This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 126f278505b Optimize proto PubsubMessage to/from conversion to beam 
PubsubMessage (#32973)
126f278505b is described below

commit 126f278505b339c32e403fbcb8b9cb34bfaea81e
Author: Sam Whittle <[email protected]>
AuthorDate: Mon Nov 11 22:00:49 2024 +0100

    Optimize proto PubsubMessage to/from conversion to beam PubsubMessage 
(#32973)
---
 sdks/python/apache_beam/io/gcp/pubsub.py | 20 ++++++++++++--------
 1 file changed, 12 insertions(+), 8 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py 
b/sdks/python/apache_beam/io/gcp/pubsub.py
index b6f801c63f7..9e006dbeda9 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -126,7 +126,7 @@ class PubsubMessage(object):
     """
     msg = pubsub.types.PubsubMessage.deserialize(proto_msg)
     # Convert ScalarMapContainer to dict.
-    attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
+    attributes = dict(msg.attributes)
     return PubsubMessage(
         msg.data,
         attributes,
@@ -151,10 +151,8 @@ class PubsubMessage(object):
       
https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage
       containing the payload of this object.
     """
-    msg = pubsub.types.PubsubMessage()
     if len(self.data) > (10_000_000):
       raise ValueError('A pubsub message data field must not exceed 10MB')
-    msg.data = self.data
 
     if self.attributes:
       if len(self.attributes) > 100:
@@ -167,19 +165,25 @@ class PubsubMessage(object):
         if len(value) > 1024:
           raise ValueError(
               'A pubsub message attribute value must not exceed 1024 bytes')
-        msg.attributes[key] = value
 
+    message_id = None
+    publish_time = None
     if not for_publish:
       if self.message_id:
-        msg.message_id = self.message_id
+        message_id = self.message_id
         if self.publish_time:
-          msg.publish_time = self.publish_time
+          publish_time = self.publish_time
 
     if len(self.ordering_key) > 1024:
       raise ValueError(
           'A pubsub message ordering key must not exceed 1024 bytes.')
-    msg.ordering_key = self.ordering_key
 
+    msg = pubsub.types.PubsubMessage(
+        data=self.data,
+        attributes=self.attributes,
+        message_id=message_id,
+        publish_time=publish_time,
+        ordering_key=self.ordering_key)
     serialized = pubsub.types.PubsubMessage.serialize(msg)
     if len(serialized) > (10_000_000):
       raise ValueError(
@@ -193,7 +197,7 @@ class PubsubMessage(object):
     
https://googleapis.github.io/google-cloud-python/latest/pubsub/subscriber/api/message.html
     """
     # Convert ScalarMapContainer to dict.
-    attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
+    attributes = dict(msg.attributes)
     pubsubmessage = PubsubMessage(msg.data, attributes)
     if msg.message_id:
       pubsubmessage.message_id = msg.message_id

Reply via email to