sjvanrossum commented on code in PR #31608: URL: https://github.com/apache/beam/pull/31608#discussion_r1779658971
########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java: ########## @@ -201,8 +206,15 @@ public void processElement( break; } + // TODO(sjvanrossum): https://github.com/apache/beam/issues/31828 + // NOTE: Null and empty ordering keys are treated as equivalent. @Nullable String topic = dynamicTopicFn.apply(element); - K key = keyFunction.apply(ThreadLocalRandom.current().nextInt(numShards), topic); + @Nullable String orderingKey = message.getOrderingKey(); + int shard = + Strings.isNullOrEmpty(orderingKey) + ? ThreadLocalRandom.current().nextInt(numShards) + : Hashing.murmur3_32_fixed().hashString(orderingKey, StandardCharsets.UTF_8).asInt(); Review Comment: @ahmedabu98 a deterministic shard number based on the ordering key ensures that the writers only operate on distinct key ranges, thus avoiding the creation of multiple buffers per ordering key with a potentially small number of batched elements per bundle. Volume skews across keys is as much a problem for Beam as it is for Pub/Sub and does not fit well with the feature since throughput is capped at 1MB/s per ordering key. One thing that does concern me is ending up with a skewed distribution of keys across buckets, but that could be fixed by tweaking the hashing or bucketing algorithm. The suggestion below switches the hashing algorithm to 64-bit FarmHash since `consistentHash` will pad or shrink the provided hash code to `long`. ```suggestion : Hashing.consistentHash( Hashing.farmHashFingerprint64().hashString(orderingKey, StandardCharsets.UTF_8), numShards); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org