gemini-code-assist[bot] commented on code in PR #39156:
URL: https://github.com/apache/beam/pull/39156#discussion_r3493578375


##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToPartitions.java:
##########
@@ -64,27 +65,32 @@ private PCollection<KV<Row, Iterable<Row>>> 
groupByPartition(PCollection<KV<Row,
     RowCoder destinationCoder = 
RowCoder.of(AssignDestinationsAndPartitions.OUTPUT_SCHEMA);
     RowCoder dataCoder = RowCoder.of(dynamicDestinations.getDataSchema());
 
-    GroupIntoBatches<Row, Row> groupIntoPartitions =
-        GroupIntoBatches.ofByteSize(DEFAULT_BYTES_PER_FILE);
-    if (IcebergUtils.isUnbounded(input) && triggeringFrequency != null) {
-      groupIntoPartitions = 
groupIntoPartitions.withMaxBufferingDuration(triggeringFrequency);
-    }
+    if (IcebergUtils.isUnbounded(input)) {
+      GroupIntoBatches<Row, Row> groupIntoPartitions =
+          GroupIntoBatches.ofByteSize(DEFAULT_BYTES_PER_FILE);
+      if (triggeringFrequency != null) {
+        groupIntoPartitions = 
groupIntoPartitions.withMaxBufferingDuration(triggeringFrequency);
+      }
 
-    if (autoSharding) {
+      if (autoSharding) {
+        return input
+            .apply(groupIntoPartitions.withShardedKey())
+            .setCoder(
+                KvCoder.of(
+                    
org.apache.beam.sdk.util.ShardedKey.Coder.of(destinationCoder),
+                    IterableCoder.of(dataCoder)))
+            .apply(
+                "DropShardId",
+                MapElements.into(kvs(rows(), iterables(rows())))
+                    .via(kv -> KV.of(kv.getKey().getKey(), kv.getValue())))
+            .setCoder(KvCoder.of(destinationCoder, 
IterableCoder.of(dataCoder)));
+      }
       return input
-          .apply(groupIntoPartitions.withShardedKey())
-          .setCoder(
-              KvCoder.of(
-                  
org.apache.beam.sdk.util.ShardedKey.Coder.of(destinationCoder),
-                  IterableCoder.of(dataCoder)))
-          .apply(
-              "DropShardId",
-              MapElements.into(kvs(rows(), iterables(rows())))
-                  .via(kv -> KV.of(kv.getKey().getKey(), kv.getValue())))
+          .apply(groupIntoPartitions)
           .setCoder(KvCoder.of(destinationCoder, IterableCoder.of(dataCoder)));
     } else {
       return input
-          .apply(groupIntoPartitions)
+          .apply(GroupByKey.create())
           .setCoder(KvCoder.of(destinationCoder, IterableCoder.of(dataCoder)));
     }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   Silently ignoring `autoSharding` for bounded inputs can lead to severe 
performance regressions (such as hot key bottlenecks) for batch pipelines that 
explicitly enabled this option to parallelize writes to large partitions.
   
   We should preserve the `GroupIntoBatches` path with sharding when 
`autoSharding` is explicitly enabled, even for bounded inputs, while defaulting 
to the memory-efficient `GroupByKey` when `autoSharding` is disabled.
   
   ```java
       if (IcebergUtils.isUnbounded(input) || autoSharding) {
         GroupIntoBatches<Row, Row> groupIntoPartitions =
             GroupIntoBatches.ofByteSize(DEFAULT_BYTES_PER_FILE);
         if (IcebergUtils.isUnbounded(input) && triggeringFrequency != null) {
           groupIntoPartitions = 
groupIntoPartitions.withMaxBufferingDuration(triggeringFrequency);
         }
   
         if (autoSharding) {
           return input
               .apply(groupIntoPartitions.withShardedKey())
               .setCoder(
                   KvCoder.of(
                       
org.apache.beam.sdk.util.ShardedKey.Coder.of(destinationCoder),
                       IterableCoder.of(dataCoder)))
               .apply(
                   "DropShardId",
                   MapElements.into(kvs(rows(), iterables(rows())))
                       .via(kv -> KV.of(kv.getKey().getKey(), kv.getValue())))
               .setCoder(KvCoder.of(destinationCoder, 
IterableCoder.of(dataCoder)));
         }
         return input
             .apply(groupIntoPartitions)
             .setCoder(KvCoder.of(destinationCoder, 
IterableCoder.of(dataCoder)));
       } else {
         return input
             .apply(GroupByKey.create())
             .setCoder(KvCoder.of(destinationCoder, 
IterableCoder.of(dataCoder)));
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to