Taragolis opened a new issue, #38955: URL: https://github.com/apache/airflow/issues/38955
### Body This new functional was added in #37498 however test [`tests/ti_deps/deps/test_mapped_task_upstream_dep.py::test_step_by_step`](https://github.com/apache/airflow/blob/main/tests/ti_deps/deps/test_mapped_task_upstream_dep.py#L160) failed on regular basis on Postgres Database This is might be a side effect of other test or potentially something wrong with Postgres implementation Example failure: ```console _____________________ test_step_by_step[task-True-failed] ______________________ dag_maker = <tests.conftest.dag_maker.<locals>.DagFactory object at 0x7f68e707e280> session = <sqlalchemy.orm.session.Session object at 0x7f691fcbd1f0> failure_mode = <TaskInstanceState.FAILED: 'failed'>, skip_upstream = True testcase = 'task' @pytest.mark.parametrize("failure_mode", [None, FAILED, UPSTREAM_FAILED]) @pytest.mark.parametrize("skip_upstream", [True, False]) @pytest.mark.parametrize("testcase", ["task", "group"]) def test_step_by_step( dag_maker, session: Session, failure_mode: TaskInstanceState | None, skip_upstream: bool, testcase: str ): from airflow.decorators import task, task_group with dag_maker(session=session): @task def t1(): return [0] @task def t2_a(): if failure_mode == UPSTREAM_FAILED: raise AirflowFailException() return [1, 2] @task def t2_b(x): if failure_mode == FAILED: raise AirflowFailException() return x @task def t3(): if skip_upstream: raise AirflowSkipException() return [3, 4] @task def t4(): return 17 @task(trigger_rule="all_done") def m1(a, x, y, z): return a + x + y + z @task(trigger_rule="all_done") def m2(x, y): return x + y @task_group def tg(x, y): return m2(x, y) x_vals = t1() y_vals = m1.partial(a=t4()).expand(x=x_vals, y=t2_b(t2_a()), z=t3()) if testcase == "task": m2.expand(x=x_vals, y=y_vals) else: tg.expand(x=x_vals, y=y_vals) dr: DagRun = dag_maker.create_dagrun() mapped_task_1 = "m1" mapped_task_2 = "m2" if testcase == "task" else "tg.m2" expect_passed = failure_mode is None and not skip_upstream # Initial decision, t1, t2 and t3 can be scheduled schedulable_tis, finished_tis_states = _one_scheduling_decision_iteration(dr, session) assert sorted(schedulable_tis) == ["t1", "t2_a", "t3", "t4"] assert not finished_tis_states # Run first schedulable task schedulable_tis["t1"].run() schedulable_tis, finished_tis_states = _one_scheduling_decision_iteration(dr, session) assert sorted(schedulable_tis) == ["t2_a", "t3", "t4"] assert finished_tis_states == {"t1": SUCCESS} # Run remaining schedulable tasks if failure_mode == UPSTREAM_FAILED: with pytest.raises(AirflowFailException): schedulable_tis["t2_a"].run() _one_scheduling_decision_iteration(dr, session) else: schedulable_tis["t2_a"].run() schedulable_tis, _ = _one_scheduling_decision_iteration(dr, session) if not failure_mode: schedulable_tis["t2_b"].run() else: with pytest.raises(AirflowFailException): schedulable_tis["t2_b"].run() schedulable_tis["t3"].run() schedulable_tis["t4"].run() _one_scheduling_decision_iteration(dr, session) # Test the mapped task upstream dependency checks schedulable_tis, finished_tis_states = _one_scheduling_decision_iteration(dr, session) expected_finished_tis_states = { "t1": SUCCESS, "t2_a": FAILED if failure_mode == UPSTREAM_FAILED else SUCCESS, "t2_b": failure_mode if failure_mode else SUCCESS, "t3": SKIPPED if skip_upstream else SUCCESS, "t4": SUCCESS, } if not expect_passed: expected_finished_tis_states[mapped_task_1] = UPSTREAM_FAILED if failure_mode else SKIPPED expected_finished_tis_states[mapped_task_2] = UPSTREAM_FAILED if failure_mode else SKIPPED > assert finished_tis_states == expected_finished_tis_states E AssertionError: assert equals failed E { { E 'm1': <TaskInstanceState.UPST 'm1': <TaskInstanceState.UPST E REAM_FAILED: 'upstream_failed'> REAM_FAILED: 'upstream_failed'> E , , E 't1': 'success', 'm2': <TaskInstanceState.UPST E REAM_FAILED: 'upstream_failed'> E , E 't2_a': 'success', 't1': <TaskInstanceState.SUCC E ESS: 'success'>, E 't2_b': 'failed', 't2_a': <TaskInstanceState.SU E CCESS: 'success'>, E 't3': 'skipped', 't2_b': <TaskInstanceState.FA E ILED: 'failed'>, E 't4': 'success', 't3': <TaskInstanceState.SKIP E PED: 'skipped'>, E 't4': <TaskInstanceState.SUCC E ESS: 'success'>, E } } tests/ti_deps/deps/test_mapped_task_upstream_dep.py:257: AssertionError ``` ### Committer - [X] I acknowledge that I am a maintainer/committer of the Apache Airflow project. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
