boyuanzz commented on a change in pull request #12930:
URL: https://github.com/apache/beam/pull/12930#discussion_r507987217
##########
File path: sdks/python/apache_beam/io/gcp/pubsub.py
##########
@@ -444,3 +448,123 @@ def display_data(self):
def writer(self):
raise NotImplementedError
+
+
+class MultipleReadFromPubSub(PTransform):
+ """A ``PTransform`` that expands ``ReadFromPubSub`` to read from multiple
+ subscriptions and/or topics."""
+ def __init__(
+ self,
+ source_list, # type: List[str]
+ with_context=False, # type: bool
+ id_label=None, # type: Optional[Union[List[str], str]]
+ with_attributes=False, # type: bool
+ timestamp_attribute=None, # type: Optional[Union[List[str], str]]
+ **kwargs
Review comment:
It also seems like there is no certain consumer for these additional
`kwargs`. Why do we want them here?
##########
File path: sdks/python/apache_beam/io/gcp/pubsub.py
##########
@@ -444,3 +448,123 @@ def display_data(self):
def writer(self):
raise NotImplementedError
+
+
+class MultipleReadFromPubSub(PTransform):
+ """A ``PTransform`` that expands ``ReadFromPubSub`` to read from multiple
+ subscriptions and/or topics."""
+ def __init__(
+ self,
+ source_list, # type: List[str]
+ with_context=False, # type: bool
Review comment:
It seems like constructing `ReadFromPubSub` doesn't ask for
`with_context`. Why do we add a new one here?
##########
File path: sdks/python/apache_beam/io/gcp/pubsub.py
##########
@@ -444,3 +448,123 @@ def display_data(self):
def writer(self):
raise NotImplementedError
+
+
+class MultipleReadFromPubSub(PTransform):
+ """A ``PTransform`` that expands ``ReadFromPubSub`` to read from multiple
+ subscriptions and/or topics."""
+ def __init__(
+ self,
+ source_list, # type: List[str]
+ with_context=False, # type: bool
+ id_label=None, # type: Optional[Union[List[str], str]]
+ with_attributes=False, # type: bool
+ timestamp_attribute=None, # type: Optional[Union[List[str], str]]
+ **kwargs
+ ):
+ """Initializes ``PubSubMultipleReader``.
+
+ Args:
+ source_list: List of Cloud Pub/Sub topics or subscriptions. Topics in
+ form "projects/<project>/topics/<topic>" and subscriptions in form
+ "projects/<project>/subscriptions/<subscription>".
+ with_context:
+ True - output elements will be key-value pairs with the source as the
+ key and the message as the value.
+ False - output elements will be the messages.
+ with_attributes:
+ True - input elements will be :class:`~PubsubMessage` objects.
+ False - input elements will be of type ``bytes`` (message data only).
+ id_label: If set, will set an attribute for each Cloud Pub/Sub message
+ with the given name and a unique value. This attribute can then be
+ used in a ReadFromPubSub PTransform to deduplicate messages. If type
+ is string, all sources will share the same value; if type is
+ ``List``, each source will use the value of its index.
+ timestamp_attribute: If set, will set an attribute for each Cloud
+ Pub/Sub message with the given name and the message's publish time as
+ the value. If type is ``string``, all sources will share the same
+ value; if type List, each source will use the value of its index.
+ """
+ self.source_list = source_list
+ self.with_context = with_context
+ self.with_attributes = with_attributes
+ self._kwargs = kwargs
+
+ self._total_sources = len(source_list)
+
+ if isinstance(id_label, str) or id_label is None:
Review comment:
Lots of logic here is checking input arguments, which we can get rid of
by using a `NamedTuple` to represent a reading PubSub configuration instead of
using a list for each configurable attributes.
##########
File path: sdks/python/apache_beam/io/gcp/pubsub.py
##########
@@ -444,3 +448,123 @@ def display_data(self):
def writer(self):
raise NotImplementedError
+
+
+class MultipleReadFromPubSub(PTransform):
+ """A ``PTransform`` that expands ``ReadFromPubSub`` to read from multiple
+ subscriptions and/or topics."""
+ def __init__(
+ self,
+ source_list, # type: List[str]
+ with_context=False, # type: bool
+ id_label=None, # type: Optional[Union[List[str], str]]
+ with_attributes=False, # type: bool
+ timestamp_attribute=None, # type: Optional[Union[List[str], str]]
+ **kwargs
+ ):
+ """Initializes ``PubSubMultipleReader``.
+
+ Args:
+ source_list: List of Cloud Pub/Sub topics or subscriptions. Topics in
+ form "projects/<project>/topics/<topic>" and subscriptions in form
+ "projects/<project>/subscriptions/<subscription>".
+ with_context:
+ True - output elements will be key-value pairs with the source as the
+ key and the message as the value.
+ False - output elements will be the messages.
+ with_attributes:
+ True - input elements will be :class:`~PubsubMessage` objects.
+ False - input elements will be of type ``bytes`` (message data only).
+ id_label: If set, will set an attribute for each Cloud Pub/Sub message
+ with the given name and a unique value. This attribute can then be
+ used in a ReadFromPubSub PTransform to deduplicate messages. If type
+ is string, all sources will share the same value; if type is
+ ``List``, each source will use the value of its index.
+ timestamp_attribute: If set, will set an attribute for each Cloud
+ Pub/Sub message with the given name and the message's publish time as
+ the value. If type is ``string``, all sources will share the same
+ value; if type List, each source will use the value of its index.
+ """
+ self.source_list = source_list
+ self.with_context = with_context
+ self.with_attributes = with_attributes
+ self._kwargs = kwargs
+
+ self._total_sources = len(source_list)
+
+ if isinstance(id_label, str) or id_label is None:
+ self.id_label = [id_label] * self._total_sources
+ else:
+ if len(id_label) != self._total_sources:
+ raise ValueError(
+ 'Length of "id_label" (%d) is not the same as length of '
+ '"sources" (%d)' % (len(id_label), self._total_sources))
+ self.id_label = id_label
+
+ if isinstance(timestamp_attribute, str) or timestamp_attribute is None:
+ self.timestamp_attribute = [timestamp_attribute] * self._total_sources
+ else:
+ if len(timestamp_attribute) != self._total_sources:
+ raise ValueError(
+ 'Length of "timestamp_attribute" (%d) is not the same as length of
'
+ '"sources" (%d)' % (len(timestamp_attribute), self._total_sources))
+ self.timestamp_attribute = timestamp_attribute
+
+ for source in self.source_list:
+ match_topic = re.match(TOPIC_REGEXP, source)
+ match_subscription = re.match(SUBSCRIPTION_REGEXP, source)
+
+ if not (match_topic or match_subscription):
+ raise ValueError(
+ 'PubSub source must be in the form "projects/<project>/topics'
+ '/<topic>" or "projects/<project>/subscription'
+ '/<subscription>" (got %r).' % source)
+
+ if 'topic' in self._kwargs:
+ raise ValueError(
+ 'Topics and subscriptions should be in "source_list". '
+ 'Found topic %s' % self._kwargs['topic'])
+
+ if 'subscription' in self._kwargs:
+ raise ValueError(
+ 'Subscriptions and topics should be in "source_list". '
+ 'Found subscription %s' % self._kwargs['subscription'])
+
+ def expand(self, pcol):
+ sources_pcol = []
+ for i, source in enumerate(self.source_list):
+ id_label = self.id_label[i]
+ timestamp_attribute = self.timestamp_attribute[i]
+
+ source_split = source.split('/')
+ source_project = source_split[1]
+ source_type = source_split[2]
+ source_name = source_split[-1]
+
+ step_name_base = 'PubSub %s/project:%s' % (source_type, source_project)
+ read_step_name = '%s/Read %s' % (step_name_base, source_name)
+
+ if source_type == 'topics':
Review comment:
`PubSubSource` has the similar checking logic as here. We should be able
to move this check.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]