gemini-code-assist[bot] commented on code in PR #38332:
URL: https://github.com/apache/beam/pull/38332#discussion_r3163621377


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java:
##########
@@ -147,109 +147,117 @@ public PCollectionTuple 
expand(PCollection<KV<DestinationT, ElementT>> input) {
         .get(patchTableSchemaTag)
         .setCoder(KvCoder.of(destinationCoder, 
ProtoCoder.of(TableSchema.class)));
     
result.get(elementsWaitingForSchemaTag).setCoder(KvCoder.of(destinationCoder, 
elementCoder));
+    if (!hasSchemaUpdateOptions) {
+      // Don't expand the update graph if it's not needed.
+      return result;
+    } else {
+      final int numShards =
+          input
+              .getPipeline()
+              .getOptions()
+              .as(BigQueryOptions.class)
+              .getSchemaUpgradeBufferingShards();
 
-    final int numShards =
-        input
-            .getPipeline()
-            .getOptions()
-            .as(BigQueryOptions.class)
-            .getSchemaUpgradeBufferingShards();
+      // Throttle the stream to the patch-table function so that only a single 
update per table per
+      // two seconds gets processed (to match quotas). The combiner merges 
incremental schemas, so
+      // we
+      // won't miss any updates.
+      PCollection<KV<ShardedKey<DestinationT>, ElementT>> tablesPatched =
+          result
+              .get(patchTableSchemaTag)
+              .apply(
+                  "rewindow",
+                  Window.<KV<DestinationT, TableSchema>>configure()
+                      .triggering(
+                          Repeatedly.forever(
+                              AfterProcessingTime.pastFirstElementInPane()
+                                  .plusDelayOf(Duration.standardSeconds(2))))
+                      .discardingFiredPanes())
+              .apply("merge schemas", Combine.fewKeys(new 
MergeSchemaCombineFn()))
+              .setCoder(KvCoder.of(destinationCoder, 
ProtoCoder.of(TableSchema.class)))
+              .apply(
+                  "Patch table schema",
+                  ParDo.of(
+                      new PatchTableSchemaDoFn<>(operationName, bqServices, 
dynamicDestinations)))
+              .setCoder(KvCoder.of(destinationCoder, 
NullableCoder.of(elementCoder)))
+              // We need to make sure that all shards of the buffering 
transform are notified.
+              .apply(
+                  "fanout to all shards",
+                  FlatMapElements.via(
+                      new SimpleFunction<
+                          KV<DestinationT, ElementT>,
+                          Iterable<KV<ShardedKey<DestinationT>, ElementT>>>() {
+                        @Override
+                        public Iterable<KV<ShardedKey<DestinationT>, 
ElementT>> apply(
+                            KV<DestinationT, ElementT> elem) {
+                          return IntStream.range(0, numShards)
+                              .mapToObj(
+                                  i ->
+                                      KV.of(
+                                          
StorageApiConvertMessages.AssignShardFn.getShardedKey(
+                                              elem.getKey(), i, numShards),
+                                          elem.getValue()))
+                              .collect(Collectors.toList());
+                        }
+                      }))
+              .setCoder(
+                  KvCoder.of(ShardedKey.Coder.of(destinationCoder), 
NullableCoder.of(elementCoder)))
+              .apply(
+                  Window.<KV<ShardedKey<DestinationT>, ElementT>>configure()
+                      .triggering(DefaultTrigger.of()));
 
-    // Throttle the stream to the patch-table function so that only a single 
update per table per
-    // two seconds gets processed (to match quotas). The combiner merges 
incremental schemas, so we
-    // won't miss any updates.
-    PCollection<KV<ShardedKey<DestinationT>, ElementT>> tablesPatched =
-        result
-            .get(patchTableSchemaTag)
-            .apply(
-                "rewindow",
-                Window.<KV<DestinationT, TableSchema>>configure()
-                    .triggering(
-                        Repeatedly.forever(
-                            AfterProcessingTime.pastFirstElementInPane()
-                                .plusDelayOf(Duration.standardSeconds(2))))
-                    .discardingFiredPanes())
-            .apply("merge schemas", Combine.fewKeys(new 
MergeSchemaCombineFn()))
-            .setCoder(KvCoder.of(destinationCoder, 
ProtoCoder.of(TableSchema.class)))
-            .apply(
-                "Patch table schema",
-                ParDo.of(
-                    new PatchTableSchemaDoFn<>(operationName, bqServices, 
dynamicDestinations)))
-            .setCoder(KvCoder.of(destinationCoder, 
NullableCoder.of(elementCoder)))
-            // We need to make sure that all shards of the buffering transform 
are notified.
-            .apply(
-                "fanout to all shards",
-                FlatMapElements.via(
-                    new SimpleFunction<
-                        KV<DestinationT, ElementT>,
-                        Iterable<KV<ShardedKey<DestinationT>, ElementT>>>() {
-                      @Override
-                      public Iterable<KV<ShardedKey<DestinationT>, ElementT>> 
apply(
-                          KV<DestinationT, ElementT> elem) {
-                        return IntStream.range(0, numShards)
-                            .mapToObj(
-                                i ->
-                                    KV.of(
-                                        
StorageApiConvertMessages.AssignShardFn.getShardedKey(
-                                            elem.getKey(), i, numShards),
-                                        elem.getValue()))
-                            .collect(Collectors.toList());
-                      }
-                    }))
-            .setCoder(
-                KvCoder.of(ShardedKey.Coder.of(destinationCoder), 
NullableCoder.of(elementCoder)))
-            .apply(
-                Window.<KV<ShardedKey<DestinationT>, ElementT>>configure()
-                    .triggering(DefaultTrigger.of()));
+      // Any elements that are waiting for a schema update are sent to this 
stateful DoFn to be
+      // buffered.
+      // Note: we currently do not provide the DynamicDestinations object 
access to the side input
+      // in
+      // this path.
+      // This is because side inputs are not currently available from timer 
callbacks. Since side
+      // inputs are generally
+      // used for getSchema and in this case we read the schema from the 
table, this is unlikely to
+      // be
+      // a problem.
+      PCollection<KV<ShardedKey<DestinationT>, ElementT>> 
shardedWaitingElements =
+          result
+              .get(elementsWaitingForSchemaTag)
+              // TODO: Consider using GroupIntoBatchs.withShardingKey to get 
auto sharding here
+              // instead of fixed sharding.
+              .apply("assignShard", ParDo.of(new AssignShardFn<>(numShards)))
+              .setCoder(
+                  KvCoder.of(
+                      ShardedKey.Coder.of(destinationCoder), 
NullableCoder.of(elementCoder)));
 
-    // Any elements that are waiting for a schema update are sent to this 
stateful DoFn to be
-    // buffered.
-    // Note: we currently do not provide the DynamicDestinations object access 
to the side input in
-    // this path.
-    // This is because side inputs are not currently available from timer 
callbacks. Since side
-    // inputs are generally
-    // used for getSchema and in this case we read the schema from the table, 
this is unlikely to be
-    // a problem.
-    PCollection<KV<ShardedKey<DestinationT>, ElementT>> shardedWaitingElements 
=
-        result
-            .get(elementsWaitingForSchemaTag)
-            // TODO: Consider using GroupIntoBatchs.withShardingKey to get 
auto sharding here
-            // instead of fixed sharding.
-            .apply("assignShard", ParDo.of(new AssignShardFn<>(numShards)))
-            .setCoder(
-                KvCoder.of(ShardedKey.Coder.of(destinationCoder), 
NullableCoder.of(elementCoder)));
+      PCollectionList<KV<ShardedKey<DestinationT>, ElementT>> 
waitingElementsList =
+          PCollectionList.of(shardedWaitingElements).and(tablesPatched);
+      PCollectionTuple retryResult =
+          waitingElementsList
+              .apply("Buffered flatten", Flatten.pCollections())
+              .apply(
+                  "bufferElements",
+                  ParDo.of(new SchemaUpdateHoldingFn<>(elementCoder, 
convertMessagesDoFn))
+                      .withOutputTags(
+                          successfulWritesTag,
+                          TupleTagList.of(ImmutableList.of(failedWritesTag, 
BAD_RECORD_TAG))));
+      retryResult.get(successfulWritesTag).setCoder(successCoder);
+      retryResult.get(failedWritesTag).setCoder(errorCoder);
+      
retryResult.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline()));
 
