johnjcasey commented on code in PR #30081:
URL: https://github.com/apache/beam/pull/30081#discussion_r1491367627
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -1776,45 +1839,102 @@ private PCollection<T>
createPCollectionForDirectReadWithStreamBundle(
TupleTag<List<ReadStream>> listReadStreamsTag,
PCollectionView<ReadSession> readSessionView,
PCollectionView<String> tableSchemaView) {
- PCollection<T> rows =
+ TupleTag<T> rowTag = new TupleTag<>();
+ PCollectionTuple resultTuple =
tuple
.get(listReadStreamsTag)
.apply(Reshuffle.viaRandomKey())
.apply(
ParDo.of(
new DoFn<List<ReadStream>, T>() {
@ProcessElement
- public void processElement(ProcessContext c)
throws Exception {
+ public void processElement(
+ ProcessContext c, MultiOutputReceiver
outputReceiver)
+ throws Exception {
ReadSession readSession =
c.sideInput(readSessionView);
TableSchema tableSchema =
BigQueryHelpers.fromJsonString(
c.sideInput(tableSchemaView),
TableSchema.class);
List<ReadStream> streamBundle = c.element();
+ ErrorHandlingParseFn errorHandlingParseFn =
+ new ErrorHandlingParseFn(getParseFn());
+
BigQueryStorageStreamBundleSource<T>
streamSource =
BigQueryStorageStreamBundleSource.create(
readSession,
streamBundle,
tableSchema,
- getParseFn(),
+ errorHandlingParseFn,
outputCoder,
getBigQueryServices(),
1L);
- // Read all of the data from the stream. In the
event that this work
- // item fails and is rescheduled, the same rows
will be returned in
- // the same order.
- BoundedReader<T> reader =
-
streamSource.createReader(c.getPipelineOptions());
- for (boolean more = reader.start(); more; more =
reader.advance()) {
- c.output(reader.getCurrent());
- }
+ readStreamSource(
+ c.getPipelineOptions(),
+ rowTag,
+ outputReceiver,
+ streamSource,
+ errorHandlingParseFn);
}
})
- .withSideInputs(readSessionView, tableSchemaView))
- .setCoder(outputCoder);
+ .withSideInputs(readSessionView, tableSchemaView)
+ .withOutputTags(rowTag,
TupleTagList.of(BAD_RECORD_TAG)));
- return rows;
+ getBadRecordErrorHandler()
+ .addErrorCollection(
+
resultTuple.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(tuple.getPipeline())));
+
+ return resultTuple.get(rowTag).setCoder(outputCoder);
+ }
+
+ public void readStreamSource(
+ PipelineOptions options,
+ TupleTag<T> rowTag,
+ MultiOutputReceiver outputReceiver,
+ BoundedSource<T> streamSource,
+ ErrorHandlingParseFn errorHandlingParseFn)
+ throws Exception {
+ // Read all the data from the stream. In the event that this work
+ // item fails and is rescheduled, the same rows will be returned in
+ // the same order.
+ BoundedSource.BoundedReader<T> reader =
streamSource.createReader(options);
+
+ try {
+ if (reader.start()) {
+ outputReceiver.get(rowTag).output(reader.getCurrent());
+ } else {
+ return;
+ }
+ } catch (ParseException e) {
+ GenericRecord record =
errorHandlingParseFn.getSchemaAndRecord().getRecord();
+ getBadRecordRouter()
+ .route(
+ outputReceiver,
+ record,
+ AvroCoder.of(record.getSchema()),
+ (Exception) e.getCause(),
+ "Unable to parse record reading from BigQuery");
+ }
+
+ while (true) {
+ try {
+ if (reader.advance()) {
+ outputReceiver.get(rowTag).output(reader.getCurrent());
+ } else {
+ return;
+ }
+ } catch (ParseException e) {
+ GenericRecord record =
errorHandlingParseFn.getSchemaAndRecord().getRecord();
+ getBadRecordRouter()
+ .route(
+ outputReceiver,
+ record,
+ AvroCoder.of(record.getSchema()),
+ (Exception) e.getCause(),
+ "Unable to parse record reading from BigQuery");
+ }
+ }
Review Comment:
Unfortunately not. The .start and .advance calls themselves throw
exceptions we want to capture.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]