Hi everyone! I have seen a very common use case in Beam which is pipelines that read from multiple PubSub topics or multiple subscriptions to end up flattening them. In general, this makes the pipeline harder to understand since not much context can be taken from it.
I was thinking of adding a PTransform that reads from a list of Topics/Subscriptions with a simple Extend to the actual ReadFromPubSub Transform. This approach would help both developing and debugging the pipelines: - No time spent developing a multi reader - Easier organization of Topics/Subcriptions - Pipeline graph easier to the eye and less convoluted - Faster debugging - Avoid issues when pipelines are too wide and hide other parts of the pipeline As mentioned, I only have the PTransform based on an Extend, but in the future implementing this with Splittable DoFn would be the way to go. The PTransform takes 3 optional parameters: - topics: list of topics - subscriptions: list of subscriptions - with_context: boolean. If True it adds the topic/subscription name to the message, so it becomes a tuple of (topic/subs name, message). This could be helpful for future aggregations. The name may need to change. The parameters `topics` and `subscriptions` may be fused in a single parameter and use the path [1] to know if it's a topic or a subscription. But I consider it cleaner this way. Please find attached the current class I made as well as some screenshots of how the pipeline looks. Since I don't know much about SplittableDoFns yet, I was considering making a Pull Request for this PTransform and, on the meanwhile, work on a SplittableDoFn version. Thanks a lot for your time, let me know what you think Iñigo [1] projects/<PROJECT>/subscriptions/<NAME> projects/<PROJECT>/topics/<NAME>
from apache_beam.io.gcp.pubsub import ReadFromPubSub from apache_beam import Flatten, Map, PTransform class PubSubMultipleReader(PTransform): def __init__(self, topics=[], subscriptions=[], with_context=False): self.topics = topics self.subscriptions = subscriptions self.with_context = with_context def expand(self, pcol): topics_subscriptions_pcol = [] for topic in self.topics: topic_split = topic.split('/') topic_project = topic_split[1] topic_name = topic_split[-1] current_topic = ( pcol | f'PubSub Topics/project:{topic_project}/Read {topic_name}' >> ReadFromPubSub( topic=topic) ) if self.with_context: name = f'PubSub Topics/project:{topic_project}/Add Keys {topic_name}' current_topic = current_topic | name >> Map( lambda x: (topic, x)) topics_subscriptions_pcol.append(current_topic) for subscription in self.subscriptions: subscription_split = subscription.split('/') subscription_project = subscription_split[1] subscription_name = subscription_split[-1] current_subscription = ( pcol | f'PubSub Subscriptions/project:{subscription_project}/Read {subscription_name}' >> ReadFromPubSub( subscription=subscription) ) if self.with_context: name = f"PubSub Subscriptions/project:{subscription_project}/Add Keys {subscription_name}" current_subscription = current_subscription | name >> Map( lambda x: (subscription, x)) topics_subscriptions_pcol.append(current_subscription) return tuple(topics_subscriptions_pcol) | Flatten()