-    PCollectionList<KV<ShardedKey<DestinationT>, ElementT>> 
waitingElementsList =
-        PCollectionList.of(shardedWaitingElements).and(tablesPatched);
-    PCollectionTuple retryResult =
-        waitingElementsList
-            .apply("Buffered flatten", Flatten.pCollections())
-            .apply(
-                "bufferElements",
-                ParDo.of(new SchemaUpdateHoldingFn<>(elementCoder, 
convertMessagesDoFn))
-                    .withOutputTags(
-                        successfulWritesTag,
-                        TupleTagList.of(ImmutableList.of(failedWritesTag, 
BAD_RECORD_TAG))));
-    retryResult.get(successfulWritesTag).setCoder(successCoder);
-    retryResult.get(failedWritesTag).setCoder(errorCoder);
-    
retryResult.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline()));
-
-    // Flatten successes and failures from both the regular transform and the 
retry transform.
-    PCollection<KV<DestinationT, StorageApiWritePayload>> allSuccesses =
-        PCollectionList.of(result.get(successfulWritesTag))
-            .and(retryResult.get(successfulWritesTag))
-            .apply("flattenSuccesses", Flatten.pCollections());
-    PCollection<BigQueryStorageApiInsertError> allFailures =
-        PCollectionList.of(result.get(failedWritesTag))
-            .and(retryResult.get(failedWritesTag))
-            .apply("flattenFailures", Flatten.pCollections());
-    PCollection<BadRecord> allBadRecords =
-        PCollectionList.of(result.get(BAD_RECORD_TAG))
-            .and(retryResult.get(BAD_RECORD_TAG))
-            .apply("flattenBadRecords", Flatten.pCollections());
-    return PCollectionTuple.of(successfulWritesTag, allSuccesses)
-        .and(failedWritesTag, allFailures)
-        .and(BAD_RECORD_TAG, allBadRecords);
+      // Flatten successes and failures from both the regular transform and 
the retry transform.
+      PCollection<KV<DestinationT, StorageApiWritePayload>> allSuccesses =
+          PCollectionList.of(result.get(successfulWritesTag))
+              .and(retryResult.get(successfulWritesTag))
+              .apply("flattenSuccesses", Flatten.pCollections());
+      PCollection<BigQueryStorageApiInsertError> allFailures =
+          PCollectionList.of(result.get(failedWritesTag))
+              .and(retryResult.get(failedWritesTag))
+              .apply("flattenFailures", Flatten.pCollections());
+      PCollection<BadRecord> allBadRecords =
+          PCollectionList.of(result.get(BAD_RECORD_TAG))
+              .and(retryResult.get(BAD_RECORD_TAG))
+              .apply("flattenBadRecords", Flatten.pCollections());
+      return PCollectionTuple.of(successfulWritesTag, allSuccesses)
+          .and(failedWritesTag, allFailures)
+          .and(BAD_RECORD_TAG, allBadRecords);
+    }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   To improve readability and reduce nesting, you can use a guard clause. 
Instead of wrapping the rest of the method in an `else` block, you can return 
early and then have the main logic at the top level of the method.
   
   ```java
       final int numShards =
           input
               .getPipeline()
               .getOptions()
               .as(BigQueryOptions.class)
               .getSchemaUpgradeBufferingShards();
   
       // Throttle the stream to the patch-table function so that only a single 
update per table per
       // two seconds gets processed (to match quotas). The combiner merges 
incremental schemas, so
       // we
       // won't miss any updates.
       PCollection<KV<ShardedKey<DestinationT>, ElementT>> tablesPatched =
           result
               .get(patchTableSchemaTag)
               .apply(
                   "rewindow",
                   Window.<KV<DestinationT, TableSchema>>configure()
                       .triggering(
                           Repeatedly.forever(
                               AfterProcessingTime.pastFirstElementInPane()
                                   .plusDelayOf(Duration.standardSeconds(2))))
                       .discardingFiredPanes())
               .apply("merge schemas", Combine.fewKeys(new 
MergeSchemaCombineFn()))
               .setCoder(KvCoder.of(destinationCoder, 
ProtoCoder.of(TableSchema.class)))
               .apply(
                   "Patch table schema",
                   ParDo.of(
                       new PatchTableSchemaDoFn<>(operationName, bqServices, 
dynamicDestinations)))
               .setCoder(KvCoder.of(destinationCoder, 
NullableCoder.of(elementCoder)))
               // We need to make sure that all shards of the buffering 
transform are notified.
               .apply(
                   "fanout to all shards",
                   FlatMapElements.via(
                       new SimpleFunction<
                           KV<DestinationT, ElementT>,
                           Iterable<KV<ShardedKey<DestinationT>, ElementT>>>() {
                         @Override
                         public Iterable<KV<ShardedKey<DestinationT>, 
ElementT>> apply(
                             KV<DestinationT, ElementT> elem) {
                           return IntStream.range(0, numShards)
                               .mapToObj(
                                   i ->
                                       KV.of(
                                           
StorageApiConvertMessages.AssignShardFn.getShardedKey(
                                               elem.getKey(), i, numShards),
                                           elem.getValue()))
                               .collect(Collectors.toList());
                         }
                       }))
               .setCoder(
                   KvCoder.of(ShardedKey.Coder.of(destinationCoder), 
NullableCoder.of(elementCoder)))
               .apply(
                   Window.<KV<ShardedKey<DestinationT>, ElementT>>configure()
                       .triggering(DefaultTrigger.of()));
   
       // Any elements that are waiting for a schema update are sent to this 
stateful DoFn to be
       // buffered.
       // Note: we currently do not provide the DynamicDestinations object 
access to the side input
       // in
       // this path.
       // This is because side inputs are not currently available from timer 
callbacks. Since side
       // inputs are generally
       // used for getSchema and in this case we read the schema from the 
table, this is unlikely to
       // be
       // a problem.
       PCollection<KV<ShardedKey<DestinationT>, ElementT>> 
shardedWaitingElements =
           result
               .get(elementsWaitingForSchemaTag)
               // TODO: Consider using GroupIntoBatchs.withShardingKey to get 
auto sharding here
               // instead of fixed sharding.
               .apply("assignShard", ParDo.of(new AssignShardFn<>(numShards)))
               .setCoder(
                   KvCoder.of(
                       ShardedKey.Coder.of(destinationCoder), 
NullableCoder.of(elementCoder)));
   
       PCollectionList<KV<ShardedKey<DestinationT>, ElementT>> 
waitingElementsList =
           PCollectionList.of(shardedWaitingElements).and(tablesPatched);
       PCollectionTuple retryResult =
           waitingElementsList
               .apply("Buffered flatten", Flatten.pCollections())
               .apply(
                   "bufferElements",
                   ParDo.of(new SchemaUpdateHoldingFn<>(elementCoder, 
convertMessagesDoFn))
                       .withOutputTags(
                           successfulWritesTag,
                           TupleTagList.of(ImmutableList.of(failedWritesTag, 
BAD_RECORD_TAG))));
       retryResult.get(successfulWritesTag).setCoder(successCoder);
       retryResult.get(failedWritesTag).setCoder(errorCoder);
       
retryResult.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline()));
   
       // Flatten successes and failures from both the regular transform and 
the retry transform.
       PCollection<KV<DestinationT, StorageApiWritePayload>> allSuccesses =
           PCollectionList.of(result.get(successfulWritesTag))
               .and(retryResult.get(successfulWritesTag))
               .apply("flattenSuccesses", Flatten.pCollections());
       PCollection<BigQueryStorageApiInsertError> allFailures =
           PCollectionList.of(result.get(failedWritesTag))
               .and(retryResult.get(failedWritesTag))
               .apply("flattenFailures", Flatten.pCollections());
       PCollection<BadRecord> allBadRecords =
           PCollectionList.of(result.get(BAD_RECORD_TAG))
               .and(retryResult.get(BAD_RECORD_TAG))
               .apply("flattenBadRecords", Flatten.pCollections());
       return PCollectionTuple.of(successfulWritesTag, allSuccesses)
           .and(failedWritesTag, allFailures)
           .and(BAD_RECORD_TAG, allBadRecords);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to