ephraimbuddy opened a new pull request, #34138:
URL: https://github.com/apache/airflow/pull/34138
Mapped task with an upstream mapped task doesn't wait for the mapped
upstream task instances to complete before attempting to run. This results in
inconsistent behaviour. For example, if you have an upstream task with 3 mapped
task, once any of the task instances finishes, the downstream task will try to
run and will depend on the state with which the first finished task in the
upstream finished. If the state is failed, then the downstream will be upstream
failed. Also, if the state is successful, then the downstream trigger rule will
evaluate using this success state of one of the upstream tasks. If eventually,
other tasks in the upstream are completed with other states other than success,
those are not used in the downstream mapped evaluation.
To fix this, I had to check if all the upstreams were done and only proceed
to evaluate the trigger rule if the upstreams were done. This affected tests in
tests/models/test_taskinstance.py because to evaluate most, we have to make the
UpstreamStates.done to be really be done in tests so the trigger rules can be
evaluated
To test the behaviour, run the below task and notice the inconsistent state
of the finished dagrun:
```python
with DAG(
dag_id="AAAbug",
start_date=datetime(2023, 7, 4),
schedule="@daily",
tags=["setup_teardown"]
) as dag:
@task
def my_work(val):
print(val)
@task
def my_setup(val):
if val == "data2.json":
raise ValueError("fail!")
elif val == "data3.json":
raise AirflowSkipException("skip!")
print(f"setup: {val}")
return val
s = my_setup.expand(val=["data1.json", "data2.json", "data3.json"])
my_work(s)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]