This is an automated email from the ASF dual-hosted git repository. ibzib pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new ce71caf [BEAM-14124] Add display data to BQ storage reads. new 57c8647 Merge pull request #17115 from ibzib/bq-display-data ce71caf is described below commit ce71cafc4801474ba7791a194200e4599b0b9e33 Author: Kyle Weaver <kcwea...@google.com> AuthorDate: Thu Mar 17 13:12:35 2022 -0700 [BEAM-14124] Add display data to BQ storage reads. Add display data for "Selected fields" and "Projection pushdown applied". I also want to add one for "Number of fields pushed down", but that will be a little more involved so I'll do it in a separate PR. --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 26 ++++++++++- .../gcp/bigquery/BigQueryStorageTableSource.java | 52 +++++++++++++++++----- .../io/gcp/bigquery/BigQueryIOStorageReadTest.java | 5 +++ 3 files changed, 70 insertions(+), 13 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 9786d90..c510db7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -590,6 +590,7 @@ public class BigQueryIO { .setMethod(TypedRead.Method.DEFAULT) .setUseAvroLogicalTypes(false) .setFormat(DataFormat.AVRO) + .setProjectionPushdownApplied(false) .build(); } @@ -805,6 +806,8 @@ public class BigQueryIO { abstract Builder<T> setFromBeamRowFn(FromBeamRowFunction<T> fromRowFn); abstract Builder<T> setUseAvroLogicalTypes(Boolean useAvroLogicalTypes); + + abstract Builder<T> setProjectionPushdownApplied(boolean projectionPushdownApplied); } abstract @Nullable ValueProvider<String> getJsonTableRef(); @@ -853,6 +856,8 @@ public class BigQueryIO { abstract Boolean getUseAvroLogicalTypes(); + abstract boolean getProjectionPushdownApplied(); + /** * An enumeration type for the priority of a query. * @@ -1229,7 +1234,8 @@ public class BigQueryIO { getRowRestriction(), getParseFn(), outputCoder, - getBigQueryServices()))); + getBigQueryServices(), + getProjectionPushdownApplied()))); } checkArgument( @@ -1430,6 +1436,10 @@ public class BigQueryIO { DisplayData.item("table", BigQueryHelpers.displayTable(getTableProvider())) .withLabel("Table")) .addIfNotNull(DisplayData.item("query", getQuery()).withLabel("Query")) + .addIfNotDefault( + DisplayData.item("projectionPushdownApplied", getProjectionPushdownApplied()) + .withLabel("Projection Pushdown Applied"), + false) .addIfNotNull( DisplayData.item("flattenResults", getFlattenResults()) .withLabel("Flatten Query Results")) @@ -1438,6 +1448,13 @@ public class BigQueryIO { .withLabel("Use Legacy SQL Dialect")) .addIfNotDefault( DisplayData.item("validation", getValidate()).withLabel("Validation Enabled"), true); + + ValueProvider<List<String>> selectedFieldsProvider = getSelectedFields(); + if (selectedFieldsProvider != null && selectedFieldsProvider.isAccessible()) { + builder.add( + DisplayData.item("selectedFields", String.join(", ", selectedFieldsProvider.get())) + .withLabel("Selected Fields")); + } } /** Ensures that methods of the from() / fromQuery() family are called at most once. */ @@ -1623,6 +1640,11 @@ public class BigQueryIO { return toBuilder().setUseAvroLogicalTypes(true).build(); } + @VisibleForTesting + TypedRead<T> withProjectionPushdownApplied() { + return toBuilder().setProjectionPushdownApplied(true).build(); + } + @Override public boolean supportsProjectionPushdown() { // We can't do projection pushdown when a query is set. The query may project certain fields @@ -1643,7 +1665,7 @@ public class BigQueryIO { outputFields.keySet()); ImmutableList<String> fields = ImmutableList.copyOf(fieldAccessDescriptor.fieldNamesAccessed()); - return withSelectedFields(fields); + return withSelectedFields(fields).withProjectionPushdownApplied(); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java index 2a14cd5..c53cab3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java @@ -45,6 +45,11 @@ public class BigQueryStorageTableSource<T> extends BigQueryStorageSourceBase<T> private static final Logger LOG = LoggerFactory.getLogger(BigQueryStorageTableSource.class); + private final ValueProvider<TableReference> tableReferenceProvider; + private final boolean projectionPushdownApplied; + + private transient AtomicReference<Table> cachedTable; + public static <T> BigQueryStorageTableSource<T> create( ValueProvider<TableReference> tableRefProvider, DataFormat format, @@ -52,9 +57,17 @@ public class BigQueryStorageTableSource<T> extends BigQueryStorageSourceBase<T> @Nullable ValueProvider<String> rowRestriction, SerializableFunction<SchemaAndRecord, T> parseFn, Coder<T> outputCoder, - BigQueryServices bqServices) { + BigQueryServices bqServices, + boolean projectionPushdownApplied) { return new BigQueryStorageTableSource<>( - tableRefProvider, format, selectedFields, rowRestriction, parseFn, outputCoder, bqServices); + tableRefProvider, + format, + selectedFields, + rowRestriction, + parseFn, + outputCoder, + bqServices, + projectionPushdownApplied); } public static <T> BigQueryStorageTableSource<T> create( @@ -65,13 +78,16 @@ public class BigQueryStorageTableSource<T> extends BigQueryStorageSourceBase<T> Coder<T> outputCoder, BigQueryServices bqServices) { return new BigQueryStorageTableSource<>( - tableRefProvider, null, selectedFields, rowRestriction, parseFn, outputCoder, bqServices); + tableRefProvider, + null, + selectedFields, + rowRestriction, + parseFn, + outputCoder, + bqServices, + false); } - private final ValueProvider<TableReference> tableReferenceProvider; - - private transient AtomicReference<Table> cachedTable; - private BigQueryStorageTableSource( ValueProvider<TableReference> tableRefProvider, DataFormat format, @@ -79,9 +95,11 @@ public class BigQueryStorageTableSource<T> extends BigQueryStorageSourceBase<T> @Nullable ValueProvider<String> rowRestriction, SerializableFunction<SchemaAndRecord, T> parseFn, Coder<T> outputCoder, - BigQueryServices bqServices) { + BigQueryServices bqServices, + boolean projectionPushdownApplied) { super(format, selectedFields, rowRestriction, parseFn, outputCoder, bqServices); this.tableReferenceProvider = checkNotNull(tableRefProvider, "tableRefProvider"); + this.projectionPushdownApplied = projectionPushdownApplied; cachedTable = new AtomicReference<>(); } @@ -93,9 +111,21 @@ public class BigQueryStorageTableSource<T> extends BigQueryStorageSourceBase<T> @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - builder.addIfNotNull( - DisplayData.item("table", BigQueryHelpers.displayTable(tableReferenceProvider)) - .withLabel("Table")); + builder + .addIfNotNull( + DisplayData.item("table", BigQueryHelpers.displayTable(tableReferenceProvider)) + .withLabel("Table")) + .addIfNotDefault( + DisplayData.item("projectionPushdownApplied", projectionPushdownApplied) + .withLabel("Projection Pushdown Applied"), + false); + + if (selectedFieldsProvider != null && selectedFieldsProvider.isAccessible()) { + builder.add( + DisplayData.item("selectedFields", String.join(", ", selectedFieldsProvider.get())) + .withLabel("Selected Fields")); + } + // Note: This transform does not set launchesBigQueryJobs because it doesn't launch // BigQuery jobs, but instead uses the storage api to directly read the table. } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java index 8f8ab86..e9a98a5 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java @@ -279,9 +279,13 @@ public class BigQueryIOStorageReadTest { BigQueryIO.read(new TableRowParser()) .withCoder(TableRowJsonCoder.of()) .withMethod(Method.DIRECT_READ) + .withSelectedFields(ImmutableList.of("foo", "bar")) + .withProjectionPushdownApplied() .from(tableSpec); DisplayData displayData = DisplayData.from(typedRead); assertThat(displayData, hasDisplayItem("table", tableSpec)); + assertThat(displayData, hasDisplayItem("selectedFields", "foo, bar")); + assertThat(displayData, hasDisplayItem("projectionPushdownApplied", true)); } @Test @@ -2097,6 +2101,7 @@ public class BigQueryIOStorageReadTest { TypedRead<Row> pushdownRead = (TypedRead<Row>) pushdownT; assertEquals(Method.DIRECT_READ, pushdownRead.getMethod()); assertThat(pushdownRead.getSelectedFields().get(), Matchers.containsInAnyOrder("foo")); + assertTrue(pushdownRead.getProjectionPushdownApplied()); } @Test