reuvenlax commented on code in PR #38783:
URL: https://github.com/apache/beam/pull/38783#discussion_r3369594241
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java:
##########
@@ -457,6 +466,52 @@ private void addErrorCollections(
}
}
+ /**
+ * A {@link DoFn} that applies a composite sharding key to incoming records
to optimize BigQuery
+ * Storage API throughput.
+ *
+ * <p>This transform manages the balance between connection count (resource
overhead) and
+ * processing parallelism by distributing data across {@code numShards}
buckets:
+ *
+ * <ul>
+ * <li><b>Data Affinity:</b> By using a composite key {@code KV<DestT,
Integer>}, this transform
+ * ensures that all records for a specific destination (table) are
grouped into specific
+ * shard buckets. This allows downstream transforms to maintain stable
{@code
+ * StreamConnection} sessions for each destination, minimizing
connection thrashing.
+ * <li><b>Parallel Throughput:</b> By appending a pseudo-random integer
shard index, this
+ * transform allows the runner to distribute the records for a single
destination across up
+ * to {@code numShards} parallel streams, parallelizing the write
throughput of "hot"
+ * (high-volume) destinations.
+ * <li><b>Concurrency Scaling:</b> The {@code numShards} parameter acts as
the parallelism
+ * multiplier per destination. The total potential concurrency across
the pipeline is {@code
+ * numShards * total_destinations}, allowing users to scale write
throughput by increasing
+ * {@code numShards} for bottlenecked tables.
+ * </ul>
+ *
+ * <p>The output structure is {@code KV<KV<DestT, Integer>, KV<DestT,
Payload>>}. Downstream,
+ * {@link Redistribute#byKey()} uses this composite key to partition the
data, ensuring the runner
+ * effectively balances load while respecting the per-destination
parallelism limits configured
+ * here.
+ */
+ private static class AddShardKeyFn<DestT, ElemT>
+ extends DoFn<
+ KV<DestT, StorageApiWritePayload>,
+ KV<KV<DestT, Integer>, KV<DestT, StorageApiWritePayload>>> {
+ private final int shardBound;
+
+ public AddShardKeyFn(int numShards) {
+ this.shardBound = Math.max(1, numShards);
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element KV<DestT, StorageApiWritePayload> element,
+ OutputReceiver<KV<KV<DestT, Integer>, KV<DestT,
StorageApiWritePayload>>> outputReceiver) {
+ int shard = ThreadLocalRandom.current().nextInt(shardBound);
Review Comment:
This is somewhat expensive. Usually what we do is initialize in @Setup and
then just increment in processElement (e.g. see many other examples, including
in this file)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]