johnjcasey commented on code in PR #30081:
URL: https://github.com/apache/beam/pull/30081#discussion_r1496071335


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -1452,28 +1454,65 @@ private PCollection<T> expandForDirectRead(
         PBegin input, Coder<T> outputCoder, Schema beamSchema, BigQueryOptions 
bqOptions) {
       ValueProvider<TableReference> tableProvider = getTableProvider();
       Pipeline p = input.getPipeline();
-      if (tableProvider != null && getBadRecordRouter() instanceof 
ThrowingBadRecordRouter) {
-        // No job ID is required. Read directly from BigQuery storage.
-        PCollection<T> rows =
-            p.apply(
-                org.apache.beam.sdk.io.Read.from(
-                    BigQueryStorageTableSource.create(
-                        tableProvider,
-                        getFormat(),
-                        getSelectedFields(),
-                        getRowRestriction(),
-                        getParseFn(),
-                        outputCoder,
-                        getBigQueryServices(),
-                        getProjectionPushdownApplied())));
-        if (beamSchema != null) {
-          rows.setSchema(
-              beamSchema,
-              getTypeDescriptor(),
-              getToBeamRowFn().apply(beamSchema),
-              getFromBeamRowFn().apply(beamSchema));
+      if (tableProvider != null) {
+        // ThrowingBadRecordRouter is the default value, and is what is used 
if the user hasn't
+        // specified any particular error handling.
+        if (getBadRecordRouter() instanceof ThrowingBadRecordRouter) {
+          // No job ID is required. Read directly from BigQuery storage.
+          PCollection<T> rows =
+              p.apply(
+                  org.apache.beam.sdk.io.Read.from(
+                      BigQueryStorageTableSource.create(
+                          tableProvider,
+                          getFormat(),
+                          getSelectedFields(),
+                          getRowRestriction(),
+                          getParseFn(),
+                          outputCoder,
+                          getBigQueryServices(),
+                          getProjectionPushdownApplied())));
+          if (beamSchema != null) {
+            rows.setSchema(
+                beamSchema,
+                getTypeDescriptor(),
+                getToBeamRowFn().apply(beamSchema),
+                getFromBeamRowFn().apply(beamSchema));
+          }
+          return rows;
+        } else {
+          // We need to manually execute the table source, so as to be able to 
capture exceptions
+          // to pipe to error handling
+          BigQueryStorageTableSource<T> source =
+              BigQueryStorageTableSource.create(
+                  tableProvider,
+                  getFormat(),
+                  getSelectedFields(),
+                  getRowRestriction(),
+                  getParseFn(),
+                  outputCoder,
+                  getBigQueryServices(),
+                  getProjectionPushdownApplied());
+          List<? extends BoundedSource<T>> sources;
+          try {
+            sources = source.split(0, input.getPipeline().getOptions());

Review Comment:
   I initially picked 0 just to verify this pattern would work. I'm not sure 
how best to pick a size for desired size here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to