This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch 
revert-28274-i-28080/java/BigQueryIO/datumReaderFactory
in repository https://gitbox.apache.org/repos/asf/beam.git

commit cdfffd80097fd022fafd9512c2ed27ac14a15621
Author: Yi Hu <[email protected]>
AuthorDate: Tue Sep 19 11:12:12 2023 -0400

    Revert "Remove TableSchema to JSON conversion. (#28274)"
    
    This reverts commit 7e830593e61ba1fbff16411b5825bfb4aea53ba2.
---
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java       | 40 +++++++++++++++++-----
 .../sdk/io/gcp/bigquery/BigQueryIOReadTest.java    | 18 ++++++----
 2 files changed, 43 insertions(+), 15 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 3c006d24d03..58d76931244 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 
 import static 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;
 import static 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.createTempTableReference;
-import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
 
@@ -50,6 +49,7 @@ import com.google.protobuf.Descriptors;
 import com.google.protobuf.DynamicMessage;
 import com.google.protobuf.Message;
 import java.io.IOException;
+import java.io.Serializable;
 import java.lang.reflect.InvocationTargetException;
 import java.util.Collections;
 import java.util.List;
@@ -132,10 +132,13 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.TypeDescriptors;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Function;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
@@ -646,19 +649,29 @@ public class BigQueryIO {
             BigQueryUtils.tableRowFromBeamRow());
   }
 
+  private static class TableSchemaFunction
+      implements Serializable, Function<@Nullable String, @Nullable 
TableSchema> {
+    @Override
+    public @Nullable TableSchema apply(@Nullable String input) {
+      return BigQueryHelpers.fromJsonString(input, TableSchema.class);
+    }
+  }
+
   @VisibleForTesting
   static class GenericDatumTransformer<T> implements DatumReader<T> {
     private final SerializableFunction<SchemaAndRecord, T> parseFn;
-    private final TableSchema tableSchema;
+    private final Supplier<TableSchema> tableSchema;
     private GenericDatumReader<T> reader;
     private org.apache.avro.Schema writerSchema;
 
     public GenericDatumTransformer(
         SerializableFunction<SchemaAndRecord, T> parseFn,
-        TableSchema tableSchema,
+        String tableSchema,
         org.apache.avro.Schema writer) {
       this.parseFn = parseFn;
-      this.tableSchema = tableSchema;
+      this.tableSchema =
+          Suppliers.memoize(
+              Suppliers.compose(new TableSchemaFunction(), 
Suppliers.ofInstance(tableSchema)));
       this.writerSchema = writer;
       this.reader = new GenericDatumReader<>(this.writerSchema);
     }
@@ -676,7 +689,7 @@ public class BigQueryIO {
     @Override
     public T read(T reuse, Decoder in) throws IOException {
       GenericRecord record = (GenericRecord) this.reader.read(reuse, in);
-      return parseFn.apply(new SchemaAndRecord(record, this.tableSchema));
+      return parseFn.apply(new SchemaAndRecord(record, 
this.tableSchema.get()));
     }
   }
 
@@ -708,9 +721,16 @@ public class BigQueryIO {
         .setDatumReaderFactory(
             (SerializableFunction<TableSchema, 
AvroSource.DatumReaderFactory<T>>)
                 input -> {
-                  TableSchema safeInput = checkStateNotNull(input);
-                  return (AvroSource.DatumReaderFactory<T>)
-                      (writer, reader) -> new 
GenericDatumTransformer<>(parseFn, safeInput, writer);
+                  try {
+                    String jsonTableSchema = 
BigQueryIO.JSON_FACTORY.toString(input);
+                    return (AvroSource.DatumReaderFactory<T>)
+                        (writer, reader) ->
+                            new GenericDatumTransformer<>(parseFn, 
jsonTableSchema, writer);
+                  } catch (IOException e) {
+                    LOG.warn(
+                        String.format("Error while converting table schema %s 
to JSON!", input), e);
+                    return null;
+                  }
                 })
         // TODO: Remove setParseFn once 
https://github.com/apache/beam/issues/21076 is fixed.
         .setParseFn(parseFn)
@@ -3366,7 +3386,9 @@ public class BigQueryIO {
             @SuppressWarnings({"unchecked", "nullness"})
             Descriptors.Descriptor descriptor =
                 (Descriptors.Descriptor)
-                    
checkStateNotNull(writeProtoClass.getMethod("getDescriptor")).invoke(null);
+                    org.apache.beam.sdk.util.Preconditions.checkStateNotNull(
+                            writeProtoClass.getMethod("getDescriptor"))
+                        .invoke(null);
             TableSchema tableSchema =
                 TableRowToStorageApiProto.protoSchemaToTableSchema(
                     
TableRowToStorageApiProto.tableSchemaFromDescriptor(descriptor));
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
index e274a8ac68e..bc75ba8bd9b 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 
 import static 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.createTempTableReference;
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
@@ -144,11 +143,18 @@ public class BigQueryIOReadTest implements Serializable {
 
   private SerializableFunction<TableSchema, 
AvroSource.DatumReaderFactory<TableRow>>
       datumReaderFactoryFn =
-          input ->
-              (AvroSource.DatumReaderFactory<TableRow>)
-                  (writer, reader) ->
-                      new BigQueryIO.GenericDatumTransformer<>(
-                          BigQueryIO.TableRowParser.INSTANCE, 
checkStateNotNull(input), writer);
+          (SerializableFunction<TableSchema, 
AvroSource.DatumReaderFactory<TableRow>>)
+              input -> {
+                try {
+                  String jsonSchema = BigQueryIO.JSON_FACTORY.toString(input);
+                  return (AvroSource.DatumReaderFactory<TableRow>)
+                      (writer, reader) ->
+                          new BigQueryIO.GenericDatumTransformer<>(
+                              BigQueryIO.TableRowParser.INSTANCE, jsonSchema, 
writer);
+                } catch (IOException e) {
+                  return null;
+                }
+              };
 
   private static class MyData implements Serializable {
     private String name;

Reply via email to