gemini-code-assist[bot] commented on code in PR #38854:
URL: https://github.com/apache/beam/pull/38854#discussion_r3377569117
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java:
##########
@@ -1030,7 +1035,32 @@ public void process(
onSuccess,
appendClientHolder.get(),
tableReference);
+
if (createRetryManagerResult.getSchemaMismatchSeen()) {
+ if (autoUpdateSchemaStrictTimeout != null) {
+ if
(startMismatchTime.plus(initialMismatchRetryTime).isBefore(org.joda.time.Instant.now()))
{
+ // Local retry time has expired!
+ org.joda.time.Instant targetExpirationTime =
startMismatchTime.plus(autoUpdateSchemaStrictTimeout);
+ Iterable<SplittingIterable.Value> mismatchedRows =
+ Iterables.transform(messages,
SplittingIterable.Value::getSchemaMismatchedRowsOnly);
+ bufferMismatchedRows(mismatchedRows, targetExpirationTime)
Review Comment:

There is a missing semicolon at the end of this statement. Additionally, the
method `bufferMismatchedRows` is called here but is not defined anywhere in
this class or the PR changes, which will cause a compilation failure.
```suggestion
bufferMismatchedRows(mismatchedRows, targetExpirationTime);
```
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java:
##########
@@ -52,44 +53,73 @@ abstract static class Value {
abstract List<@Nullable TableRow> getFailsafeTableRows();
- abstract boolean getSchemaMismatchSeen();
- }
+ abstract BitSet getSchemaMismatchedRows();
+
+ private static final BitSet EMPTY_BIT_SET = new BitSet(0);
+
+ Value getSchemaMismatchedRowsOnly() {
+ ProtoRows.Builder inserts = ProtoRows.newBuilder();
+ List<Instant> timestamps = Lists.newArrayList();
+ List<@Nullable TableRow> failsafeTableRows = Lists.newArrayList();
+ for (int i = 0; ! getSchemaMismatchedRows().isEmpty() && i <
getProtoRows().getSerializedRowsCount(); i++) {
+ if (getSchemaMismatchedRows().get(i)) {
+ inserts.addSerializedRows(getProtoRows().getSerializedRows(i));
+ timestamps.add(getTimestamps().get(i));
+ failsafeTableRows.add(getFailsafeTableRows().get(i));
+ }
+ }
Review Comment:

The loop condition contains `! getSchemaMismatchedRows().isEmpty() &&` which
is redundant and evaluated on every iteration. It can be safely removed.
```suggestion
for (int i = 0; i < getProtoRows().getSerializedRowsCount(); i++) {
if (getSchemaMismatchedRows().get(i)) {
inserts.addSerializedRows(getProtoRows().getSerializedRows(i));
timestamps.add(getTimestamps().get(i));
failsafeTableRows.add(getFailsafeTableRows().get(i));
}
}
```
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java:
##########
@@ -152,6 +151,7 @@ public StorageApiWritePayload toMessage(
public void updateSchemaFromTable() throws IOException,
InterruptedException {
SCHEMA_CACHE.refreshSchema(
delegate.get().tableReference, datasetService, writeStreamService,
bigQueryOptions);
+ T
// Recycle the internal MessageConverter so that we pick up the new
schema from the cache.
Review Comment:

There is a stray `T` character on line 154 which will cause a compilation
failure. It should be removed.
```suggestion
delegate.get().tableReference, datasetService, writeStreamService,
bigQueryOptions);
// Recycle the internal MessageConverter so that we pick up the new
schema from the cache.
```
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java:
##########
@@ -1030,7 +1035,32 @@ public void process(
onSuccess,
appendClientHolder.get(),
tableReference);
+
if (createRetryManagerResult.getSchemaMismatchSeen()) {
+ if (autoUpdateSchemaStrictTimeout != null) {
+ if
(startMismatchTime.plus(initialMismatchRetryTime).isBefore(org.joda.time.Instant.now()))
{
+ // Local retry time has expired!
+ org.joda.time.Instant targetExpirationTime =
startMismatchTime.plus(autoUpdateSchemaStrictTimeout);
+ Iterable<SplittingIterable.Value> mismatchedRows =
+ Iterables.transform(messages,
SplittingIterable.Value::getSchemaMismatchedRowsOnly);
+ bufferMismatchedRows(mismatchedRows, targetExpirationTime)
+
+ // Continue processing only the messages that matched the
schema.
+ messages = Iterables.transform(messages,
SplittingIterable.Value::getSchemaMatchedRowsOnly);
+ createRetryManagerResult =
+ createRetryManager(
+ element.getKey(),
+ messages,
+ runOperation,
+ onError,
+ onSuccess,
+ appendClientHolder.get(),
+ tableReference);
+
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState(
+ !createRetryManagerResult.getSchemaMismatchSeen());
Review Comment:

The fully-qualified class name for `Preconditions` is used here, but
`checkState` is already statically imported at the top of the file. Simplifying
this improves readability.
```java
checkState(!createRetryManagerResult.getSchemaMismatchSeen());
```
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java:
##########
@@ -29,7 +30,7 @@
abstract class StorageApiDynamicDestinations<T, DestinationT>
extends DynamicDestinationsHelpers.DelegatingDynamicDestinations<T,
DestinationT> {
public interface MessageConverter<T> {
- com.google.cloud.bigquery.storage.v1.TableSchema getTableSchema();
+ TableSchema getTableSchema();
DescriptorProtos.DescriptorProto getDescriptor(boolean includeCdcColumns)
throws Exception;
Review Comment:

The `MessageConverter` interface does not define `updateSchemaFromTable()`,
but it is called on `messageConverter` in
`StorageApiWritesShardedRecords.java`. Adding a default no-op implementation to
the interface will prevent compilation failures for other implementations.
```java
TableSchema getTableSchema();
DescriptorProtos.DescriptorProto getDescriptor(boolean
includeCdcColumns) throws Exception;
default void updateSchemaFromTable() throws IOException,
InterruptedException {}
```
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java:
##########
@@ -52,44 +53,73 @@ abstract static class Value {
abstract List<@Nullable TableRow> getFailsafeTableRows();
- abstract boolean getSchemaMismatchSeen();
- }
+ abstract BitSet getSchemaMismatchedRows();
+
+ private static final BitSet EMPTY_BIT_SET = new BitSet(0);
+
+ Value getSchemaMismatchedRowsOnly() {
+ ProtoRows.Builder inserts = ProtoRows.newBuilder();
+ List<Instant> timestamps = Lists.newArrayList();
+ List<@Nullable TableRow> failsafeTableRows = Lists.newArrayList();
+ for (int i = 0; ! getSchemaMismatchedRows().isEmpty() && i <
getProtoRows().getSerializedRowsCount(); i++) {
+ if (getSchemaMismatchedRows().get(i)) {
+ inserts.addSerializedRows(getProtoRows().getSerializedRows(i));
+ timestamps.add(getTimestamps().get(i));
+ failsafeTableRows.add(getFailsafeTableRows().get(i));
+ }
+ }
+ BitSet allBits = new BitSet(inserts.getSerializedRowsCount());
+ allBits.set(0, inserts.getSerializedRowsCount());
+ return new AutoValue_SplittingIterable_Value(inserts.build(),
timestamps, failsafeTableRows, allBits);
+ }
+
+ Value getSchemaMatchedRowsOnly() {
+ if (getSchemaMismatchedRows().isEmpty()) {
+ return this;
+ }
+
+ ProtoRows.Builder inserts = ProtoRows.newBuilder();
+ List<Instant> timestamps = Lists.newArrayList();
+ List<@Nullable TableRow> failsafeTableRows = Lists.newArrayList();
+ for (int i = 0; ! getSchemaMismatchedRows().isEmpty() && i <
getProtoRows().getSerializedRowsCount(); i++) {
+ if (!getSchemaMismatchedRows().get(i)) {
+ inserts.addSerializedRows(getProtoRows().getSerializedRows(i));
+ timestamps.add(getTimestamps().get(i));
+ failsafeTableRows.add(getFailsafeTableRows().get(i));
+ }
+ }
Review Comment:

The loop condition contains `! getSchemaMismatchedRows().isEmpty() &&` which
is redundant and evaluated on every iteration. It can be safely removed.
```suggestion
for (int i = 0; i < getProtoRows().getSerializedRowsCount(); i++) {
if (!getSchemaMismatchedRows().get(i)) {
inserts.addSerializedRows(getProtoRows().getSerializedRows(i));
timestamps.add(getTimestamps().get(i));
failsafeTableRows.add(getFailsafeTableRows().get(i));
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]