gemini-code-assist[bot] commented on code in PR #38783:
URL: https://github.com/apache/beam/pull/38783#discussion_r3349957729


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java:
##########
@@ -457,6 +469,37 @@ private void addErrorCollections(
     }
   }
 
+  private static class AddShardKeyFn<DestT, ElemT>
+      extends DoFn<
+          KV<DestT, StorageApiWritePayload>, KV<Integer, KV<DestT, 
StorageApiWritePayload>>> {
+
+    private final StorageApiDynamicDestinations<ElemT, DestT> 
dynamicDestinations;
+    private final int numShards;
+
+    public AddShardKeyFn(
+        StorageApiDynamicDestinations<ElemT, DestT> dynamicDestinations, int 
numShards) {
+      this.dynamicDestinations = dynamicDestinations;
+      this.numShards = numShards;
+    }
+
+    @ProcessElement
+    public void processElement(
+        ProcessContext c,
+        @Element KV<DestT, StorageApiWritePayload> element,
+        OutputReceiver<KV<Integer, KV<DestT, StorageApiWritePayload>>> 
outputReceiver) {
+      dynamicDestinations.setSideInputAccessorFromProcessContext(c);
+
+      String tableUrn = 
dynamicDestinations.getTable(element.getKey()).getShortTableUrn();
+
+      int hash = Hashing.murmur3_32_fixed().hashString(tableUrn, 
StandardCharsets.UTF_8).asInt();
+
+      int shardKey =
+          Math.floorMod(hash ^ ThreadLocalRandom.current().nextInt(numShards), 
numShards);
+
+      outputReceiver.output(KV.of(shardKey, element));
+    }
+  }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   There are two major issues with the current implementation of 
`AddShardKeyFn`:
   
   1. **Non-uniform Shard Distribution (Correctness/Load Balancing)**: Using 
the XOR (`^`) operator with `ThreadLocalRandom.current().nextInt(numShards)` 
followed by `Math.floorMod` results in a highly non-uniform distribution of 
shard keys when `numShards` is not a power of two. This can lead to severe load 
imbalance (hot shards and empty shards). For example, if `numShards = 3` and 
`hash = 1`, shard `2` is never selected, while shard `0` receives 67% of the 
traffic. Using addition (`+`) instead of XOR (`^`) guarantees a perfectly 
uniform distribution across all `numShards` shards for any value of `numShards`.
   
   2. **Interleaved Destinations (Efficiency/Connection Management)**: Keying 
the collection solely by the `Integer` shard key means elements from many 
different destinations will be interleaved within the same shard. When the 
downstream writer processes these interleaved elements, it will have to 
frequently switch between destinations, leading to high concurrent connection 
usage and connection thrashing. By using a composite key `KV<DestT, Integer>` 
for redistribution, the runner will group elements by both destination and 
shard key, ensuring that all elements for a specific `(destination, shard)` are 
processed together in a single contiguous block, minimizing concurrent 
connections.
   
   ```java
     private static class AddShardKeyFn<DestT, ElemT>
         extends DoFn<
             KV<DestT, StorageApiWritePayload>,
             KV<KV<DestT, Integer>, KV<DestT, StorageApiWritePayload>>> {
   
       private final StorageApiDynamicDestinations<ElemT, DestT> 
dynamicDestinations;
       private final int numShards;
   
       public AddShardKeyFn(
           StorageApiDynamicDestinations<ElemT, DestT> dynamicDestinations, int 
numShards) {
         this.dynamicDestinations = dynamicDestinations;
         this.numShards = numShards;
       }
   
       @ProcessElement
       public void processElement(
           ProcessContext c,
           @Element KV<DestT, StorageApiWritePayload> element,
           OutputReceiver<KV<KV<DestT, Integer>, KV<DestT, 
StorageApiWritePayload>>> outputReceiver) {
         dynamicDestinations.setSideInputAccessorFromProcessContext(c);
   
         String tableUrn = 
dynamicDestinations.getTable(element.getKey()).getShortTableUrn();
   
         int hash = Hashing.murmur3_32_fixed().hashString(tableUrn, 
StandardCharsets.UTF_8).asInt();
   
         int shardKey =
             Math.floorMod(hash + 
ThreadLocalRandom.current().nextInt(numShards), numShards);
   
         outputReceiver.output(KV.of(KV.of(element.getKey(), shardKey), 
element));
       }
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to