This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9f9ff10  Remove unused internal function left over form Scheduler HA 
work (#16269)
9f9ff10 is described below

commit 9f9ff106584a40433b36a7459c7658d04571a6bd
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Fri Jun 4 17:13:09 2021 +0100

    Remove unused internal function left over form Scheduler HA work (#16269)
    
    As of AIP-15 this function is not called anymore and should have been
    deleted then.
---
 airflow/jobs/scheduler_job.py    | 40 ------------------------------------
 tests/jobs/test_scheduler_job.py | 44 ----------------------------------------
 2 files changed, 84 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index dc3f144..0a5c7cc 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1164,46 +1164,6 @@ class SchedulerJob(BaseJob):  # pylint: 
disable=too-many-instance-attributes
         return len(queued_tis)
 
     @provide_session
-    def _change_state_for_tasks_failed_to_execute(self, session: Session = 
None):
-        """
-        If there are tasks left over in the executor,
-        we set them back to SCHEDULED to avoid creating hanging tasks.
-
-        :param session: session for ORM operations
-        """
-        if not self.executor.queued_tasks:
-            return
-
-        filter_for_ti_state_change = [
-            and_(
-                TI.dag_id == dag_id,
-                TI.task_id == task_id,
-                TI.execution_date == execution_date,
-                # The TI.try_number will return raw try_number+1 since the
-                # ti is not running. And we need to -1 to match the DB record.
-                TI._try_number == try_number - 1,  # pylint: 
disable=protected-access
-                TI.state == State.QUEUED,
-            )
-            for dag_id, task_id, execution_date, try_number in 
self.executor.queued_tasks.keys()
-        ]
-        ti_query = session.query(TI).filter(or_(*filter_for_ti_state_change))
-        tis_to_set_to_scheduled: List[TI] = with_row_locks(ti_query, 
session=session).all()
-        if not tis_to_set_to_scheduled:
-            return
-
-        # set TIs to queued state
-        filter_for_tis = TI.filter_for_tis(tis_to_set_to_scheduled)
-        session.query(TI).filter(filter_for_tis).update(
-            {TI.state: State.SCHEDULED, TI.queued_dttm: None}, 
synchronize_session=False
-        )
-
-        for task_instance in tis_to_set_to_scheduled:
-            self.executor.queued_tasks.pop(task_instance.key)
-
-        task_instance_str = "\n\t".join(repr(x) for x in 
tis_to_set_to_scheduled)
-        self.log.info("Set the following tasks to scheduled state:\n\t%s", 
task_instance_str)
-
-    @provide_session
     def _process_executor_events(self, session: Session = None) -> int:
         """Respond to executor events."""
         if not self.processor_agent:
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index adb3855..3c8c068 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -1858,50 +1858,6 @@ class TestSchedulerJob(unittest.TestCase):
         ti2.refresh_from_db(session=session)
         assert ti2.state == State.SCHEDULED
 
-    def test_change_state_for_tasks_failed_to_execute(self):
-        dag = DAG(dag_id='dag_id', start_date=DEFAULT_DATE)
-
-        task = DummyOperator(task_id='task_id', dag=dag, owner='airflow')
-        dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
-
-        # If there's no left over task in executor.queued_tasks, nothing 
happens
-        session = settings.Session()
-        self.scheduler_job = SchedulerJob(subdir=os.devnull)
-        mock_logger = mock.MagicMock()
-        test_executor = MockExecutor(do_update=False)
-        self.scheduler_job.executor = test_executor
-        self.scheduler_job._logger = mock_logger
-        self.scheduler_job._change_state_for_tasks_failed_to_execute()
-        mock_logger.info.assert_not_called()
-
-        # Tasks failed to execute with QUEUED state will be set to SCHEDULED 
state.
-        session.query(TaskInstance).delete()
-        session.commit()
-        key = 'dag_id', 'task_id', DEFAULT_DATE, 1
-        test_executor.queued_tasks[key] = 'value'
-        ti = TaskInstance(task, DEFAULT_DATE)
-        ti.state = State.QUEUED
-        session.merge(ti)  # pylint: disable=no-value-for-parameter
-        session.commit()
-
-        self.scheduler_job._change_state_for_tasks_failed_to_execute()
-
-        ti.refresh_from_db()
-        assert State.SCHEDULED == ti.state
-
-        # Tasks failed to execute with RUNNING state will not be set to 
SCHEDULED state.
-        session.query(TaskInstance).delete()
-        session.commit()
-        ti.state = State.RUNNING
-
-        session.merge(ti)
-        session.commit()
-
-        self.scheduler_job._change_state_for_tasks_failed_to_execute()
-
-        ti.refresh_from_db()
-        assert State.RUNNING == ti.state
-
     def test_adopt_or_reset_orphaned_tasks(self):
         session = settings.Session()
         dag = DAG(

Reply via email to