This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9f9ff10 Remove unused internal function left over form Scheduler HA
work (#16269)
9f9ff10 is described below
commit 9f9ff106584a40433b36a7459c7658d04571a6bd
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Fri Jun 4 17:13:09 2021 +0100
Remove unused internal function left over form Scheduler HA work (#16269)
As of AIP-15 this function is not called anymore and should have been
deleted then.
---
airflow/jobs/scheduler_job.py | 40 ------------------------------------
tests/jobs/test_scheduler_job.py | 44 ----------------------------------------
2 files changed, 84 deletions(-)
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index dc3f144..0a5c7cc 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1164,46 +1164,6 @@ class SchedulerJob(BaseJob): # pylint:
disable=too-many-instance-attributes
return len(queued_tis)
@provide_session
- def _change_state_for_tasks_failed_to_execute(self, session: Session =
None):
- """
- If there are tasks left over in the executor,
- we set them back to SCHEDULED to avoid creating hanging tasks.
-
- :param session: session for ORM operations
- """
- if not self.executor.queued_tasks:
- return
-
- filter_for_ti_state_change = [
- and_(
- TI.dag_id == dag_id,
- TI.task_id == task_id,
- TI.execution_date == execution_date,
- # The TI.try_number will return raw try_number+1 since the
- # ti is not running. And we need to -1 to match the DB record.
- TI._try_number == try_number - 1, # pylint:
disable=protected-access
- TI.state == State.QUEUED,
- )
- for dag_id, task_id, execution_date, try_number in
self.executor.queued_tasks.keys()
- ]
- ti_query = session.query(TI).filter(or_(*filter_for_ti_state_change))
- tis_to_set_to_scheduled: List[TI] = with_row_locks(ti_query,
session=session).all()
- if not tis_to_set_to_scheduled:
- return
-
- # set TIs to queued state
- filter_for_tis = TI.filter_for_tis(tis_to_set_to_scheduled)
- session.query(TI).filter(filter_for_tis).update(
- {TI.state: State.SCHEDULED, TI.queued_dttm: None},
synchronize_session=False
- )
-
- for task_instance in tis_to_set_to_scheduled:
- self.executor.queued_tasks.pop(task_instance.key)
-
- task_instance_str = "\n\t".join(repr(x) for x in
tis_to_set_to_scheduled)
- self.log.info("Set the following tasks to scheduled state:\n\t%s",
task_instance_str)
-
- @provide_session
def _process_executor_events(self, session: Session = None) -> int:
"""Respond to executor events."""
if not self.processor_agent:
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index adb3855..3c8c068 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -1858,50 +1858,6 @@ class TestSchedulerJob(unittest.TestCase):
ti2.refresh_from_db(session=session)
assert ti2.state == State.SCHEDULED
- def test_change_state_for_tasks_failed_to_execute(self):
- dag = DAG(dag_id='dag_id', start_date=DEFAULT_DATE)
-
- task = DummyOperator(task_id='task_id', dag=dag, owner='airflow')
- dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
-
- # If there's no left over task in executor.queued_tasks, nothing
happens
- session = settings.Session()
- self.scheduler_job = SchedulerJob(subdir=os.devnull)
- mock_logger = mock.MagicMock()
- test_executor = MockExecutor(do_update=False)
- self.scheduler_job.executor = test_executor
- self.scheduler_job._logger = mock_logger
- self.scheduler_job._change_state_for_tasks_failed_to_execute()
- mock_logger.info.assert_not_called()
-
- # Tasks failed to execute with QUEUED state will be set to SCHEDULED
state.
- session.query(TaskInstance).delete()
- session.commit()
- key = 'dag_id', 'task_id', DEFAULT_DATE, 1
- test_executor.queued_tasks[key] = 'value'
- ti = TaskInstance(task, DEFAULT_DATE)
- ti.state = State.QUEUED
- session.merge(ti) # pylint: disable=no-value-for-parameter
- session.commit()
-
- self.scheduler_job._change_state_for_tasks_failed_to_execute()
-
- ti.refresh_from_db()
- assert State.SCHEDULED == ti.state
-
- # Tasks failed to execute with RUNNING state will not be set to
SCHEDULED state.
- session.query(TaskInstance).delete()
- session.commit()
- ti.state = State.RUNNING
-
- session.merge(ti)
- session.commit()
-
- self.scheduler_job._change_state_for_tasks_failed_to_execute()
-
- ti.refresh_from_db()
- assert State.RUNNING == ti.state
-
def test_adopt_or_reset_orphaned_tasks(self):
session = settings.Session()
dag = DAG(