[
https://issues.apache.org/jira/browse/BEAM-3744?focusedWorklogId=85064&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85064
]
ASF GitHub Bot logged work on BEAM-3744:
----------------------------------------
Author: ASF GitHub Bot
Created on: 27/Mar/18 22:43
Start Date: 27/Mar/18 22:43
Worklog Time Spent: 10m
Work Description: chamikaramj closed pull request #4901: [BEAM-3744]
Expand Pubsub read API for Python.
URL: https://github.com/apache/beam/pull/4901
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/python/apache_beam/examples/windowed_wordcount.py
b/sdks/python/apache_beam/examples/windowed_wordcount.py
index 3838408fd2b..987b66040a7 100644
--- a/sdks/python/apache_beam/examples/windowed_wordcount.py
+++ b/sdks/python/apache_beam/examples/windowed_wordcount.py
@@ -67,10 +67,10 @@ def run(argv=None):
with beam.Pipeline(argv=pipeline_args) as p:
- # Read the text from PubSub messages
+ # Read the text from PubSub messages.
lines = p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
- # Capitalize the characters in each line.
+ # Get the number of appearances of a word.
def count_ones(word_ones):
(word, ones) = word_ones
return (word, sum(ones))
diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py
b/sdks/python/apache_beam/io/gcp/pubsub.py
index d5afee95580..f5ca17e64a1 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -20,6 +20,14 @@
pipelines, during remote execution.
This API is currently under development and is subject to change.
+
+Description of common arguments used in this module:
+ topic: Cloud Pub/Sub topic in the form "projects/<project>/topics/<topic>".
+ If provided, subscription must be None.
+ subscription: Existing Cloud Pub/Sub subscription to use in the
+ form "projects/<project>/subscriptions/<subscription>". If not specified,
+ a temporary subscription will be created from the specified topic. If
+ provided, topic must be None.
"""
from __future__ import absolute_import
@@ -34,8 +42,6 @@
from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
from apache_beam.transforms import Map
from apache_beam.transforms import PTransform
-from apache_beam.transforms import core
-from apache_beam.transforms import window
from apache_beam.transforms.display import DisplayDataItem
try:
@@ -44,7 +50,7 @@
pubsub_pb2 = None
-__all__ = ['PubsubMessage', 'ReadMessagesFromPubSub', 'ReadStringsFromPubSub',
+__all__ = ['PubsubMessage', 'ReadFromPubSub', 'ReadStringsFromPubSub',
'WriteStringsToPubSub']
@@ -52,38 +58,27 @@ class PubsubMessage(object):
"""Represents a message from Cloud Pub/Sub.
This interface is experimental. No backwards compatibility guarantees.
+
+ Attributes:
+ payload: (str) Message payload, as a byte string.
+ attributes: (dict) Map of string to string.
"""
- def __init__(self, payload, message_id, attributes, publish_time):
+ def __init__(self, payload, attributes):
"""Constructs a message.
- This interface is experimental. No backwards compatibility guarantees.
-
- See also:
- https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
-
- Attributes:
- payload: (str) Message payload, as a byte string.
- message_id: (str) Server assigned message ID. Unique within a topic.
- attributes: (dict) Map of string to string.
- publish_time: (str) Server assigned timestamp of when the message was
- published.
+ Beam users should not directly construct ``PubsubMessages``.
"""
self.payload = payload
- self.message_id = message_id
self.attributes = attributes
- self.publish_time = publish_time
def __eq__(self, other):
return isinstance(other, PubsubMessage) and (
self.payload == other.payload and
- self.message_id == other.message_id and
- self.attributes == other.attributes and
- self.publish_time == other.publish_time)
+ self.attributes == other.attributes)
def __repr__(self):
- return 'PubsubMessage(%s, %s, %s, %s)' % (
- self.payload, self.message_id, self.attributes, self.publish_time)
+ return 'PubsubMessage(%s, %s)' % (self.payload, self.attributes)
@staticmethod
def _from_proto(proto_msg):
@@ -95,8 +90,7 @@ def _from_proto(proto_msg):
msg.ParseFromString(proto_msg)
# Convert ScalarMapContainer to dict.
attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
- return PubsubMessage(msg.data, msg.message_id,
- attributes, msg.publish_time)
+ return PubsubMessage(msg.data, attributes)
@staticmethod
def _from_message(msg):
@@ -106,130 +100,82 @@ def _from_message(msg):
"""
# Convert ScalarMapContainer to dict.
attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
- return PubsubMessage(msg.data, msg.message_id,
- attributes, msg.service_timestamp)
-
+ return PubsubMessage(msg.data, attributes)
-class ReadMessagesFromPubSub(PTransform):
- """A ``PTransform`` for reading from Cloud Pub/Sub.
-
- This interface is experimental. No backwards compatibility guarantees.
- Outputs elements of type :class:`~PubsubMessage`.
- """
+class ReadFromPubSub(PTransform):
+ """A ``PTransform`` for reading from Cloud Pub/Sub."""
+ # Implementation note: This ``PTransform`` is overridden by Directrunner.
- def __init__(self, topic=None, subscription=None, id_label=None):
- """Initializes ``ReadMessagesFromPubSub``.
+ def __init__(self, topic=None, subscription=None, id_label=None,
+ with_attributes=False, timestamp_attribute=None):
+ """Initializes ``ReadFromPubSub``.
Args:
- topic: Cloud Pub/Sub topic in the form "projects/<project>/topics/
- <topic>". If provided, subscription must be None.
- subscription: Existing Cloud Pub/Sub subscription to use in the
- form "projects/<project>/subscriptions/<subscription>". If not
- specified, a temporary subscription will be created from the specified
- topic. If provided, topic must be None.
id_label: The attribute on incoming Pub/Sub messages to use as a unique
- record identifier. When specified, the value of this attribute (which
+ record identifier. When specified, the value of this attribute (which
can be any string that uniquely identifies the record) will be used for
- deduplication of messages. If not provided, we cannot guarantee
+ deduplication of messages. If not provided, we cannot guarantee
that no duplicate data will be delivered on the Pub/Sub stream. In this
case, deduplication of the stream will be strictly best effort.
+ with_attributes:
+ True - output elements will be :class:`~PubsubMessage` objects.
+ False - output elements will be of type ``str`` (message payload only).
+ timestamp_attribute: Message value to use as element timestamp. If None,
+ uses message publishing time as the timestamp.
+ Note that this argument doesn't require with_attributes=True.
+
+ Timestamp values should be in one of two formats:
+
+ - A numerical value representing the number of milliseconds since the
+ Unix epoch.
+ - A string in RFC 3339 format, UTC timezone. Example:
+ ``2015-10-29T23:41:41.123Z``. The sub-second component of the
+ timestamp is optional, and digits beyond the first three (i.e., time
+ units smaller than milliseconds) may be ignored.
"""
- super(ReadMessagesFromPubSub, self).__init__()
- self.topic = topic
- self.subscription = subscription
- self.id_label = id_label
-
- def get_windowing(self, unused_inputs):
- return core.Windowing(window.GlobalWindows())
+ super(ReadFromPubSub, self).__init__()
+ self.with_attributes = with_attributes
+ self._source = _PubSubSource(
+ topic=topic,
+ subscription=subscription,
+ id_label=id_label,
+ with_attributes=with_attributes,
+ timestamp_attribute=timestamp_attribute)
- def expand(self, pcoll):
- p = (pcoll.pipeline
- | _ReadFromPubSub(self.topic, self.subscription, self.id_label,
- with_attributes=True))
- return p
+ def expand(self, pvalue):
+ pcoll = pvalue.pipeline | Read(self._source)
+ if self.with_attributes:
+ pcoll = pcoll | Map(PubsubMessage._from_proto)
+ pcoll.element_type = PubsubMessage
+ else:
+ pcoll.element_type = bytes
+ return pcoll
class ReadStringsFromPubSub(PTransform):
"""A ``PTransform`` for reading utf-8 string payloads from Cloud Pub/Sub.
Outputs elements of type ``unicode``, decoded from UTF-8.
+
+ This class is deprecated.
"""
def __init__(self, topic=None, subscription=None, id_label=None):
- """Initializes ``ReadStringsFromPubSub``.
-
- Args:
- topic: Cloud Pub/Sub topic in the form "projects/<project>/topics/
- <topic>". If provided, subscription must be None.
- subscription: Existing Cloud Pub/Sub subscription to use in the
- form "projects/<project>/subscriptions/<subscription>". If not
- specified, a temporary subscription will be created from the specified
- topic. If provided, topic must be None.
- id_label: The attribute on incoming Pub/Sub messages to use as a unique
- record identifier. When specified, the value of this attribute (which
- can be any string that uniquely identifies the record) will be used for
- deduplication of messages. If not provided, we cannot guarantee
- that no duplicate data will be delivered on the Pub/Sub stream. In this
- case, deduplication of the stream will be strictly best effort.
- """
super(ReadStringsFromPubSub, self).__init__()
self.topic = topic
self.subscription = subscription
self.id_label = id_label
- def get_windowing(self, unused_inputs):
- return core.Windowing(window.GlobalWindows())
-
- def expand(self, pcoll):
- p = (pcoll.pipeline
- | _ReadFromPubSub(self.topic, self.subscription, self.id_label,
- with_attributes=False)
+ def expand(self, pvalue):
+ p = (pvalue.pipeline
+ | ReadFromPubSub(self.topic, self.subscription, self.id_label,
+ with_attributes=False)
| 'DecodeString' >> Map(lambda b: b.decode('utf-8')))
p.element_type = text_type
return p
-class _ReadFromPubSub(PTransform):
- """A ``PTransform`` for reading from Cloud Pub/Sub."""
-
- def __init__(self, topic, subscription, id_label, with_attributes):
- """Initializes ``_ReadFromPubSub``.
-
- Args:
- topic: Cloud Pub/Sub topic in the form "projects/<project>/topics/
- <topic>". If provided, subscription must be None.
- subscription: Existing Cloud Pub/Sub subscription to use in the
- form "projects/<project>/subscriptions/<subscription>". If not
- specified, a temporary subscription will be created from the specified
- topic. If provided, topic must be None.
- id_label: The attribute on incoming Pub/Sub messages to use as a unique
- record identifier. When specified, the value of this attribute (which
- can be any string that uniquely identifies the record) will be used for
- deduplication of messages. If None, we cannot guarantee
- that no duplicate data will be delivered on the Pub/Sub stream. In this
- case, deduplication of the stream will be strictly best effort.
- with_attributes: False - output elements will be raw payload bytes.
- True - output will be :class:`~PubsubMessage` objects.
- """
- super(_ReadFromPubSub, self).__init__()
- self.with_attributes = with_attributes
- self._source = _PubSubSource(
- topic,
- subscription=subscription,
- id_label=id_label,
- with_attributes=with_attributes)
-
- def expand(self, pvalue):
- pcoll = pvalue.pipeline | Read(self._source)
- if self.with_attributes:
- pcoll = pcoll | Map(PubsubMessage._from_proto)
- pcoll.element_type = PubsubMessage
- else:
- pcoll.element_type = bytes
- return pcoll
-
-
class WriteStringsToPubSub(PTransform):
"""A ``PTransform`` for writing utf-8 string payloads to Cloud Pub/Sub."""
@@ -280,25 +226,15 @@ def parse_subscription(full_subscription):
class _PubSubSource(dataflow_io.NativeSource):
"""Source for the payload of a message as bytes from a Cloud Pub/Sub topic.
+ This ``NativeSource`` is overridden by a native Pubsub implementation.
+
Attributes:
- topic: Cloud Pub/Sub topic in the form "projects/<project>/topics/<topic>".
- If provided, subscription must be None.
- subscription: Existing Cloud Pub/Sub subscription to use in the
- form "projects/<project>/subscriptions/<subscription>". If not specified,
- a temporary subscription will be created from the specified topic. If
- provided, topic must be None.
- id_label: The attribute on incoming Pub/Sub messages to use as a unique
- record identifier. When specified, the value of this attribute (which
can
- be any string that uniquely identifies the record) will be used for
- deduplication of messages. If not provided, Dataflow cannot guarantee
- that no duplicate data will be delivered on the Pub/Sub stream. In this
- case, deduplication of the stream will be strictly best effort.
with_attributes: If False, will fetch just message payload. Otherwise,
fetches ``PubsubMessage`` protobufs.
"""
def __init__(self, topic=None, subscription=None, id_label=None,
- with_attributes=False):
+ with_attributes=False, timestamp_attribute=None):
# We are using this coder explicitly for portability reasons of PubsubIO
# across implementations in languages.
self.coder = coders.BytesCoder()
@@ -308,6 +244,7 @@ def __init__(self, topic=None, subscription=None,
id_label=None,
self.subscription_name = None
self.id_label = id_label
self.with_attributes = with_attributes
+ self.timestamp_attribute = timestamp_attribute
# Perform some validation on the topic and subscription.
if not (topic or subscription):
@@ -337,8 +274,7 @@ def display_data(self):
label='Pubsub Subscription').drop_if_none()}
def reader(self):
- raise NotImplementedError(
- 'PubSubPayloadSource is not supported in local execution.')
+ raise NotImplementedError
def is_bounded(self):
return False
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py
b/sdks/python/apache_beam/io/gcp/pubsub_test.py
index b5afff8ac59..f987947d454 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py
@@ -27,22 +27,24 @@
import apache_beam as beam
from apache_beam.io.gcp.pubsub import PubsubMessage
-from apache_beam.io.gcp.pubsub import ReadMessagesFromPubSub
+from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.io.gcp.pubsub import ReadStringsFromPubSub
from apache_beam.io.gcp.pubsub import WriteStringsToPubSub
from apache_beam.io.gcp.pubsub import _PubSubPayloadSink
from apache_beam.io.gcp.pubsub import _PubSubSource
-from apache_beam.io.gcp.pubsub import _ReadFromPubSub
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.runners.direct import transform_evaluator
from apache_beam.runners.direct.direct_runner import _DirectReadFromPubSub
from apache_beam.runners.direct.direct_runner import _get_transform_overrides
from apache_beam.runners.direct.transform_evaluator import _PubSubReadEvaluator
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import TestWindowedValue
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
+from apache_beam.transforms import window
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display_test import DisplayDataItemMatcher
+from apache_beam.utils import timestamp
# Protect against environments where the PubSub library is not available.
# pylint: disable=wrong-import-order, wrong-import-position
@@ -60,8 +62,9 @@ def test_expand_with_topic(self):
p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
pcoll = (p
- | _ReadFromPubSub('projects/fakeprj/topics/a_topic',
- None, 'a_label', with_attributes=False)
+ | ReadFromPubSub('projects/fakeprj/topics/a_topic',
+ None, 'a_label', with_attributes=False,
+ timestamp_attribute=None)
| beam.Map(lambda x: x))
self.assertEqual(str, pcoll.element_type)
@@ -69,7 +72,7 @@ def test_expand_with_topic(self):
overrides = _get_transform_overrides(p.options)
p.replace_all(overrides)
- # Note that the direct output of ReadMessagesFromPubSub will be replaced
+ # Note that the direct output of ReadFromPubSub will be replaced
# by a PTransformOverride, so we use a no-op Map.
read_transform = pcoll.producer.inputs[0].producer.transform
@@ -82,9 +85,9 @@ def test_expand_with_subscription(self):
p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
pcoll = (p
- | _ReadFromPubSub(
+ | ReadFromPubSub(
None, 'projects/fakeprj/subscriptions/a_subscription',
- 'a_label', with_attributes=False)
+ 'a_label', with_attributes=False, timestamp_attribute=None)
| beam.Map(lambda x: x))
self.assertEqual(str, pcoll.element_type)
@@ -92,7 +95,7 @@ def test_expand_with_subscription(self):
overrides = _get_transform_overrides(p.options)
p.replace_all(overrides)
- # Note that the direct output of ReadMessagesFromPubSub will be replaced
+ # Note that the direct output of ReadFromPubSub will be replaced
# by a PTransformOverride, so we use a no-op Map.
read_transform = pcoll.producer.inputs[0].producer.transform
@@ -104,20 +107,22 @@ def test_expand_with_subscription(self):
def test_expand_with_no_topic_or_subscription(self):
with self.assertRaisesRegexp(
ValueError, "Either a topic or subscription must be provided."):
- _ReadFromPubSub(None, None, 'a_label', with_attributes=False)
+ ReadFromPubSub(None, None, 'a_label', with_attributes=False,
+ timestamp_attribute=None)
def test_expand_with_both_topic_and_subscription(self):
with self.assertRaisesRegexp(
ValueError, "Only one of topic or subscription should be provided."):
- _ReadFromPubSub('a_topic', 'a_subscription', 'a_label',
- with_attributes=False)
+ ReadFromPubSub('a_topic', 'a_subscription', 'a_label',
+ with_attributes=False, timestamp_attribute=None)
- def test_expand_with_attributes(self):
+ def test_expand_with_other_options(self):
p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
pcoll = (p
- | _ReadFromPubSub('projects/fakeprj/topics/a_topic',
- None, 'a_label', with_attributes=True)
+ | ReadFromPubSub('projects/fakeprj/topics/a_topic',
+ None, 'a_label', with_attributes=True,
+ timestamp_attribute='time')
| beam.Map(lambda x: x))
self.assertEqual(PubsubMessage, pcoll.element_type)
@@ -125,14 +130,14 @@ def test_expand_with_attributes(self):
overrides = _get_transform_overrides(p.options)
p.replace_all(overrides)
- # Note that the direct output of ReadMessagesFromPubSub will be replaced
+ # Note that the direct output of ReadFromPubSub will be replaced
# by a PTransformOverride, so we use a no-op Map.
read_transform = pcoll.producer.inputs[0].producer.transform
# Ensure that the properties passed through correctly
source = read_transform._source
- self.assertEqual('a_topic', source.topic_name)
- self.assertEqual('a_label', source.id_label)
+ self.assertTrue(source.with_attributes)
+ self.assertEqual('time', source.timestamp_attribute)
@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
@@ -276,6 +281,13 @@ def topic(self, name):
return FakePubsubTopic(name, self)
+def create_client_message(payload, message_id, attributes, publish_time):
+ """Returns a message as it would be returned from Cloud Pub/Sub client."""
+ msg = pubsub.message.Message(payload, message_id, attributes)
+ msg._service_timestamp = publish_time
+ return msg
+
+
@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
class TestReadFromPubSub(unittest.TestCase):
@@ -283,9 +295,13 @@ class TestReadFromPubSub(unittest.TestCase):
def test_read_messages_success(self, mock_pubsub):
payload = 'payload'
message_id = 'message_id'
- attributes = {'attribute': 'value'}
- data = [pubsub.message.Message(payload, message_id, attributes)]
- expected_data = [PubsubMessage(payload, message_id, attributes, None)]
+ publish_time = '2018-03-12T13:37:01.234567Z'
+ attributes = {'key': 'value'}
+ data = [create_client_message(
+ payload, message_id, attributes, publish_time)]
+ expected_data = [TestWindowedValue(PubsubMessage(payload, attributes),
+ timestamp.Timestamp(1520861821.234567),
+ [window.GlobalWindow()])]
mock_pubsub.Client = functools.partial(FakePubsubClient, data)
mock_pubsub.subscription.AutoAck = FakeAutoAck
@@ -293,16 +309,18 @@ def test_read_messages_success(self, mock_pubsub):
p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
pcoll = (p
- | ReadMessagesFromPubSub('projects/fakeprj/topics/a_topic',
- None, 'a_label'))
- assert_that(pcoll, equal_to(expected_data))
+ | ReadFromPubSub('projects/fakeprj/topics/a_topic',
+ None, 'a_label', with_attributes=True))
+ assert_that(pcoll, equal_to(expected_data), reify_windows=True)
p.run()
@mock.patch('google.cloud.pubsub')
def test_read_strings_success(self, mock_pubsub):
payload = u'🤷 ¯\\_(ツ)_/¯'
payload_encoded = payload.encode('utf-8')
- data = [pubsub.message.Message(payload_encoded, None, None)]
+ publish_time = '2018-03-12T13:37:01.234567Z'
+ data = [create_client_message(
+ payload_encoded, None, None, publish_time)]
expected_data = [payload]
mock_pubsub.Client = functools.partial(FakePubsubClient, data)
@@ -316,6 +334,121 @@ def test_read_strings_success(self, mock_pubsub):
assert_that(pcoll, equal_to(expected_data))
p.run()
+ @mock.patch('google.cloud.pubsub')
+ def test_read_payload_success(self, mock_pubsub):
+ payload_encoded = u'🤷 ¯\\_(ツ)_/¯'.encode('utf-8')
+ publish_time = '2018-03-12T13:37:01.234567Z'
+ data = [create_client_message(
+ payload_encoded, None, None, publish_time)]
+ expected_data = [payload_encoded]
+
+ mock_pubsub.Client = functools.partial(FakePubsubClient, data)
+ mock_pubsub.subscription.AutoAck = FakeAutoAck
+
+ p = TestPipeline()
+ p.options.view_as(StandardOptions).streaming = True
+ pcoll = (p
+ | ReadFromPubSub('projects/fakeprj/topics/a_topic',
+ None, 'a_label'))
+ assert_that(pcoll, equal_to(expected_data))
+ p.run()
+
+ @mock.patch('google.cloud.pubsub')
+ def test_read_messages_timestamp_attribute_milli_success(self, mock_pubsub):
+ payload = 'payload'
+ message_id = 'message_id'
+ attributes = {'time': '1337'}
+ publish_time = '2018-03-12T13:37:01.234567Z'
+ data = [create_client_message(
+ payload, message_id, attributes, publish_time)]
+ expected_data = [
+ TestWindowedValue(
+ PubsubMessage(payload, attributes),
+ timestamp.Timestamp(micros=int(attributes['time']) * 1000),
+ [window.GlobalWindow()]),
+ ]
+
+ mock_pubsub.Client = functools.partial(FakePubsubClient, data)
+ mock_pubsub.subscription.AutoAck = FakeAutoAck
+
+ p = TestPipeline()
+ p.options.view_as(StandardOptions).streaming = True
+ pcoll = (p
+ | ReadFromPubSub(
+ 'projects/fakeprj/topics/a_topic', None, 'a_label',
+ with_attributes=True, timestamp_attribute='time'))
+ assert_that(pcoll, equal_to(expected_data), reify_windows=True)
+ p.run()
+
+ @mock.patch('google.cloud.pubsub')
+ def test_read_messages_timestamp_attribute_rfc3339_success(self,
mock_pubsub):
+ payload = 'payload'
+ message_id = 'message_id'
+ attributes = {'time': '2018-03-12T13:37:01.234567Z'}
+ publish_time = '2018-03-12T13:37:01.234567Z'
+ data = [create_client_message(
+ payload, message_id, attributes, publish_time)]
+ expected_data = [
+ TestWindowedValue(
+ PubsubMessage(payload, attributes),
+ timestamp.Timestamp.from_rfc3339(attributes['time']),
+ [window.GlobalWindow()]),
+ ]
+
+ mock_pubsub.Client = functools.partial(FakePubsubClient, data)
+ mock_pubsub.subscription.AutoAck = FakeAutoAck
+
+ p = TestPipeline()
+ p.options.view_as(StandardOptions).streaming = True
+ pcoll = (p
+ | ReadFromPubSub(
+ 'projects/fakeprj/topics/a_topic', None, 'a_label',
+ with_attributes=True, timestamp_attribute='time'))
+ assert_that(pcoll, equal_to(expected_data), reify_windows=True)
+ p.run()
+
+ @mock.patch('google.cloud.pubsub')
+ def test_read_messages_timestamp_attribute_fail_missing(self, mock_pubsub):
+ payload = 'payload'
+ message_id = 'message_id'
+ attributes = {'time': '1337'}
+ publish_time = '2018-03-12T13:37:01.234567Z'
+ data = [create_client_message(
+ payload, message_id, attributes, publish_time)]
+
+ mock_pubsub.Client = functools.partial(FakePubsubClient, data)
+ mock_pubsub.subscription.AutoAck = FakeAutoAck
+
+ p = TestPipeline()
+ p.options.view_as(StandardOptions).streaming = True
+ _ = (p
+ | ReadFromPubSub(
+ 'projects/fakeprj/topics/a_topic', None, 'a_label',
+ with_attributes=True, timestamp_attribute='nonexistent'))
+ with self.assertRaisesRegexp(KeyError, r'Timestamp.*nonexistent'):
+ p.run()
+
+ @mock.patch('google.cloud.pubsub')
+ def test_read_messages_timestamp_attribute_fail_parse(self, mock_pubsub):
+ payload = 'payload'
+ message_id = 'message_id'
+ attributes = {'time': '1337 unparseable'}
+ publish_time = '2018-03-12T13:37:01.234567Z'
+ data = [create_client_message(
+ payload, message_id, attributes, publish_time)]
+
+ mock_pubsub.Client = functools.partial(FakePubsubClient, data)
+ mock_pubsub.subscription.AutoAck = FakeAutoAck
+
+ p = TestPipeline()
+ p.options.view_as(StandardOptions).streaming = True
+ _ = (p
+ | ReadFromPubSub(
+ 'projects/fakeprj/topics/a_topic', None, 'a_label',
+ with_attributes=True, timestamp_attribute='time'))
+ with self.assertRaisesRegexp(ValueError, r'parse'):
+ p.run()
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index e53a12e222b..d9d76f89128 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -777,7 +777,7 @@ def run_Read(self, transform_node):
standard_options = (
transform_node.inputs[0].pipeline.options.view_as(StandardOptions))
if not standard_options.streaming:
- raise ValueError('PubSubPayloadSource is currently available for use '
+ raise ValueError('Cloud Pub/Sub is currently available for use '
'only in streaming pipelines.')
# Only one of topic or subscription should be set.
if transform.source.full_subscription:
@@ -793,6 +793,9 @@ def run_Read(self, transform_node):
# Setting this property signals Dataflow runner to return full
# PubsubMessages instead of just the payload.
step.add_property(PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN, '')
+ if transform.source.timestamp_attribute is not None:
+ step.add_property(PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE,
+ transform.source.timestamp_attribute)
else:
raise ValueError(
'Source %r has unexpected format %s.' % (
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py
b/sdks/python/apache_beam/runners/dataflow/internal/names.py
index 8e482369531..b5f8051704e 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/names.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py
@@ -74,10 +74,11 @@ class PropertyNames(object):
OUTPUT_INFO = 'output_info'
OUTPUT_NAME = 'output_name'
PARALLEL_INPUT = 'parallel_input'
- PUBSUB_TOPIC = 'pubsub_topic'
- PUBSUB_SUBSCRIPTION = 'pubsub_subscription'
PUBSUB_ID_LABEL = 'pubsub_id_label'
PUBSUB_SERIALIZED_ATTRIBUTES_FN = 'pubsub_serialized_attributes_fn'
+ PUBSUB_SUBSCRIPTION = 'pubsub_subscription'
+ PUBSUB_TIMESTAMP_ATTRIBUTE = 'pubsub_timestamp_label'
+ PUBSUB_TOPIC = 'pubsub_topic'
SERIALIZED_FN = 'serialized_fn'
SHARD_NAME_TEMPLATE = 'shard_template'
SOURCE_STEP_INPUT = 'custom_source_step_input'
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py
b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 062509fd0cb..510a4e69fe9 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -268,7 +268,7 @@ def _get_pubsub_transform_overrides(pipeline_options):
class ReadFromPubSubOverride(PTransformOverride):
def matches(self, applied_ptransform):
return isinstance(applied_ptransform.transform,
- beam_pubsub._ReadFromPubSub)
+ beam_pubsub.ReadFromPubSub)
def get_replacement_transform(self, transform):
if not pipeline_options.view_as(StandardOptions).streaming:
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index ad29007dada..eb1ccd5c36c 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -417,7 +417,7 @@ def start_bundle(self):
def process_element(self, element):
pass
- def _read_from_pubsub(self):
+ def _read_from_pubsub(self, timestamp_attribute):
from apache_beam.io.gcp.pubsub import PubsubMessage
from google.cloud import pubsub
# Because of the AutoAck, we are not able to reread messages if this
@@ -428,24 +428,40 @@ def _read_from_pubsub(self):
self._subscription, return_immediately=True,
max_messages=10) as results:
def _get_element(message):
- if self.source.with_attributes:
- return PubsubMessage._from_message(message)
+ parsed_message = PubsubMessage._from_message(message)
+ if timestamp_attribute:
+ try:
+ rfc3339_or_milli = parsed_message.attributes[timestamp_attribute]
+ except KeyError as e:
+ raise KeyError('Timestamp attribute not found: %s' % e)
+ try:
+ timestamp = Timestamp.from_rfc3339(rfc3339_or_milli)
+ except ValueError:
+ try:
+ timestamp = Timestamp(micros=int(rfc3339_or_milli) * 1000)
+ except ValueError as e:
+ raise ValueError('Bad timestamp value: %s' % e)
else:
- return message.data
+ timestamp = Timestamp.from_rfc3339(message.service_timestamp)
+
+ return timestamp, parsed_message
return [_get_element(message)
for unused_ack_id, message in results.items()]
def finish_bundle(self):
- data = self._read_from_pubsub()
+ data = self._read_from_pubsub(self.source.timestamp_attribute)
if data:
output_pcollection = list(self._outputs)[0]
bundle = self._evaluation_context.create_bundle(output_pcollection)
- # TODO(ccy): we currently do not use the PubSub message timestamp or
- # respect the PubSub source's id_label field.
- now = Timestamp.of(time.time())
- for message_data in data:
- bundle.output(GlobalWindows.windowed_value(message_data,
timestamp=now))
+ # TODO(ccy): Respect the PubSub source's id_label field.
+ for timestamp, message in data:
+ if self.source.with_attributes:
+ element = message
+ else:
+ element = message.payload
+ bundle.output(
+ GlobalWindows.windowed_value(element, timestamp=timestamp))
bundles = [bundle]
else:
bundles = []
@@ -456,6 +472,7 @@ def finish_bundle(self):
unprocessed_bundle = self._evaluation_context.create_bundle(
input_pvalue)
+ # TODO(udim): Correct value for watermark hold.
return TransformResult(self, bundles, [unprocessed_bundle], None,
{None: Timestamp.of(time.time())})
diff --git a/sdks/python/apache_beam/utils/timestamp.py
b/sdks/python/apache_beam/utils/timestamp.py
index c437d5a3e7c..7c41c3002f6 100644
--- a/sdks/python/apache_beam/utils/timestamp.py
+++ b/sdks/python/apache_beam/utils/timestamp.py
@@ -24,6 +24,9 @@
from __future__ import division
import datetime
+import re
+
+import pytz
class Timestamp(object):
@@ -38,6 +41,12 @@ class Timestamp(object):
"""
def __init__(self, seconds=0, micros=0):
+ if not isinstance(seconds, (int, float, long)):
+ raise TypeError('Cannot interpret %s %s as seconds.' % (
+ seconds, type(seconds)))
+ if not isinstance(micros, (int, float, long)):
+ raise TypeError('Cannot interpret %s %s as micros.' % (
+ micros, type(micros)))
self.micros = int(seconds * 1000000) + int(micros)
@staticmethod
@@ -53,12 +62,52 @@ def of(seconds):
Corresponding Timestamp object.
"""
- if isinstance(seconds, Duration):
- raise TypeError('Can\'t interpret %s as Timestamp.' % seconds)
+ if not isinstance(seconds, (int, float, Timestamp)):
+ raise TypeError('Cannot interpret %s %s as Timestamp.' % (
+ seconds, type(seconds)))
if isinstance(seconds, Timestamp):
return seconds
return Timestamp(seconds)
+ RFC_3339_RE = re.compile(
+ r'^(\d{4})-(\d{2})-(\d{2})T(\d{2}):(\d{2}):(\d{2})(?:\.(\d+))?Z$')
+
+ @staticmethod
+ def _epoch_datetime_utc():
+ return datetime.datetime.fromtimestamp(0, pytz.utc)
+
+ @classmethod
+ def from_utc_datetime(cls, dt):
+ """Create a ``Timestamp`` instance from a ``datetime.datetime`` object.
+
+ Args:
+ dt: A ``datetime.datetime`` object in UTC (offset-aware).
+ """
+ if dt.tzinfo != pytz.utc:
+ raise ValueError('dt not in UTC: %s', dt)
+ duration = dt - cls._epoch_datetime_utc()
+ return Timestamp(duration.total_seconds())
+
+ @classmethod
+ def from_rfc3339(cls, rfc3339):
+ """Create a ``Timestamp`` instance from an RFC 3339 compliant string.
+
+ Args:
+ rfc3339: String in RFC 3339 form.
+ """
+ dt_args = []
+ match = cls.RFC_3339_RE.match(rfc3339)
+ if match is None:
+ raise ValueError('Could not parse RFC 3339 string: %s', rfc3339)
+ for s in match.groups():
+ if s is not None:
+ dt_args.append(int(s))
+ else:
+ dt_args.append(0)
+ dt_args += (pytz.utc, )
+ dt = datetime.datetime(*dt_args)
+ return cls.from_utc_datetime(dt)
+
def predecessor(self):
"""Returns the largest timestamp smaller than self."""
return Timestamp(micros=self.micros - 1)
@@ -76,12 +125,12 @@ def __repr__(self):
return 'Timestamp(%s%d)' % (sign, int_part)
def to_utc_datetime(self):
- epoch = datetime.datetime.utcfromtimestamp(0)
# We can't easily construct a datetime object from microseconds, so we
# create one at the epoch and add an appropriate timedelta interval.
- return epoch + datetime.timedelta(microseconds=self.micros)
+ return self._epoch_datetime_utc().replace(tzinfo=None) +
datetime.timedelta(
+ microseconds=self.micros)
- def isoformat(self):
+ def to_rfc3339(self):
# Append 'Z' for UTC timezone.
return self.to_utc_datetime().isoformat() + 'Z'
@@ -150,7 +199,7 @@ def of(seconds):
"""
if isinstance(seconds, Timestamp):
- raise TypeError('Can\'t interpret %s as Duration.' % seconds)
+ raise TypeError('Cannot interpret %s as Duration.' % seconds)
if isinstance(seconds, Duration):
return seconds
return Duration(seconds)
diff --git a/sdks/python/apache_beam/utils/timestamp_test.py
b/sdks/python/apache_beam/utils/timestamp_test.py
index 33229361b36..8296dc6a9fb 100644
--- a/sdks/python/apache_beam/utils/timestamp_test.py
+++ b/sdks/python/apache_beam/utils/timestamp_test.py
@@ -19,8 +19,11 @@
from __future__ import absolute_import
+import datetime
import unittest
+import pytz
+
from apache_beam.utils.timestamp import Duration
from apache_beam.utils.timestamp import Timestamp
@@ -43,13 +46,39 @@ def test_precision(self):
self.assertEqual(Timestamp(10000000) % Duration(0.000005), 0)
def test_utc_timestamp(self):
- self.assertEqual(Timestamp(10000000).isoformat(),
+ self.assertEqual(Timestamp(10000000).to_rfc3339(),
'1970-04-26T17:46:40Z')
- self.assertEqual(Timestamp(10000000.000001).isoformat(),
+ self.assertEqual(Timestamp(10000000.000001).to_rfc3339(),
'1970-04-26T17:46:40.000001Z')
- self.assertEqual(Timestamp(1458343379.123456).isoformat(),
+ self.assertEqual(Timestamp(1458343379.123456).to_rfc3339(),
'2016-03-18T23:22:59.123456Z')
+ def test_from_rfc3339(self):
+ test_cases = [
+ (10000000, '1970-04-26T17:46:40Z'),
+ (10000000.000001, '1970-04-26T17:46:40.000001Z'),
+ (1458343379.123456, '2016-03-18T23:22:59.123456Z'),
+ ]
+ for seconds_float, rfc3339_str in test_cases:
+ self.assertEqual(Timestamp(seconds_float),
+ Timestamp.from_rfc3339(rfc3339_str))
+ self.assertEqual(rfc3339_str,
+ Timestamp.from_rfc3339(rfc3339_str).to_rfc3339())
+
+ def test_from_rfc3339_failure(self):
+ with self.assertRaisesRegexp(ValueError, 'parse'):
+ Timestamp.from_rfc3339('not rfc3339')
+ with self.assertRaisesRegexp(ValueError, 'parse'):
+ Timestamp.from_rfc3339('2016-03-18T23:22:59.123456Z unparseable')
+
+ def test_from_utc_datetime(self):
+ self.assertEqual(
+ Timestamp.from_utc_datetime(datetime.datetime(1970, 1, 1,
+ tzinfo=pytz.utc)),
+ Timestamp(0))
+ with self.assertRaisesRegexp(ValueError, r'UTC'):
+ Timestamp.from_utc_datetime(datetime.datetime(1970, 1, 1))
+
def test_arithmetic(self):
# Supported operations.
self.assertEqual(Timestamp(123) + 456, 579)
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index ffc4df78413..b7f400e739e 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -106,6 +106,7 @@ def get_version():
'oauth2client>=2.0.1,<5',
# grpcio 1.8.1 and above requires protobuf 3.5.0.post1.
'protobuf>=3.5.0.post1,<4',
+ 'pytz>=2018.3',
'pyyaml>=3.12,<4.0.0',
'pyvcf>=0.6.8,<0.7.0',
'six>=1.9,<1.12',
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 85064)
Time Spent: 9h (was: 8h 50m)
> Support full PubsubMessages
> ---------------------------
>
> Key: BEAM-3744
> URL: https://issues.apache.org/jira/browse/BEAM-3744
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Reporter: Udi Meiri
> Assignee: Udi Meiri
> Priority: Critical
> Time Spent: 9h
> Remaining Estimate: 0h
>
> Tracking changes to Pubsub support in Python SDK.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)