dstandish commented on issue #32836:
URL: https://github.com/apache/airflow/issues/32836#issuecomment-1650465117
ok actually @uranusjr i was able to confirm that many -> one mapping when
there's a failure in mapped tasks, upstream failed does not propagate correctly
beyond the first downstream relative when using dag.test() or
info.scheduling_decisions approach
here is example test for repro
```python
def get_states(dr):
ti_dict = defaultdict(dict)
for ti in dr.get_task_instances():
if ti.map_index == -1:
ti_dict[ti.task_id] = ti.state
else:
ti_dict[ti.task_id][ti.map_index] = ti.state
return ti_dict
def test_many_one(dag_maker, session):
with dag_maker(dag_id="many_one") as dag:
@task
def my_setup(val):
print("setup")
raise ValueError("hi")
@task
def my_work():
print("work")
@task
def my_teardown():
print("teardown")
s = my_setup.expand(val=[1, 2])
t = my_teardown()
s >> my_work() >> t
dr = dag_maker.create_dagrun()
while True:
info = dr.task_instance_scheduling_decisions()
if not info.schedulable_tis:
break
for ti in info.schedulable_tis:
try:
ti.run()
except Exception as e:
print(e)
session.commit()
info.unfinished_tis
# dr = dag.test()
states = get_states(dr)
expected = {
"my_setup": {0: "failed", 1: "failed"},
"my_teardown": None,
"my_work": "upstream_failed",
}
assert states == expected
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]