[
https://issues.apache.org/jira/browse/BEAM-7819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Anonymous updated BEAM-7819:
----------------------------
Status: Triage Needed (was: Resolved)
> PubsubMessage message parsing is lacking non-attribute fields
> -------------------------------------------------------------
>
> Key: BEAM-7819
> URL: https://issues.apache.org/jira/browse/BEAM-7819
> Project: Beam
> Issue Type: Bug
> Components: io-py-gcp
> Reporter: Ahmet Altay
> Assignee: N. M.
> Priority: P3
> Fix For: 2.31.0
>
> Time Spent: 19h 10m
> Remaining Estimate: 0h
>
> User reported issue:
> https://lists.apache.org/thread.html/139b0c15abc6471a2e2202d76d915c645a529a23ecc32cd9cfecd315@%3Cuser.beam.apache.org%3E
> """
> Looking at the source code, with my untrained python eyes, I think if the
> intention is to include the message id and the publish time in the attributes
> attribute of the PubSubMessage type, then the protobuf mapping is missing
> something:-
> @staticmethod
> def _from_proto_str(proto_msg):
> """Construct from serialized form of ``PubsubMessage``.
> Args:
> proto_msg: String containing a serialized protobuf of type
> https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage
> Returns:
> A new PubsubMessage object.
> """
> msg = pubsub.types.pubsub_pb2.PubsubMessage()
> msg.ParseFromString(proto_msg)
> # Convert ScalarMapContainer to dict.
> attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
> return PubsubMessage(msg.data, attributes)
> The protobuf definition is here:-
> https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage
> and so it looks as if the message_id and publish_time are not being parsed as
> they are seperate from the attributes. Perhaps the PubsubMessage class needs
> expanding to include these as attributes, or they would need adding to the
> dictionary for attributes. This would only need doing for the _from_proto_str
> as obviously they would not need to be populated when transmitting a message
> to PubSub.
> My python is not great, I'm assuming the latter option would need to look
> something like this?
> attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
> attributes.update({'message_id': msg.message_id, 'publish_time':
> msg.publish_time})
> return PubsubMessage(msg.data, attributes)
> """
--
This message was sent by Atlassian Jira
(v8.20.10#820010)