Taragolis opened a new issue, #38955:
URL: https://github.com/apache/airflow/issues/38955

   ### Body
   
   This new functional was added in #37498 however test 
[`tests/ti_deps/deps/test_mapped_task_upstream_dep.py::test_step_by_step`](https://github.com/apache/airflow/blob/main/tests/ti_deps/deps/test_mapped_task_upstream_dep.py#L160)
 failed on regular basis on Postgres Database
   
   This is might be a side effect of other test or potentially something wrong 
with Postgres implementation
   
   Example failure:
   ```console
   _____________________ test_step_by_step[task-True-failed] 
______________________
   
   dag_maker = <tests.conftest.dag_maker.<locals>.DagFactory object at 
0x7f68e707e280>
   session = <sqlalchemy.orm.session.Session object at 0x7f691fcbd1f0>
   failure_mode = <TaskInstanceState.FAILED: 'failed'>, skip_upstream = True
   testcase = 'task'
   
       @pytest.mark.parametrize("failure_mode", [None, FAILED, UPSTREAM_FAILED])
       @pytest.mark.parametrize("skip_upstream", [True, False])
       @pytest.mark.parametrize("testcase", ["task", "group"])
       def test_step_by_step(
           dag_maker, session: Session, failure_mode: TaskInstanceState | None, 
skip_upstream: bool, testcase: str
       ):
           from airflow.decorators import task, task_group
       
           with dag_maker(session=session):
       
               @task
               def t1():
                   return [0]
       
               @task
               def t2_a():
                   if failure_mode == UPSTREAM_FAILED:
                       raise AirflowFailException()
                   return [1, 2]
       
               @task
               def t2_b(x):
                   if failure_mode == FAILED:
                       raise AirflowFailException()
                   return x
       
               @task
               def t3():
                   if skip_upstream:
                       raise AirflowSkipException()
                   return [3, 4]
       
               @task
               def t4():
                   return 17
       
               @task(trigger_rule="all_done")
               def m1(a, x, y, z):
                   return a + x + y + z
       
               @task(trigger_rule="all_done")
               def m2(x, y):
                   return x + y
       
               @task_group
               def tg(x, y):
                   return m2(x, y)
       
               x_vals = t1()
               y_vals = m1.partial(a=t4()).expand(x=x_vals, y=t2_b(t2_a()), 
z=t3())
               if testcase == "task":
                   m2.expand(x=x_vals, y=y_vals)
               else:
                   tg.expand(x=x_vals, y=y_vals)
       
           dr: DagRun = dag_maker.create_dagrun()
       
           mapped_task_1 = "m1"
           mapped_task_2 = "m2" if testcase == "task" else "tg.m2"
           expect_passed = failure_mode is None and not skip_upstream
       
           # Initial decision, t1, t2 and t3 can be scheduled
           schedulable_tis, finished_tis_states = 
_one_scheduling_decision_iteration(dr, session)
           assert sorted(schedulable_tis) == ["t1", "t2_a", "t3", "t4"]
           assert not finished_tis_states
       
           # Run first schedulable task
           schedulable_tis["t1"].run()
           schedulable_tis, finished_tis_states = 
_one_scheduling_decision_iteration(dr, session)
           assert sorted(schedulable_tis) == ["t2_a", "t3", "t4"]
           assert finished_tis_states == {"t1": SUCCESS}
       
           # Run remaining schedulable tasks
           if failure_mode == UPSTREAM_FAILED:
               with pytest.raises(AirflowFailException):
                   schedulable_tis["t2_a"].run()
               _one_scheduling_decision_iteration(dr, session)
           else:
               schedulable_tis["t2_a"].run()
               schedulable_tis, _ = _one_scheduling_decision_iteration(dr, 
session)
               if not failure_mode:
                   schedulable_tis["t2_b"].run()
               else:
                   with pytest.raises(AirflowFailException):
                       schedulable_tis["t2_b"].run()
           schedulable_tis["t3"].run()
           schedulable_tis["t4"].run()
           _one_scheduling_decision_iteration(dr, session)
       
           # Test the mapped task upstream dependency checks
           schedulable_tis, finished_tis_states = 
_one_scheduling_decision_iteration(dr, session)
           expected_finished_tis_states = {
               "t1": SUCCESS,
               "t2_a": FAILED if failure_mode == UPSTREAM_FAILED else SUCCESS,
               "t2_b": failure_mode if failure_mode else SUCCESS,
               "t3": SKIPPED if skip_upstream else SUCCESS,
               "t4": SUCCESS,
           }
           if not expect_passed:
               expected_finished_tis_states[mapped_task_1] = UPSTREAM_FAILED if 
failure_mode else SKIPPED
               expected_finished_tis_states[mapped_task_2] = UPSTREAM_FAILED if 
failure_mode else SKIPPED
   >       assert finished_tis_states == expected_finished_tis_states
   E       AssertionError: assert equals failed
   E         {                                {                               
   E           'm1': <TaskInstanceState.UPST    'm1': <TaskInstanceState.UPST 
   E         REAM_FAILED: 'upstream_failed'>  REAM_FAILED: 'upstream_failed'> 
   E         ,                                ,                               
   E           't1': 'success',                 'm2': <TaskInstanceState.UPST 
   E                                          REAM_FAILED: 'upstream_failed'> 
   E                                          ,                               
   E           't2_a': 'success',               't1': <TaskInstanceState.SUCC 
   E                                          ESS: 'success'>,                
   E           't2_b': 'failed',                't2_a': <TaskInstanceState.SU 
   E                                          CCESS: 'success'>,              
   E           't3': 'skipped',                 't2_b': <TaskInstanceState.FA 
   E                                          ILED: 'failed'>,                
   E           't4': 'success',                 't3': <TaskInstanceState.SKIP 
   E                                          PED: 'skipped'>,                
   E                                            't4': <TaskInstanceState.SUCC 
   E                                          ESS: 'success'>,                
   E         }                                }
   
   tests/ti_deps/deps/test_mapped_task_upstream_dep.py:257: AssertionError
   ```
   
   ### Committer
   
   - [X] I acknowledge that I am a maintainer/committer of the Apache Airflow 
project.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to