Ahmet Altay created BEAM-7819:
---------------------------------

             Summary: PubsubMessage message parsing is lacking non-attribute 
fields
                 Key: BEAM-7819
                 URL: https://issues.apache.org/jira/browse/BEAM-7819
             Project: Beam
          Issue Type: Bug
          Components: io-python-gcp
            Reporter: Ahmet Altay
            Assignee: Udi Meiri


User reported issue: 
https://lists.apache.org/thread.html/139b0c15abc6471a2e2202d76d915c645a529a23ecc32cd9cfecd315@%3Cuser.beam.apache.org%3E

"""
Looking at the source code, with my untrained python eyes, I think if the 
intention is to include the message id and the publish time in the attributes 
attribute of the PubSubMessage type, then the protobuf mapping is missing 
something:-


@staticmethod
def _from_proto_str(proto_msg):
"""Construct from serialized form of ``PubsubMessage``.

Args:
proto_msg: String containing a serialized protobuf of type
https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage

Returns:
A new PubsubMessage object.
"""
msg = pubsub.types.pubsub_pb2.PubsubMessage()
msg.ParseFromString(proto_msg)
# Convert ScalarMapContainer to dict.
attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
return PubsubMessage(msg.data, attributes)

The protobuf definition is here:-

https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage

and so it looks as if the message_id and publish_time are not being parsed as 
they are seperate from the attributes. Perhaps the PubsubMessage class needs 
expanding to include these as attributes, or they would need adding to the 
dictionary for attributes. This would only need doing for the _from_proto_str 
as obviously they would not need to be populated when transmitting a message to 
PubSub.

My python is not great, I'm assuming the latter option would need to look 
something like this?

attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
attributes.update({'message_id': msg.message_id, 'publish_time': 
msg.publish_time})
return PubsubMessage(msg.data, attributes)
"""



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to