gemini-code-assist[bot] commented on code in PR #38058:
URL: https://github.com/apache/beam/pull/38058#discussion_r3036111003


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PatchTableSchemaDoFn.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.ExponentialBackOff;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.cloud.hadoop.util.ApiErrorExtractor;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.KV;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * This DoFn is responsible for updating a BigQuery's table schema. The input 
is a TableSchema
+ * containing only the schema delta (new fields, relaxed fields). It outputs 
elements for all
+ * updated tables, which act as notifcations to the buffering stage that the 
elements can be
+ * retried.
+ */
+public class PatchTableSchemaDoFn<DestinationT extends @NonNull Object, 
ElementT>
+    extends DoFn<KV<DestinationT, TableSchema>, KV<DestinationT, ElementT>> {
+  private final BigQueryServices bqServices;
+  private final StorageApiDynamicDestinations<ElementT, DestinationT> 
dynamicDestinations;
+  private TwoLevelMessageConverterCache<DestinationT, ElementT> 
messageConverters;
+  private transient BigQueryServices.@Nullable DatasetService 
datasetServiceInternal = null;
+  private transient BigQueryServices.@Nullable WriteStreamService 
writeStreamServiceInternal = null;
+
+  PatchTableSchemaDoFn(
+      String operationName,
+      BigQueryServices bqServices,
+      StorageApiDynamicDestinations<ElementT, DestinationT> 
dynamicDestinations) {
+    this.messageConverters = new 
TwoLevelMessageConverterCache<>(operationName);
+    this.bqServices = bqServices;
+    this.dynamicDestinations = dynamicDestinations;
+  }
+
+  private BigQueryServices.DatasetService getDatasetService(PipelineOptions 
pipelineOptions)
+      throws IOException {
+    if (datasetServiceInternal == null) {
+      datasetServiceInternal =
+          
bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class));
+    }
+    return datasetServiceInternal;
+  }
+
+  private BigQueryServices.WriteStreamService 
getWriteStreamService(PipelineOptions pipelineOptions)
+      throws IOException {
+    if (writeStreamServiceInternal == null) {
+      writeStreamServiceInternal =
+          
bqServices.getWriteStreamService(pipelineOptions.as(BigQueryOptions.class));
+    }
+    return writeStreamServiceInternal;
+  }
+
+  @Teardown
+  public void onTeardown() {
+    try {
+      if (datasetServiceInternal != null) {
+        datasetServiceInternal.close();
+        datasetServiceInternal = null;
+      }
+      if (writeStreamServiceInternal != null) {
+        writeStreamServiceInternal.close();
+        writeStreamServiceInternal = null;
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @ProcessElement
+  public void processElement(
+      @Element KV<DestinationT, TableSchema> element,
+      OutputReceiver<KV<DestinationT, @Nullable ElementT>> o,
+      ProcessContext context,
+      PipelineOptions pipelineOptions)
+      throws Exception {
+    dynamicDestinations.setSideInputAccessorFromProcessContext(context);
+    DestinationT destination = element.getKey();
+    TableSchema tableSchemaDiff = element.getValue();
+
+    StorageApiDynamicDestinations.MessageConverter<ElementT> messageConverter =
+        messageConverters.get(
+            destination,
+            dynamicDestinations,
+            pipelineOptions,
+            getDatasetService(pipelineOptions),
+            getWriteStreamService(pipelineOptions));
+    messageConverter.updateSchemaFromTable();
+
+    while (true) {
+      TableSchema baseSchema = messageConverter.getTableSchema();
+      TableSchema updatedSchema = UpgradeTableSchema.mergeSchemas(baseSchema, 
tableSchemaDiff);
+      // Check first to see if the schema still needs updating.
+      if (baseSchema.equals(updatedSchema)) {
+        return;
+      }
+      BackOff backoff =
+          new ExponentialBackOff.Builder()
+              .setMaxElapsedTimeMillis((int) TimeUnit.MINUTES.toMillis(1))
+              .build();
+      boolean schemaOutOfDate = false;
+      Exception lastException = null;
+      do {
+        try {
+          getDatasetService(pipelineOptions)
+              .patchTableSchema(
+                  
dynamicDestinations.getTable(destination).getTableReference(),
+                  
TableRowToStorageApiProto.protoSchemaToTableSchema(updatedSchema));
+          // Indicate that we've patched this schema.
+          o.output(KV.of(destination, null));
+          return;
+        } catch (IOException e) {
+          ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
+          if (errorExtractor.preconditionNotMet(e) || 
errorExtractor.badRequest(e)) {
+            schemaOutOfDate = true;
+            break;
+          } else {
+            lastException = e;
+          }
+        }
+      } while (BackOffUtils.next(com.google.api.client.util.Sleeper.DEFAULT, 
backoff));
+      if (schemaOutOfDate) {
+        // This could be due to an out-of-date schema.
+        messageConverter.updateSchemaFromTable();
+      } else {
+        // We ran out of retries.
+        throw new RuntimeException("Failed to patch table schema.", 
lastException);
+      }
+    }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The `while (true)` loop, combined with the logic that sets `schemaOutOfDate 
= true` on any `badRequest` or `preconditionNotMet` error, can lead to an 
infinite loop. If a schema update is inherently invalid (e.g., attempting an 
incompatible type change), BigQuery will return a `400 Bad Request`. The 
current implementation will interpret this as the local schema being out of 
date, refresh it, and retry the same invalid update indefinitely. A maximum 
retry count or a more specific check on the error reason should be implemented.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java:
##########
@@ -81,125 +103,165 @@ public StorageApiConvertMessages(
   public PCollectionTuple expand(PCollection<KV<DestinationT, ElementT>> 
input) {
     String operationName = input.getName() + "/" + getName();
 
+    @SuppressWarnings({
+      "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+    })
+    ConvertMessagesDoFn<DestinationT, ElementT> convertMessagesDoFn =
+        new ConvertMessagesDoFn<>(
+            dynamicDestinations,
+            bqServices,
+            operationName,
+            failedWritesTag,
+            successfulWritesTag,
+            patchTableSchemaTag,
+            elementsWaitingForSchemaTag,
+            rowMutationFn,
+            badRecordRouter,
+            input.getCoder());
+
     PCollectionTuple result =
         input.apply(
             "Convert to message",
-            ParDo.of(
-                    new ConvertMessagesDoFn<>(
-                        dynamicDestinations,
-                        bqServices,
-                        operationName,
-                        failedWritesTag,
-                        successfulWritesTag,
-                        rowMutationFn,
-                        badRecordRouter,
-                        input.getCoder()))
+            ParDo.of(convertMessagesDoFn)
                 .withOutputTags(
                     successfulWritesTag,
-                    TupleTagList.of(ImmutableList.of(failedWritesTag, 
BAD_RECORD_TAG)))
+                    TupleTagList.of(
+                        ImmutableList.of(
+                            failedWritesTag,
+                            BAD_RECORD_TAG,
+                            patchTableSchemaTag,
+                            elementsWaitingForSchemaTag)))
                 .withSideInputs(dynamicDestinations.getSideInputs()));
     result.get(successfulWritesTag).setCoder(successCoder);
     result.get(failedWritesTag).setCoder(errorCoder);
     
result.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline()));
-    return result;
+    result
+        .get(patchTableSchemaTag)
+        .setCoder(KvCoder.of(destinationCoder, 
ProtoCoder.of(TableSchema.class)));
+    
result.get(elementsWaitingForSchemaTag).setCoder(KvCoder.of(destinationCoder, 
elementCoder));
+
+    final int numShards =
+        input
+            .getPipeline()
+            .getOptions()
+            .as(BigQueryOptions.class)
+            .getSchemaUpgradeBufferingShards();
+
+    // Throttle the stream to the patch-table function so that only a single 
update per table per
+    // second gets processed. The combiner merges incremental schemas, so we 
won't miss any pdates.
+    PCollection<KV<ShardedKey<DestinationT>, ElementT>> tablesPatched =
+        result
+            .get(patchTableSchemaTag)
+            .apply(
+                "rewindow",
+                Window.<KV<DestinationT, TableSchema>>configure()
+                    .triggering(
+                        Repeatedly.forever(
+                            AfterProcessingTime.pastFirstElementInPane()
+                                .plusDelayOf(Duration.standardSeconds(1))))
+                    .discardingFiredPanes())
+            .apply("merge schemas", Combine.perKey(new MergeSchemaCombineFn()))
+            .setCoder(KvCoder.of(destinationCoder, 
ProtoCoder.of(TableSchema.class)))
+            .apply(
+                "Patch table schema",
+                ParDo.of(
+                    new PatchTableSchemaDoFn<>(operationName, bqServices, 
dynamicDestinations)))
+            .setCoder(KvCoder.of(destinationCoder, 
NullableCoder.of(elementCoder)))
+            // We need to make sure that all shards of the buffering transform 
are notified.
+            .apply(
+                "fanout to all shards",
+                FlatMapElements.via(
+                    new SimpleFunction<
+                        KV<DestinationT, ElementT>,
+                        Iterable<KV<ShardedKey<DestinationT>, ElementT>>>() {
+                      @Override
+                      public Iterable<KV<ShardedKey<DestinationT>, ElementT>> 
apply(
+                          KV<DestinationT, ElementT> elem) {
+                        return IntStream.range(0, numShards)
+                            .mapToObj(
+                                i ->
+                                    KV.of(
+                                        
StorageApiConvertMessages.AssignShardFn.getShardedKey(
+                                            elem.getKey(), i, numShards),
+                                        elem.getValue()))
+                            .collect(Collectors.toList());
+                      }
+                    }))
+            .setCoder(
+                KvCoder.of(ShardedKey.Coder.of(destinationCoder), 
NullableCoder.of(elementCoder)))
+            .apply(
+                Window.<KV<ShardedKey<DestinationT>, ElementT>>configure()
+                    .triggering(DefaultTrigger.of()));

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The schema update notification (`tablesPatched`) is emitted in the 
`GlobalWindow` via `ConvertMessagesDoFn.finishBundle`. However, the elements 
waiting for schema updates in `SchemaUpdateHoldingFn` may belong to various 
windows. Since `SchemaUpdateHoldingFn` is a stateful `DoFn`, its state and 
timers are window-scoped. A notification in the `GlobalWindow` will not trigger 
the processing of state held in other windows. While the `pollTimer` fallback 
ensures eventual processing, the 'fast-path' notification mechanism is 
currently ineffective for windowed data.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java:
##########
@@ -81,125 +103,165 @@ public StorageApiConvertMessages(
   public PCollectionTuple expand(PCollection<KV<DestinationT, ElementT>> 
input) {
     String operationName = input.getName() + "/" + getName();
 
+    @SuppressWarnings({
+      "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+    })
+    ConvertMessagesDoFn<DestinationT, ElementT> convertMessagesDoFn =
+        new ConvertMessagesDoFn<>(
+            dynamicDestinations,
+            bqServices,
+            operationName,
+            failedWritesTag,
+            successfulWritesTag,
+            patchTableSchemaTag,
+            elementsWaitingForSchemaTag,
+            rowMutationFn,
+            badRecordRouter,
+            input.getCoder());
+
     PCollectionTuple result =
         input.apply(
             "Convert to message",
-            ParDo.of(
-                    new ConvertMessagesDoFn<>(
-                        dynamicDestinations,
-                        bqServices,
-                        operationName,
-                        failedWritesTag,
-                        successfulWritesTag,
-                        rowMutationFn,
-                        badRecordRouter,
-                        input.getCoder()))
+            ParDo.of(convertMessagesDoFn)
                 .withOutputTags(
                     successfulWritesTag,
-                    TupleTagList.of(ImmutableList.of(failedWritesTag, 
BAD_RECORD_TAG)))
+                    TupleTagList.of(
+                        ImmutableList.of(
+                            failedWritesTag,
+                            BAD_RECORD_TAG,
+                            patchTableSchemaTag,
+                            elementsWaitingForSchemaTag)))
                 .withSideInputs(dynamicDestinations.getSideInputs()));
     result.get(successfulWritesTag).setCoder(successCoder);
     result.get(failedWritesTag).setCoder(errorCoder);
     
result.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline()));
-    return result;
+    result
+        .get(patchTableSchemaTag)
+        .setCoder(KvCoder.of(destinationCoder, 
ProtoCoder.of(TableSchema.class)));
+    
result.get(elementsWaitingForSchemaTag).setCoder(KvCoder.of(destinationCoder, 
elementCoder));
+
+    final int numShards =
+        input
+            .getPipeline()
+            .getOptions()
+            .as(BigQueryOptions.class)
+            .getSchemaUpgradeBufferingShards();
+
+    // Throttle the stream to the patch-table function so that only a single 
update per table per
+    // second gets processed. The combiner merges incremental schemas, so we 
won't miss any pdates.
+    PCollection<KV<ShardedKey<DestinationT>, ElementT>> tablesPatched =
+        result
+            .get(patchTableSchemaTag)
+            .apply(
+                "rewindow",
+                Window.<KV<DestinationT, TableSchema>>configure()
+                    .triggering(
+                        Repeatedly.forever(
+                            AfterProcessingTime.pastFirstElementInPane()
+                                .plusDelayOf(Duration.standardSeconds(1))))
+                    .discardingFiredPanes())
+            .apply("merge schemas", Combine.perKey(new MergeSchemaCombineFn()))
+            .setCoder(KvCoder.of(destinationCoder, 
ProtoCoder.of(TableSchema.class)))
+            .apply(
+                "Patch table schema",
+                ParDo.of(
+                    new PatchTableSchemaDoFn<>(operationName, bqServices, 
dynamicDestinations)))
+            .setCoder(KvCoder.of(destinationCoder, 
NullableCoder.of(elementCoder)))
+            // We need to make sure that all shards of the buffering transform 
are notified.
+            .apply(
+                "fanout to all shards",
+                FlatMapElements.via(
+                    new SimpleFunction<
+                        KV<DestinationT, ElementT>,
+                        Iterable<KV<ShardedKey<DestinationT>, ElementT>>>() {
+                      @Override
+                      public Iterable<KV<ShardedKey<DestinationT>, ElementT>> 
apply(
+                          KV<DestinationT, ElementT> elem) {
+                        return IntStream.range(0, numShards)
+                            .mapToObj(
+                                i ->
+                                    KV.of(
+                                        
StorageApiConvertMessages.AssignShardFn.getShardedKey(
+                                            elem.getKey(), i, numShards),
+                                        elem.getValue()))
+                            .collect(Collectors.toList());
+                      }
+                    }))
+            .setCoder(
+                KvCoder.of(ShardedKey.Coder.of(destinationCoder), 
NullableCoder.of(elementCoder)))
+            .apply(
+                Window.<KV<ShardedKey<DestinationT>, ElementT>>configure()
+                    .triggering(DefaultTrigger.of()));
+
+    // Any elements that are waiting for a schema update are sent to this 
stateful DoFn to be
+    // buffered.
+    // Note: we currently do not provide the DynamicDestinations object access 
to the side input in
+    // this path.
+    // This is because side inputs are not currently available from timer 
callbacks. Since side
+    // inputs are generally
+    // used for getSchema and in this case we read the schema from the table, 
this is unlikely to be
+    // a problem.
+    PCollection<KV<ShardedKey<DestinationT>, ElementT>> shardedWaitingElements 
=
+        result
+            .get(elementsWaitingForSchemaTag)
+            // TODO: Consider using GroupIntoBatchs.withShardingKey to get 
auto sharding here
+            // instead of fixed sharding.
+            .apply("assignShard", ParDo.of(new AssignShardFn<>(numShards)))
+            .setCoder(
+                KvCoder.of(ShardedKey.Coder.of(destinationCoder), 
NullableCoder.of(elementCoder)));
+
+    PCollectionList<KV<ShardedKey<DestinationT>, ElementT>> 
waitingElementsList =
+        PCollectionList.of(shardedWaitingElements).and(tablesPatched);
+    PCollectionTuple retryResult =
+        waitingElementsList
+            .apply("Buffered flatten", Flatten.pCollections())
+            .apply(
+                "bufferElements",
+                ParDo.of(new SchemaUpdateHoldingFn<>(elementCoder, 
convertMessagesDoFn))

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The `convertMessagesDoFn` instance is being used both as a standalone `DoFn` 
in a `ParDo` (line 125) and as an embedded helper inside 
`SchemaUpdateHoldingFn` (line 220). In Apache Beam, `DoFn` instances have 
specific lifecycles (setup, startBundle, finishBundle, teardown) managed by the 
runner. Sharing a `DoFn` instance across different transforms or embedding it 
within another `DoFn` is highly discouraged and can lead to unexpected behavior 
or serialization issues. The shared logic should be refactored into a separate 
helper class that is not a `DoFn`.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java:
##########
@@ -884,64 +969,79 @@ public void process(
               }
             }
           };
-      Instant now = Instant.now();
-      List<AppendRowsContext> contexts = Lists.newArrayList();
-      RetryManager<AppendRowsResponse, AppendRowsContext> retryManager =
-          new RetryManager<>(
-              Duration.standardSeconds(1),
-              Duration.standardSeconds(20),
-              maxRetries,
-              
BigQuerySinkMetrics.throttledTimeCounter(BigQuerySinkMetrics.RpcMethod.APPEND_ROWS));
-      int numAppends = 0;
-      for (SplittingIterable.Value splitValue : messages) {
-        // Handle the case of a row that is too large.
-        if (splitValue.getProtoRows().getSerializedSize() >= maxRequestSize) {
-          if (splitValue.getProtoRows().getSerializedRowsCount() > 1) {
-            // TODO(reuvenlax): Is it worth trying to handle this case by 
splitting the protoRows?
-            // Given that we split
-            // the ProtoRows iterable at 2MB and the max request size is 10MB, 
this scenario seems
-            // nearly impossible.
-            LOG.error(
-                "A request containing more than one row is over the request 
size limit of {}. This is unexpected. All rows in the request will be sent to 
the failed-rows PCollection.",
-                maxRequestSize);
-          }
-          for (int i = 0; i < 
splitValue.getProtoRows().getSerializedRowsCount(); ++i) {
-            org.joda.time.Instant timestamp = 
splitValue.getTimestamps().get(i);
-            TableRow failedRow = splitValue.getFailsafeTableRows().get(i);
-            if (failedRow == null) {
-              ByteString rowBytes = 
splitValue.getProtoRows().getSerializedRows(i);
-              failedRow = appendClientInfo.get().toTableRow(rowBytes, 
Predicates.alwaysTrue());
-            }
-            o.get(failedRowsTag)
-                .outputWithTimestamp(
-                    new BigQueryStorageApiInsertError(
-                        failedRow,
-                        "Row payload too large. Maximum size " + 
maxRequestSize,
-                        tableReference),
-                    timestamp);
-          }
-          int numRowsFailed = 
splitValue.getProtoRows().getSerializedRowsCount();
-          rowsSentToFailedRowsCollection.inc(numRowsFailed);
-          BigQuerySinkMetrics.appendRowsRowStatusCounter(
-                  BigQuerySinkMetrics.RowStatus.FAILED,
-                  BigQuerySinkMetrics.PAYLOAD_TOO_LARGE,
-                  shortTableId)
-              .inc(numRowsFailed);
-        } else {
-          ++numAppends;
-          // RetryManager
-          AppendRowsContext context =
-              new AppendRowsContext(
-                  element.getKey(),
-                  splitValue.getProtoRows(),
-                  splitValue.getTimestamps(),
-                  splitValue.getFailsafeTableRows());
-          contexts.add(context);
-          retryManager.addOperation(runOperation, onError, onSuccess, context);
-          
recordsAppended.inc(splitValue.getProtoRows().getSerializedRowsCount());
-          
appendSizeDistribution.update(context.protoRows.getSerializedRowsCount());
+
+      Optional<RetryManager<AppendRowsResponse, AppendRowsContext>> 
maybeRetryManager =
+          Optional.empty();
+      do {
+        // Each ProtoRows object contains at most 1MB of rows.
+        // TODO: Push messageFromTableRow up to top level. That we we cans 
skip TableRow entirely if
+        // already proto or already schema.
+        Iterable<SplittingIterable.Value> messages =
+            new SplittingIterable(
+                element.getValue(),
+                splitSize,
+                // Unknown field merger
+                (bytes, tableRow) ->
+                    appendClientInfo.get().mergeNewFields(bytes, tableRow, 
ignoreUnknownValues),
+                // Convert back to TableRow
+                bytes -> appendClientInfo.get().toTableRow(bytes, 
Predicates.alwaysTrue()),
+                // Failed rows consumer
+                (failedRow, errorMessage) -> {
+                  o.get(failedRowsTag)
+                      .outputWithTimestamp(
+                          new BigQueryStorageApiInsertError(
+                              failedRow.getValue(), errorMessage, 
tableReference),
+                          failedRow.getTimestamp());
+                  rowsSentToFailedRowsCollection.inc();
+                  BigQuerySinkMetrics.appendRowsRowStatusCounter(
+                          BigQuerySinkMetrics.RowStatus.FAILED,
+                          BigQuerySinkMetrics.PAYLOAD_TOO_LARGE,
+                          shortTableId)
+                      .inc(1);
+                },
+                // Get the currently-known TableSchema hash
+                () -> appendClientInfo.get().getTableSchemaHash(),
+                () ->
+                    TableRowToStorageApiProto.wrapDescriptorProto(
+                        messageConverter.getDescriptor(false)),
+                autoUpdateSchema,
+                elementTs);
+
+        maybeRetryManager =
+            createRetryManager(
+                element.getKey(),
+                messages,
+                runOperation,
+                onError,
+                onSuccess,
+                appendClientInfo.get(),
+                tableReference,
+                shortTableId,
+                o);
+        // createRetryManager returns empty if the schema doesn't match.
+        if (!maybeRetryManager.isPresent()) {
+          LOG.info("Schema out of date: refreshing table schema for {}", 
tableId);
+          // Force the message converter to get the schema again from the 
table.
+          messageConverter.updateSchemaFromTable();
+          // Close all RPC clients that were opened with the old descriptor. 
Clear the cache,
+          // forcing
+          // us to create a new append client with the updated descriptor.
+          
APPEND_CLIENTS.invalidate(messageConverters.getAppendClientKey(element.getKey()));
+          appendClientInfo.set(
+              APPEND_CLIENTS.get(
+                  messageConverters.getAppendClientKey(element.getKey()), 
getAppendClientInfo));
         }
-      }
+      } while (!maybeRetryManager.isPresent());
+      Instant now = Instant.now();
+
+      RetryManager<AppendRowsResponse, AppendRowsContext> retryManager = 
maybeRetryManager.get();
+      new RetryManager<>(
+          Duration.standardSeconds(1),
+          Duration.standardSeconds(20),
+          maxRetries,
+          
BigQuerySinkMetrics.throttledTimeCounter(BigQuerySinkMetrics.RpcMethod.APPEND_ROWS));

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   This block instantiates a new `RetryManager` but does not assign it to any 
variable or use it. It appears to be a leftover from a refactoring or a 
copy-paste error, as the `retryManager` used in the subsequent logic is 
correctly initialized on line 1037.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -503,12 +502,13 @@ SchemaAndDescriptor getCurrentTableSchema(String stream, 
@Nullable TableSchema u
             updated.get()
                 ? TableRowToStorageApiProto.descriptorSchemaFromTableSchema(
                     currentSchema.get(), true, includeCdcColumns)
-                : initialDescriptor;
+                : messageConverter.getDescriptor(includeCdcColumns);
         return new SchemaAndDescriptor(currentSchema.get(), descriptor);
       }
 
       AppendClientInfo getAppendClientInfo(
           boolean lookupCache, final @Nullable TableSchema updatedSchema) {
+        lookupCache = false;

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The `lookupCache` parameter is hardcoded to `false` at the start of the 
method, which makes the parameter passed by callers irrelevant. If this was 
intended for debugging, it should be removed before merging.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SchemaUpdateHoldingFn.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.ExponentialBackOff;
+import com.google.common.collect.Iterables;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.CombiningState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.ShardedKey;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * This is a stateful DoFn that buffers elements that triggered table schema 
update. Once the table
+ * schema has been updated, this reprocesses the messages and allows them to 
continue on through the
+ * sink.
+ */
+public class SchemaUpdateHoldingFn<DestinationT extends @NonNull Object, 
ElementT>
+    extends DoFn<
+        KV<ShardedKey<DestinationT>, @Nullable ElementT>,
+        KV<DestinationT, StorageApiWritePayload>> {
+  private static final Duration POLL_DURATION = Duration.standardSeconds(1);
+
+  @StateId("bufferedElements")
+  private final StateSpec<BagState<TimestampedValue<ElementT>>> bufferedSpec;
+
+  @StateId("minBufferedTimestamp")
+  private final StateSpec<CombiningState<Long, long[], Long>> 
minBufferedTsSpec;
+
+  @StateId("timerTimestamp")
+  private final StateSpec<ValueState<Long>> timerTsSpec;
+
+  @TimerId("pollTimer")
+  private final TimerSpec pollTimerSpec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+  // Noop timer used only for watermark holds.
+  @TimerId("holdTimer")
+  private final TimerSpec holdTimerSpec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+  private final ConvertMessagesDoFn<DestinationT, ElementT> 
convertMessagesDoFn;
+
+  public SchemaUpdateHoldingFn(
+      Coder<ElementT> elementCoder,
+      ConvertMessagesDoFn<DestinationT, ElementT> convertMessagesDoFn) {
+    this.convertMessagesDoFn = convertMessagesDoFn;
+    this.bufferedSpec = 
StateSpecs.bag(TimestampedValue.TimestampedValueCoder.of(elementCoder));
+    this.timerTsSpec = StateSpecs.value();
+
+    Combine.BinaryCombineLongFn minCombineFn =
+        new Combine.BinaryCombineLongFn() {
+          @Override
+          public long identity() {
+            return BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
+          }
+
+          @Override
+          public long apply(long left, long right) {
+            return Math.min(left, right);
+          }
+        };
+    this.minBufferedTsSpec = StateSpecs.combining(minCombineFn);
+  }
+
+  @Teardown
+  public void onTeardown() {
+    convertMessagesDoFn.onTeardown();
+  }
+
+  @ProcessElement
+  public void processElement(
+      @Element KV<ShardedKey<DestinationT>, @Nullable ElementT> element,
+      @Timestamp Instant timestamp,
+      @StateId("bufferedElements") BagState<TimestampedValue<ElementT>> bag,
+      @StateId("minBufferedTimestamp") CombiningState<Long, long[], Long> 
minBufferedTimestamp,
+      @StateId("timerTimestamp") ValueState<Long> timerTs,
+      @TimerId("pollTimer") Timer pollTimer,
+      @TimerId("holdTimer") Timer holdTimer,
+      ProcessContext context,
+      BoundedWindow window,
+      MultiOutputReceiver o)
+      throws Exception {
+    
convertMessagesDoFn.getDynamicDestinations().setSideInputAccessorFromProcessContext(context);
+
+    minBufferedTimestamp.readLater();
+    timerTs.readLater();
+    ElementT value = element.getValue();
+    boolean needsNewTimer = false;
+    if (value != null) {
+      // Buffer the element.
+      bag.add(TimestampedValue.of(value, timestamp));
+      minBufferedTimestamp.add(timestamp.getMillis());
+      needsNewTimer = (timerTs.read() == null);
+    } else {
+      // This means that the table schema was recently updated. Try to flush 
the pending elements.
+      if (tryFlushBuffer(
+          element.getKey().getKey(), context.getPipelineOptions(), bag, 
minBufferedTimestamp, o)) {
+        // Nothing left in buffer. clear timer.
+        pollTimer.clear();
+        timerTs.clear();
+      } else {
+        // We just scanned the buffer, so bump the timer.
+        needsNewTimer = true;
+      }
+    }
+    updateHold(holdTimer, window, 
Instant.ofEpochMilli(minBufferedTimestamp.read()));
+
+    if (needsNewTimer) {
+      Instant newTimerTs = 
pollTimer.getCurrentRelativeTime().plus(POLL_DURATION);
+      pollTimer
+          
.withOutputTimestamp(Instant.ofEpochMilli(minBufferedTimestamp.read()))
+          .set(newTimerTs);
+      timerTs.write(newTimerTs.getMillis());
+    }
+  }
+
+  @Override
+  public Duration getAllowedTimestampSkew() {
+    // This is safe because a watermark hold will always be set using 
timer.withOutputTimestamp.
+    return Duration.millis(Long.MAX_VALUE);
+  }
+
+  // noop. We need this method because we have a holdTimer, but the timer only 
exists to hold the
+  // watermark.
+  @OnTimer("holdTimer")
+  public void onHoldTimer() {}
+
+  @OnTimer("pollTimer")
+  public void onPollTimer(
+      @Key ShardedKey<DestinationT> key,
+      PipelineOptions pipelineOptions,
+      @StateId("bufferedElements") BagState<TimestampedValue<ElementT>> bag,
+      @StateId("minBufferedTimestamp") CombiningState<Long, long[], Long> 
minBufferedTimestamp,
+      @StateId("timerTimestamp") ValueState<Long> timerTs,
+      @TimerId("pollTimer") Timer pollTimer,
+      @TimerId("holdTimer") Timer holdTimer,
+      BoundedWindow window,
+      MultiOutputReceiver o)
+      throws Exception {
+    if (tryFlushBuffer(key.getKey(), pipelineOptions, bag, 
minBufferedTimestamp, o)) {
+      timerTs.clear();
+    } else {
+      // We still have buffered elements. Make sure that the polling timer 
keeps looping.
+      Instant newTimerTs = 
pollTimer.getCurrentRelativeTime().plus(POLL_DURATION);
+      pollTimer
+          
.withOutputTimestamp(Instant.ofEpochMilli(minBufferedTimestamp.read()))
+          .set(newTimerTs);
+      timerTs.write(newTimerTs.getMillis());
+    }
+    updateHold(holdTimer, window, 
Instant.ofEpochMilli(minBufferedTimestamp.read()));
+  }
+
+  @OnWindowExpiration
+  public void onWindowExpiration(
+      @Key ShardedKey<DestinationT> key,
+      PipelineOptions pipelineOptions,
+      @StateId("bufferedElements") BagState<TimestampedValue<ElementT>> bag,
+      @StateId("minBufferedTimestamp") CombiningState<Long, long[], Long> 
minBufferedTimestamp,
+      MultiOutputReceiver o)
+      throws Exception {
+    // This can happen on test completion or drain. We can't set any more 
timers in window
+    // expiration, so we just have to loop until the schema is updated.
+    BackOff backoff =
+        new ExponentialBackOff.Builder()
+            .setMaxElapsedTimeMillis((int) TimeUnit.SECONDS.toMillis(500))
+            .build();
+    do {
+      if (tryFlushBuffer(key.getKey(), pipelineOptions, bag, 
minBufferedTimestamp, o)) {
+        return;
+      }
+    } while (BackOffUtils.next(com.google.api.client.util.Sleeper.DEFAULT, 
backoff));
+    throw new RuntimeException("Failed to flush elements on window 
expiration!");

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Throwing a `RuntimeException` during `onWindowExpiration` is quite 
aggressive for a streaming pipeline. If a schema update is delayed or fails 
permanently, this will cause the entire pipeline to crash and potentially enter 
a crash loop. Consider routing these elements to a dead-letter queue or failing 
them through the standard `WriteResult` mechanism instead of crashing the 
worker.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -743,7 +780,16 @@ long flush(
                     if (failedRow == null) {
                       ByteString protoBytes =
                           
failedContext.protoRows.getSerializedRows(failedIndex);
-                      AppendClientInfo aci = 
Preconditions.checkStateNotNull(appendClientInfo);
+                      AppendClientInfo aci = 
Preconditions.checkStateNotNull(this.appendClientInfo);
+                      // TODO: WHY ARE WE HITTING THIS FAILURE!!!!!!! WE 
successfully reopend the
+                      // connection with
+                      // a new TableSchema, yet Vortex is failing the 
individual rows.
+                      // APPEARS THAT STREAMWRITER isn't getting updated?
+                      LOG.error(
+                          "UNEXPECTED. DUMPING ERROR {}, CONVERTER SCHEMA {}, 
ACI SCHEMA {}",
+                          error.getRowIndexToErrorMessage().get(failedIndex),
+                          messageConverter.getTableSchema(),
+                          aci.getTableSchema());

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   This block contains a debug-style log message with a 'TODO' and multiple 
exclamation marks. This should be cleaned up or converted into a proper 
production-level error log with a clear explanation of the expected failure 
mode.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to