Chais opened a new issue, #43883:
URL: https://github.com/apache/airflow/issues/43883
### Apache Airflow version
2.10.3
### If "Other Airflow 2 version" selected, which one?
_No response_
### What happened?
It seems like tasks created via `.expand()` do not inherit the downstream
tasks of its parent/original. This becomes an issue when we expand a
`task.short_circuit` because short-circuiting no longer works.
### What you think should happen instead?
My understanding from the documentation is, that
`step_two.expand(do=step_one.expand(i=start()))` is supposed to be equivalent
to:
```mermaid
flowchart LR
start --> A1[step_one_0] --> B1[step_two_0]
start --> A2[step_one_1] --> B2[step_two_1]
start --> A3[step_one_2] --> B3[step_two_2]
start --> A4[…] --> B4[…]
start --> A9[step_one_9] --> B9[step_two_9]
```
### How to reproduce
```python
import random
from typing import List
import pendulum
from airflow.decorators import dag, task
@dag(
"test_foo",
schedule=None,
start_date=pendulum.now(),
render_template_as_native_obj=True,
dag_display_name="Test random things",
)
def test_foo():
@task.python
def start() -> List[int]:
return [i for i in range(10)]
@task.short_circuit
def step_one(i: int) -> bool:
print(f"Hello from step {i}")
return random.random() >= 0.5
@task.python
def step_two(do: bool):
if not do:
print("Should've been skipped.")
print("Doing stuff")
step_two.expand(do=step_one.expand(i=start()))
test_foo()
if __name__ == "__main__":
test_foo().test()
```
Obviously approximately 5 of the 10 tasks are bound to return `False` and
short-circuit. If they do the log will read something like this:
```
Hello from step 5
[2024-11-11 13:31:09,695] {python.py:240} INFO - Done. Returned value was:
False
[2024-11-11 13:31:09,697] {python.py:309} INFO - Condition result is False
[2024-11-11 13:31:09,698] {python.py:316} INFO - No downstream tasks;
nothing to do.
```
But checking the `task_dict` variable in the module, we see that `step_one`
does have `step_two` set as downstream task and vice-versa, `step_two` has
`step_one` set as upstream task.
```
__pydevd_ret_val_dict['factory'].task_dict['step_one'].downstream_task_ids
{'step_two'}
__pydevd_ret_val_dict['factory'].task_dict['step_two'].upstream_task_ids
{'step_one'}
```
### Operating System
24.04.1 LTS (Noble Numbat)
### Versions of Apache Airflow Providers
_No response_
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
I'm testing this on Standalone
### Anything else?
I can get closer to the desired behaviour by using a list comprehension in
the dependency, but that can't be done dynamically and also doesn't produce
exactly the desired behaviour.
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]