gemini-code-assist[bot] commented on code in PR #38854:
URL: https://github.com/apache/beam/pull/38854#discussion_r3382159098


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java:
##########
@@ -1096,10 +1239,109 @@ public void process(
           java.time.Duration timeElapsed = java.time.Duration.between(now, 
Instant.now());
           appendLatencyDistribution.update(timeElapsed.toMillis());
         }
+        processMismatchedRows.accept(mismatchedRows);
       }
       idleTimer.offset(streamIdleTime).withNoOutputTimestamp().setRelative();
     }
 
+    @OnTimer("retryMismatchedRowsTimer")
+    public void onMismatchedRowsTimer(
+        PipelineOptions pipelineOptions,
+        @Key ShardedKey<DestinationT> shardedDestination,
+        @Timestamp org.joda.time.Instant elementTs,
+        @AlwaysFetched @StateId("streamName") ValueState<String> streamName,
+        @AlwaysFetched @StateId("streamOffset") ValueState<Long> streamOffset,
+        @StateId("updatedSchema") ValueState<TableSchema> updatedSchema,
+        @TimerId("idleTimer") Timer idleTimer,
+        MultiOutputReceiver o,
+        @TimerId("retryMismatchedRowsTimer") Timer retryRowsTimer,
+        @StateId("mismatchedRows") BagState<MismatchedRow> mismatchedRowsBag,
+        @StateId("currentMismatchedRowTimerValue") ValueState<Long> 
currentTimerValue,
+        @StateId("minPendingTimestamp") ValueState<Long> minPendingTimestamp)
+        throws Exception {
+      System.err.println("RETRY TIMER " + Instant.now());
+      mismatchedRowsBag.readLater();
+      currentTimerValue.readLater();
+      minPendingTimestamp.readLater();
+
+      // TODOTDO XXX add context for side inputs.

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The side input accessor for `dynamicDestinations` is not set during the 
timer execution. If `dynamicDestinations` relies on side inputs, calling 
`dynamicDestinations.getTable(...)` on line 1272 will fail at runtime. You 
should add `OnTimerContext` to the `onMismatchedRowsTimer` method parameters 
and use it to set the side input accessor on `dynamicDestinations` before 
accessing the table destination.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BufferMismatchedRows.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.StreamSupport;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.ShardedKey;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+class BufferMismatchedRows<DestinationT extends @NonNull Object, ElementT>
+    extends PTransform<PCollection<KV<DestinationT, MismatchedRow>>, 
PCollectionTuple> {
+  private final Coder<BigQueryStorageApiInsertError> failedRowsCoder;
+  private final Coder<TableRow> successfulRowsCoder;
+  private final Coder<DestinationT> destinationCoder;
+  private final StorageApiDynamicDestinations<ElementT, DestinationT> 
dynamicDestinations;
+  private final StorageApiWriteUnshardedRecords.WriteRecordsDoFn<DestinationT, 
ElementT> writeDoFn;
+  private final TupleTag<BigQueryStorageApiInsertError> failedRowsTag;
+  private final @Nullable TupleTag<TableRow> successfulRowsTag;
+  // This output is effectively ignored, since we only support this code path 
for
+  // StorageApiWriteRecordsInconsistent.
+  private final TupleTag<KV<String, String>> finalizeTag = new 
TupleTag<>("finalizeTag");
+  private static final int NUM_DEFAULT_SHARDS = 20;
+
+  public BufferMismatchedRows(
+      Coder<BigQueryStorageApiInsertError> failedRowsCoder,
+      Coder<TableRow> successfulRowsCoder,
+      Coder<DestinationT> destinationCoder,
+      StorageApiDynamicDestinations<ElementT, DestinationT> 
dynamicDestinations,
+      StorageApiWriteUnshardedRecords.WriteRecordsDoFn<DestinationT, ElementT> 
writeDoFn,
+      TupleTag<BigQueryStorageApiInsertError> failedRowsTag,
+      @Nullable TupleTag<TableRow> successfulRowsTag) {
+    this.failedRowsCoder = failedRowsCoder;
+    this.successfulRowsCoder = successfulRowsCoder;
+    this.destinationCoder = destinationCoder;
+    this.dynamicDestinations = dynamicDestinations;
+    this.writeDoFn = writeDoFn;
+    this.failedRowsTag = failedRowsTag;
+    this.successfulRowsTag = successfulRowsTag;
+  }
+
+  @Override
+  public PCollectionTuple expand(PCollection<KV<DestinationT, MismatchedRow>> 
input) {
+    // Append records to the Storage API streams.
+    TupleTagList tupleTagList = TupleTagList.of(failedRowsTag);
+    if (successfulRowsTag != null) {
+      tupleTagList = tupleTagList.and(successfulRowsTag);
+    }
+
+    PCollectionTuple result =
+        input
+            .apply(
+                "addShard",
+                ParDo.of(
+                    new DoFn<
+                        KV<DestinationT, MismatchedRow>,
+                        KV<ShardedKey<DestinationT>, MismatchedRow>>() {
+                      int shardNumber;
+
+                      @Setup
+                      public void setup() {
+                        shardNumber = 
ThreadLocalRandom.current().nextInt(NUM_DEFAULT_SHARDS);
+                      }
+
+                      @ProcessElement
+                      public void process(
+                          @Element KV<DestinationT, MismatchedRow> element,
+                          OutputReceiver<KV<ShardedKey<DestinationT>, 
MismatchedRow>> o) {
+                        ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES);
+                        buffer.putInt(++shardNumber % NUM_DEFAULT_SHARDS);
+                        o.output(
+                            KV.of(
+                                ShardedKey.of(element.getKey(), 
buffer.array()),
+                                element.getValue()));
+                      }
+                    }))
+            .setCoder(KvCoder.of(ShardedKey.Coder.of(destinationCoder), 
MismatchedRow.Coder.of()))
+            .apply(
+                "bufferMismatchedRows",
+                ParDo.of(new BufferingDoFn(writeDoFn))
+                    .withOutputTags(finalizeTag, tupleTagList)
+                    .withSideInputs(dynamicDestinations.getSideInputs()));
+
+    result.get(failedRowsTag).setCoder(failedRowsCoder);
+    if (successfulRowsTag != null) {
+      result.get(successfulRowsTag).setCoder(successfulRowsCoder);
+    }
+    return result;
+  }
+
+  class BufferingDoFn
+      extends DoFn<KV<ShardedKey<DestinationT>, MismatchedRow>, KV<String, 
String>> {
+    private final 
StorageApiWriteUnshardedRecords.WriteRecordsDoFn<DestinationT, ElementT>
+        writeDoFn;
+
+    @StateId("mismatchedRows")
+    private final StateSpec<BagState<MismatchedRow>> mismatchedRowsSpec =
+        StateSpecs.bag(MismatchedRow.Coder.of());
+
+    @TimerId("retryMismatchedRowsTimer")
+    private final TimerSpec mismatchedRowsTimerSpec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+    @StateId("currentMismatchedRowTimerValue")
+    private final StateSpec<ValueState<Long>> 
currentMismatchedRowTimerValueSpec =
+        StateSpecs.value();
+
+    @StateId("minPendingTimestamp")
+    private final StateSpec<ValueState<Long>> minPendingTimestampSpec = 
StateSpecs.value();
+
+    private final Counter rowsSentToFailedRowsCollection =
+        Metrics.counter(BufferMismatchedRows.BufferingDoFn.class, 
"rowsSentToFailedRowsCollection");
+
+    private final Duration RETRY_MISMATCHED_ROWS_PERIOD = 
Duration.standardMinutes(1);
+
+    public BufferingDoFn(
+        StorageApiWriteUnshardedRecords.WriteRecordsDoFn<DestinationT, 
ElementT> writeDoFn) {
+      this.writeDoFn = writeDoFn;
+    }
+
+    @StartBundle
+    public void startBundle() throws IOException {
+      writeDoFn.startBundle();
+    }
+
+    @ProcessElement
+    public void process(
+        ProcessContext processContext,
+        @Element KV<ShardedKey<DestinationT>, MismatchedRow> element,
+        @StateId("mismatchedRows") BagState<MismatchedRow> mismatchedRowsBag,
+        @TimerId("retryMismatchedRowsTimer") Timer retryTimer,
+        @StateId("currentMismatchedRowTimerValue") ValueState<Long> 
currentTimerValue,
+        @StateId("minPendingTimestamp") ValueState<Long> minPendingTimestamp,
+        MultiOutputReceiver o)
+        throws Exception {
+      
dynamicDestinations.setSideInputAccessorFromProcessContext(processContext);
+      TableDestination tableDestination = 
dynamicDestinations.getTable(element.getKey().getKey());
+
+      SchemaChangeDetectorHelper.bufferMismatchedRows(
+          Collections.singleton(element.getValue()),
+          mismatchedRowsBag,
+          retryTimer,
+          currentTimerValue,
+          minPendingTimestamp,
+          tableDestination,
+          o.get(failedRowsTag),
+          null,
+          rowsSentToFailedRowsCollection,
+          RETRY_MISMATCHED_ROWS_PERIOD);
+    }
+
+    @OnTimer("retryMismatchedRowsTimer")
+    public void onTimer(
+        @Key ShardedKey<DestinationT> shardedDestination,
+        @Timestamp Instant timestamp,
+        @StateId("mismatchedRows") BagState<MismatchedRow> mismatchedRowsBag,
+        @StateId("currentMismatchedRowTimerValue") ValueState<Long> 
currentTimerValue,
+        @StateId("minPendingTimestamp") ValueState<Long> minPendingTimestamp,
+        @TimerId("retryMismatchedRowsTimer") Timer retryTimer,
+        PipelineOptions pipelineOptions,
+        MultiOutputReceiver o)
+        throws Exception {
+      // TODO XXX SET DYNAMIC DESTINATIONS

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The side input accessor for `dynamicDestinations` is not set during the 
timer execution. If `dynamicDestinations` relies on side inputs, calling 
`dynamicDestinations.getTable(...)` on line 238 will fail at runtime. You 
should add `OnTimerContext` (or `WindowContext`) to the `onTimer` method 
parameters and use it to set the side input accessor on `dynamicDestinations` 
before accessing any tables.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java:
##########
@@ -767,17 +790,18 @@ public void process(
         final PipelineOptions pipelineOptions,
         @Element KV<ShardedKey<DestinationT>, 
Iterable<StorageApiWritePayload>> element,
         @Timestamp org.joda.time.Instant elementTs,
-        final @AlwaysFetched @StateId("streamName") ValueState<String> 
streamName,
-        final @AlwaysFetched @StateId("streamOffset") ValueState<Long> 
streamOffset,
-        final @StateId("updatedSchema") ValueState<TableSchema> updatedSchema,
+        @AlwaysFetched @StateId("streamName") ValueState<String> streamName,
+        @AlwaysFetched @StateId("streamOffset") ValueState<Long> streamOffset,
+        @StateId("updatedSchema") ValueState<TableSchema> updatedSchema,
         @TimerId("idleTimer") Timer idleTimer,
+        @StateId("mismatchedRows") BagState<MismatchedRow> mismatchedRowsBag,
+        @TimerId("retryMismatchedRowsTimer") Timer mismatchedRowsRetryTimer,
+        @StateId("currentMismatchedRowTimerValue") ValueState<Long> 
mismatchedRowsRetryTimerValue,
+        @StateId("minPendingTimestamp") ValueState<Long> minPendingTimestamp,
         final MultiOutputReceiver o)
         throws Exception {
-      BigQueryOptions bigQueryOptions = 
pipelineOptions.as(BigQueryOptions.class);
-
-      if (autoUpdateSchema) {
-        updatedSchema.readLater();
-      }
+      Instant now = Instant.now();
+      System.err.println("PROCESS " + now);

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Remove leftover debugging `System.err.println` statements and the unused 
`now` variable.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -592,16 +570,20 @@ long flush(
                 Preconditions.checkStateNotNull(
                     getAppendClientInfo(
                         false, null /* read updated schema from 
messageConverter */));
+            // Convert back to an Iterable<StorageApiWritePayload> to try 
again. We can't reuse the
+            // existing
+            // payloadsToIterate as if there are any failures in it, that 
would cause us to output
+            // them again to
+            // the dead-letter collection, resulting in duplicate outputs to 
dead letter.
+            payloadsToIterate = () -> 
processingRows.toPayloadStream().iterator();
           }
         } while (schemaMismatchSeen && BackOffUtils.next(Sleeper.DEFAULT, 
backoff));
 
         pendingMessages.clear();

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   If all rows in the batch are mismatched, they are buffered for later retry, 
leaving `rowsToProcess` empty. If we proceed without checking, an empty 
operation is added to the `retryManager`, resulting in an unnecessary empty RPC 
call to BigQuery. We should return early if there are no rows to process.
   
   ```java
           pendingMessages.clear();
           if (rowsToProcess.getProtoRows().getSerializedRowsCount() == 0) {
             return 0;
           }
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpgradeTableSchema.java:
##########
@@ -305,4 +315,86 @@ private static boolean hasUnknownFields(Message message) {
     // If we reach here, neither this message nor its descendants have unknown 
fields
     return false;
   }
+
+  public static boolean missingUnknownField(
+      TableRow unknownFields, ThrowingSupplier<Descriptors.Descriptor> 
schemaDescriptor)
+      throws Exception {
+    @Nullable Object fValue = unknownFields.get("f");
+    if (fValue instanceof List) {
+      List<?> cells = (List<?>) fValue;
+      return missingUnknownField(cells, schemaDescriptor.get());
+    } else {
+      return missingUnknownField(unknownFields, schemaDescriptor.get());
+    }
+  }
+
+  public static boolean missingUnknownField(
+      List<?> unknownFields, Descriptors.Descriptor schemaDescriptor) {
+    for (int i = 0; i < unknownFields.size(); i++) {
+      Object cell = unknownFields.get(i);
+      Object value;
+      if (cell instanceof TableCell) {
+        value = ((TableCell) cell).getV();
+      } else if (cell instanceof Map) {
+        value = ((Map<?, ?>) cell).get("v");
+      } else {
+        value = cell;
+      }
+      if (i >= schemaDescriptor.getFields().size()) {
+        return true;
+      }
+      Descriptors.FieldDescriptor fieldDescriptor = 
schemaDescriptor.getFields().get(i);
+      if (missingUnknownFieldObject(value, fieldDescriptor)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public static boolean missingUnknownField(
+      AbstractMap<String, Object> unknownFields, Descriptors.Descriptor 
schemaDescriptor) {
+    for (Map.Entry<String, Object> entry : unknownFields.entrySet()) {
+      String key = entry.getKey().toLowerCase();
+      String protoFieldName =
+          BigQuerySchemaUtil.isProtoCompatible(key)
+              ? key
+              : BigQuerySchemaUtil.generatePlaceholderFieldName(key);
+
+      Descriptors.FieldDescriptor fieldDescriptor =
+          schemaDescriptor.findFieldByName(protoFieldName);
+      if (fieldDescriptor == null) {
+        return true;
+      }
+      Object value = entry.getValue();
+      if (value == null) {
+        continue;
+      }
+      if (missingUnknownFieldObject(value, fieldDescriptor)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private static boolean missingUnknownFieldObject(
+      @Nullable Object value, Descriptors.FieldDescriptor fieldDescriptor) {
+    if (value == null) {
+      return false;
+    }
+
+    if (fieldDescriptor.getType() != Descriptors.FieldDescriptor.Type.MESSAGE) 
{
+      return false;
+    }
+    if (value instanceof List) {
+      for (Object element : (List<?>) value) {
+        if (missingUnknownFieldObject(element, fieldDescriptor)) {
+          return true;
+        }
+      }
+      return false;
+    } else if (value instanceof TableRow) {
+      return missingUnknownField((TableRow) value, 
fieldDescriptor.getMessageType());
+    }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Check for `AbstractMap` instead of `TableRow` to ensure that nested records 
represented as standard Java maps (e.g., `LinkedHashMap`) are also correctly 
checked for missing unknown fields.
   
   ```suggestion
       } else if (value instanceof AbstractMap) {
         return missingUnknownField((AbstractMap<String, Object>) value, 
fieldDescriptor.getMessageType());
       }
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java:
##########
@@ -793,11 +817,86 @@ public void process(
                     dest);
                 return tableDestination1;
               });
