[ 
https://issues.apache.org/jira/browse/BEAM-10962?focusedWorklogId=513591&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-513591
 ]

ASF GitHub Bot logged work on BEAM-10962:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/Nov/20 15:15
            Start Date: 18/Nov/20 15:15
    Worklog Time Spent: 10m 
      Work Description: InigoSJ commented on a change in pull request #12930:
URL: https://github.com/apache/beam/pull/12930#discussion_r526166184



##########
File path: sdks/python/apache_beam/io/gcp/pubsub.py
##########
@@ -444,3 +449,69 @@ def display_data(self):
 
   def writer(self):
     raise NotImplementedError
+
+
+class PubSubSourceDescriptor(NamedTuple):
+  """A PubSub source descriptor for ``MultipleReadFromPubSub```"""
+  source: str
+  id_label: str = None
+  timestamp_attribute: str = None
+
+
+class MultipleReadFromPubSub(PTransform):
+  """A ``PTransform`` that expands ``ReadFromPubSub`` to read from multiple
+  subscriptions and/or topics."""
+  def __init__(
+      self,
+      source_list,  # type: List[PubSubSourceDescriptor]
+      with_attributes=False,  # type: bool
+  ):
+    """Initializes ``PubSubMultipleReader``.
+
+    Args:
+      source_list: List of Cloud Pub/Sub topics or subscriptions of type
+      `~PubSubSourceDescriptor`.
+      with_attributes:
+        True - input elements will be :class:`~PubsubMessage` objects.
+        False - input elements will be of type ``bytes`` (message data only).
+    """
+    self.source_list = source_list
+    self.with_attributes = with_attributes
+
+    for source in self.source_list:
+      match_topic = re.match(TOPIC_REGEXP, source.source)
+      match_subscription = re.match(SUBSCRIPTION_REGEXP, source.source)
+
+      if not (match_topic or match_subscription):
+        raise ValueError(
+            'PubSub source must be in the form "projects/<project>/topics'
+            '/<topic>" or "projects/<project>/subscription'
+            '/<subscription>" (got %r).' % source.source)
+
+  def expand(self, pcol):
+    sources_pcol = []
+    for source in self.source_list:
+      source_split = source.source.split('/')

Review comment:
       I used a new regex (`PUBSUB_DESCRIPTOR_REGEXP`) that is valid for both, 
so I could use `match.group` to check if topic or subscription, let me know 
what you think.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 513591)
    Time Spent: 8h 50m  (was: 8h 40m)

> Multiple PubSub Source Reader 
> ------------------------------
>
>                 Key: BEAM-10962
>                 URL: https://issues.apache.org/jira/browse/BEAM-10962
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-py-gcp
>            Reporter: Inigo San Jose Visiers
>            Priority: P2
>          Time Spent: 8h 50m
>  Remaining Estimate: 0h
>
> A very common use case in Dataflow / Beam is reading from multiple PubSub 
> topics/subscriptions and flatten them out. 
> I would be nice to have a PTransform that does this without users having to 
> built it themselves.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to