reuvenlax commented on code in PR #38783:
URL: https://github.com/apache/beam/pull/38783#discussion_r3369594241


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java:
##########
@@ -457,6 +466,52 @@ private void addErrorCollections(
     }
   }
 
+  /**
+   * A {@link DoFn} that applies a composite sharding key to incoming records 
to optimize BigQuery
+   * Storage API throughput.
+   *
+   * <p>This transform manages the balance between connection count (resource 
overhead) and
+   * processing parallelism by distributing data across {@code numShards} 
buckets:
+   *
+   * <ul>
+   *   <li><b>Data Affinity:</b> By using a composite key {@code KV<DestT, 
Integer>}, this transform
+   *       ensures that all records for a specific destination (table) are 
grouped into specific
+   *       shard buckets. This allows downstream transforms to maintain stable 
{@code
+   *       StreamConnection} sessions for each destination, minimizing 
connection thrashing.
+   *   <li><b>Parallel Throughput:</b> By appending a pseudo-random integer 
shard index, this
+   *       transform allows the runner to distribute the records for a single 
destination across up
+   *       to {@code numShards} parallel streams, parallelizing the write 
throughput of "hot"
+   *       (high-volume) destinations.
+   *   <li><b>Concurrency Scaling:</b> The {@code numShards} parameter acts as 
the parallelism
+   *       multiplier per destination. The total potential concurrency across 
the pipeline is {@code
+   *       numShards * total_destinations}, allowing users to scale write 
throughput by increasing
+   *       {@code numShards} for bottlenecked tables.
+   * </ul>
+   *
+   * <p>The output structure is {@code KV<KV<DestT, Integer>, KV<DestT, 
Payload>>}. Downstream,
+   * {@link Redistribute#byKey()} uses this composite key to partition the 
data, ensuring the runner
+   * effectively balances load while respecting the per-destination 
parallelism limits configured
+   * here.
+   */
+  private static class AddShardKeyFn<DestT, ElemT>
+      extends DoFn<
+          KV<DestT, StorageApiWritePayload>,
+          KV<KV<DestT, Integer>, KV<DestT, StorageApiWritePayload>>> {
+    private final int shardBound;
+
+    public AddShardKeyFn(int numShards) {
+      this.shardBound = Math.max(1, numShards);
+    }
+
+    @ProcessElement
+    public void processElement(
+        @Element KV<DestT, StorageApiWritePayload> element,
+        OutputReceiver<KV<KV<DestT, Integer>, KV<DestT, 
StorageApiWritePayload>>> outputReceiver) {
+      int shard = ThreadLocalRandom.current().nextInt(shardBound);

Review Comment:
   This is somewhat expensive. Usually what we do is initialize in @Setup and 
then just increment in processElement (e.g. see many other examples, including 
in this file)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to