gemini-code-assist[bot] commented on code in PR #38783:
URL: https://github.com/apache/beam/pull/38783#discussion_r3349957729
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java:
##########
@@ -457,6 +469,37 @@ private void addErrorCollections(
}
}
+ private static class AddShardKeyFn<DestT, ElemT>
+ extends DoFn<
+ KV<DestT, StorageApiWritePayload>, KV<Integer, KV<DestT,
StorageApiWritePayload>>> {
+
+ private final StorageApiDynamicDestinations<ElemT, DestT>
dynamicDestinations;
+ private final int numShards;
+
+ public AddShardKeyFn(
+ StorageApiDynamicDestinations<ElemT, DestT> dynamicDestinations, int
numShards) {
+ this.dynamicDestinations = dynamicDestinations;
+ this.numShards = numShards;
+ }
+
+ @ProcessElement
+ public void processElement(
+ ProcessContext c,
+ @Element KV<DestT, StorageApiWritePayload> element,
+ OutputReceiver<KV<Integer, KV<DestT, StorageApiWritePayload>>>
outputReceiver) {
+ dynamicDestinations.setSideInputAccessorFromProcessContext(c);
+
+ String tableUrn =
dynamicDestinations.getTable(element.getKey()).getShortTableUrn();
+
+ int hash = Hashing.murmur3_32_fixed().hashString(tableUrn,
StandardCharsets.UTF_8).asInt();
+
+ int shardKey =
+ Math.floorMod(hash ^ ThreadLocalRandom.current().nextInt(numShards),
numShards);
+
+ outputReceiver.output(KV.of(shardKey, element));
+ }
+ }
Review Comment:

There are two major issues with the current implementation of
`AddShardKeyFn`:
1. **Non-uniform Shard Distribution (Correctness/Load Balancing)**: Using
the XOR (`^`) operator with `ThreadLocalRandom.current().nextInt(numShards)`
followed by `Math.floorMod` results in a highly non-uniform distribution of
shard keys when `numShards` is not a power of two. This can lead to severe load
imbalance (hot shards and empty shards). For example, if `numShards = 3` and
`hash = 1`, shard `2` is never selected, while shard `0` receives 67% of the
traffic. Using addition (`+`) instead of XOR (`^`) guarantees a perfectly
uniform distribution across all `numShards` shards for any value of `numShards`.
2. **Interleaved Destinations (Efficiency/Connection Management)**: Keying
the collection solely by the `Integer` shard key means elements from many
different destinations will be interleaved within the same shard. When the
downstream writer processes these interleaved elements, it will have to
frequently switch between destinations, leading to high concurrent connection
usage and connection thrashing. By using a composite key `KV<DestT, Integer>`
for redistribution, the runner will group elements by both destination and
shard key, ensuring that all elements for a specific `(destination, shard)` are
processed together in a single contiguous block, minimizing concurrent
connections.
```java
private static class AddShardKeyFn<DestT, ElemT>
extends DoFn<
KV<DestT, StorageApiWritePayload>,
KV<KV<DestT, Integer>, KV<DestT, StorageApiWritePayload>>> {
private final StorageApiDynamicDestinations<ElemT, DestT>
dynamicDestinations;
private final int numShards;
public AddShardKeyFn(
StorageApiDynamicDestinations<ElemT, DestT> dynamicDestinations, int
numShards) {
this.dynamicDestinations = dynamicDestinations;
this.numShards = numShards;
}
@ProcessElement
public void processElement(
ProcessContext c,
@Element KV<DestT, StorageApiWritePayload> element,
OutputReceiver<KV<KV<DestT, Integer>, KV<DestT,
StorageApiWritePayload>>> outputReceiver) {
dynamicDestinations.setSideInputAccessorFromProcessContext(c);
String tableUrn =
dynamicDestinations.getTable(element.getKey()).getShortTableUrn();
int hash = Hashing.murmur3_32_fixed().hashString(tableUrn,
StandardCharsets.UTF_8).asInt();
int shardKey =
Math.floorMod(hash +
ThreadLocalRandom.current().nextInt(numShards), numShards);
outputReceiver.output(KV.of(KV.of(element.getKey(), shardKey),
element));
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]