[
https://issues.apache.org/jira/browse/BEAM-3744?focusedWorklogId=82549&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82549
]
ASF GitHub Bot logged work on BEAM-3744:
----------------------------------------
Author: ASF GitHub Bot
Created on: 20/Mar/18 23:29
Start Date: 20/Mar/18 23:29
Worklog Time Spent: 10m
Work Description: udim commented on a change in pull request #4901:
[BEAM-3744] Expand Pubsub read API for Python.
URL: https://github.com/apache/beam/pull/4901#discussion_r175953760
##########
File path: sdks/python/apache_beam/io/gcp/pubsub.py
##########
@@ -118,97 +132,78 @@ class ReadMessagesFromPubSub(PTransform):
Outputs elements of type :class:`~PubsubMessage`.
"""
- def __init__(self, topic=None, subscription=None, id_label=None):
+ def __init__(self, topic=None, subscription=None, id_label=None,
+ timestamp_attribute=None):
"""Initializes ``ReadMessagesFromPubSub``.
Args:
- topic: Cloud Pub/Sub topic in the form "projects/<project>/topics/
- <topic>". If provided, subscription must be None.
- subscription: Existing Cloud Pub/Sub subscription to use in the
- form "projects/<project>/subscriptions/<subscription>". If not
- specified, a temporary subscription will be created from the specified
- topic. If provided, topic must be None.
- id_label: The attribute on incoming Pub/Sub messages to use as a unique
- record identifier. When specified, the value of this attribute (which
- can be any string that uniquely identifies the record) will be used for
- deduplication of messages. If not provided, we cannot guarantee
- that no duplicate data will be delivered on the Pub/Sub stream. In this
- case, deduplication of the stream will be strictly best effort.
"""
super(ReadMessagesFromPubSub, self).__init__()
self.topic = topic
self.subscription = subscription
self.id_label = id_label
+ self.timestamp_attribute = timestamp_attribute
def get_windowing(self, unused_inputs):
return core.Windowing(window.GlobalWindows())
def expand(self, pcoll):
p = (pcoll.pipeline
| _ReadFromPubSub(self.topic, self.subscription, self.id_label,
- with_attributes=True))
+ with_attributes=True,
+ timestamp_attribute=self.timestamp_attribute))
return p
-class ReadStringsFromPubSub(PTransform):
- """A ``PTransform`` for reading utf-8 string payloads from Cloud Pub/Sub.
+class ReadPayloadsFromPubSub(PTransform):
+ """A ``PTransform`` for reading raw payloads from Cloud Pub/Sub.
- Outputs elements of type ``unicode``, decoded from UTF-8.
+ Outputs elements of type ``str``.
"""
- def __init__(self, topic=None, subscription=None, id_label=None):
- """Initializes ``ReadStringsFromPubSub``.
-
- Args:
- topic: Cloud Pub/Sub topic in the form "projects/<project>/topics/
- <topic>". If provided, subscription must be None.
- subscription: Existing Cloud Pub/Sub subscription to use in the
- form "projects/<project>/subscriptions/<subscription>". If not
- specified, a temporary subscription will be created from the specified
- topic. If provided, topic must be None.
- id_label: The attribute on incoming Pub/Sub messages to use as a unique
- record identifier. When specified, the value of this attribute (which
- can be any string that uniquely identifies the record) will be used for
- deduplication of messages. If not provided, we cannot guarantee
- that no duplicate data will be delivered on the Pub/Sub stream. In this
- case, deduplication of the stream will be strictly best effort.
- """
- super(ReadStringsFromPubSub, self).__init__()
+ def __init__(self, topic=None, subscription=None, id_label=None,
+ timestamp_attribute=None):
+ super(ReadPayloadsFromPubSub, self).__init__()
self.topic = topic
self.subscription = subscription
self.id_label = id_label
+ self.timestamp_attribute = timestamp_attribute
def get_windowing(self, unused_inputs):
return core.Windowing(window.GlobalWindows())
def expand(self, pcoll):
p = (pcoll.pipeline
| _ReadFromPubSub(self.topic, self.subscription, self.id_label,
- with_attributes=False)
+ with_attributes=False,
+ timestamp_attribute=self.timestamp_attribute))
+ return p
+
+
+class ReadStringsFromPubSub(ReadPayloadsFromPubSub):
Review comment:
Please see my comment above.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 82549)
Time Spent: 4.5h (was: 4h 20m)
> Support full PubsubMessages
> ---------------------------
>
> Key: BEAM-3744
> URL: https://issues.apache.org/jira/browse/BEAM-3744
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Reporter: Udi Meiri
> Assignee: Udi Meiri
> Priority: Critical
> Time Spent: 4.5h
> Remaining Estimate: 0h
>
> Tracking changes to Pubsub support in Python SDK.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)