reuvenlax commented on code in PR #24147:
URL: https://github.com/apache/beam/pull/24147#discussion_r1032744564


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java:
##########
@@ -72,128 +65,86 @@ static void clearSchemaCache() throws ExecutionException, 
InterruptedException {
   @Override
   public MessageConverter<T> getMessageConverter(
       DestinationT destination, DatasetService datasetService) throws 
Exception {
-    return new MessageConverter<T>() {
-      @Nullable TableSchema tableSchema;
-      TableRowToStorageApiProto.SchemaInformation schemaInformation;
-      Descriptor descriptor;
-      long descriptorHash;
+    return new TableRowConverter(destination, datasetService);
+  }
 
-      {
-        tableSchema = getSchema(destination);
-        TableReference tableReference = 
getTable(destination).getTableReference();
-        if (tableSchema == null) {
-          // If the table already exists, then try and fetch the schema from 
the existing
-          // table.
-          tableSchema = SCHEMA_CACHE.getSchema(tableReference, datasetService);
-          if (tableSchema == null) {
-            if (createDisposition == CreateDisposition.CREATE_NEVER) {
-              throw new RuntimeException(
-                  "BigQuery table "
-                      + tableReference
-                      + " not found. If you wanted to "
-                      + "automatically create the table, set the create 
disposition to CREATE_IF_NEEDED and specify a "
-                      + "schema.");
-            } else {
-              throw new RuntimeException(
-                  "Schema must be set for table "
-                      + tableReference
-                      + " when writing TableRows using Storage API and "
-                      + "using a create disposition of CREATE_IF_NEEDED.");
-            }
-          }
-        } else {
-          // Make sure we register this schema with the cache, unless there's 
already a more
-          // up-to-date schema.
-          tableSchema =
-              MoreObjects.firstNonNull(
-                  SCHEMA_CACHE.putSchemaIfAbsent(tableReference, tableSchema), 
tableSchema);
-        }
-        schemaInformation =
-            
TableRowToStorageApiProto.SchemaInformation.fromTableSchema(tableSchema);
-        descriptor = 
TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema);
-        descriptorHash = 
BigQueryUtils.hashSchemaDescriptorDeterministic(descriptor);
-      }
+  class TableRowConverter implements MessageConverter<T> {
+    final @Nullable TableSchema tableSchema;
+    final com.google.cloud.bigquery.storage.v1.TableSchema protoTableSchema;
+    final TableRowToStorageApiProto.SchemaInformation schemaInformation;
+    final Descriptor descriptor;
 
-      @Override
-      public DescriptorWrapper getSchemaDescriptor() {
-        synchronized (this) {
-          return new DescriptorWrapper(descriptor, descriptorHash);
-        }
-      }
+    TableRowConverter(
+        TableSchema tableSchema,
+        TableRowToStorageApiProto.SchemaInformation schemaInformation,
+        Descriptor descriptor) {
+      this.tableSchema = tableSchema;
+      this.protoTableSchema = 
TableRowToStorageApiProto.schemaToProtoTableSchema(tableSchema);
+      this.schemaInformation = schemaInformation;
+      this.descriptor = descriptor;
+    }
 
-      @Override
-      public void refreshSchema(long expectedHash) throws Exception {
-        // When a table is updated, all streams writing to that table will try 
to refresh the
-        // schema. Since we don't want them all querying the table for the 
schema, keep track of
-        // the expected hash and return if it already matches.
-        synchronized (this) {
-          if (expectedHash == descriptorHash) {
-            return;
+    TableRowConverter(DestinationT destination, DatasetService datasetService) 
throws Exception {
+      TableSchema localTableSchema = getSchema(destination);
+      TableReference tableReference = 
getTable(destination).getTableReference();
+      if (localTableSchema == null) {
+        // If the table already exists, then try and fetch the schema from the 
existing
+        // table.
+        localTableSchema = SCHEMA_CACHE.getSchema(tableReference, 
datasetService);
+        if (localTableSchema == null) {
+          if (createDisposition == CreateDisposition.CREATE_NEVER) {
+            throw new RuntimeException(
+                "BigQuery table "
+                    + tableReference
+                    + " not found. If you wanted to "
+                    + "automatically create the table, set the create 
disposition to CREATE_IF_NEEDED and specify a "
+                    + "schema.");
+          } else {
+            throw new RuntimeException(
+                "Schema must be set for table "
+                    + tableReference
+                    + " when writing TableRows using Storage API and "
+                    + "using a create disposition of CREATE_IF_NEEDED.");
           }
         }
-        refreshSchemaInternal();
+      } else {
+        // Make sure we register this schema with the cache, unless there's 
already a more
+        // up-to-date schema.
+        localTableSchema =
+            MoreObjects.firstNonNull(
+                SCHEMA_CACHE.putSchemaIfAbsent(tableReference, 
localTableSchema), localTableSchema);
       }
+      this.tableSchema = localTableSchema;
+      this.protoTableSchema = 
TableRowToStorageApiProto.schemaToProtoTableSchema(tableSchema);
+      schemaInformation =
+          
TableRowToStorageApiProto.SchemaInformation.fromTableSchema(protoTableSchema);
+      descriptor =
+          TableRowToStorageApiProto.getDescriptorFromTableSchema(
+              Preconditions.checkStateNotNull(tableSchema), true);
+    }
 
-      public void refreshSchemaInternal() throws Exception {
-        TableReference tableReference = 
getTable(destination).getTableReference();
-        SCHEMA_CACHE.refreshSchema(tableReference, datasetService);
-        TableSchema newSchema = SCHEMA_CACHE.getSchema(tableReference, 
datasetService);
-        if (newSchema == null) {
-          throw new RuntimeException("BigQuery table " + tableReference + " 
not found");
-        }
-        synchronized (this) {
-          tableSchema = newSchema;
-          schemaInformation =
-              
TableRowToStorageApiProto.SchemaInformation.fromTableSchema(tableSchema);
-          descriptor = 
TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema);
-          long newHash = 
BigQueryUtils.hashSchemaDescriptorDeterministic(descriptor);
-          if (descriptorHash != newHash) {
-            LOG.info(
-                "Refreshed table "
-                    + BigQueryHelpers.toTableSpec(tableReference)
-                    + " has a new schema.");
-          }
-          descriptorHash = newHash;
-        }
-      }
+    @Override
+    public com.google.cloud.bigquery.storage.v1.TableSchema getTableSchema() {
+      return protoTableSchema;
+    }
 
-      @Override
-      public TableRow toTableRow(T element) {
-        return formatFunction.apply(element);
-      }
+    @Override
+    public TableRow toTableRow(T element) {
+      return formatFunction.apply(element);
+    }
 
-      @Override
-      public StorageApiWritePayload toMessage(T element) throws Exception {
-        int attempt = 0;
-        do {
-          TableRowToStorageApiProto.SchemaInformation localSchemaInformation;
-          Descriptor localDescriptor;
-          long localDescriptorHash;
-          synchronized (this) {
-            localSchemaInformation = schemaInformation;
-            localDescriptor = descriptor;
-            localDescriptorHash = descriptorHash;
-          }
-          try {
-            Message msg =
-                TableRowToStorageApiProto.messageFromTableRow(
-                    localSchemaInformation,
-                    localDescriptor,
-                    formatFunction.apply(element),
-                    ignoreUnknownValues);
-            return new AutoValue_StorageApiWritePayload(msg.toByteArray(), 
localDescriptorHash);
-          } catch (SchemaTooNarrowException e) {
-            if (!autoSchemaUpdates || attempt > schemaUpdateRetries) {
-              throw e;
-            }
-            // The input record has fields not found in the schema, and 
ignoreUnknownValues=false.
-            // It's possible that the user has updated the target table with a 
wider schema. Try
-            // to read the target's table schema to see if that is the case.
-            refreshSchemaInternal();
-            ++attempt;
-          }
-        } while (true);
-      }
-    };
-  }
+    @Override
+    public StorageApiWritePayload toMessage(T element) throws Exception {
+      return toMessage(formatFunction.apply(element), true);
+    }
+
+    public StorageApiWritePayload toMessage(TableRow tableRow, boolean 
respectRequired)
+        throws Exception {
+      boolean ignore = ignoreUnknownValues || autoSchemaUpdates;

Review Comment:
   It does not imply that - it is only ignoring unknown values at the prior 
stage so that it can send them on to the writing stage. However this actually 
belongs in the followon PR (since in this PR, there is not yet any handling of 
new schemas) so I'll remove this for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to