gemini-code-assist[bot] commented on code in PR #38058:
URL: https://github.com/apache/beam/pull/38058#discussion_r3030884717


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java:
##########
@@ -81,47 +129,442 @@ public StorageApiConvertMessages(
   public PCollectionTuple expand(PCollection<KV<DestinationT, ElementT>> 
input) {
     String operationName = input.getName() + "/" + getName();
 
+    @SuppressWarnings({
+      "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+    })
+    ConvertMessagesDoFn<DestinationT, ElementT> convertMessagesDoFn =
+        new ConvertMessagesDoFn<>(
+            dynamicDestinations,
+            bqServices,
+            operationName,
+            failedWritesTag,
+            successfulWritesTag,
+            patchTableSchemaTag,
+            elementsWaitingForSchemaTag,
+            rowMutationFn,
+            badRecordRouter,
+            input.getCoder());
+
     PCollectionTuple result =
         input.apply(
             "Convert to message",
-            ParDo.of(
-                    new ConvertMessagesDoFn<>(
-                        dynamicDestinations,
-                        bqServices,
-                        operationName,
-                        failedWritesTag,
-                        successfulWritesTag,
-                        rowMutationFn,
-                        badRecordRouter,
-                        input.getCoder()))
+            ParDo.of(convertMessagesDoFn)
                 .withOutputTags(
                     successfulWritesTag,
-                    TupleTagList.of(ImmutableList.of(failedWritesTag, 
BAD_RECORD_TAG)))
+                    TupleTagList.of(
+                        ImmutableList.of(
+                            failedWritesTag,
+                            BAD_RECORD_TAG,
+                            patchTableSchemaTag,
+                            elementsWaitingForSchemaTag)))
                 .withSideInputs(dynamicDestinations.getSideInputs()));
     result.get(successfulWritesTag).setCoder(successCoder);
     result.get(failedWritesTag).setCoder(errorCoder);
     
result.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline()));
-    return result;
+    result
+        .get(patchTableSchemaTag)
+        .setCoder(KvCoder.of(destinationCoder, 
ProtoCoder.of(TableSchema.class)));
+    
result.get(elementsWaitingForSchemaTag).setCoder(KvCoder.of(destinationCoder, 
elementCoder));
+
+    final int numShards = 1;
+    // Throttle the stream to the patch-table function so that only a single 
update per table per
+    // second gets processed. The combiner merges incremental schemas, so we 
won't miss any pdates.
+    PCollection<KV<ShardedKey<DestinationT>, ElementT>> tablesPatched =
+        result
+            .get(patchTableSchemaTag)
+            .apply(
+                "rewindow",
+                Window.<KV<DestinationT, TableSchema>>configure()
+                    .triggering(
+                        Repeatedly.forever(
+                            AfterProcessingTime.pastFirstElementInPane()
+                                .plusDelayOf(Duration.standardSeconds(1))))
+                    .discardingFiredPanes())
+            .apply("merge schemas", Combine.perKey(new MergeSchemaCombineFn()))
+            .setCoder(KvCoder.of(destinationCoder, 
ProtoCoder.of(TableSchema.class)))
+            .apply(
+                "Patch table schema",
+                ParDo.of(
+                    new PatchTableSchemaDoFn<>(
+                        operationName, bqServices, dynamicDestinations, 
numShards)))
+            .setCoder(
+                KvCoder.of(ShardedKey.Coder.of(destinationCoder), 
NullableCoder.of(elementCoder)))
+            .apply(
+                Window.<KV<ShardedKey<DestinationT>, ElementT>>configure()
+                    .triggering(DefaultTrigger.of()));
+
+    // Any elements that are waiting for a schema update are sent to this 
stateful DoFn to be
+    // buffered.
+    // Note: we currently do not provide the DynamicDestinations object access 
to the side input in
+    // this path.
+    // This is because side inputs are not currently available from timer 
callbacks. Since side
+    // inputs are generally
+    // used for getSchema and in this case we read the schema from the table, 
this is unlikely to be
+    // a problem.
+
+    PCollection<KV<ShardedKey<DestinationT>, ElementT>> shardedWaitingElements 
=
+        result
+            .get(elementsWaitingForSchemaTag)
+            .apply("assignShard", ParDo.of(new AssignShardFn<>(numShards)))
+            .setCoder(
+                KvCoder.of(ShardedKey.Coder.of(destinationCoder), 
NullableCoder.of(elementCoder)));
+
+    PCollectionList<KV<ShardedKey<DestinationT>, ElementT>> 
waitingElementsList =
+        PCollectionList.of(shardedWaitingElements).and(tablesPatched);
+    PCollectionTuple retryResult =
+        waitingElementsList
+            .apply("Buffered flatten", Flatten.pCollections())
+            .apply(
+                "bufferElements",
+                ParDo.of(new SchemaUpdateHoldingFn<>(elementCoder, 
convertMessagesDoFn))
+                    .withOutputTags(
+                        successfulWritesTag,
+                        TupleTagList.of(ImmutableList.of(failedWritesTag, 
BAD_RECORD_TAG))));
+    retryResult.get(successfulWritesTag).setCoder(successCoder);
+    retryResult.get(failedWritesTag).setCoder(errorCoder);
+    
retryResult.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline()));
+
+    // Flatten successes and failures from both the regular transform and the 
retry transform.
+    PCollection<KV<DestinationT, StorageApiWritePayload>> allSuccesses =
+        PCollectionList.of(result.get(successfulWritesTag))
+            .and(retryResult.get(successfulWritesTag))
+            .apply("flattenSuccesses", Flatten.pCollections());
+    PCollection<BigQueryStorageApiInsertError> allFailures =
+        PCollectionList.of(result.get(failedWritesTag))
+            .and(retryResult.get(failedWritesTag))
+            .apply("flattenFailures", Flatten.pCollections());
+    return PCollectionTuple.of(successfulWritesTag, 
allSuccesses).and(failedWritesTag, allFailures);
+  }
+
+  public static class SchemaUpdateHoldingFn<DestinationT extends @NonNull 
Object, ElementT>
+      extends DoFn<
+          KV<ShardedKey<DestinationT>, @Nullable ElementT>,
+          KV<DestinationT, StorageApiWritePayload>> {
+    private static final Duration POLL_DURATION = Duration.standardMinutes(2);
+
+    @StateId("bufferedElements")
+    private final StateSpec<BagState<TimestampedValue<ElementT>>> bufferedSpec;
+
+    @StateId("minBufferedTimestamp")
+    private final StateSpec<CombiningState<Long, long[], Long>> 
minBufferedTsSpec;
+
+    @StateId("timerTimestamp")
+    private final StateSpec<ValueState<Long>> timerTsSpec;
+
+    @TimerId("pollTimer")
+    private final TimerSpec pollTimerSpec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+    // Noop timer used only for watermark holds.
+    @TimerId("holdTimer")
+    private final TimerSpec holdTimerSpec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    private final ConvertMessagesDoFn<DestinationT, ElementT> 
convertMessagesDoFn;
+
+    public SchemaUpdateHoldingFn(
+        Coder<ElementT> elementCoder,
+        ConvertMessagesDoFn<DestinationT, ElementT> convertMessagesDoFn) {
+      this.convertMessagesDoFn = convertMessagesDoFn;
+      this.bufferedSpec = 
StateSpecs.bag(TimestampedValue.TimestampedValueCoder.of(elementCoder));
+      this.timerTsSpec = StateSpecs.value();
+
+      Combine.BinaryCombineLongFn minCombineFn =
+          new Combine.BinaryCombineLongFn() {
+            @Override
+            public long identity() {
+              return BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
+            }
+
+            @Override
+            public long apply(long left, long right) {
+              return Math.min(left, right);
+            }
+          };
+      this.minBufferedTsSpec = StateSpecs.combining(minCombineFn);
+    }
+
+    @Teardown
+    public void onTeardown() {
+      convertMessagesDoFn.onTeardown();
+    }
+
+    @ProcessElement
+    public void processElement(
+        @Element KV<ShardedKey<DestinationT>, @Nullable ElementT> element,
+        @Timestamp Instant timestamp,
+        @StateId("bufferedElements") BagState<TimestampedValue<ElementT>> bag,
+        @StateId("minBufferedTimestamp") CombiningState<Long, long[], Long> 
minBufferedTimestamp,
+        @StateId("timerTimestamp") ValueState<Long> timerTs,
+        @TimerId("pollTimer") Timer pollTimer,
+        @TimerId("holdTimer") Timer holdTimer,
+        ProcessContext context,
+        BoundedWindow window,
+        MultiOutputReceiver o)
+        throws Exception {
+      
convertMessagesDoFn.dynamicDestinations.setSideInputAccessorFromProcessContext(context);
+
+      minBufferedTimestamp.readLater();
+      timerTs.readLater();
+      ElementT value = element.getValue();
+      boolean needsNewTimer = false;
+      if (value != null) {
+        System.err.println("BUFFERING ELEMENT");

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   This debug print should be removed or replaced with a proper logger call. 
`System.err` is not appropriate for production library code as it bypasses 
logging configurations.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java:
##########
@@ -81,47 +129,442 @@ public StorageApiConvertMessages(
   public PCollectionTuple expand(PCollection<KV<DestinationT, ElementT>> 
input) {
     String operationName = input.getName() + "/" + getName();
 
+    @SuppressWarnings({
+      "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+    })
+    ConvertMessagesDoFn<DestinationT, ElementT> convertMessagesDoFn =
+        new ConvertMessagesDoFn<>(
+            dynamicDestinations,
+            bqServices,
+            operationName,
+            failedWritesTag,
+            successfulWritesTag,
+            patchTableSchemaTag,
+            elementsWaitingForSchemaTag,
+            rowMutationFn,
+            badRecordRouter,
+            input.getCoder());
+
     PCollectionTuple result =
         input.apply(
             "Convert to message",
-            ParDo.of(
-                    new ConvertMessagesDoFn<>(
-                        dynamicDestinations,
-                        bqServices,
-                        operationName,
-                        failedWritesTag,
-                        successfulWritesTag,
-                        rowMutationFn,
-                        badRecordRouter,
-                        input.getCoder()))
+            ParDo.of(convertMessagesDoFn)
                 .withOutputTags(
                     successfulWritesTag,
-                    TupleTagList.of(ImmutableList.of(failedWritesTag, 
BAD_RECORD_TAG)))
+                    TupleTagList.of(
+                        ImmutableList.of(
+                            failedWritesTag,
+                            BAD_RECORD_TAG,
+                            patchTableSchemaTag,
+                            elementsWaitingForSchemaTag)))
                 .withSideInputs(dynamicDestinations.getSideInputs()));
     result.get(successfulWritesTag).setCoder(successCoder);
     result.get(failedWritesTag).setCoder(errorCoder);
     
result.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline()));
-    return result;
+    result
+        .get(patchTableSchemaTag)
+        .setCoder(KvCoder.of(destinationCoder, 
ProtoCoder.of(TableSchema.class)));
+    
result.get(elementsWaitingForSchemaTag).setCoder(KvCoder.of(destinationCoder, 
elementCoder));
+
+    final int numShards = 1;
+    // Throttle the stream to the patch-table function so that only a single 
update per table per
+    // second gets processed. The combiner merges incremental schemas, so we 
won't miss any pdates.
+    PCollection<KV<ShardedKey<DestinationT>, ElementT>> tablesPatched =
+        result
+            .get(patchTableSchemaTag)
+            .apply(
+                "rewindow",
+                Window.<KV<DestinationT, TableSchema>>configure()
+                    .triggering(
+                        Repeatedly.forever(
+                            AfterProcessingTime.pastFirstElementInPane()
+                                .plusDelayOf(Duration.standardSeconds(1))))
+                    .discardingFiredPanes())
+            .apply("merge schemas", Combine.perKey(new MergeSchemaCombineFn()))
+            .setCoder(KvCoder.of(destinationCoder, 
ProtoCoder.of(TableSchema.class)))
+            .apply(
+                "Patch table schema",
+                ParDo.of(
+                    new PatchTableSchemaDoFn<>(
+                        operationName, bqServices, dynamicDestinations, 
numShards)))
+            .setCoder(
+                KvCoder.of(ShardedKey.Coder.of(destinationCoder), 
NullableCoder.of(elementCoder)))
+            .apply(
+                Window.<KV<ShardedKey<DestinationT>, ElementT>>configure()
+                    .triggering(DefaultTrigger.of()));
+
+    // Any elements that are waiting for a schema update are sent to this 
stateful DoFn to be
+    // buffered.
+    // Note: we currently do not provide the DynamicDestinations object access 
to the side input in
+    // this path.
+    // This is because side inputs are not currently available from timer 
callbacks. Since side
+    // inputs are generally
+    // used for getSchema and in this case we read the schema from the table, 
this is unlikely to be
+    // a problem.
+
+    PCollection<KV<ShardedKey<DestinationT>, ElementT>> shardedWaitingElements 
=
+        result
+            .get(elementsWaitingForSchemaTag)
+            .apply("assignShard", ParDo.of(new AssignShardFn<>(numShards)))
+            .setCoder(
+                KvCoder.of(ShardedKey.Coder.of(destinationCoder), 
NullableCoder.of(elementCoder)));
+
+    PCollectionList<KV<ShardedKey<DestinationT>, ElementT>> 
waitingElementsList =
+        PCollectionList.of(shardedWaitingElements).and(tablesPatched);
+    PCollectionTuple retryResult =
+        waitingElementsList
+            .apply("Buffered flatten", Flatten.pCollections())
+            .apply(
+                "bufferElements",
+                ParDo.of(new SchemaUpdateHoldingFn<>(elementCoder, 
convertMessagesDoFn))
+                    .withOutputTags(
+                        successfulWritesTag,
+                        TupleTagList.of(ImmutableList.of(failedWritesTag, 
BAD_RECORD_TAG))));
+    retryResult.get(successfulWritesTag).setCoder(successCoder);
+    retryResult.get(failedWritesTag).setCoder(errorCoder);
+    
retryResult.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline()));
+
+    // Flatten successes and failures from both the regular transform and the 
retry transform.
+    PCollection<KV<DestinationT, StorageApiWritePayload>> allSuccesses =
+        PCollectionList.of(result.get(successfulWritesTag))
+            .and(retryResult.get(successfulWritesTag))
+            .apply("flattenSuccesses", Flatten.pCollections());
+    PCollection<BigQueryStorageApiInsertError> allFailures =
+        PCollectionList.of(result.get(failedWritesTag))
+            .and(retryResult.get(failedWritesTag))
+            .apply("flattenFailures", Flatten.pCollections());
+    return PCollectionTuple.of(successfulWritesTag, 
allSuccesses).and(failedWritesTag, allFailures);
+  }
+
+  public static class SchemaUpdateHoldingFn<DestinationT extends @NonNull 
Object, ElementT>
+      extends DoFn<
+          KV<ShardedKey<DestinationT>, @Nullable ElementT>,
+          KV<DestinationT, StorageApiWritePayload>> {
+    private static final Duration POLL_DURATION = Duration.standardMinutes(2);
+
+    @StateId("bufferedElements")
+    private final StateSpec<BagState<TimestampedValue<ElementT>>> bufferedSpec;
+
+    @StateId("minBufferedTimestamp")
+    private final StateSpec<CombiningState<Long, long[], Long>> 
minBufferedTsSpec;
+
+    @StateId("timerTimestamp")
+    private final StateSpec<ValueState<Long>> timerTsSpec;
+
+    @TimerId("pollTimer")
+    private final TimerSpec pollTimerSpec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+    // Noop timer used only for watermark holds.
+    @TimerId("holdTimer")
+    private final TimerSpec holdTimerSpec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    private final ConvertMessagesDoFn<DestinationT, ElementT> 
convertMessagesDoFn;
+
+    public SchemaUpdateHoldingFn(
+        Coder<ElementT> elementCoder,
+        ConvertMessagesDoFn<DestinationT, ElementT> convertMessagesDoFn) {
+      this.convertMessagesDoFn = convertMessagesDoFn;
+      this.bufferedSpec = 
StateSpecs.bag(TimestampedValue.TimestampedValueCoder.of(elementCoder));
+      this.timerTsSpec = StateSpecs.value();
+
+      Combine.BinaryCombineLongFn minCombineFn =
+          new Combine.BinaryCombineLongFn() {
+            @Override
+            public long identity() {
+              return BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
+            }
+
+            @Override
+            public long apply(long left, long right) {
+              return Math.min(left, right);
+            }
+          };
+      this.minBufferedTsSpec = StateSpecs.combining(minCombineFn);
+    }
+
+    @Teardown
+    public void onTeardown() {
+      convertMessagesDoFn.onTeardown();
+    }
+
+    @ProcessElement
+    public void processElement(
+        @Element KV<ShardedKey<DestinationT>, @Nullable ElementT> element,
+        @Timestamp Instant timestamp,
+        @StateId("bufferedElements") BagState<TimestampedValue<ElementT>> bag,
+        @StateId("minBufferedTimestamp") CombiningState<Long, long[], Long> 
minBufferedTimestamp,
+        @StateId("timerTimestamp") ValueState<Long> timerTs,
+        @TimerId("pollTimer") Timer pollTimer,
+        @TimerId("holdTimer") Timer holdTimer,
+        ProcessContext context,
+        BoundedWindow window,
+        MultiOutputReceiver o)
+        throws Exception {
+      
convertMessagesDoFn.dynamicDestinations.setSideInputAccessorFromProcessContext(context);
+
+      minBufferedTimestamp.readLater();
+      timerTs.readLater();
+      ElementT value = element.getValue();
+      boolean needsNewTimer = false;
+      if (value != null) {
+        System.err.println("BUFFERING ELEMENT");
+        // Buffer the element.
+        bag.add(TimestampedValue.of(value, timestamp));
+        minBufferedTimestamp.add(timestamp.getMillis());
+
+        needsNewTimer = (timerTs.read() == null);
+      } else {
+        // This means that the table schema was recently updated. Try to flush 
the pending elements.
+        if (tryFlushBuffer(
+            element.getKey().getKey(),
+            context.getPipelineOptions(),
+            bag,
+            minBufferedTimestamp,
+            holdTimer,
+            window,
+            o)) {
+          // Nothing in buffer. clear timer.
+          pollTimer.clear();
+          timerTs.clear();
+        } else {
+          // We just scanned the buffer, so bump the timer.
+          needsNewTimer = true;
+        }
+      }
+
+      if (needsNewTimer) {
+        Instant newTimerTs = 
pollTimer.getCurrentRelativeTime().plus(POLL_DURATION);
+        pollTimer.set(newTimerTs);
+        timerTs.write(newTimerTs.getMillis());
+      }
+    }
+
+    @Override
+    public Duration getAllowedTimestampSkew() {
+      // This is safe because a watermark hold will always be set using 
timer.withOutputTimestamp.
+      return Duration.millis(Long.MAX_VALUE);
+    }
+
+    @OnTimer("holdTimer")
+    public void onHoldTimer() {
+      // noop
+    }
+
+    @OnTimer("pollTimer")
+    public void onPollTimer(
+        @Key ShardedKey<DestinationT> key,
+        PipelineOptions pipelineOptions,
+        @StateId("bufferedElements") BagState<TimestampedValue<ElementT>> bag,
+        @StateId("minBufferedTimestamp") CombiningState<Long, long[], Long> 
minBufferedTimestamp,
+        @StateId("timerTimestamp") ValueState<Long> timerTs,
+        @TimerId("pollTimer") Timer pollTimer,
+        @TimerId("holdTimer") Timer holdTimer,
+        BoundedWindow window,
+        MultiOutputReceiver o)
+        throws Exception {
+      if (tryFlushBuffer(
+          key.getKey(), pipelineOptions, bag, minBufferedTimestamp, holdTimer, 
window, o)) {
+        timerTs.clear();
+      } else {
+        Instant newTimerTs = 
pollTimer.getCurrentRelativeTime().plus(POLL_DURATION);
+        pollTimer.set(newTimerTs);
+        timerTs.write(newTimerTs.getMillis());
+      }
+    }
+
+    public boolean tryFlushBuffer(
+        DestinationT destination,
+        PipelineOptions pipelineOptions,
+        @StateId("bufferedElements") BagState<TimestampedValue<ElementT>> bag,
+        @StateId("minBufferedTimestamp") CombiningState<Long, long[], Long> 
minBufferedTimestamp,
+        @TimerId("holdTimer") Timer holdTimer,
+        BoundedWindow window,
+        MultiOutputReceiver o)
+        throws Exception {
+      System.err.println("FLUSHING BUFFER " + Iterables.size(bag.read()));
+      // Force an update of the MessageConverter schema.
+      MessageConverter<ElementT> messageConverter =
+          convertMessagesDoFn.messageConverters.get(
+              destination,
+              convertMessagesDoFn.dynamicDestinations,
+              convertMessagesDoFn.getDatasetService(pipelineOptions));
+      messageConverter.updateSchemaFromTable();
+
+      List<TimestampedValue<ElementT>> stillWaiting = Lists.newArrayList();
+      minBufferedTimestamp.clear();
+
+      Iterable<TimestampedValue<KV<DestinationT, ElementT>>> bagElements =
+          Iterables.transform(
+              bag.read(),
+              e -> TimestampedValue.of(KV.of(destination, e.getValue()), 
e.getTimestamp()));
+
+      TableRowToStorageApiProto.ErrorCollector errorCollector =
+          UpgradeTableSchema.newErrorCollector();
+      Iterable<TimestampedValue<KV<DestinationT, ElementT>>> unProcessed =
+          convertMessagesDoFn.handleProcessElements(
+              messageConverter, bagElements, o, errorCollector);
+      if (!errorCollector.isEmpty()) {
+        System.err.println("GOT ERRORS " + messageConverter.getTableSchema());
+        unProcessed.forEach(
+            tv -> {
+              stillWaiting.add(TimestampedValue.of(tv.getValue().getValue(), 
tv.getTimestamp()));
+              minBufferedTimestamp.add(tv.getTimestamp().getMillis());
+            });
+      }
+
+      bag.clear();
+      stillWaiting.forEach(bag::add);
+
+      // Use a dummy timer to hold the watermark to the minimum buffered 
timestamp.
+      if (minBufferedTimestamp.read() == 
BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+        holdTimer.clear();
+      } else {
+        Instant windowEnd = window.maxTimestamp();
+        holdTimer
+            
.withOutputTimestamp(Instant.ofEpochMilli(minBufferedTimestamp.read()))
+            .set(windowEnd);
+      }
+      return stillWaiting.isEmpty();
+    }
   }
 
+  public static class PatchTableSchemaDoFn<DestinationT extends @NonNull 
Object, ElementT>
+      extends DoFn<KV<DestinationT, TableSchema>, KV<ShardedKey<DestinationT>, 
ElementT>> {
+    private final BigQueryServices bqServices;
+    private final StorageApiDynamicDestinations<ElementT, DestinationT> 
dynamicDestinations;
+    private TwoLevelMessageConverterCache<DestinationT, ElementT> 
messageConverters;
+    private transient @Nullable DatasetService datasetServiceInternal = null;
+    final int numShards;
+
+    public PatchTableSchemaDoFn(
+        String operationName,
+        BigQueryServices bqServices,
+        StorageApiDynamicDestinations<ElementT, DestinationT> 
dynamicDestinations,
+        int numShards) {
+      this.messageConverters = new 
TwoLevelMessageConverterCache<>(operationName);
+      this.bqServices = bqServices;
+      this.dynamicDestinations = dynamicDestinations;
+      this.numShards = numShards;
+    }
+
+    private DatasetService getDatasetService(PipelineOptions pipelineOptions) 
throws IOException {
+      if (datasetServiceInternal == null) {
+        datasetServiceInternal =
+            
bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class));
+      }
+      return datasetServiceInternal;
+    }
+
+    @ProcessElement
+    public void processElement(
+        @Element KV<DestinationT, TableSchema> element,
+        OutputReceiver<KV<ShardedKey<DestinationT>, @Nullable ElementT>> o,
+        ProcessContext context,
+        PipelineOptions pipelineOptions)
+        throws Exception {
+      dynamicDestinations.setSideInputAccessorFromProcessContext(context);
+      DestinationT destination = element.getKey();
+      TableSchema tableSchemaDiff = element.getValue();
+
+      MessageConverter<ElementT> messageConverter =
+          messageConverters.get(
+              destination, dynamicDestinations, 
getDatasetService(pipelineOptions));
+      messageConverter.updateSchemaFromTable();
+
+      while (true) {
+        TableSchema baseSchema = messageConverter.getTableSchema();
+        TableSchema updatedSchema = 
UpgradeTableSchema.mergeSchemas(baseSchema, tableSchemaDiff);
+        if (baseSchema.equals(updatedSchema)) {
+          return;
+        }
+        // Check first to see if the schema still needs updating.
+        BackOff backoff =
+            new ExponentialBackOff.Builder()
+                .setMaxElapsedTimeMillis((int) TimeUnit.MINUTES.toMillis(5))
+                .build();
+        boolean schemaOutOfDate = false;
+        do {
+          try {
+            System.err.println("TRYING TO PATCH TO " + updatedSchema);
+
+            getDatasetService(pipelineOptions)
+                .patchTableSchema(
+                    
dynamicDestinations.getTable(destination).getTableReference(),
+                    
TableRowToStorageApiProto.protoSchemaToTableSchema(updatedSchema));
+            System.err.println("DONE PATCHING TO " + element.getValue());
+            // Indicate that we've patched this schema.
+            for (int i = 0; i < numShards; ++i) {
+              // Since the holding transform is sharded, we must broadcast to 
all shards. NB: this
+              // means that reducing
+              // shard count across update is unsafe as it might result in 
lost messages.
+              o.output(KV.of(AssignShardFn.getShardedKey(destination, i, 
numShards), null));
+            }
+            return;
+          } catch (IOException e) {
+            ApiErrorExtractor errorExtractor = new ApiErrorExtractor();

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   `ApiErrorExtractor` is stateless and can be instantiated once outside of the 
retry loop to avoid unnecessary object allocation in each iteration.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java:
##########
@@ -100,8 +144,37 @@ abstract static class SchemaConversionException extends 
Exception {
   }
 
   public static class SchemaTooNarrowException extends 
SchemaConversionException {
-    SchemaTooNarrowException(String msg) {
+    private final String missingField;
+    private final boolean isRepeated;
+    private final boolean isStruct;
+
+    SchemaTooNarrowException(
+        String missingField, String msg, boolean isRepeated, boolean isStruct) 
{
       super(msg);
+      this.missingField = missingField;
+      this.isRepeated = isRepeated;
+      this.isStruct = isStruct;
+      ;

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   There is a redundant semicolon here.
   
   ```suggestion
         this.isStruct = isStruct;
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java:
##########
@@ -163,43 +613,150 @@ public void processElement(
         @Timestamp Instant timestamp,
         MultiOutputReceiver o)
         throws Exception {
+      System.err.println("CONVERTING MESSAGE " + element.getValue());
+      DestinationT destination = element.getKey();
+
       dynamicDestinations.setSideInputAccessorFromProcessContext(c);
+      // Should we do this across the entire bundle instead? Unfortunately 
that doesn't work because
+      // we can't access
+      // side inputs in finishBundle.
       MessageConverter<ElementT> messageConverter =
           messageConverters.get(
-              element.getKey(), dynamicDestinations, 
getDatasetService(pipelineOptions));
+              destination, dynamicDestinations, 
getDatasetService(pipelineOptions));
+      TableRowToStorageApiProto.ErrorCollector errorCollector =
+          UpgradeTableSchema.newErrorCollector();
+      Iterable<TimestampedValue<KV<DestinationT, ElementT>>> unProcessed =
+          handleProcessElements(
+              messageConverter,
+              ImmutableList.of(TimestampedValue.of(element, timestamp)),
+              o,
+              errorCollector);
+      if (!errorCollector.isEmpty()) {
+        System.err.println("GOT ERRORS " + errorCollector.getExceptions());
 
-      RowMutationInformation rowMutationInformation = null;
-      if (rowMutationFn != null) {
-        rowMutationInformation =
-            
Preconditions.checkStateNotNull(rowMutationFn).apply(element.getValue());
+        // Track all errors. Generate schema-update message in finishBundle.
+        BufferedCollectorInformation bufferedCollectorInformation =
+            collectors.computeIfAbsent(
+                destination,
+                d -> new 
BufferedCollectorInformation(messageConverter.getTableSchema()));
+        bufferedCollectorInformation.addCollector(errorCollector, timestamp);
+
+        // Forward the message to the buffering stage to wait for the schema 
to be updated.
+        unProcessed.forEach(
+            tv ->
+                o.get(retryElementsWaitingForSchemaTag)
+                    .outputWithTimestamp(tv.getValue(), tv.getTimestamp()));
       }
-      try {
-        StorageApiWritePayload payload =
-            messageConverter
-                .toMessage(element.getValue(), rowMutationInformation)
-                .withTimestamp(timestamp);
-        o.get(successfulWritesTag).output(KV.of(element.getKey(), payload));
-      } catch (TableRowToStorageApiProto.SchemaConversionException 
conversionException) {
-        TableRow failsafeTableRow;
+    }
+
+    @FinishBundle
+    public void finishBundle(FinishBundleContext c) throws Exception {
+      if (!collectors.isEmpty()) {
+        for (Map.Entry<DestinationT, BufferedCollectorInformation> e : 
collectors.entrySet()) {
+          if (e.getValue().collector != null) {
+            c.output(
+                patchTableSchemaTag,
+                KV.of(
+                    e.getKey(),
+                    UpgradeTableSchema.getIncrementalSchema(
+                        e.getValue().collector, e.getValue().schema)),
+                e.getValue().timestamp,
+                GlobalWindow.INSTANCE);
+          }
+        }
+      }
+      collectors.clear();
+      ;

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   There is a redundant semicolon here.
   
   ```suggestion
         collectors.clear();
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java:
##########
@@ -425,18 +521,38 @@ public interface ThrowingBiFunction<FirstInputT, 
SecondInputT, OutputT> {
                   })
               .put(
                   TableFieldSchema.Type.STRING,
-                  (schemaInformation, value) ->
-                      Preconditions.checkArgumentNotNull(value).toString())
+                  (fullName, value) -> 
Preconditions.checkArgumentNotNull(value).toString())
               .put(
                   TableFieldSchema.Type.JSON,
-                  (schemaInformation, value) ->
-                      Preconditions.checkArgumentNotNull(value).toString())
+                  (fullName, value) -> 
Preconditions.checkArgumentNotNull(value).toString())
               .put(
                   TableFieldSchema.Type.GEOGRAPHY,
-                  (schemaInformation, value) ->
-                      Preconditions.checkArgumentNotNull(value).toString())
+                  (fullName, value) -> 
Preconditions.checkArgumentNotNull(value).toString())
               .build();
 
+  static final HashFunction SCHEMA_HASH_FUNCTION = Hashing.goodFastHash(32);
+
+  public static byte[] tableSchemaHash(TableSchema tableSchema) {
+    return tableSchemaHash("", tableSchema.getFieldsList()).asBytes();
+  }
+
+  public static HashCode tableSchemaHash(String prefix, List<TableFieldSchema> 
fields) {
+    List<HashCode> hashCodes = Lists.newArrayList();
+    for (TableFieldSchema tableFieldSchema : fields) {
+      String name =
+          prefix.isEmpty()
+              ? tableFieldSchema.getName()
+              : String.join(".", prefix, tableFieldSchema.getName());
+      hashCodes.add(SCHEMA_HASH_FUNCTION.hashString(name.toLowerCase(), 
Charset.defaultCharset()));

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Using `Charset.defaultCharset()` can lead to inconsistent results across 
different environments. It is safer to use a fixed charset like 
`StandardCharsets.UTF_8` for hashing.
   
   ```suggestion
         hashCodes.add(SCHEMA_HASH_FUNCTION.hashString(name.toLowerCase(), 
StandardCharsets.UTF_8));
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java:
##########
@@ -81,47 +129,442 @@ public StorageApiConvertMessages(
   public PCollectionTuple expand(PCollection<KV<DestinationT, ElementT>> 
input) {
     String operationName = input.getName() + "/" + getName();
 
+    @SuppressWarnings({
+      "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+    })
+    ConvertMessagesDoFn<DestinationT, ElementT> convertMessagesDoFn =
+        new ConvertMessagesDoFn<>(
+            dynamicDestinations,
+            bqServices,
+            operationName,
+            failedWritesTag,
+            successfulWritesTag,
+            patchTableSchemaTag,
+            elementsWaitingForSchemaTag,
+            rowMutationFn,
+            badRecordRouter,
+            input.getCoder());
+
     PCollectionTuple result =
         input.apply(
             "Convert to message",
-            ParDo.of(
-                    new ConvertMessagesDoFn<>(
-                        dynamicDestinations,
-                        bqServices,
-                        operationName,
-                        failedWritesTag,
-                        successfulWritesTag,
-                        rowMutationFn,
-                        badRecordRouter,
-                        input.getCoder()))
+            ParDo.of(convertMessagesDoFn)
                 .withOutputTags(
                     successfulWritesTag,
-                    TupleTagList.of(ImmutableList.of(failedWritesTag, 
BAD_RECORD_TAG)))
+                    TupleTagList.of(
+                        ImmutableList.of(
+                            failedWritesTag,
+                            BAD_RECORD_TAG,
+                            patchTableSchemaTag,
+                            elementsWaitingForSchemaTag)))
                 .withSideInputs(dynamicDestinations.getSideInputs()));
     result.get(successfulWritesTag).setCoder(successCoder);
     result.get(failedWritesTag).setCoder(errorCoder);
     
result.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline()));
-    return result;
+    result
+        .get(patchTableSchemaTag)
+        .setCoder(KvCoder.of(destinationCoder, 
ProtoCoder.of(TableSchema.class)));
+    
result.get(elementsWaitingForSchemaTag).setCoder(KvCoder.of(destinationCoder, 
elementCoder));
+
+    final int numShards = 1;
+    // Throttle the stream to the patch-table function so that only a single 
update per table per
+    // second gets processed. The combiner merges incremental schemas, so we 
won't miss any pdates.
+    PCollection<KV<ShardedKey<DestinationT>, ElementT>> tablesPatched =
+        result
+            .get(patchTableSchemaTag)
+            .apply(
+                "rewindow",
+                Window.<KV<DestinationT, TableSchema>>configure()
+                    .triggering(
+                        Repeatedly.forever(
+                            AfterProcessingTime.pastFirstElementInPane()
+                                .plusDelayOf(Duration.standardSeconds(1))))
+                    .discardingFiredPanes())
+            .apply("merge schemas", Combine.perKey(new MergeSchemaCombineFn()))
+            .setCoder(KvCoder.of(destinationCoder, 
ProtoCoder.of(TableSchema.class)))
+            .apply(
+                "Patch table schema",
+                ParDo.of(
+                    new PatchTableSchemaDoFn<>(
+                        operationName, bqServices, dynamicDestinations, 
numShards)))
+            .setCoder(
+                KvCoder.of(ShardedKey.Coder.of(destinationCoder), 
NullableCoder.of(elementCoder)))
+            .apply(
+                Window.<KV<ShardedKey<DestinationT>, ElementT>>configure()
+                    .triggering(DefaultTrigger.of()));
+
+    // Any elements that are waiting for a schema update are sent to this 
stateful DoFn to be
+    // buffered.
+    // Note: we currently do not provide the DynamicDestinations object access 
to the side input in
+    // this path.
+    // This is because side inputs are not currently available from timer 
callbacks. Since side
+    // inputs are generally
+    // used for getSchema and in this case we read the schema from the table, 
this is unlikely to be
+    // a problem.
+
+    PCollection<KV<ShardedKey<DestinationT>, ElementT>> shardedWaitingElements 
=
+        result
+            .get(elementsWaitingForSchemaTag)
+            .apply("assignShard", ParDo.of(new AssignShardFn<>(numShards)))
+            .setCoder(
+                KvCoder.of(ShardedKey.Coder.of(destinationCoder), 
NullableCoder.of(elementCoder)));
+
+    PCollectionList<KV<ShardedKey<DestinationT>, ElementT>> 
waitingElementsList =
+        PCollectionList.of(shardedWaitingElements).and(tablesPatched);
+    PCollectionTuple retryResult =
+        waitingElementsList
+            .apply("Buffered flatten", Flatten.pCollections())
+            .apply(
+                "bufferElements",
+                ParDo.of(new SchemaUpdateHoldingFn<>(elementCoder, 
convertMessagesDoFn))
+                    .withOutputTags(
+                        successfulWritesTag,
+                        TupleTagList.of(ImmutableList.of(failedWritesTag, 
BAD_RECORD_TAG))));
+    retryResult.get(successfulWritesTag).setCoder(successCoder);
+    retryResult.get(failedWritesTag).setCoder(errorCoder);
+    
retryResult.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline()));
+
+    // Flatten successes and failures from both the regular transform and the 
retry transform.
+    PCollection<KV<DestinationT, StorageApiWritePayload>> allSuccesses =
+        PCollectionList.of(result.get(successfulWritesTag))
+            .and(retryResult.get(successfulWritesTag))
+            .apply("flattenSuccesses", Flatten.pCollections());
+    PCollection<BigQueryStorageApiInsertError> allFailures =
+        PCollectionList.of(result.get(failedWritesTag))
+            .and(retryResult.get(failedWritesTag))
+            .apply("flattenFailures", Flatten.pCollections());
+    return PCollectionTuple.of(successfulWritesTag, 
allSuccesses).and(failedWritesTag, allFailures);
+  }
+
+  public static class SchemaUpdateHoldingFn<DestinationT extends @NonNull 
Object, ElementT>
+      extends DoFn<
+          KV<ShardedKey<DestinationT>, @Nullable ElementT>,
+          KV<DestinationT, StorageApiWritePayload>> {
+    private static final Duration POLL_DURATION = Duration.standardMinutes(2);
+
+    @StateId("bufferedElements")
+    private final StateSpec<BagState<TimestampedValue<ElementT>>> bufferedSpec;
+
+    @StateId("minBufferedTimestamp")
+    private final StateSpec<CombiningState<Long, long[], Long>> 
minBufferedTsSpec;
+
+    @StateId("timerTimestamp")
+    private final StateSpec<ValueState<Long>> timerTsSpec;
+
+    @TimerId("pollTimer")
+    private final TimerSpec pollTimerSpec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+    // Noop timer used only for watermark holds.
+    @TimerId("holdTimer")
+    private final TimerSpec holdTimerSpec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    private final ConvertMessagesDoFn<DestinationT, ElementT> 
convertMessagesDoFn;
+
+    public SchemaUpdateHoldingFn(
+        Coder<ElementT> elementCoder,
+        ConvertMessagesDoFn<DestinationT, ElementT> convertMessagesDoFn) {
+      this.convertMessagesDoFn = convertMessagesDoFn;
+      this.bufferedSpec = 
StateSpecs.bag(TimestampedValue.TimestampedValueCoder.of(elementCoder));
+      this.timerTsSpec = StateSpecs.value();
+
+      Combine.BinaryCombineLongFn minCombineFn =
+          new Combine.BinaryCombineLongFn() {
+            @Override
+            public long identity() {
+              return BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
+            }
+
+            @Override
+            public long apply(long left, long right) {
+              return Math.min(left, right);
+            }
+          };
+      this.minBufferedTsSpec = StateSpecs.combining(minCombineFn);
+    }
+
+    @Teardown
+    public void onTeardown() {
+      convertMessagesDoFn.onTeardown();
+    }
+
+    @ProcessElement
+    public void processElement(
+        @Element KV<ShardedKey<DestinationT>, @Nullable ElementT> element,
+        @Timestamp Instant timestamp,
+        @StateId("bufferedElements") BagState<TimestampedValue<ElementT>> bag,
+        @StateId("minBufferedTimestamp") CombiningState<Long, long[], Long> 
minBufferedTimestamp,
+        @StateId("timerTimestamp") ValueState<Long> timerTs,
+        @TimerId("pollTimer") Timer pollTimer,
+        @TimerId("holdTimer") Timer holdTimer,
+        ProcessContext context,
+        BoundedWindow window,
+        MultiOutputReceiver o)
+        throws Exception {
+      
convertMessagesDoFn.dynamicDestinations.setSideInputAccessorFromProcessContext(context);
+
+      minBufferedTimestamp.readLater();
+      timerTs.readLater();
+      ElementT value = element.getValue();
+      boolean needsNewTimer = false;
+      if (value != null) {
+        System.err.println("BUFFERING ELEMENT");
+        // Buffer the element.
+        bag.add(TimestampedValue.of(value, timestamp));
+        minBufferedTimestamp.add(timestamp.getMillis());
+
+        needsNewTimer = (timerTs.read() == null);
+      } else {
+        // This means that the table schema was recently updated. Try to flush 
the pending elements.
+        if (tryFlushBuffer(
+            element.getKey().getKey(),
+            context.getPipelineOptions(),
+            bag,
+            minBufferedTimestamp,
+            holdTimer,
+            window,
+            o)) {
+          // Nothing in buffer. clear timer.
+          pollTimer.clear();
+          timerTs.clear();
+        } else {
+          // We just scanned the buffer, so bump the timer.
+          needsNewTimer = true;
+        }
+      }
+
+      if (needsNewTimer) {
+        Instant newTimerTs = 
pollTimer.getCurrentRelativeTime().plus(POLL_DURATION);
+        pollTimer.set(newTimerTs);
+        timerTs.write(newTimerTs.getMillis());
+      }
+    }
+
+    @Override
+    public Duration getAllowedTimestampSkew() {
+      // This is safe because a watermark hold will always be set using 
timer.withOutputTimestamp.
+      return Duration.millis(Long.MAX_VALUE);
+    }
+
+    @OnTimer("holdTimer")
+    public void onHoldTimer() {
+      // noop
+    }
+
+    @OnTimer("pollTimer")
+    public void onPollTimer(
+        @Key ShardedKey<DestinationT> key,
+        PipelineOptions pipelineOptions,
+        @StateId("bufferedElements") BagState<TimestampedValue<ElementT>> bag,
+        @StateId("minBufferedTimestamp") CombiningState<Long, long[], Long> 
minBufferedTimestamp,
+        @StateId("timerTimestamp") ValueState<Long> timerTs,
+        @TimerId("pollTimer") Timer pollTimer,
+        @TimerId("holdTimer") Timer holdTimer,
+        BoundedWindow window,
+        MultiOutputReceiver o)
+        throws Exception {
+      if (tryFlushBuffer(
+          key.getKey(), pipelineOptions, bag, minBufferedTimestamp, holdTimer, 
window, o)) {
+        timerTs.clear();
+      } else {
+        Instant newTimerTs = 
pollTimer.getCurrentRelativeTime().plus(POLL_DURATION);
+        pollTimer.set(newTimerTs);
+        timerTs.write(newTimerTs.getMillis());
+      }
+    }
+
+    public boolean tryFlushBuffer(
+        DestinationT destination,
+        PipelineOptions pipelineOptions,
+        @StateId("bufferedElements") BagState<TimestampedValue<ElementT>> bag,
+        @StateId("minBufferedTimestamp") CombiningState<Long, long[], Long> 
minBufferedTimestamp,
+        @TimerId("holdTimer") Timer holdTimer,
+        BoundedWindow window,
+        MultiOutputReceiver o)
+        throws Exception {
+      System.err.println("FLUSHING BUFFER " + Iterables.size(bag.read()));
+      // Force an update of the MessageConverter schema.
+      MessageConverter<ElementT> messageConverter =
+          convertMessagesDoFn.messageConverters.get(
+              destination,
+              convertMessagesDoFn.dynamicDestinations,
+              convertMessagesDoFn.getDatasetService(pipelineOptions));
+      messageConverter.updateSchemaFromTable();
+
+      List<TimestampedValue<ElementT>> stillWaiting = Lists.newArrayList();
+      minBufferedTimestamp.clear();
+
+      Iterable<TimestampedValue<KV<DestinationT, ElementT>>> bagElements =
+          Iterables.transform(
+              bag.read(),
+              e -> TimestampedValue.of(KV.of(destination, e.getValue()), 
e.getTimestamp()));
+
+      TableRowToStorageApiProto.ErrorCollector errorCollector =
+          UpgradeTableSchema.newErrorCollector();
+      Iterable<TimestampedValue<KV<DestinationT, ElementT>>> unProcessed =
+          convertMessagesDoFn.handleProcessElements(
+              messageConverter, bagElements, o, errorCollector);
+      if (!errorCollector.isEmpty()) {
+        System.err.println("GOT ERRORS " + messageConverter.getTableSchema());
+        unProcessed.forEach(
+            tv -> {
+              stillWaiting.add(TimestampedValue.of(tv.getValue().getValue(), 
tv.getTimestamp()));
+              minBufferedTimestamp.add(tv.getTimestamp().getMillis());
+            });
+      }
+
+      bag.clear();
+      stillWaiting.forEach(bag::add);
+
+      // Use a dummy timer to hold the watermark to the minimum buffered 
timestamp.
+      if (minBufferedTimestamp.read() == 
BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+        holdTimer.clear();
+      } else {
+        Instant windowEnd = window.maxTimestamp();
+        holdTimer
+            
.withOutputTimestamp(Instant.ofEpochMilli(minBufferedTimestamp.read()))
+            .set(windowEnd);
+      }
+      return stillWaiting.isEmpty();
+    }
   }
 
+  public static class PatchTableSchemaDoFn<DestinationT extends @NonNull 
Object, ElementT>
+      extends DoFn<KV<DestinationT, TableSchema>, KV<ShardedKey<DestinationT>, 
ElementT>> {
+    private final BigQueryServices bqServices;
+    private final StorageApiDynamicDestinations<ElementT, DestinationT> 
dynamicDestinations;
+    private TwoLevelMessageConverterCache<DestinationT, ElementT> 
messageConverters;
+    private transient @Nullable DatasetService datasetServiceInternal = null;
+    final int numShards;
+
+    public PatchTableSchemaDoFn(
+        String operationName,
+        BigQueryServices bqServices,
+        StorageApiDynamicDestinations<ElementT, DestinationT> 
dynamicDestinations,
+        int numShards) {
+      this.messageConverters = new 
TwoLevelMessageConverterCache<>(operationName);
+      this.bqServices = bqServices;
+      this.dynamicDestinations = dynamicDestinations;
+      this.numShards = numShards;
+    }
+
+    private DatasetService getDatasetService(PipelineOptions pipelineOptions) 
throws IOException {
+      if (datasetServiceInternal == null) {
+        datasetServiceInternal =
+            
bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class));
+      }
+      return datasetServiceInternal;
+    }
+
+    @ProcessElement
+    public void processElement(
+        @Element KV<DestinationT, TableSchema> element,
+        OutputReceiver<KV<ShardedKey<DestinationT>, @Nullable ElementT>> o,
+        ProcessContext context,
+        PipelineOptions pipelineOptions)
+        throws Exception {
+      dynamicDestinations.setSideInputAccessorFromProcessContext(context);
+      DestinationT destination = element.getKey();
+      TableSchema tableSchemaDiff = element.getValue();
+
+      MessageConverter<ElementT> messageConverter =
+          messageConverters.get(
+              destination, dynamicDestinations, 
getDatasetService(pipelineOptions));
+      messageConverter.updateSchemaFromTable();
+
+      while (true) {
+        TableSchema baseSchema = messageConverter.getTableSchema();
+        TableSchema updatedSchema = 
UpgradeTableSchema.mergeSchemas(baseSchema, tableSchemaDiff);
+        if (baseSchema.equals(updatedSchema)) {
+          return;
+        }
+        // Check first to see if the schema still needs updating.
+        BackOff backoff =
+            new ExponentialBackOff.Builder()
+                .setMaxElapsedTimeMillis((int) TimeUnit.MINUTES.toMillis(5))
+                .build();
+        boolean schemaOutOfDate = false;
+        do {
+          try {
+            System.err.println("TRYING TO PATCH TO " + updatedSchema);

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   This debug print should be removed or replaced with a proper logger call.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to