tanelk commented on PR #23731:
URL: https://github.com/apache/airflow/pull/23731#issuecomment-1141678437

   ```diff --git a/tests/jobs/test_scheduler_job.py 
b/tests/jobs/test_scheduler_job.py
   index fd32e6dd7..fee991995 100644
   --- a/tests/jobs/test_scheduler_job.py
   +++ b/tests/jobs/test_scheduler_job.py
   @@ -4099,37 +4099,35 @@ class TestSchedulerJob:
            ) > (timezone.utcnow() - timedelta(days=2))
   
   
   [email protected](reason="Work out where this goes")
   -def test_task_with_upstream_skip_process_task_instances():
   -    """
   -    Test if _process_task_instances puts a task instance into SKIPPED state 
if any of its
   -    upstream tasks are skipped according to TriggerRuleDep.
   -    """
   -    clear_db_runs()
   -    with DAG(
   -        dag_id='test_task_with_upstream_skip_dag', start_date=DEFAULT_DATE, 
schedule_interval=None
   -    ) as dag:
   -        dummy1 = EmptyOperator(task_id='dummy1')
   -        dummy2 = EmptyOperator(task_id="dummy2")
   -        dummy3 = EmptyOperator(task_id="dummy3")
   -        [dummy1, dummy2] >> dummy3
   -
   -    # dag_file_processor = DagFileProcessor(dag_ids=[], 
log=mock.MagicMock())
   -    dag.clear()
   -    dr = dag.create_dagrun(run_type=DagRunType.MANUAL, state=State.RUNNING, 
execution_date=DEFAULT_DATE)
   -    assert dr is not None
   -
   -    with create_session() as session:
   +    def test_task_with_upstream_skip_process_task_instances(self, 
dag_maker, session):
   +        """
   +        Test if _process_task_instances puts a task instance into SKIPPED 
state if any of its
   +        upstream tasks are skipped according to TriggerRuleDep.
   +        """
   +        with dag_maker(
   +            dag_id='test_task_with_upstream_skip_process_task_instances',
   +            start_date=DEFAULT_DATE,
   +            session=session,
   +        ):
   +            dummy1 = EmptyOperator(task_id='dummy1')
   +            dummy2 = EmptyOperator(task_id="dummy2")
   +            dummy3 = EmptyOperator(task_id="dummy3")
   +            [dummy1, dummy2] >> dummy3
   +
   +        dr = dag_maker.create_dagrun(state=State.RUNNING)
   +
            tis = {ti.task_id: ti for ti in 
dr.get_task_instances(session=session)}
            # Set dummy1 to skipped and dummy2 to success. dummy3 remains as 
none.
            tis[dummy1.task_id].state = State.SKIPPED
            tis[dummy2.task_id].state = State.SUCCESS
            assert tis[dummy3.task_id].state == State.NONE
   +        session.flush()
   
   -    # dag_runs = DagRun.find(dag_id='test_task_with_upstream_skip_dag')
   -    # dag_file_processor._process_task_instances(dag, dag_runs=dag_runs)
   +        self.scheduler_job = SchedulerJob(subdir=os.devnull)
   +        self.scheduler_job._schedule_dag_run(dr, session)
   +        session.flush()
   
   -    with create_session() as session:
   +        session.refresh(dr)
            tis = {ti.task_id: ti for ti in 
dr.get_task_instances(session=session)}
            assert tis[dummy1.task_id].state == State.SKIPPED
            assert tis[dummy2.task_id].state == State.SUCCESS
   ```
   
   The test is from pre 2.0 era. The method, that was supposed to set the TI to 
skiped (`dag_file_processor._process_task_instances`) was moved to scheduler 
job.
   
   The test should pass now, but I'm not sure if it belongs here or perhaps 
this is covered by some other tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to