gemini-code-assist[bot] commented on code in PR #38854:
URL: https://github.com/apache/beam/pull/38854#discussion_r3377569117


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java:
##########
@@ -1030,7 +1035,32 @@ public void process(
                   onSuccess,
                   appendClientHolder.get(),
                   tableReference);
+
           if (createRetryManagerResult.getSchemaMismatchSeen()) {
+            if (autoUpdateSchemaStrictTimeout != null) {
+              if 
(startMismatchTime.plus(initialMismatchRetryTime).isBefore(org.joda.time.Instant.now()))
 {
+                // Local retry time has expired!
+                org.joda.time.Instant targetExpirationTime = 
startMismatchTime.plus(autoUpdateSchemaStrictTimeout);
+                Iterable<SplittingIterable.Value> mismatchedRows =
+                  Iterables.transform(messages, 
SplittingIterable.Value::getSchemaMismatchedRowsOnly);
+                bufferMismatchedRows(mismatchedRows, targetExpirationTime)

Review Comment:
   ![critical](https://www.gstatic.com/codereviewagent/critical.svg)
   
   There is a missing semicolon at the end of this statement. Additionally, the 
method `bufferMismatchedRows` is called here but is not defined anywhere in 
this class or the PR changes, which will cause a compilation failure.
   
   ```suggestion
                   bufferMismatchedRows(mismatchedRows, targetExpirationTime);
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java:
##########
@@ -52,44 +53,73 @@ abstract static class Value {
 
     abstract List<@Nullable TableRow> getFailsafeTableRows();
 
-    abstract boolean getSchemaMismatchSeen();
-  }
+    abstract BitSet getSchemaMismatchedRows();
+
+    private static final BitSet EMPTY_BIT_SET = new BitSet(0);
+
+    Value getSchemaMismatchedRowsOnly() {
+      ProtoRows.Builder inserts = ProtoRows.newBuilder();
+      List<Instant> timestamps = Lists.newArrayList();
+      List<@Nullable TableRow> failsafeTableRows = Lists.newArrayList();
+      for (int i = 0; ! getSchemaMismatchedRows().isEmpty() && i < 
getProtoRows().getSerializedRowsCount(); i++) {
+        if (getSchemaMismatchedRows().get(i)) {
+          inserts.addSerializedRows(getProtoRows().getSerializedRows(i));
+          timestamps.add(getTimestamps().get(i));
+          failsafeTableRows.add(getFailsafeTableRows().get(i));
+        }
+      }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The loop condition contains `! getSchemaMismatchedRows().isEmpty() &&` which 
is redundant and evaluated on every iteration. It can be safely removed.
   
   ```suggestion
         for (int i = 0; i < getProtoRows().getSerializedRowsCount(); i++) {
           if (getSchemaMismatchedRows().get(i)) {
             inserts.addSerializedRows(getProtoRows().getSerializedRows(i));
             timestamps.add(getTimestamps().get(i));
             failsafeTableRows.add(getFailsafeTableRows().get(i));
           }
         }
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java:
##########
@@ -152,6 +151,7 @@ public StorageApiWritePayload toMessage(
     public void updateSchemaFromTable() throws IOException, 
InterruptedException {
       SCHEMA_CACHE.refreshSchema(
           delegate.get().tableReference, datasetService, writeStreamService, 
bigQueryOptions);
+      T
       // Recycle the internal MessageConverter so that we pick up the new 
schema from the cache.

Review Comment:
   ![critical](https://www.gstatic.com/codereviewagent/critical.svg)
   
   There is a stray `T` character on line 154 which will cause a compilation 
failure. It should be removed.
   
   ```suggestion
             delegate.get().tableReference, datasetService, writeStreamService, 
bigQueryOptions);
         // Recycle the internal MessageConverter so that we pick up the new 
schema from the cache.
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java:
##########
@@ -1030,7 +1035,32 @@ public void process(
                   onSuccess,
                   appendClientHolder.get(),
                   tableReference);
+
           if (createRetryManagerResult.getSchemaMismatchSeen()) {
+            if (autoUpdateSchemaStrictTimeout != null) {
+              if 
(startMismatchTime.plus(initialMismatchRetryTime).isBefore(org.joda.time.Instant.now()))
 {
+                // Local retry time has expired!
+                org.joda.time.Instant targetExpirationTime = 
startMismatchTime.plus(autoUpdateSchemaStrictTimeout);
+                Iterable<SplittingIterable.Value> mismatchedRows =
+                  Iterables.transform(messages, 
SplittingIterable.Value::getSchemaMismatchedRowsOnly);
+                bufferMismatchedRows(mismatchedRows, targetExpirationTime)
+
+                // Continue processing only the messages that matched the 
schema.
+                messages = Iterables.transform(messages, 
SplittingIterable.Value::getSchemaMatchedRowsOnly);
+                createRetryManagerResult =
+                  createRetryManager(
+                    element.getKey(),
+                    messages,
+                    runOperation,
+                    onError,
+                    onSuccess,
+                    appendClientHolder.get(),
+                    tableReference);
+                
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState(
+                  !createRetryManagerResult.getSchemaMismatchSeen());

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The fully-qualified class name for `Preconditions` is used here, but 
`checkState` is already statically imported at the top of the file. Simplifying 
this improves readability.
   
   ```java
                   
checkState(!createRetryManagerResult.getSchemaMismatchSeen());
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java:
##########
@@ -29,7 +30,7 @@
 abstract class StorageApiDynamicDestinations<T, DestinationT>
     extends DynamicDestinationsHelpers.DelegatingDynamicDestinations<T, 
DestinationT> {
   public interface MessageConverter<T> {
-    com.google.cloud.bigquery.storage.v1.TableSchema getTableSchema();
+    TableSchema getTableSchema();
 
     DescriptorProtos.DescriptorProto getDescriptor(boolean includeCdcColumns) 
throws Exception;

Review Comment:
   ![critical](https://www.gstatic.com/codereviewagent/critical.svg)
   
   The `MessageConverter` interface does not define `updateSchemaFromTable()`, 
but it is called on `messageConverter` in 
`StorageApiWritesShardedRecords.java`. Adding a default no-op implementation to 
the interface will prevent compilation failures for other implementations.
   
   ```java
       TableSchema getTableSchema();
   
       DescriptorProtos.DescriptorProto getDescriptor(boolean 
includeCdcColumns) throws Exception;
   
       default void updateSchemaFromTable() throws IOException, 
InterruptedException {}
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java:
##########
@@ -52,44 +53,73 @@ abstract static class Value {
 
     abstract List<@Nullable TableRow> getFailsafeTableRows();
 
-    abstract boolean getSchemaMismatchSeen();
-  }
+    abstract BitSet getSchemaMismatchedRows();
+
+    private static final BitSet EMPTY_BIT_SET = new BitSet(0);
+
+    Value getSchemaMismatchedRowsOnly() {
+      ProtoRows.Builder inserts = ProtoRows.newBuilder();
+      List<Instant> timestamps = Lists.newArrayList();
+      List<@Nullable TableRow> failsafeTableRows = Lists.newArrayList();
+      for (int i = 0; ! getSchemaMismatchedRows().isEmpty() && i < 
getProtoRows().getSerializedRowsCount(); i++) {
+        if (getSchemaMismatchedRows().get(i)) {
+          inserts.addSerializedRows(getProtoRows().getSerializedRows(i));
+          timestamps.add(getTimestamps().get(i));
+          failsafeTableRows.add(getFailsafeTableRows().get(i));
+        }
+      }
+      BitSet allBits = new BitSet(inserts.getSerializedRowsCount());
+      allBits.set(0, inserts.getSerializedRowsCount());
+      return new AutoValue_SplittingIterable_Value(inserts.build(), 
timestamps, failsafeTableRows, allBits);
+    }
+
+    Value getSchemaMatchedRowsOnly() {
+      if (getSchemaMismatchedRows().isEmpty()) {
+        return this;
+      }
+
+      ProtoRows.Builder inserts = ProtoRows.newBuilder();
+      List<Instant> timestamps = Lists.newArrayList();
+      List<@Nullable TableRow> failsafeTableRows = Lists.newArrayList();
+      for (int i = 0; ! getSchemaMismatchedRows().isEmpty() && i < 
getProtoRows().getSerializedRowsCount(); i++) {
+        if (!getSchemaMismatchedRows().get(i)) {
+          inserts.addSerializedRows(getProtoRows().getSerializedRows(i));
+          timestamps.add(getTimestamps().get(i));
+          failsafeTableRows.add(getFailsafeTableRows().get(i));
+        }
+      }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The loop condition contains `! getSchemaMismatchedRows().isEmpty() &&` which 
is redundant and evaluated on every iteration. It can be safely removed.
   
   ```suggestion
         for (int i = 0; i < getProtoRows().getSerializedRowsCount(); i++) {
           if (!getSchemaMismatchedRows().get(i)) {
             inserts.addSerializedRows(getProtoRows().getSerializedRows(i));
             timestamps.add(getTimestamps().get(i));
             failsafeTableRows.add(getFailsafeTableRows().get(i));
           }
         }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to