[
https://issues.apache.org/jira/browse/BEAM-7819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16923646#comment-16923646
]
Matthew Darwin commented on BEAM-7819:
--------------------------------------
Struggling a little with running the integration test locally; does it need
both a the tarball building and the worker_jar?
./scripts/run_integration_test.sh --project [my-project] --gcs_location
gs://[my-project]/tmp --test_opts
--tests=apache_beam.io.gcp.pubsub_integration_test --runner TestDataflowRunner
--sdk_location ./dist/apache-beam-2.16.0.dev0.tar.gz
{{AssertionError: }}
{{Expected: (Test pipeline expected terminated in state: RUNNING and Expected 4
messages.)}}
{{ but: Test pipeline expected terminated in state: RUNNING Test pipeline job
terminated in state: FAILED}}
In addition, for the pubsub integration test, given that pubsub will generate
the message_id and publish_time, I'm not sure how exactly to handle that in the
expected messages?
> PubsubMessage message parsing is lacking non-attribute fields
> -------------------------------------------------------------
>
> Key: BEAM-7819
> URL: https://issues.apache.org/jira/browse/BEAM-7819
> Project: Beam
> Issue Type: Bug
> Components: io-py-gcp
> Reporter: Ahmet Altay
> Assignee: Udi Meiri
> Priority: Major
> Time Spent: 9h 40m
> Remaining Estimate: 0h
>
> User reported issue:
> https://lists.apache.org/thread.html/139b0c15abc6471a2e2202d76d915c645a529a23ecc32cd9cfecd315@%3Cuser.beam.apache.org%3E
> """
> Looking at the source code, with my untrained python eyes, I think if the
> intention is to include the message id and the publish time in the attributes
> attribute of the PubSubMessage type, then the protobuf mapping is missing
> something:-
> @staticmethod
> def _from_proto_str(proto_msg):
> """Construct from serialized form of ``PubsubMessage``.
> Args:
> proto_msg: String containing a serialized protobuf of type
> https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage
> Returns:
> A new PubsubMessage object.
> """
> msg = pubsub.types.pubsub_pb2.PubsubMessage()
> msg.ParseFromString(proto_msg)
> # Convert ScalarMapContainer to dict.
> attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
> return PubsubMessage(msg.data, attributes)
> The protobuf definition is here:-
> https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage
> and so it looks as if the message_id and publish_time are not being parsed as
> they are seperate from the attributes. Perhaps the PubsubMessage class needs
> expanding to include these as attributes, or they would need adding to the
> dictionary for attributes. This would only need doing for the _from_proto_str
> as obviously they would not need to be populated when transmitting a message
> to PubSub.
> My python is not great, I'm assuming the latter option would need to look
> something like this?
> attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
> attributes.update({'message_id': msg.message_id, 'publish_time':
> msg.publish_time})
> return PubsubMessage(msg.data, attributes)
> """
--
This message was sent by Atlassian Jira
(v8.3.2#803003)