scwhittle commented on code in PR #31608:
URL: https://github.com/apache/beam/pull/31608#discussion_r1776850727
##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java:
##########
@@ -2033,6 +2034,14 @@ private static void translate(
PubsubUnboundedSink overriddenTransform,
StepTranslationContext stepContext,
PCollection input) {
+ if (overriddenTransform.getPublishBatchWithOrderingKey()) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "The DataflowRunner does not currently support publishing to
Pubsub with ordering keys. "
+ + "%s is required to support publishing with ordering
keys. "
+ + "Set the pipeline option --experiments=%s to use this
PTransform.",
Review Comment:
I foudn an existing public issue can you add
"See https://issuetracker.google.com/issues/200955424 for current status."
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java:
##########
@@ -1382,6 +1384,8 @@ abstract Builder<T> setFormatFn(
abstract Builder<T> setPubsubRootUrl(String pubsubRootUrl);
+ abstract Builder<T> setNeedsOrderingKey(boolean needsOrderingKey);
Review Comment:
needs still seems confusing as verb to me for output. For input it makes
since because the consumer requested ordering id and thus we need to send it.
But for output "needs" sounds like it requires the incoming messages to have an
ordering id set which isn't true.
Can we change it to setSupportsOrderingKey or setPublishWithOrderingKey?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java:
##########
@@ -201,8 +206,15 @@ public void processElement(
break;
}
+ // TODO(sjvanrossum): https://github.com/apache/beam/issues/31828
+ // NOTE: Null and empty ordering keys are treated as equivalent.
@Nullable String topic = dynamicTopicFn.apply(element);
- K key =
keyFunction.apply(ThreadLocalRandom.current().nextInt(numShards), topic);
+ @Nullable String orderingKey = message.getOrderingKey();
+ int shard =
+ Strings.isNullOrEmpty(orderingKey)
Review Comment:
can we use the new logic only if sink is configured to care about ordering
keys to avoid changing the batching for existing cases where key is set?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java:
##########
@@ -201,8 +206,15 @@ public void processElement(
break;
}
+ // TODO(sjvanrossum): https://github.com/apache/beam/issues/31828
+ // NOTE: Null and empty ordering keys are treated as equivalent.
@Nullable String topic = dynamicTopicFn.apply(element);
- K key =
keyFunction.apply(ThreadLocalRandom.current().nextInt(numShards), topic);
+ @Nullable String orderingKey = message.getOrderingKey();
+ int shard =
+ Strings.isNullOrEmpty(orderingKey)
+ ? ThreadLocalRandom.current().nextInt(numShards)
+ : Hashing.murmur3_32_fixed().hashString(orderingKey,
StandardCharsets.UTF_8).asInt();
Review Comment:
in general using a huge # of keys hurts performance. It seems like it would
be better to still limit to numShards but be deterministic shard. The user can
still for now increase the numShards very high if needed for performance.
If we need more publishing paralellism that seems like it should be done
below by just adding publishBatch to scheduled executor and then joining, not
via key parallelism.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]