nikitagrover19 commented on code in PR #37345:
URL: https://github.com/apache/beam/pull/37345#discussion_r3004736332
##########
sdks/python/apache_beam/io/external/gcp/pubsub.py:
##########
@@ -150,18 +152,24 @@ def __init__(
in a ReadFromPubSub PTransform to deduplicate messages.
timestamp_attribute: If set, will set an attribute for each Cloud Pub/Sub
message with the given name and the message's publish time as the
value.
+ publish_with_ordering_key: If True, enables ordering key support when
+ publishing messages. The ordering key must be set on each
+ PubsubMessage via the ``ordering_key`` attribute. Requires
+ messages to be routed to the same region.
"""
self.params = WriteToPubsubSchema(
topic=topic,
id_label=id_label,
# with_attributes=with_attributes,
- timestamp_attribute=timestamp_attribute)
+ timestamp_attribute=timestamp_attribute,
+ publish_with_ordering_key=publish_with_ordering_key)
self.expansion_service = expansion_service
self.with_attributes = with_attributes
def expand(self, pvalue):
if self.with_attributes:
- pcoll = pvalue | 'ToProto' >> Map(pubsub.WriteToPubSub.to_proto_str)
+ pcoll = pvalue | 'ToProto' >> Map(
Review Comment:
Yes, `message_to_proto_str` is the right choice - it has proper type
checking. The original `to_proto_str `reference was a pre-existing bug (the
method doesn't exist on `WriteToPubSub`), which is why `with_attributes=True`
was broken before this PR. Will switch to `message_to_proto_str`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]