GitHub user olk added a comment to the discussion: branching + dyn. task 
mapping + passing arguments: how to filter passed arguments

A variant that solves the problem doe expand the arguments before branching but 
I'm wondering if there is a more elegant solution ...

```
from airflow.decorators import dag, task, task_group
from pendulum import datetime


@task
def producer():
    return [ { "flag": True, "url": "abc" }, { "flag": False, "url": "xyz "} ]


@task.branch
def branching(data):
    if data["flag"]:
        return "expanding.consumer_a"
    else:
        return "expanding.consumer_b"


@task
def print_flag(data):
    print(f"flag: {data['flag']}")
    return data


@task
def print_url(data):
    print(f"url: {data['url']}")


@task_group
def consumer_a(data):
    print_url(print_flag(data))


@task_group
def consumer_b(data):
    print_url(print_flag(data))


@task_group
def expanding(data):
    branching(data=data) >> [ consumer_a(data=data), consumer_b(data=data) ]


@dag(
    start_date=datetime(2023, 1, 1),
    catchup=False,
    schedule="@daily"
)
def branch_2():

    data = producer()
    expanding.expand(data=data)


branch_2()
```



GitHub link: 
https://github.com/apache/airflow/discussions/46035#discussioncomment-11950111

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to