ahmedabu98 commented on code in PR #30081:
URL: https://github.com/apache/beam/pull/30081#discussion_r1489846433
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java:
##########
@@ -291,6 +287,10 @@ private synchronized boolean readNextRecord() throws
IOException {
* 1.0
/ totalRowsInCurrentResponse;
+ SchemaAndRecord schemaAndRecord = new
SchemaAndRecord(reader.readSingleRecord(), tableSchema);
+
+ current = parseFn.apply(schemaAndRecord);
+
Review Comment:
nit: see previous comment
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java:
##########
@@ -680,62 +697,92 @@ PCollection<WriteBundlesToFiles.Result<DestinationT>>
writeDynamicallyShardedFil
// parallelize properly. We also ensure that the files are written if a
threshold number of
// records are ready. Dynamic sharding is achieved via the
withShardedKey() option provided by
// GroupIntoBatches.
- return input
- .apply(
- GroupIntoBatches.<DestinationT,
ElementT>ofSize(FILE_TRIGGERING_RECORD_COUNT)
- .withByteSize(byteSize)
- .withMaxBufferingDuration(maxBufferingDuration)
- .withShardedKey())
- .setCoder(
- KvCoder.of(
- org.apache.beam.sdk.util.ShardedKey.Coder.of(destinationCoder),
- IterableCoder.of(elementCoder)))
- .apply(
- "StripShardId",
- MapElements.via(
- new SimpleFunction<
- KV<org.apache.beam.sdk.util.ShardedKey<DestinationT>,
Iterable<ElementT>>,
- KV<DestinationT, Iterable<ElementT>>>() {
- @Override
- public KV<DestinationT, Iterable<ElementT>> apply(
- KV<org.apache.beam.sdk.util.ShardedKey<DestinationT>,
Iterable<ElementT>>
- input) {
- return KV.of(input.getKey().getKey(), input.getValue());
- }
- }))
- .setCoder(KvCoder.of(destinationCoder, IterableCoder.of(elementCoder)))
- .apply(
- "WriteGroupedRecords",
- ParDo.of(
- new WriteGroupedRecordsToFiles<DestinationT, ElementT>(
- tempFilePrefix, maxFileSize, rowWriterFactory))
- .withSideInputs(tempFilePrefix))
+ TupleTag<Result<DestinationT>> successfulResultsTag = new TupleTag<>();
+ PCollectionTuple writeResults =
+ input
+ .apply(
+ GroupIntoBatches.<DestinationT,
ElementT>ofSize(FILE_TRIGGERING_RECORD_COUNT)
+ .withByteSize(byteSize)
+ .withMaxBufferingDuration(maxBufferingDuration)
+ .withShardedKey())
+ .setCoder(
+ KvCoder.of(
+
org.apache.beam.sdk.util.ShardedKey.Coder.of(destinationCoder),
+ IterableCoder.of(elementCoder)))
+ .apply(
+ "StripShardId",
+ MapElements.via(
+ new SimpleFunction<
+ KV<org.apache.beam.sdk.util.ShardedKey<DestinationT>,
Iterable<ElementT>>,
+ KV<DestinationT, Iterable<ElementT>>>() {
+ @Override
+ public KV<DestinationT, Iterable<ElementT>> apply(
+
KV<org.apache.beam.sdk.util.ShardedKey<DestinationT>, Iterable<ElementT>>
+ input) {
+ return KV.of(input.getKey().getKey(),
input.getValue());
+ }
+ }))
+ .setCoder(KvCoder.of(destinationCoder,
IterableCoder.of(elementCoder)))
+ .apply(
+ "WriteGroupedRecords",
+ ParDo.of(
+ new WriteGroupedRecordsToFiles<DestinationT, ElementT>(
+ tempFilePrefix,
+ maxFileSize,
+ rowWriterFactory,
+ badRecordRouter,
+ successfulResultsTag,
+ elementCoder))
+ .withSideInputs(tempFilePrefix)
+ .withOutputTags(successfulResultsTag,
TupleTagList.of(BAD_RECORD_TAG)));
+ badRecordErrorHandler.addErrorCollection(
+
writeResults.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline())));
+
+ return writeResults
+ .get(successfulResultsTag)
.setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
}
private PCollection<Result<DestinationT>> writeShardedRecords(
PCollection<KV<ShardedKey<DestinationT>, ElementT>> shardedRecords,
PCollectionView<String> tempFilePrefix) {
- return shardedRecords
- .apply("GroupByDestination", GroupByKey.create())
- .apply(
- "StripShardId",
- MapElements.via(
- new SimpleFunction<
- KV<ShardedKey<DestinationT>, Iterable<ElementT>>,
- KV<DestinationT, Iterable<ElementT>>>() {
- @Override
- public KV<DestinationT, Iterable<ElementT>> apply(
- KV<ShardedKey<DestinationT>, Iterable<ElementT>> input) {
- return KV.of(input.getKey().getKey(), input.getValue());
- }
- }))
- .setCoder(KvCoder.of(destinationCoder, IterableCoder.of(elementCoder)))
- .apply(
- "WriteGroupedRecords",
- ParDo.of(
- new WriteGroupedRecordsToFiles<>(tempFilePrefix,
maxFileSize, rowWriterFactory))
- .withSideInputs(tempFilePrefix))
+ TupleTag<Result<DestinationT>> successfulResultsTag = new TupleTag<>();
+ PCollectionTuple writeResults =
+ shardedRecords
+ .apply("GroupByDestination", GroupByKey.create())
+ .apply(
+ "StripShardId",
+ MapElements.via(
+ new SimpleFunction<
+ KV<ShardedKey<DestinationT>, Iterable<ElementT>>,
+ KV<DestinationT, Iterable<ElementT>>>() {
+ @Override
+ public KV<DestinationT, Iterable<ElementT>> apply(
+ KV<ShardedKey<DestinationT>, Iterable<ElementT>>
input) {
+ return KV.of(input.getKey().getKey(),
input.getValue());
+ }
+ }))
+ .setCoder(KvCoder.of(destinationCoder,
IterableCoder.of(elementCoder)))
+ .apply(
+ "WriteGroupedRecords",
+ ParDo.of(
+ new WriteGroupedRecordsToFiles<>(
+ tempFilePrefix,
+ maxFileSize,
+ rowWriterFactory,
+ badRecordRouter,
+ successfulResultsTag,
+ elementCoder))
+ .withSideInputs(tempFilePrefix)
+ .withOutputTags(successfulResultsTag,
TupleTagList.of(BAD_RECORD_TAG)));
+
+ badRecordErrorHandler.addErrorCollection(
+ writeResults
+ .get(BAD_RECORD_TAG)
+ .setCoder(BadRecord.getCoder(shardedRecords.getPipeline())));
Review Comment:
QQ what happens if this `BadRecord.getCoder()` method throws an error here?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -1428,7 +1452,7 @@ private PCollection<T> expandForDirectRead(
PBegin input, Coder<T> outputCoder, Schema beamSchema, BigQueryOptions
bqOptions) {
ValueProvider<TableReference> tableProvider = getTableProvider();
Pipeline p = input.getPipeline();
- if (tableProvider != null) {
+ if (tableProvider != null && getBadRecordRouter() instanceof
ThrowingBadRecordRouter) {
Review Comment:
P.S. same with `getWithTemplateCompatibility` below
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -1776,45 +1839,102 @@ private PCollection<T>
createPCollectionForDirectReadWithStreamBundle(
TupleTag<List<ReadStream>> listReadStreamsTag,
PCollectionView<ReadSession> readSessionView,
PCollectionView<String> tableSchemaView) {
- PCollection<T> rows =
+ TupleTag<T> rowTag = new TupleTag<>();
+ PCollectionTuple resultTuple =
tuple
.get(listReadStreamsTag)
.apply(Reshuffle.viaRandomKey())
.apply(
ParDo.of(
new DoFn<List<ReadStream>, T>() {
@ProcessElement
- public void processElement(ProcessContext c)
throws Exception {
+ public void processElement(
+ ProcessContext c, MultiOutputReceiver
outputReceiver)
+ throws Exception {
ReadSession readSession =
c.sideInput(readSessionView);
TableSchema tableSchema =
BigQueryHelpers.fromJsonString(
c.sideInput(tableSchemaView),
TableSchema.class);
List<ReadStream> streamBundle = c.element();
+ ErrorHandlingParseFn errorHandlingParseFn =
+ new ErrorHandlingParseFn(getParseFn());
+
BigQueryStorageStreamBundleSource<T>
streamSource =
BigQueryStorageStreamBundleSource.create(
readSession,
streamBundle,
tableSchema,
- getParseFn(),
+ errorHandlingParseFn,
outputCoder,
getBigQueryServices(),
1L);
- // Read all of the data from the stream. In the
event that this work
- // item fails and is rescheduled, the same rows
will be returned in
- // the same order.
- BoundedReader<T> reader =
-
streamSource.createReader(c.getPipelineOptions());
- for (boolean more = reader.start(); more; more =
reader.advance()) {
- c.output(reader.getCurrent());
- }
+ readStreamSource(
+ c.getPipelineOptions(),
+ rowTag,
+ outputReceiver,
+ streamSource,
+ errorHandlingParseFn);
}
})
- .withSideInputs(readSessionView, tableSchemaView))
- .setCoder(outputCoder);
+ .withSideInputs(readSessionView, tableSchemaView)
+ .withOutputTags(rowTag,
TupleTagList.of(BAD_RECORD_TAG)));
- return rows;
+ getBadRecordErrorHandler()
+ .addErrorCollection(
+
resultTuple.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(tuple.getPipeline())));
+
+ return resultTuple.get(rowTag).setCoder(outputCoder);
+ }
+
+ public void readStreamSource(
+ PipelineOptions options,
+ TupleTag<T> rowTag,
+ MultiOutputReceiver outputReceiver,
+ BoundedSource<T> streamSource,
+ ErrorHandlingParseFn errorHandlingParseFn)
+ throws Exception {
+ // Read all the data from the stream. In the event that this work
+ // item fails and is rescheduled, the same rows will be returned in
+ // the same order.
+ BoundedSource.BoundedReader<T> reader =
streamSource.createReader(options);
+
+ try {
+ if (reader.start()) {
+ outputReceiver.get(rowTag).output(reader.getCurrent());
+ } else {
+ return;
+ }
+ } catch (ParseException e) {
+ GenericRecord record =
errorHandlingParseFn.getSchemaAndRecord().getRecord();
+ getBadRecordRouter()
+ .route(
+ outputReceiver,
+ record,
+ AvroCoder.of(record.getSchema()),
+ (Exception) e.getCause(),
+ "Unable to parse record reading from BigQuery");
+ }
+
+ while (true) {
+ try {
+ if (reader.advance()) {
+ outputReceiver.get(rowTag).output(reader.getCurrent());
+ } else {
+ return;
+ }
+ } catch (ParseException e) {
+ GenericRecord record =
errorHandlingParseFn.getSchemaAndRecord().getRecord();
+ getBadRecordRouter()
+ .route(
+ outputReceiver,
+ record,
+ AvroCoder.of(record.getSchema()),
+ (Exception) e.getCause(),
+ "Unable to parse record reading from BigQuery");
+ }
+ }
Review Comment:
Are we able to combine this in a for loop with:
```python
for (boolean more = reader.start(); more; more = reader.advance()) {
```
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java:
##########
@@ -159,9 +178,20 @@ public void processElement(
.toMessage(element.getValue(), rowMutationInformation)
.withTimestamp(timestamp);
o.get(successfulWritesTag).output(KV.of(element.getKey(), payload));
- } catch (TableRowToStorageApiProto.SchemaConversionException e) {
- TableRow tableRow = messageConverter.toTableRow(element.getValue());
- o.get(failedWritesTag).output(new
BigQueryStorageApiInsertError(tableRow, e.toString()));
+ } catch (TableRowToStorageApiProto.SchemaConversionException
conversionException) {
+ TableRow tableRow;
+ try {
+ tableRow = messageConverter.toTableRow(element.getValue());
+ } catch (Exception e) {
+ badRecordRouter.route(
+ o, element, elementCoder, e, "Unable to convert value to
StorageWriteApiPayload");
Review Comment:
>Unable to convert value to StorageWriteApiPayload
nit: description could be a little more accurate, like "Unable to convert
failed element to TableRow"
The conversion to StorageWriteApiPayload already fails above. This exception
will happen when we fail to convert user element to a TableRow
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -1428,7 +1452,7 @@ private PCollection<T> expandForDirectRead(
PBegin input, Coder<T> outputCoder, Schema beamSchema, BigQueryOptions
bqOptions) {
ValueProvider<TableReference> tableProvider = getTableProvider();
Pipeline p = input.getPipeline();
- if (tableProvider != null) {
+ if (tableProvider != null && getBadRecordRouter() instanceof
ThrowingBadRecordRouter) {
Review Comment:
So if someone reads with a table using a different BadRecordRouter, the
connector will use a different code path?
Might be worth documenting this somewhere or logging on INFO level.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamBundleSource.java:
##########
@@ -355,6 +351,11 @@ private boolean readNextRecord() throws IOException {
// progress made in the current Stream gives us the overall StreamBundle
progress.
fractionOfStreamBundleConsumed =
(currentStreamBundleIndex + fractionOfCurrentStreamConsumed) /
source.streamBundle.size();
+
+ SchemaAndRecord schemaAndRecord = new
SchemaAndRecord(reader.readSingleRecord(), tableSchema);
+
+ current = parseFn.apply(schemaAndRecord);
+
Review Comment:
nit: does this move result in any functional change? maybe revert
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java:
##########
@@ -234,17 +245,28 @@ public void processElement(
try {
writer.write(element.getValue());
- } catch (Exception e) {
- // Discard write result and close the write.
+ } catch (BigQueryRowSerializationException e) {
try {
- writer.close();
- // The writer does not need to be reset, as this DoFn cannot be reused.
- } catch (Exception closeException) {
- // Do not mask the exception that caused the write to fail.
- e.addSuppressed(closeException);
+ badRecordRouter.route(
+ outputReceiver, element, coder, e, "Unable to Write BQ Record to
File");
+ } catch (Exception e2) {
+ cleanupWriter(writer, e2);
}
- throw e;
+ } catch (IOException e) {
Review Comment:
This used to catch `Exception`. Is there a reason we're narrowing it down to
`IOException`? Ideally we'd want to cleanup writer on any exception right?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java:
##########
@@ -234,17 +245,28 @@ public void processElement(
try {
writer.write(element.getValue());
- } catch (Exception e) {
- // Discard write result and close the write.
+ } catch (BigQueryRowSerializationException e) {
try {
- writer.close();
- // The writer does not need to be reset, as this DoFn cannot be reused.
- } catch (Exception closeException) {
- // Do not mask the exception that caused the write to fail.
- e.addSuppressed(closeException);
+ badRecordRouter.route(
+ outputReceiver, element, coder, e, "Unable to Write BQ Record to
File");
Review Comment:
nit:
```suggestion
outputReceiver, element, coder, e, "Unable to convert element to
TableRow");
```
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java:
##########
@@ -159,9 +178,20 @@ public void processElement(
.toMessage(element.getValue(), rowMutationInformation)
.withTimestamp(timestamp);
o.get(successfulWritesTag).output(KV.of(element.getKey(), payload));
- } catch (TableRowToStorageApiProto.SchemaConversionException e) {
- TableRow tableRow = messageConverter.toTableRow(element.getValue());
- o.get(failedWritesTag).output(new
BigQueryStorageApiInsertError(tableRow, e.toString()));
+ } catch (TableRowToStorageApiProto.SchemaConversionException
conversionException) {
+ TableRow tableRow;
+ try {
+ tableRow = messageConverter.toTableRow(element.getValue());
+ } catch (Exception e) {
+ badRecordRouter.route(
+ o, element, elementCoder, e, "Unable to convert value to
StorageWriteApiPayload");
+ return;
+ }
+ o.get(failedWritesTag)
+ .output(new BigQueryStorageApiInsertError(tableRow,
conversionException.toString()));
+ } catch (Exception e) {
+ badRecordRouter.route(
+ o, element, elementCoder, e, "Unable to convert value to
StorageWriteApiPayload");
Review Comment:
nit: more accurate to say "Unable to output failed TableRow to dead-letter
queue"
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]