GitHub user olk added a comment to the discussion: branching + dyn. task
mapping + passing arguments: how to filter passed arguments
A variant that solves the problem doe expand the arguments before branching but
I'm wondering if there is a more elegant solution ...
```
from airflow.decorators import dag, task, task_group
from pendulum import datetime
@task
def producer():
return [ { "flag": True, "url": "abc" }, { "flag": False, "url": "xyz "} ]
@task.branch
def branching(data):
if data["flag"]:
return "expanding.consumer_a"
else:
return "expanding.consumer_b"
@task
def print_flag(data):
print(f"flag: {data['flag']}")
return data
@task
def print_url(data):
print(f"url: {data['url']}")
@task_group
def consumer_a(data):
print_url(print_flag(data))
@task_group
def consumer_b(data):
print_url(print_flag(data))
@task_group
def expanding(data):
branching(data=data) >> [ consumer_a(data=data), consumer_b(data=data) ]
@dag(
start_date=datetime(2023, 1, 1),
catchup=False,
schedule="@daily"
)
def branch_2():
data = producer()
expanding.expand(data=data)
branch_2()
```
GitHub link:
https://github.com/apache/airflow/discussions/46035#discussioncomment-11950111
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]