gemini-code-assist[bot] commented on code in PR #39156:
URL: https://github.com/apache/beam/pull/39156#discussion_r3493578375
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToPartitions.java:
##########
@@ -64,27 +65,32 @@ private PCollection<KV<Row, Iterable<Row>>>
groupByPartition(PCollection<KV<Row,
RowCoder destinationCoder =
RowCoder.of(AssignDestinationsAndPartitions.OUTPUT_SCHEMA);
RowCoder dataCoder = RowCoder.of(dynamicDestinations.getDataSchema());
- GroupIntoBatches<Row, Row> groupIntoPartitions =
- GroupIntoBatches.ofByteSize(DEFAULT_BYTES_PER_FILE);
- if (IcebergUtils.isUnbounded(input) && triggeringFrequency != null) {
- groupIntoPartitions =
groupIntoPartitions.withMaxBufferingDuration(triggeringFrequency);
- }
+ if (IcebergUtils.isUnbounded(input)) {
+ GroupIntoBatches<Row, Row> groupIntoPartitions =
+ GroupIntoBatches.ofByteSize(DEFAULT_BYTES_PER_FILE);
+ if (triggeringFrequency != null) {
+ groupIntoPartitions =
groupIntoPartitions.withMaxBufferingDuration(triggeringFrequency);
+ }
- if (autoSharding) {
+ if (autoSharding) {
+ return input
+ .apply(groupIntoPartitions.withShardedKey())
+ .setCoder(
+ KvCoder.of(
+
org.apache.beam.sdk.util.ShardedKey.Coder.of(destinationCoder),
+ IterableCoder.of(dataCoder)))
+ .apply(
+ "DropShardId",
+ MapElements.into(kvs(rows(), iterables(rows())))
+ .via(kv -> KV.of(kv.getKey().getKey(), kv.getValue())))
+ .setCoder(KvCoder.of(destinationCoder,
IterableCoder.of(dataCoder)));
+ }
return input
- .apply(groupIntoPartitions.withShardedKey())
- .setCoder(
- KvCoder.of(
-
org.apache.beam.sdk.util.ShardedKey.Coder.of(destinationCoder),
- IterableCoder.of(dataCoder)))
- .apply(
- "DropShardId",
- MapElements.into(kvs(rows(), iterables(rows())))
- .via(kv -> KV.of(kv.getKey().getKey(), kv.getValue())))
+ .apply(groupIntoPartitions)
.setCoder(KvCoder.of(destinationCoder, IterableCoder.of(dataCoder)));
} else {
return input
- .apply(groupIntoPartitions)
+ .apply(GroupByKey.create())
.setCoder(KvCoder.of(destinationCoder, IterableCoder.of(dataCoder)));
}
Review Comment:

Silently ignoring `autoSharding` for bounded inputs can lead to severe
performance regressions (such as hot key bottlenecks) for batch pipelines that
explicitly enabled this option to parallelize writes to large partitions.
We should preserve the `GroupIntoBatches` path with sharding when
`autoSharding` is explicitly enabled, even for bounded inputs, while defaulting
to the memory-efficient `GroupByKey` when `autoSharding` is disabled.
```java
if (IcebergUtils.isUnbounded(input) || autoSharding) {
GroupIntoBatches<Row, Row> groupIntoPartitions =
GroupIntoBatches.ofByteSize(DEFAULT_BYTES_PER_FILE);
if (IcebergUtils.isUnbounded(input) && triggeringFrequency != null) {
groupIntoPartitions =
groupIntoPartitions.withMaxBufferingDuration(triggeringFrequency);
}
if (autoSharding) {
return input
.apply(groupIntoPartitions.withShardedKey())
.setCoder(
KvCoder.of(
org.apache.beam.sdk.util.ShardedKey.Coder.of(destinationCoder),
IterableCoder.of(dataCoder)))
.apply(
"DropShardId",
MapElements.into(kvs(rows(), iterables(rows())))
.via(kv -> KV.of(kv.getKey().getKey(), kv.getValue())))
.setCoder(KvCoder.of(destinationCoder,
IterableCoder.of(dataCoder)));
}
return input
.apply(groupIntoPartitions)
.setCoder(KvCoder.of(destinationCoder,
IterableCoder.of(dataCoder)));
} else {
return input
.apply(GroupByKey.create())
.setCoder(KvCoder.of(destinationCoder,
IterableCoder.of(dataCoder)));
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]