+
+      StorageApiDynamicDestinations.MessageConverter<?> messageConverter =
+          messageConverters.get(
+              element.getKey().getKey(),
+              dynamicDestinations,
+              pipelineOptions,
+              getDatasetService(pipelineOptions),
+              getWriteStreamService(pipelineOptions));
+
+      ThrowingConsumer<Exception, Iterable<MismatchedRow>> 
processMismatchedRows =
+          mismatchedRows -> {
+            if (!Iterables.isEmpty(mismatchedRows)) {
+              AppendClientInfo info =
+                  AppendClientInfo.of(
+                      
Preconditions.checkStateNotNull(messageConverter.getTableSchema()),
+                      messageConverter.getDescriptor(false),
+                      AutoCloseable::close);
+
+              SchemaChangeDetectorHelper.bufferMismatchedRows(
+                  mismatchedRows,
+                  mismatchedRowsBag,
+                  mismatchedRowsRetryTimer,
+                  mismatchedRowsRetryTimerValue,
+                  minPendingTimestamp,
+                  tableDestination,
+                  o.get(failedRowsTag),
+                  info,
+                  rowsSentToFailedRowsCollection,
+                  RETRY_MISMATCHED_ROWS_PERIOD);
+            }
+          };
+      processPayloads(
+          pipelineOptions,
+          element.getKey(),
+          tableDestination,
+          messageConverter,
+          element.getValue(),
+          elementTs,
+          streamName,
+          streamOffset,
+          updatedSchema,
+          idleTimer,
+          o,
+          processMismatchedRows);
+      System.err.println("PROCESS DONE AFTER " + 
java.time.Duration.between(now, Instant.now()));

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Remove leftover debugging `System.err.println` statement.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java:
##########
@@ -1096,10 +1239,109 @@ public void process(
           java.time.Duration timeElapsed = java.time.Duration.between(now, 
Instant.now());
           appendLatencyDistribution.update(timeElapsed.toMillis());
         }
