This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 126f278505b Optimize proto PubsubMessage to/from conversion to beam
PubsubMessage (#32973)
126f278505b is described below
commit 126f278505b339c32e403fbcb8b9cb34bfaea81e
Author: Sam Whittle <[email protected]>
AuthorDate: Mon Nov 11 22:00:49 2024 +0100
Optimize proto PubsubMessage to/from conversion to beam PubsubMessage
(#32973)
---
sdks/python/apache_beam/io/gcp/pubsub.py | 20 ++++++++++++--------
1 file changed, 12 insertions(+), 8 deletions(-)
diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py
b/sdks/python/apache_beam/io/gcp/pubsub.py
index b6f801c63f7..9e006dbeda9 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -126,7 +126,7 @@ class PubsubMessage(object):
"""
msg = pubsub.types.PubsubMessage.deserialize(proto_msg)
# Convert ScalarMapContainer to dict.
- attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
+ attributes = dict(msg.attributes)
return PubsubMessage(
msg.data,
attributes,
@@ -151,10 +151,8 @@ class PubsubMessage(object):
https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage
containing the payload of this object.
"""
- msg = pubsub.types.PubsubMessage()
if len(self.data) > (10_000_000):
raise ValueError('A pubsub message data field must not exceed 10MB')
- msg.data = self.data
if self.attributes:
if len(self.attributes) > 100:
@@ -167,19 +165,25 @@ class PubsubMessage(object):
if len(value) > 1024:
raise ValueError(
'A pubsub message attribute value must not exceed 1024 bytes')
- msg.attributes[key] = value
+ message_id = None
+ publish_time = None
if not for_publish:
if self.message_id:
- msg.message_id = self.message_id
+ message_id = self.message_id
if self.publish_time:
- msg.publish_time = self.publish_time
+ publish_time = self.publish_time
if len(self.ordering_key) > 1024:
raise ValueError(
'A pubsub message ordering key must not exceed 1024 bytes.')
- msg.ordering_key = self.ordering_key
+ msg = pubsub.types.PubsubMessage(
+ data=self.data,
+ attributes=self.attributes,
+ message_id=message_id,
+ publish_time=publish_time,
+ ordering_key=self.ordering_key)
serialized = pubsub.types.PubsubMessage.serialize(msg)
if len(serialized) > (10_000_000):
raise ValueError(
@@ -193,7 +197,7 @@ class PubsubMessage(object):
https://googleapis.github.io/google-cloud-python/latest/pubsub/subscriber/api/message.html
"""
# Convert ScalarMapContainer to dict.
- attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
+ attributes = dict(msg.attributes)
pubsubmessage = PubsubMessage(msg.data, attributes)
if msg.message_id:
pubsubmessage.message_id = msg.message_id