Hi everyone!

I have seen a very common use case in Beam which is pipelines that read
from multiple PubSub topics or multiple subscriptions to end up flattening
them. In general, this makes the pipeline harder to understand since not
much context can be taken from it.

I was thinking of adding a PTransform that reads from a list of
Topics/Subscriptions with a simple Extend to the actual ReadFromPubSub
Transform. This approach would help both developing and debugging the
pipelines:

   - No time spent developing a multi reader
   - Easier organization of Topics/Subcriptions
   - Pipeline graph easier to the eye and less convoluted
   - Faster debugging
   - Avoid issues when pipelines are too wide and hide other parts of the
   pipeline

As mentioned, I only have the PTransform based on an Extend, but in the
future implementing this with Splittable DoFn would be the way to go. The
PTransform takes 3 optional parameters:

   - topics: list of topics
   - subscriptions: list of subscriptions
   - with_context: boolean. If True it adds the topic/subscription name to
   the message, so it becomes a tuple of (topic/subs name, message). This
   could be helpful for future aggregations. The name may need to change.

The parameters `topics` and `subscriptions` may be fused in a single
parameter and use the path [1] to know if it's a topic or a subscription.
But I consider it cleaner this way.

Please find attached the current class I made as well as some screenshots
of how the pipeline looks.

Since I don't know much about SplittableDoFns yet, I was considering making
a Pull Request for this PTransform and, on the meanwhile, work on a
SplittableDoFn version.

Thanks a lot for your time, let me know what you think
Iñigo

[1] projects/<PROJECT>/subscriptions/<NAME>
projects/<PROJECT>/topics/<NAME>
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam import Flatten, Map, PTransform

class PubSubMultipleReader(PTransform):
    def __init__(self, topics=[], subscriptions=[], with_context=False):
        self.topics = topics
        self.subscriptions = subscriptions
        self.with_context = with_context

    def expand(self, pcol):
        topics_subscriptions_pcol = []
        for topic in self.topics:
            topic_split = topic.split('/')
            topic_project = topic_split[1]
            topic_name = topic_split[-1]
            current_topic = (
                pcol | f'PubSub Topics/project:{topic_project}/Read {topic_name}' >> ReadFromPubSub(
                topic=topic)
            )
            if self.with_context:
                name = f'PubSub Topics/project:{topic_project}/Add Keys {topic_name}'
                current_topic = current_topic | name >> Map(
                    lambda x: (topic, x))

            topics_subscriptions_pcol.append(current_topic)

        for subscription in self.subscriptions:
            subscription_split = subscription.split('/')
            subscription_project = subscription_split[1]
            subscription_name = subscription_split[-1]
            current_subscription = (
                pcol | f'PubSub Subscriptions/project:{subscription_project}/Read {subscription_name}' >> ReadFromPubSub(
                subscription=subscription)
            )
            if self.with_context:
                name = f"PubSub Subscriptions/project:{subscription_project}/Add Keys {subscription_name}"
                current_subscription = current_subscription | name >> Map(
                    lambda x: (subscription, x))

            topics_subscriptions_pcol.append(current_subscription)

        return tuple(topics_subscriptions_pcol) | Flatten()

Reply via email to