boyuanzz commented on a change in pull request #12930:
URL: https://github.com/apache/beam/pull/12930#discussion_r522518679



##########
File path: sdks/python/apache_beam/io/gcp/pubsub.py
##########
@@ -444,3 +449,69 @@ def display_data(self):
 
   def writer(self):
     raise NotImplementedError
+
+
+class PubSubSourceDescriptor(NamedTuple):
+  """A PubSub source descriptor for ``MultipleReadFromPubSub```"""
+  source: str
+  id_label: str = None
+  timestamp_attribute: str = None
+
+
+class MultipleReadFromPubSub(PTransform):
+  """A ``PTransform`` that expands ``ReadFromPubSub`` to read from multiple
+  subscriptions and/or topics."""
+  def __init__(
+      self,
+      source_list,  # type: List[PubSubSourceDescriptor]

Review comment:
       ```suggestion
        pubsub_source_descriptors,  # type: List[PubSubSourceDescriptor]
   ```

##########
File path: sdks/python/apache_beam/io/gcp/pubsub.py
##########
@@ -444,3 +449,69 @@ def display_data(self):
 
   def writer(self):
     raise NotImplementedError
+
+
+class PubSubSourceDescriptor(NamedTuple):
+  """A PubSub source descriptor for ``MultipleReadFromPubSub```"""
+  source: str
+  id_label: str = None
+  timestamp_attribute: str = None
+
+
+class MultipleReadFromPubSub(PTransform):
+  """A ``PTransform`` that expands ``ReadFromPubSub`` to read from multiple
+  subscriptions and/or topics."""

Review comment:
       ```suggestion
     ``PubSubSourceDescriptor``."""
   ```

##########
File path: sdks/python/apache_beam/io/gcp/pubsub.py
##########
@@ -444,3 +449,69 @@ def display_data(self):
 
   def writer(self):
     raise NotImplementedError
+
+
+class PubSubSourceDescriptor(NamedTuple):
+  """A PubSub source descriptor for ``MultipleReadFromPubSub```"""
+  source: str

Review comment:
       It seems like the `source` here will be the full path to the 
topic/subscriptions. Please add more pydoc to describe what the source str 
should be.

##########
File path: sdks/python/apache_beam/io/gcp/pubsub.py
##########
@@ -444,3 +449,69 @@ def display_data(self):
 
   def writer(self):
     raise NotImplementedError
+
+
+class PubSubSourceDescriptor(NamedTuple):
+  """A PubSub source descriptor for ``MultipleReadFromPubSub```"""
+  source: str
+  id_label: str = None
+  timestamp_attribute: str = None
+
+
+class MultipleReadFromPubSub(PTransform):
+  """A ``PTransform`` that expands ``ReadFromPubSub`` to read from multiple
+  subscriptions and/or topics."""
+  def __init__(
+      self,
+      source_list,  # type: List[PubSubSourceDescriptor]
+      with_attributes=False,  # type: bool
+  ):
+    """Initializes ``PubSubMultipleReader``.
+
+    Args:
+      source_list: List of Cloud Pub/Sub topics or subscriptions of type
+      `~PubSubSourceDescriptor`.
+      with_attributes:
+        True - input elements will be :class:`~PubsubMessage` objects.
+        False - input elements will be of type ``bytes`` (message data only).
+    """
+    self.source_list = source_list
+    self.with_attributes = with_attributes
+
+    for source in self.source_list:
+      match_topic = re.match(TOPIC_REGEXP, source.source)
+      match_subscription = re.match(SUBSCRIPTION_REGEXP, source.source)
+
+      if not (match_topic or match_subscription):
+        raise ValueError(
+            'PubSub source must be in the form "projects/<project>/topics'
+            '/<topic>" or "projects/<project>/subscription'
+            '/<subscription>" (got %r).' % source.source)
+
+  def expand(self, pcol):
+    sources_pcol = []
+    for source in self.source_list:
+      source_split = source.source.split('/')

Review comment:
       Instead of using `split`, can we use `re.match(TOPIC_REGEXP, 
source.source)` and `re.match(SUBSCRIPTION_REGEXP, source.source)` as well?

##########
File path: sdks/python/apache_beam/io/gcp/pubsub.py
##########
@@ -444,3 +449,69 @@ def display_data(self):
 
   def writer(self):
     raise NotImplementedError
+
+
+class PubSubSourceDescriptor(NamedTuple):
+  """A PubSub source descriptor for ``MultipleReadFromPubSub```"""
+  source: str
+  id_label: str = None
+  timestamp_attribute: str = None
+
+
+class MultipleReadFromPubSub(PTransform):

Review comment:
       Please add a code snippet about how to use this transform and highlight 
the benefit of using this transform compared to the `ReadFromPubSub`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to