tvalentyn commented on code in PR #37345:
URL: https://github.com/apache/beam/pull/37345#discussion_r3326597895
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java:
##########
@@ -85,6 +90,9 @@ public PTransform<PCollection<byte[]>, PDone>
buildExternal(Configuration config
if (config.timestampAttribute != null) {
writeBuilder.setTimestampAttribute(config.timestampAttribute);
}
+ if (config.publishWithOrderingKey) {
+ writeBuilder.setPublishWithOrderingKey(true);
+ }
Review Comment:
+1
##########
sdks/python/apache_beam/io/gcp/pubsub.py:
##########
@@ -647,21 +660,29 @@ def _flush(self):
if not self._buffer:
return
- import time
-
# The elements in buffer are serialized protobuf bytes from the previous
# transforms. We need to deserialize them to extract data and attributes.
futures = []
for elem in self._buffer:
# Deserialize the protobuf to get the original PubsubMessage
pubsub_msg = PubsubMessage._from_proto_str(elem)
- # Publish with the correct data and attributes
+ # Publish with the correct data, attributes, and ordering_key
if self.with_attributes and pubsub_msg.attributes:
future = self._pub_client.publish(
- self._topic, pubsub_msg.data, **pubsub_msg.attributes)
+ self._topic,
+ pubsub_msg.data,
+ ordering_key=pubsub_msg.ordering_key
+ if pubsub_msg.ordering_key else '',
+ **pubsub_msg.attributes)
else:
- future = self._pub_client.publish(self._topic, pubsub_msg.data)
+ if pubsub_msg.ordering_key:
+ future = self._pub_client.publish(
+ self._topic,
+ pubsub_msg.data,
+ ordering_key=pubsub_msg.ordering_key)
+ else:
+ future = self._pub_client.publish(self._topic, pubsub_msg.data)
Review Comment:
+1
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]