scwhittle commented on code in PR #31608:
URL: https://github.com/apache/beam/pull/31608#discussion_r1776850727


##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java:
##########
@@ -2033,6 +2034,14 @@ private static void translate(
         PubsubUnboundedSink overriddenTransform,
         StepTranslationContext stepContext,
         PCollection input) {
+      if (overriddenTransform.getPublishBatchWithOrderingKey()) {
+        throw new UnsupportedOperationException(
+            String.format(
+                "The DataflowRunner does not currently support publishing to 
Pubsub with ordering keys. "
+                    + "%s is required to support publishing with ordering 
keys. "
+                    + "Set the pipeline option --experiments=%s to use this 
PTransform.",

Review Comment:
   I foudn an existing public issue can you add
   "See https://issuetracker.google.com/issues/200955424 for current status."



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java:
##########
@@ -1382,6 +1384,8 @@ abstract Builder<T> setFormatFn(
 
       abstract Builder<T> setPubsubRootUrl(String pubsubRootUrl);
 
+      abstract Builder<T> setNeedsOrderingKey(boolean needsOrderingKey);

Review Comment:
   needs still seems confusing as verb to me for output. For input it makes 
since because the consumer requested ordering id and thus we need to send it.  
But for output "needs" sounds like it requires the incoming messages to have an 
ordering id set which isn't true.
   
   Can we change it to setSupportsOrderingKey or setPublishWithOrderingKey?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java:
##########
@@ -201,8 +206,15 @@ public void processElement(
           break;
       }
 
+      // TODO(sjvanrossum): https://github.com/apache/beam/issues/31828
+      // NOTE: Null and empty ordering keys are treated as equivalent.
       @Nullable String topic = dynamicTopicFn.apply(element);
-      K key = 
keyFunction.apply(ThreadLocalRandom.current().nextInt(numShards), topic);
+      @Nullable String orderingKey = message.getOrderingKey();
+      int shard =
+          Strings.isNullOrEmpty(orderingKey)

Review Comment:
   can we use the new logic only if sink is configured to care about ordering 
keys to avoid changing the batching for existing cases where key is set?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java:
##########
@@ -201,8 +206,15 @@ public void processElement(
           break;
       }
 
+      // TODO(sjvanrossum): https://github.com/apache/beam/issues/31828
+      // NOTE: Null and empty ordering keys are treated as equivalent.
       @Nullable String topic = dynamicTopicFn.apply(element);
-      K key = 
keyFunction.apply(ThreadLocalRandom.current().nextInt(numShards), topic);
+      @Nullable String orderingKey = message.getOrderingKey();
+      int shard =
+          Strings.isNullOrEmpty(orderingKey)
+              ? ThreadLocalRandom.current().nextInt(numShards)
+              : Hashing.murmur3_32_fixed().hashString(orderingKey, 
StandardCharsets.UTF_8).asInt();

Review Comment:
   in general using a huge # of keys hurts performance. It seems like it would 
be better to still limit to numShards but be deterministic shard. The user can 
still for now increase the numShards very high if needed for performance.
   
   If we need more publishing paralellism that seems like it should be done 
below by just adding publishBatch to scheduled executor and then joining, not 
via key parallelism.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to