[ 
https://issues.apache.org/jira/browse/BEAM-3744?focusedWorklogId=82967&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82967
 ]

ASF GitHub Bot logged work on BEAM-3744:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/Mar/18 22:26
            Start Date: 21/Mar/18 22:26
    Worklog Time Spent: 10m 
      Work Description: chamikaramj commented on a change in pull request 
#4901: [BEAM-3744] Expand Pubsub read API for Python.
URL: https://github.com/apache/beam/pull/4901#discussion_r176257008
 
 

 ##########
 File path: sdks/python/apache_beam/runners/direct/transform_evaluator.py
 ##########
 @@ -428,24 +428,41 @@ def _read_from_pubsub(self):
         self._subscription, return_immediately=True,
         max_messages=10) as results:
       def _get_element(message):
-        if self.source.with_attributes:
-          return PubsubMessage._from_message(message)
+        parsed_message = PubsubMessage._from_message(message)
+        if timestamp_attribute:
+          try:
+            rfc3339_or_milli = parsed_message.attributes[timestamp_attribute]
+          except KeyError:
+            raise KeyError('Timestamp attribute not found: %s' %
+                           self.source.timestamp_attribute)
+          try:
+            timestamp = Timestamp.from_rfc3339(rfc3339_or_milli)
+          except ValueError:
+            try:
+              timestamp = Timestamp(micros=int(rfc3339_or_milli) * 1000)
+            except ValueError:
+              raise ValueError('Invalid timestamp value: %s', rfc3339_or_milli)
 
 Review comment:
   Raise/preserve original exception ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 82967)

> Support full PubsubMessages
> ---------------------------
>
>                 Key: BEAM-3744
>                 URL: https://issues.apache.org/jira/browse/BEAM-3744
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Udi Meiri
>            Assignee: Udi Meiri
>            Priority: Critical
>          Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> Tracking changes to Pubsub support in Python SDK.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to