stankiewicz commented on code in PR #38783:
URL: https://github.com/apache/beam/pull/38783#discussion_r3362829237


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java:
##########
@@ -457,6 +469,37 @@ private void addErrorCollections(
     }
   }
 
+  private static class AddShardKeyFn<DestT, ElemT>
+      extends DoFn<
+          KV<DestT, StorageApiWritePayload>, KV<Integer, KV<DestT, 
StorageApiWritePayload>>> {
+
+    private final StorageApiDynamicDestinations<ElemT, DestT> 
dynamicDestinations;
+    private final int numShards;
+
+    public AddShardKeyFn(
+        StorageApiDynamicDestinations<ElemT, DestT> dynamicDestinations, int 
numShards) {
+      this.dynamicDestinations = dynamicDestinations;
+      this.numShards = numShards;
+    }
+
+    @ProcessElement
+    public void processElement(
+        ProcessContext c,
+        @Element KV<DestT, StorageApiWritePayload> element,
+        OutputReceiver<KV<Integer, KV<DestT, StorageApiWritePayload>>> 
outputReceiver) {
+      dynamicDestinations.setSideInputAccessorFromProcessContext(c);
+
+      String tableUrn = 
dynamicDestinations.getTable(element.getKey()).getShortTableUrn();
+
+      int hash = Hashing.murmur3_32_fixed().hashString(tableUrn, 
StandardCharsets.UTF_8).asInt();
+
+      int shardKey =
+          Math.floorMod(hash + ThreadLocalRandom.current().nextInt(numShards), 
numShards);

Review Comment:
   thanks Sam, refactored, updated comment and also tested. goal is to have 
affinity of elements sharing same table plus sharding if we want to increase 
paralleism per table at the cost of increased connection count.
   concurrent connections then is at maximum of number_of_tables * numShards. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to