This is an automated email from the ASF dual-hosted git repository. yhu pushed a commit to branch revert-28274-i-28080/java/BigQueryIO/datumReaderFactory in repository https://gitbox.apache.org/repos/asf/beam.git
commit cdfffd80097fd022fafd9512c2ed27ac14a15621 Author: Yi Hu <[email protected]> AuthorDate: Tue Sep 19 11:12:12 2023 -0400 Revert "Remove TableSchema to JSON conversion. (#28274)" This reverts commit 7e830593e61ba1fbff16411b5825bfb4aea53ba2. --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 40 +++++++++++++++++----- .../sdk/io/gcp/bigquery/BigQueryIOReadTest.java | 18 ++++++---- 2 files changed, 43 insertions(+), 15 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 3c006d24d03..58d76931244 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -19,7 +19,6 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.createTempTableReference; -import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; @@ -50,6 +49,7 @@ import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; import com.google.protobuf.Message; import java.io.IOException; +import java.io.Serializable; import java.lang.reflect.InvocationTargetException; import java.util.Collections; import java.util.List; @@ -132,10 +132,13 @@ import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Function; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; @@ -646,19 +649,29 @@ public class BigQueryIO { BigQueryUtils.tableRowFromBeamRow()); } + private static class TableSchemaFunction + implements Serializable, Function<@Nullable String, @Nullable TableSchema> { + @Override + public @Nullable TableSchema apply(@Nullable String input) { + return BigQueryHelpers.fromJsonString(input, TableSchema.class); + } + } + @VisibleForTesting static class GenericDatumTransformer<T> implements DatumReader<T> { private final SerializableFunction<SchemaAndRecord, T> parseFn; - private final TableSchema tableSchema; + private final Supplier<TableSchema> tableSchema; private GenericDatumReader<T> reader; private org.apache.avro.Schema writerSchema; public GenericDatumTransformer( SerializableFunction<SchemaAndRecord, T> parseFn, - TableSchema tableSchema, + String tableSchema, org.apache.avro.Schema writer) { this.parseFn = parseFn; - this.tableSchema = tableSchema; + this.tableSchema = + Suppliers.memoize( + Suppliers.compose(new TableSchemaFunction(), Suppliers.ofInstance(tableSchema))); this.writerSchema = writer; this.reader = new GenericDatumReader<>(this.writerSchema); } @@ -676,7 +689,7 @@ public class BigQueryIO { @Override public T read(T reuse, Decoder in) throws IOException { GenericRecord record = (GenericRecord) this.reader.read(reuse, in); - return parseFn.apply(new SchemaAndRecord(record, this.tableSchema)); + return parseFn.apply(new SchemaAndRecord(record, this.tableSchema.get())); } } @@ -708,9 +721,16 @@ public class BigQueryIO { .setDatumReaderFactory( (SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<T>>) input -> { - TableSchema safeInput = checkStateNotNull(input); - return (AvroSource.DatumReaderFactory<T>) - (writer, reader) -> new GenericDatumTransformer<>(parseFn, safeInput, writer); + try { + String jsonTableSchema = BigQueryIO.JSON_FACTORY.toString(input); + return (AvroSource.DatumReaderFactory<T>) + (writer, reader) -> + new GenericDatumTransformer<>(parseFn, jsonTableSchema, writer); + } catch (IOException e) { + LOG.warn( + String.format("Error while converting table schema %s to JSON!", input), e); + return null; + } }) // TODO: Remove setParseFn once https://github.com/apache/beam/issues/21076 is fixed. .setParseFn(parseFn) @@ -3366,7 +3386,9 @@ public class BigQueryIO { @SuppressWarnings({"unchecked", "nullness"}) Descriptors.Descriptor descriptor = (Descriptors.Descriptor) - checkStateNotNull(writeProtoClass.getMethod("getDescriptor")).invoke(null); + org.apache.beam.sdk.util.Preconditions.checkStateNotNull( + writeProtoClass.getMethod("getDescriptor")) + .invoke(null); TableSchema tableSchema = TableRowToStorageApiProto.protoSchemaToTableSchema( TableRowToStorageApiProto.tableSchemaFromDescriptor(descriptor)); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java index e274a8ac68e..bc75ba8bd9b 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java @@ -19,7 +19,6 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.createTempTableReference; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; -import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; @@ -144,11 +143,18 @@ public class BigQueryIOReadTest implements Serializable { private SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<TableRow>> datumReaderFactoryFn = - input -> - (AvroSource.DatumReaderFactory<TableRow>) - (writer, reader) -> - new BigQueryIO.GenericDatumTransformer<>( - BigQueryIO.TableRowParser.INSTANCE, checkStateNotNull(input), writer); + (SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<TableRow>>) + input -> { + try { + String jsonSchema = BigQueryIO.JSON_FACTORY.toString(input); + return (AvroSource.DatumReaderFactory<TableRow>) + (writer, reader) -> + new BigQueryIO.GenericDatumTransformer<>( + BigQueryIO.TableRowParser.INSTANCE, jsonSchema, writer); + } catch (IOException e) { + return null; + } + }; private static class MyData implements Serializable { private String name;
