Repository: beam Updated Branches: refs/heads/master 2994fce73 -> 27dd7a9e4
Fix Python Dataflow execution errors due to #3223 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f6ed520f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f6ed520f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f6ed520f Branch: refs/heads/master Commit: f6ed520fbcb1f2d4b13800b597ecd26eff9c5a08 Parents: 2994fce Author: Charles Chen <[email protected]> Authored: Fri May 26 14:24:55 2017 -0700 Committer: [email protected] <[email protected]> Committed: Fri May 26 17:11:35 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/gcp/pubsub.py | 74 ++++++++++++++++++++++++++- 1 file changed, 73 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/f6ed520f/sdks/python/apache_beam/io/gcp/pubsub.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py index 7b838d2..1ba8ac0 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/gcp/pubsub.py @@ -18,10 +18,13 @@ Cloud Pub/Sub sources and sinks are currently supported only in streaming pipelines, during remote execution. + +This API is currently under development and is subject to change. """ from __future__ import absolute_import +from apache_beam import coders from apache_beam.io.iobase import Read from apache_beam.io.iobase import Write from apache_beam.runners.dataflow.native_io import iobase as dataflow_io @@ -30,7 +33,8 @@ from apache_beam.transforms import ParDo from apache_beam.transforms.display import DisplayDataItem -__all__ = ['ReadStringsFromPubSub', 'WriteStringsToPubSub'] +__all__ = ['ReadStringsFromPubSub', 'WriteStringsToPubSub', + 'PubSubSource', 'PubSubSink'] class ReadStringsFromPubSub(PTransform): @@ -150,3 +154,71 @@ def _decodeUtf8String(encoded_value): def _encodeUtf8String(value): """Encodes a string in utf-8 format to bytes""" return value.encode('utf-8') + + +class PubSubSource(dataflow_io.NativeSource): + """Deprecated: do not use. + + Source for reading from a given Cloud Pub/Sub topic. + + Attributes: + topic: Cloud Pub/Sub topic in the form "/topics/<project>/<topic>". + subscription: Optional existing Cloud Pub/Sub subscription to use in the + form "projects/<project>/subscriptions/<subscription>". + id_label: The attribute on incoming Pub/Sub messages to use as a unique + record identifier. When specified, the value of this attribute (which can + be any string that uniquely identifies the record) will be used for + deduplication of messages. If not provided, Dataflow cannot guarantee + that no duplicate data will be delivered on the Pub/Sub stream. In this + case, deduplication of the stream will be strictly best effort. + coder: The Coder to use for decoding incoming Pub/Sub messages. + """ + + def __init__(self, topic, subscription=None, id_label=None, + coder=coders.StrUtf8Coder()): + self.topic = topic + self.subscription = subscription + self.id_label = id_label + self.coder = coder + + @property + def format(self): + """Source format name required for remote execution.""" + return 'pubsub' + + def display_data(self): + return {'id_label': + DisplayDataItem(self.id_label, + label='ID Label Attribute').drop_if_none(), + 'topic': + DisplayDataItem(self.topic, + label='Pubsub Topic'), + 'subscription': + DisplayDataItem(self.subscription, + label='Pubsub Subscription').drop_if_none()} + + def reader(self): + raise NotImplementedError( + 'PubSubSource is not supported in local execution.') + + +class PubSubSink(dataflow_io.NativeSink): + """Deprecated: do not use. + + Sink for writing to a given Cloud Pub/Sub topic.""" + + def __init__(self, topic, coder=coders.StrUtf8Coder()): + self.topic = topic + self.coder = coder + + @property + def format(self): + """Sink format name required for remote execution.""" + return 'pubsub' + + def display_data(self): + return {'topic': DisplayDataItem(self.topic, label='Pubsub Topic')} + + def writer(self): + raise NotImplementedError( + 'PubSubSink is not supported in local execution.')
