Chais commented on issue #43883:
URL: https://github.com/apache/airflow/issues/43883#issuecomment-2470400840
I was able to get the desired behaviour by creating a task group, pulling
the `return_value` XCom from `start` and manually creating all the tasks in a
loop, which I feel shouldn't be necessary:
```python
from random import random
from typing import List
from airflow.decorators import dag, task, task_group
from airflow.models import XCom
from pendulum import now
@dag(
"test_foo",
schedule=None,
start_date=now(),
dag_display_name="Test random things",
)
def test_foo():
@task.python
def start() -> List[int]:
return [i for i in range(1, 11)]
@task_group()
def tg():
@task.short_circuit
def step_one(i: int) -> bool:
print(f"Hello from step {i}")
return random() >= 0.5
@task.python(trigger_rule="one_done")
def step_two(do: bool):
if not do:
print("Should've been skipped.")
print("Doing stuff")
for i in XCom.get_one(key="return_value", task_id="start",
run_id=XCom.run_id):
step_two(step_one(i))
start() >> tg()
test_foo()
if __name__ == "__main__":
test_foo().test()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]