This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5c35786ca2 Fix cleaning zombie RESTARTING tasks (#33706)
5c35786ca2 is described below
commit 5c35786ca29aa53ec08232502fc8a16fb1ef847a
Author: Daniel DylÄ…g <[email protected]>
AuthorDate: Fri Aug 25 01:55:18 2023 +0200
Fix cleaning zombie RESTARTING tasks (#33706)
* Fix cleaning zombie RESTARTING tasks
* Fix test
* Improve naming
Renamed to adoptable_states because adoption is preferrable choice and
reset is fallback.
Also "resettable" might be confused with RESTARTING itself.
---------
Co-authored-by: daniel.dylag <[email protected]>
---
airflow/jobs/scheduler_job_runner.py | 17 ++++++++---------
airflow/utils/state.py | 8 ++++++++
tests/jobs/test_scheduler_job.py | 24 ++++++++++++++++++++++++
3 files changed, 40 insertions(+), 9 deletions(-)
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index 5bd9f816a3..33b7653342 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -1579,11 +1579,11 @@ class SchedulerJobRunner(BaseJobRunner[Job],
LoggingMixin):
@provide_session
def adopt_or_reset_orphaned_tasks(self, session: Session = NEW_SESSION) ->
int:
"""
- Reset any TaskInstance in QUEUED or SCHEDULED state if its
SchedulerJob is no longer running.
+ Adopt or reset any TaskInstance in resettable state if its
SchedulerJob is no longer running.
:return: the number of TIs reset
"""
- self.log.info("Resetting orphaned tasks for active dag runs")
+ self.log.info("Adopting or resetting orphaned tasks for active dag
runs")
timeout = conf.getint("scheduler", "scheduler_health_check_threshold")
for attempt in run_with_db_retries(logger=self.log):
@@ -1609,10 +1609,9 @@ class SchedulerJobRunner(BaseJobRunner[Job],
LoggingMixin):
self.log.info("Marked %d SchedulerJob instances as
failed", num_failed)
Stats.incr(self.__class__.__name__.lower() + "_end",
num_failed)
- resettable_states = [TaskInstanceState.QUEUED,
TaskInstanceState.RUNNING]
query = (
select(TI)
- .where(TI.state.in_(resettable_states))
+ .where(TI.state.in_(State.adoptable_states))
# outerjoin is because we didn't use to have
queued_by_job
# set, so we need to pick up anything pre upgrade.
This (and the
# "or queued_by_job_id IS NONE") can go as soon as
scheduler HA is
@@ -1628,11 +1627,11 @@ class SchedulerJobRunner(BaseJobRunner[Job],
LoggingMixin):
)
# Lock these rows, so that another scheduler can't try and
adopt these too
- tis_to_reset_or_adopt = with_row_locks(
+ tis_to_adopt_or_reset = with_row_locks(
query, of=TI, session=session,
**skip_locked(session=session)
)
- tis_to_reset_or_adopt =
session.scalars(tis_to_reset_or_adopt).all()
- to_reset =
self.job.executor.try_adopt_task_instances(tis_to_reset_or_adopt)
+ tis_to_adopt_or_reset =
session.scalars(tis_to_adopt_or_reset).all()
+ to_reset =
self.job.executor.try_adopt_task_instances(tis_to_adopt_or_reset)
reset_tis_message = []
for ti in to_reset:
@@ -1640,11 +1639,11 @@ class SchedulerJobRunner(BaseJobRunner[Job],
LoggingMixin):
ti.state = None
ti.queued_by_job_id = None
- for ti in set(tis_to_reset_or_adopt) - set(to_reset):
+ for ti in set(tis_to_adopt_or_reset) - set(to_reset):
ti.queued_by_job_id = self.job.id
Stats.incr("scheduler.orphaned_tasks.cleared",
len(to_reset))
- Stats.incr("scheduler.orphaned_tasks.adopted",
len(tis_to_reset_or_adopt) - len(to_reset))
+ Stats.incr("scheduler.orphaned_tasks.adopted",
len(tis_to_adopt_or_reset) - len(to_reset))
if to_reset:
task_instance_str = "\n\t".join(reset_tis_message)
diff --git a/airflow/utils/state.py b/airflow/utils/state.py
index 22fb6e27c8..6da7dacc75 100644
--- a/airflow/utils/state.py
+++ b/airflow/utils/state.py
@@ -199,3 +199,11 @@ class State:
"""
A list of states indicating that a task has been terminated.
"""
+
+ adoptable_states = frozenset(
+ [TaskInstanceState.QUEUED, TaskInstanceState.RUNNING,
TaskInstanceState.RESTARTING]
+ )
+ """
+ A list of states indicating that a task can be adopted or reset by a
scheduler job
+ if it was queued by another scheduler job that is not running anymore.
+ """
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index cffea246e4..d1612a84f3 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -3151,6 +3151,30 @@ class TestSchedulerJob:
session = settings.Session()
assert 0 ==
self.job_runner.adopt_or_reset_orphaned_tasks(session=session)
+ @pytest.mark.parametrize(
+ "adoptable_state",
+ State.adoptable_states,
+ )
+ def test_adopt_or_reset_resettable_tasks(self, dag_maker, adoptable_state):
+ dag_id = "test_adopt_or_reset_adoptable_tasks_" + adoptable_state.name
+ with dag_maker(dag_id=dag_id, schedule="@daily"):
+ task_id = dag_id + "_task"
+ EmptyOperator(task_id=task_id)
+
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(job=scheduler_job,
subdir=os.devnull)
+ session = settings.Session()
+
+ dr1 = dag_maker.create_dagrun(external_trigger=True)
+ ti = dr1.get_task_instances(session=session)[0]
+ ti.state = adoptable_state
+ session.merge(ti)
+ session.merge(dr1)
+ session.commit()
+
+ num_reset_tis =
self.job_runner.adopt_or_reset_orphaned_tasks(session=session)
+ assert 1 == num_reset_tis
+
def test_adopt_or_reset_orphaned_tasks_external_triggered_dag(self,
dag_maker):
dag_id = "test_reset_orphaned_tasks_external_triggered_dag"
with dag_maker(dag_id=dag_id, schedule="@daily"):