johnjcasey commented on code in PR #30081:
URL: https://github.com/apache/beam/pull/30081#discussion_r1496071335
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -1452,28 +1454,65 @@ private PCollection<T> expandForDirectRead(
PBegin input, Coder<T> outputCoder, Schema beamSchema, BigQueryOptions
bqOptions) {
ValueProvider<TableReference> tableProvider = getTableProvider();
Pipeline p = input.getPipeline();
- if (tableProvider != null && getBadRecordRouter() instanceof
ThrowingBadRecordRouter) {
- // No job ID is required. Read directly from BigQuery storage.
- PCollection<T> rows =
- p.apply(
- org.apache.beam.sdk.io.Read.from(
- BigQueryStorageTableSource.create(
- tableProvider,
- getFormat(),
- getSelectedFields(),
- getRowRestriction(),
- getParseFn(),
- outputCoder,
- getBigQueryServices(),
- getProjectionPushdownApplied())));
- if (beamSchema != null) {
- rows.setSchema(
- beamSchema,
- getTypeDescriptor(),
- getToBeamRowFn().apply(beamSchema),
- getFromBeamRowFn().apply(beamSchema));
+ if (tableProvider != null) {
+ // ThrowingBadRecordRouter is the default value, and is what is used
if the user hasn't
+ // specified any particular error handling.
+ if (getBadRecordRouter() instanceof ThrowingBadRecordRouter) {
+ // No job ID is required. Read directly from BigQuery storage.
+ PCollection<T> rows =
+ p.apply(
+ org.apache.beam.sdk.io.Read.from(
+ BigQueryStorageTableSource.create(
+ tableProvider,
+ getFormat(),
+ getSelectedFields(),
+ getRowRestriction(),
+ getParseFn(),
+ outputCoder,
+ getBigQueryServices(),
+ getProjectionPushdownApplied())));
+ if (beamSchema != null) {
+ rows.setSchema(
+ beamSchema,
+ getTypeDescriptor(),
+ getToBeamRowFn().apply(beamSchema),
+ getFromBeamRowFn().apply(beamSchema));
+ }
+ return rows;
+ } else {
+ // We need to manually execute the table source, so as to be able to
capture exceptions
+ // to pipe to error handling
+ BigQueryStorageTableSource<T> source =
+ BigQueryStorageTableSource.create(
+ tableProvider,
+ getFormat(),
+ getSelectedFields(),
+ getRowRestriction(),
+ getParseFn(),
+ outputCoder,
+ getBigQueryServices(),
+ getProjectionPushdownApplied());
+ List<? extends BoundedSource<T>> sources;
+ try {
+ sources = source.split(0, input.getPipeline().getOptions());
Review Comment:
I initially picked 0 just to verify this pattern would work. I'm not sure
how best to pick a size for desired size here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]