Repository: beam
Updated Branches:
  refs/heads/master 2994fce73 -> 27dd7a9e4


Fix Python Dataflow execution errors due to #3223


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f6ed520f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f6ed520f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f6ed520f

Branch: refs/heads/master
Commit: f6ed520fbcb1f2d4b13800b597ecd26eff9c5a08
Parents: 2994fce
Author: Charles Chen <[email protected]>
Authored: Fri May 26 14:24:55 2017 -0700
Committer: [email protected] <[email protected]>
Committed: Fri May 26 17:11:35 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/gcp/pubsub.py | 74 ++++++++++++++++++++++++++-
 1 file changed, 73 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f6ed520f/sdks/python/apache_beam/io/gcp/pubsub.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py 
b/sdks/python/apache_beam/io/gcp/pubsub.py
index 7b838d2..1ba8ac0 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -18,10 +18,13 @@
 
 Cloud Pub/Sub sources and sinks are currently supported only in streaming
 pipelines, during remote execution.
+
+This API is currently under development and is subject to change.
 """
 
 from __future__ import absolute_import
 
+from apache_beam import coders
 from apache_beam.io.iobase import Read
 from apache_beam.io.iobase import Write
 from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
@@ -30,7 +33,8 @@ from apache_beam.transforms import ParDo
 from apache_beam.transforms.display import DisplayDataItem
 
 
-__all__ = ['ReadStringsFromPubSub', 'WriteStringsToPubSub']
+__all__ = ['ReadStringsFromPubSub', 'WriteStringsToPubSub',
+           'PubSubSource', 'PubSubSink']
 
 
 class ReadStringsFromPubSub(PTransform):
@@ -150,3 +154,71 @@ def _decodeUtf8String(encoded_value):
 def _encodeUtf8String(value):
   """Encodes a string in utf-8 format to bytes"""
   return value.encode('utf-8')
+
+
+class PubSubSource(dataflow_io.NativeSource):
+  """Deprecated: do not use.
+
+  Source for reading from a given Cloud Pub/Sub topic.
+
+  Attributes:
+    topic: Cloud Pub/Sub topic in the form "/topics/<project>/<topic>".
+    subscription: Optional existing Cloud Pub/Sub subscription to use in the
+      form "projects/<project>/subscriptions/<subscription>".
+    id_label: The attribute on incoming Pub/Sub messages to use as a unique
+      record identifier.  When specified, the value of this attribute (which 
can
+      be any string that uniquely identifies the record) will be used for
+      deduplication of messages.  If not provided, Dataflow cannot guarantee
+      that no duplicate data will be delivered on the Pub/Sub stream. In this
+      case, deduplication of the stream will be strictly best effort.
+    coder: The Coder to use for decoding incoming Pub/Sub messages.
+  """
+
+  def __init__(self, topic, subscription=None, id_label=None,
+               coder=coders.StrUtf8Coder()):
+    self.topic = topic
+    self.subscription = subscription
+    self.id_label = id_label
+    self.coder = coder
+
+  @property
+  def format(self):
+    """Source format name required for remote execution."""
+    return 'pubsub'
+
+  def display_data(self):
+    return {'id_label':
+            DisplayDataItem(self.id_label,
+                            label='ID Label Attribute').drop_if_none(),
+            'topic':
+            DisplayDataItem(self.topic,
+                            label='Pubsub Topic'),
+            'subscription':
+            DisplayDataItem(self.subscription,
+                            label='Pubsub Subscription').drop_if_none()}
+
+  def reader(self):
+    raise NotImplementedError(
+        'PubSubSource is not supported in local execution.')
+
+
+class PubSubSink(dataflow_io.NativeSink):
+  """Deprecated: do not use.
+
+  Sink for writing to a given Cloud Pub/Sub topic."""
+
+  def __init__(self, topic, coder=coders.StrUtf8Coder()):
+    self.topic = topic
+    self.coder = coder
+
+  @property
+  def format(self):
+    """Sink format name required for remote execution."""
+    return 'pubsub'
+
+  def display_data(self):
+    return {'topic': DisplayDataItem(self.topic, label='Pubsub Topic')}
+
+  def writer(self):
+    raise NotImplementedError(
+        'PubSubSink is not supported in local execution.')

Reply via email to