reuvenlax commented on code in PR #24147:
URL: https://github.com/apache/beam/pull/24147#discussion_r1029773805


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java:
##########
@@ -72,128 +65,86 @@ static void clearSchemaCache() throws ExecutionException, 
InterruptedException {
   @Override
   public MessageConverter<T> getMessageConverter(
       DestinationT destination, DatasetService datasetService) throws 
Exception {
-    return new MessageConverter<T>() {
-      @Nullable TableSchema tableSchema;
-      TableRowToStorageApiProto.SchemaInformation schemaInformation;
-      Descriptor descriptor;
-      long descriptorHash;
+    return new TableRowConverter(destination, datasetService);
+  }
 
-      {
-        tableSchema = getSchema(destination);
-        TableReference tableReference = 
getTable(destination).getTableReference();
-        if (tableSchema == null) {
-          // If the table already exists, then try and fetch the schema from 
the existing
-          // table.
-          tableSchema = SCHEMA_CACHE.getSchema(tableReference, datasetService);
-          if (tableSchema == null) {
-            if (createDisposition == CreateDisposition.CREATE_NEVER) {
-              throw new RuntimeException(
-                  "BigQuery table "
-                      + tableReference
-                      + " not found. If you wanted to "
-                      + "automatically create the table, set the create 
disposition to CREATE_IF_NEEDED and specify a "
-                      + "schema.");
-            } else {
-              throw new RuntimeException(
-                  "Schema must be set for table "
-                      + tableReference
-                      + " when writing TableRows using Storage API and "
-                      + "using a create disposition of CREATE_IF_NEEDED.");
-            }
-          }
-        } else {
-          // Make sure we register this schema with the cache, unless there's 
already a more
-          // up-to-date schema.
-          tableSchema =
-              MoreObjects.firstNonNull(
-                  SCHEMA_CACHE.putSchemaIfAbsent(tableReference, tableSchema), 
tableSchema);
-        }
-        schemaInformation =
-            
TableRowToStorageApiProto.SchemaInformation.fromTableSchema(tableSchema);
-        descriptor = 
TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema);
-        descriptorHash = 
BigQueryUtils.hashSchemaDescriptorDeterministic(descriptor);
-      }
+  class TableRowConverter implements MessageConverter<T> {
+    final @Nullable TableSchema tableSchema;
+    final com.google.cloud.bigquery.storage.v1.TableSchema protoTableSchema;
+    final TableRowToStorageApiProto.SchemaInformation schemaInformation;
+    final Descriptor descriptor;
 
-      @Override
-      public DescriptorWrapper getSchemaDescriptor() {
-        synchronized (this) {
-          return new DescriptorWrapper(descriptor, descriptorHash);
-        }
-      }
+    TableRowConverter(
+        TableSchema tableSchema,

Review Comment:
   this.tableSchema - this is the json table schema (the Beam API is written in 
terms of this schema, and that's usually what users give to us)
   
   this.protoTableSchema - the result of translating the json schema into the 
proto TableSchema
   
   this.schemaInformation - some extra information calculated about the schema 
to allow for easy conversion of json row -> proto. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to