tvalentyn commented on code in PR #37345:
URL: https://github.com/apache/beam/pull/37345#discussion_r2996712381
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java:
##########
@@ -1727,7 +1727,10 @@ public void startBundle(StartBundleContext c) throws
IOException {
this.pubsubClient =
getPubsubClientFactory()
.newClient(
- getTimestampAttribute(), null,
c.getPipelineOptions().as(PubsubOptions.class));
+ getTimestampAttribute(),
+ null,
+ c.getPipelineOptions().as(PubsubOptions.class),
+ Write.this.getPubsubRootUrl());
Review Comment:
for my education, why was this necessary?
##########
sdks/python/apache_beam/io/external/gcp/pubsub.py:
##########
@@ -150,18 +152,24 @@ def __init__(
in a ReadFromPubSub PTransform to deduplicate messages.
timestamp_attribute: If set, will set an attribute for each Cloud Pub/Sub
message with the given name and the message's publish time as the
value.
+ publish_with_ordering_key: If True, enables ordering key support when
+ publishing messages. The ordering key must be set on each
+ PubsubMessage via the ``ordering_key`` attribute. Requires
+ messages to be routed to the same region.
"""
self.params = WriteToPubsubSchema(
topic=topic,
id_label=id_label,
# with_attributes=with_attributes,
- timestamp_attribute=timestamp_attribute)
+ timestamp_attribute=timestamp_attribute,
+ publish_with_ordering_key=publish_with_ordering_key)
self.expansion_service = expansion_service
self.with_attributes = with_attributes
def expand(self, pvalue):
if self.with_attributes:
- pcoll = pvalue | 'ToProto' >> Map(pubsub.WriteToPubSub.to_proto_str)
+ pcoll = pvalue | 'ToProto' >> Map(
Review Comment:
Re: `Map(pubsub.WriteToPubSub.to_proto_str)` - was this a typo that was
meant to be `Map(pubsub.WriteToPubSub.message_to_proto_str)` ? The latter seems
to have some type checking which probably wouldn't hurt.
##########
sdks/python/apache_beam/io/external/gcp/pubsub.py:
##########
@@ -150,18 +152,24 @@ def __init__(
in a ReadFromPubSub PTransform to deduplicate messages.
timestamp_attribute: If set, will set an attribute for each Cloud Pub/Sub
message with the given name and the message's publish time as the
value.
+ publish_with_ordering_key: If True, enables ordering key support when
+ publishing messages. The ordering key must be set on each
+ PubsubMessage via the ``ordering_key`` attribute. Requires
Review Comment:
> Requires
messages to be routed to the same region.
What does this mean?
##########
sdks/python/apache_beam/io/gcp/pubsub_integration_test.py:
##########
@@ -305,6 +305,51 @@ def test_batch_write_with_attributes(self):
"""Test WriteToPubSub in batch mode with attributes."""
self._test_batch_write(with_attributes=True)
+ @pytest.mark.it_postcommit
Review Comment:
this will work only in direct runner, but not in dataflow runner - correct?
##########
sdks/python/apache_beam/io/gcp/pubsub_integration_test.py:
##########
@@ -305,6 +305,51 @@ def test_batch_write_with_attributes(self):
"""Test WriteToPubSub in batch mode with attributes."""
self._test_batch_write(with_attributes=True)
+ @pytest.mark.it_postcommit
Review Comment:
also wondering, if we can have a warning for Dataflow that that advises
users to use xlang version if they wish to use ordering key. IIRC we have
something like that in Java SDK?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]