InigoSJ commented on a change in pull request #12930:
URL: https://github.com/apache/beam/pull/12930#discussion_r526166184



##########
File path: sdks/python/apache_beam/io/gcp/pubsub.py
##########
@@ -444,3 +449,69 @@ def display_data(self):
 
   def writer(self):
     raise NotImplementedError
+
+
+class PubSubSourceDescriptor(NamedTuple):
+  """A PubSub source descriptor for ``MultipleReadFromPubSub```"""
+  source: str
+  id_label: str = None
+  timestamp_attribute: str = None
+
+
+class MultipleReadFromPubSub(PTransform):
+  """A ``PTransform`` that expands ``ReadFromPubSub`` to read from multiple
+  subscriptions and/or topics."""
+  def __init__(
+      self,
+      source_list,  # type: List[PubSubSourceDescriptor]
+      with_attributes=False,  # type: bool
+  ):
+    """Initializes ``PubSubMultipleReader``.
+
+    Args:
+      source_list: List of Cloud Pub/Sub topics or subscriptions of type
+      `~PubSubSourceDescriptor`.
+      with_attributes:
+        True - input elements will be :class:`~PubsubMessage` objects.
+        False - input elements will be of type ``bytes`` (message data only).
+    """
+    self.source_list = source_list
+    self.with_attributes = with_attributes
+
+    for source in self.source_list:
+      match_topic = re.match(TOPIC_REGEXP, source.source)
+      match_subscription = re.match(SUBSCRIPTION_REGEXP, source.source)
+
+      if not (match_topic or match_subscription):
+        raise ValueError(
+            'PubSub source must be in the form "projects/<project>/topics'
+            '/<topic>" or "projects/<project>/subscription'
+            '/<subscription>" (got %r).' % source.source)
+
+  def expand(self, pcol):
+    sources_pcol = []
+    for source in self.source_list:
+      source_split = source.source.split('/')

Review comment:
       I used a new regex (`PUBSUB_DESCRIPTOR_REGEXP`) that is valid for both, 
so I could use `match.group` to check if topic or subscription, let me know 
what you think.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to