sjvanrossum commented on code in PR #31608:
URL: https://github.com/apache/beam/pull/31608#discussion_r1779658971
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java:
##########
@@ -201,8 +206,15 @@ public void processElement(
break;
}
+ // TODO(sjvanrossum): https://github.com/apache/beam/issues/31828
+ // NOTE: Null and empty ordering keys are treated as equivalent.
@Nullable String topic = dynamicTopicFn.apply(element);
- K key =
keyFunction.apply(ThreadLocalRandom.current().nextInt(numShards), topic);
+ @Nullable String orderingKey = message.getOrderingKey();
+ int shard =
+ Strings.isNullOrEmpty(orderingKey)
+ ? ThreadLocalRandom.current().nextInt(numShards)
+ : Hashing.murmur3_32_fixed().hashString(orderingKey,
StandardCharsets.UTF_8).asInt();
Review Comment:
@ahmedabu98 a deterministic shard number based on the ordering key ensures
that the writers only operate on distinct key ranges, thus avoiding the
creation multiple buffers per ordering key with potentially a small number of
batched elements per bundle. Volume skews across keys is as much a problem for
Beam as it is for Pub/Sub and does not fit well with the feature since
throughput is capped at 1MB/s per ordering key.
One thing that does concern me is ending up with a skewed distribution of
keys across buckets, but that could be fixed by tweaking the bucketing
algorithm.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]