+        processMismatchedRows.accept(mismatchedRows);
       }
       idleTimer.offset(streamIdleTime).withNoOutputTimestamp().setRelative();
     }
 
+    @OnTimer("retryMismatchedRowsTimer")
+    public void onMismatchedRowsTimer(
+        PipelineOptions pipelineOptions,
+        @Key ShardedKey<DestinationT> shardedDestination,
+        @Timestamp org.joda.time.Instant elementTs,
+        @AlwaysFetched @StateId("streamName") ValueState<String> streamName,
+        @AlwaysFetched @StateId("streamOffset") ValueState<Long> streamOffset,
+        @StateId("updatedSchema") ValueState<TableSchema> updatedSchema,
+        @TimerId("idleTimer") Timer idleTimer,
+        MultiOutputReceiver o,
+        @TimerId("retryMismatchedRowsTimer") Timer retryRowsTimer,
+        @StateId("mismatchedRows") BagState<MismatchedRow> mismatchedRowsBag,
+        @StateId("currentMismatchedRowTimerValue") ValueState<Long> 
currentTimerValue,
+        @StateId("minPendingTimestamp") ValueState<Long> minPendingTimestamp)
+        throws Exception {
+      System.err.println("RETRY TIMER " + Instant.now());

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Remove leftover debugging `System.err.println` statement.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BufferMismatchedRows.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.StreamSupport;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.ShardedKey;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+class BufferMismatchedRows<DestinationT extends @NonNull Object, ElementT>
+    extends PTransform<PCollection<KV<DestinationT, MismatchedRow>>, 
PCollectionTuple> {
+  private final Coder<BigQueryStorageApiInsertError> failedRowsCoder;
+  private final Coder<TableRow> successfulRowsCoder;
+  private final Coder<DestinationT> destinationCoder;
+  private final StorageApiDynamicDestinations<ElementT, DestinationT> 
dynamicDestinations;
+  private final StorageApiWriteUnshardedRecords.WriteRecordsDoFn<DestinationT, 
ElementT> writeDoFn;
+  private final TupleTag<BigQueryStorageApiInsertError> failedRowsTag;
+  private final @Nullable TupleTag<TableRow> successfulRowsTag;
+  // This output is effectively ignored, since we only support this code path 
for
+  // StorageApiWriteRecordsInconsistent.
+  private final TupleTag<KV<String, String>> finalizeTag = new 
TupleTag<>("finalizeTag");
+  private static final int NUM_DEFAULT_SHARDS = 20;
+
+  public BufferMismatchedRows(
+      Coder<BigQueryStorageApiInsertError> failedRowsCoder,
+      Coder<TableRow> successfulRowsCoder,
+      Coder<DestinationT> destinationCoder,
+      StorageApiDynamicDestinations<ElementT, DestinationT> 
dynamicDestinations,
+      StorageApiWriteUnshardedRecords.WriteRecordsDoFn<DestinationT, ElementT> 
writeDoFn,
+      TupleTag<BigQueryStorageApiInsertError> failedRowsTag,
+      @Nullable TupleTag<TableRow> successfulRowsTag) {
+    this.failedRowsCoder = failedRowsCoder;
+    this.successfulRowsCoder = successfulRowsCoder;
+    this.destinationCoder = destinationCoder;
+    this.dynamicDestinations = dynamicDestinations;
+    this.writeDoFn = writeDoFn;
+    this.failedRowsTag = failedRowsTag;
+    this.successfulRowsTag = successfulRowsTag;
+  }
+
+  @Override
+  public PCollectionTuple expand(PCollection<KV<DestinationT, MismatchedRow>> 
input) {
+    // Append records to the Storage API streams.
+    TupleTagList tupleTagList = TupleTagList.of(failedRowsTag);
+    if (successfulRowsTag != null) {
+      tupleTagList = tupleTagList.and(successfulRowsTag);
+    }
+
+    PCollectionTuple result =
+        input
+            .apply(
+                "addShard",
+                ParDo.of(
+                    new DoFn<
+                        KV<DestinationT, MismatchedRow>,
+                        KV<ShardedKey<DestinationT>, MismatchedRow>>() {
+                      int shardNumber;
+
+                      @Setup
+                      public void setup() {
+                        shardNumber = 
ThreadLocalRandom.current().nextInt(NUM_DEFAULT_SHARDS);
+                      }
+
+                      @ProcessElement
+                      public void process(
+                          @Element KV<DestinationT, MismatchedRow> element,
+                          OutputReceiver<KV<ShardedKey<DestinationT>, 
MismatchedRow>> o) {
+                        ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES);
+                        buffer.putInt(++shardNumber % NUM_DEFAULT_SHARDS);

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Using an instance variable `shardNumber` and incrementing it sequentially 
can lead to negative values if it overflows (since `%` in Java with a negative 
operand returns a negative result). It is simpler, thread-safe, and robust to 
use `ThreadLocalRandom` directly to generate the shard number.
   
   ```suggestion
                           ByteBuffer buffer = 
ByteBuffer.allocate(Integer.BYTES);
                           
buffer.putInt(ThreadLocalRandom.current().nextInt(NUM_DEFAULT_SHARDS));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to