This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d79c6c21f2d [edge] Clean up of dead tasks in edge_jobs table (#44280)
d79c6c21f2d is described below
commit d79c6c21f2d571bae236419bad87bc48bf9c97ce
Author: AutomationDev85 <[email protected]>
AuthorDate: Fri Nov 22 21:02:47 2024 +0100
[edge] Clean up of dead tasks in edge_jobs table (#44280)
* Add edge_job clean up
* Reworked unit test
* Reworked unit test
---------
Co-authored-by: Marco Küttelwesch <[email protected]>
---
providers/src/airflow/providers/edge/CHANGELOG.rst | 8 +++
providers/src/airflow/providers/edge/__init__.py | 2 +-
.../providers/edge/executors/edge_executor.py | 41 +++++++++++++--
providers/src/airflow/providers/edge/provider.yaml | 2 +-
.../tests/edge/executors/test_edge_executor.py | 59 +++++++++++++++++++---
5 files changed, 99 insertions(+), 13 deletions(-)
diff --git a/providers/src/airflow/providers/edge/CHANGELOG.rst
b/providers/src/airflow/providers/edge/CHANGELOG.rst
index 48c7a76b5f0..d24373463f8 100644
--- a/providers/src/airflow/providers/edge/CHANGELOG.rst
+++ b/providers/src/airflow/providers/edge/CHANGELOG.rst
@@ -27,6 +27,14 @@
Changelog
---------
+0.6.1pre0
+.........
+
+Misc
+~~~~
+
+* ``Update jobs or edge workers who have been killed to clean up job table.``
+
0.6.0pre0
.........
diff --git a/providers/src/airflow/providers/edge/__init__.py
b/providers/src/airflow/providers/edge/__init__.py
index 1613f44510a..b3f545f8244 100644
--- a/providers/src/airflow/providers/edge/__init__.py
+++ b/providers/src/airflow/providers/edge/__init__.py
@@ -29,7 +29,7 @@ from airflow import __version__ as airflow_version
__all__ = ["__version__"]
-__version__ = "0.6.0pre0"
+__version__ = "0.6.1pre0"
if
packaging.version.parse(packaging.version.parse(airflow_version).base_version)
< packaging.version.parse(
"2.10.0"
diff --git a/providers/src/airflow/providers/edge/executors/edge_executor.py
b/providers/src/airflow/providers/edge/executors/edge_executor.py
index a13552fbf8a..b990a331157 100644
--- a/providers/src/airflow/providers/edge/executors/edge_executor.py
+++ b/providers/src/airflow/providers/edge/executors/edge_executor.py
@@ -26,7 +26,7 @@ from airflow.cli.cli_config import GroupCommand
from airflow.configuration import conf
from airflow.executors.base_executor import BaseExecutor
from airflow.models.abstractoperator import DEFAULT_QUEUE
-from airflow.models.taskinstance import TaskInstanceState
+from airflow.models.taskinstance import TaskInstance, TaskInstanceState
from airflow.providers.edge.cli.edge_command import EDGE_COMMANDS
from airflow.providers.edge.models.edge_job import EdgeJobModel
from airflow.providers.edge.models.edge_logs import EdgeLogsModel
@@ -42,7 +42,6 @@ if TYPE_CHECKING:
from sqlalchemy.orm import Session
from airflow.executors.base_executor import CommandType
- from airflow.models.taskinstance import TaskInstance
from airflow.models.taskinstancekey import TaskInstanceKey
PARALLELISM: int = conf.getint("core", "PARALLELISM")
@@ -108,6 +107,30 @@ class EdgeExecutor(BaseExecutor):
return changed
+ def _update_orphaned_jobs(self, session: Session) -> bool:
+ """Update status ob jobs when workers die and don't update anymore."""
+ heartbeat_interval: int = conf.getint("scheduler",
"scheduler_zombie_task_threshold")
+ lifeless_jobs: list[EdgeJobModel] = (
+ session.query(EdgeJobModel)
+ .filter(
+ EdgeJobModel.state == TaskInstanceState.RUNNING,
+ EdgeJobModel.last_update < (timezone.utcnow() -
timedelta(seconds=heartbeat_interval)),
+ )
+ .all()
+ )
+
+ for job in lifeless_jobs:
+ ti = TaskInstance.get_task_instance(
+ dag_id=job.dag_id,
+ run_id=job.run_id,
+ task_id=job.task_id,
+ map_index=job.map_index,
+ session=session,
+ )
+ job.state = ti.state if ti else TaskInstanceState.REMOVED
+
+ return bool(lifeless_jobs)
+
def _purge_jobs(self, session: Session) -> bool:
"""Clean finished jobs."""
purged_marker = False
@@ -117,7 +140,12 @@ class EdgeExecutor(BaseExecutor):
session.query(EdgeJobModel)
.filter(
EdgeJobModel.state.in_(
- [TaskInstanceState.RUNNING, TaskInstanceState.SUCCESS,
TaskInstanceState.FAILED]
+ [
+ TaskInstanceState.RUNNING,
+ TaskInstanceState.SUCCESS,
+ TaskInstanceState.FAILED,
+ TaskInstanceState.REMOVED,
+ ]
)
)
.all()
@@ -145,7 +173,7 @@ class EdgeExecutor(BaseExecutor):
job.state == TaskInstanceState.SUCCESS
and job.last_update_t < (datetime.now() -
timedelta(minutes=job_success_purge)).timestamp()
) or (
- job.state == TaskInstanceState.FAILED
+ job.state in (TaskInstanceState.FAILED,
TaskInstanceState.REMOVED)
and job.last_update_t < (datetime.now() -
timedelta(minutes=job_fail_purge)).timestamp()
):
if job.key in self.last_reported_state:
@@ -168,7 +196,10 @@ class EdgeExecutor(BaseExecutor):
def sync(self, session: Session = NEW_SESSION) -> None:
"""Sync will get called periodically by the heartbeat method."""
with Stats.timer("edge_executor.sync.duration"):
- if self._purge_jobs(session) or
self._check_worker_liveness(session):
+ orphaned = self._update_orphaned_jobs(session)
+ purged = self._purge_jobs(session)
+ liveness = self._check_worker_liveness(session)
+ if purged or liveness or orphaned:
session.commit()
def end(self) -> None:
diff --git a/providers/src/airflow/providers/edge/provider.yaml
b/providers/src/airflow/providers/edge/provider.yaml
index 96ce7f152f7..6fe609502aa 100644
--- a/providers/src/airflow/providers/edge/provider.yaml
+++ b/providers/src/airflow/providers/edge/provider.yaml
@@ -27,7 +27,7 @@ source-date-epoch: 1729683247
# note that those versions are maintained by release manager - do not update
them manually
versions:
- - 0.6.0pre0
+ - 0.6.1pre0
dependencies:
- apache-airflow>=2.10.0
diff --git a/providers/tests/edge/executors/test_edge_executor.py
b/providers/tests/edge/executors/test_edge_executor.py
index 7970e5fad04..126afa1fb70 100644
--- a/providers/tests/edge/executors/test_edge_executor.py
+++ b/providers/tests/edge/executors/test_edge_executor.py
@@ -16,11 +16,12 @@
# under the License.
from __future__ import annotations
-from datetime import datetime
+from datetime import datetime, timedelta
from unittest.mock import patch
import pytest
+from airflow.configuration import conf
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.providers.edge.executors.edge_executor import EdgeExecutor
from airflow.providers.edge.models.edge_job import EdgeJobModel
@@ -65,6 +66,44 @@ class TestEdgeExecutor:
assert jobs[0].run_id == "test_run"
assert jobs[0].task_id == "test_task"
+ def test_sync_orphaned_tasks(self):
+ executor = EdgeExecutor()
+
+ delta_to_purge = timedelta(minutes=conf.getint("edge",
"job_fail_purge") + 1)
+ delta_to_orphaned = timedelta(seconds=conf.getint("scheduler",
"scheduler_zombie_task_threshold") + 1)
+
+ with create_session() as session:
+ for task_id, state, last_update in [
+ (
+ "started_running_orphaned",
+ TaskInstanceState.RUNNING,
+ timezone.utcnow() - delta_to_orphaned,
+ ),
+ ("started_removed", TaskInstanceState.REMOVED,
timezone.utcnow() - delta_to_purge),
+ ]:
+ session.add(
+ EdgeJobModel(
+ dag_id="test_dag",
+ task_id=task_id,
+ run_id="test_run",
+ map_index=-1,
+ try_number=1,
+ state=state,
+ queue="default",
+ command="dummy",
+ last_update=last_update,
+ )
+ )
+ session.commit()
+
+ executor.sync()
+
+ with create_session() as session:
+ jobs = session.query(EdgeJobModel).all()
+ assert len(jobs) == 1
+ assert jobs[0].task_id == "started_running_orphaned"
+ assert jobs[0].task_id == "started_running_orphaned"
+
@patch("airflow.providers.edge.executors.edge_executor.EdgeExecutor.running_state")
@patch("airflow.providers.edge.executors.edge_executor.EdgeExecutor.success")
@patch("airflow.providers.edge.executors.edge_executor.EdgeExecutor.fail")
@@ -77,12 +116,14 @@ class TestEdgeExecutor:
mock_success.side_effect = remove_from_running
mock_fail.side_effect = remove_from_running
+ delta_to_purge = timedelta(minutes=conf.getint("edge",
"job_fail_purge") + 1)
+
# Prepare some data
with create_session() as session:
- for task_id, state in [
- ("started_running", TaskInstanceState.RUNNING),
- ("started_success", TaskInstanceState.SUCCESS),
- ("started_failed", TaskInstanceState.FAILED),
+ for task_id, state, last_update in [
+ ("started_running", TaskInstanceState.RUNNING,
timezone.utcnow()),
+ ("started_success", TaskInstanceState.SUCCESS,
timezone.utcnow() - delta_to_purge),
+ ("started_failed", TaskInstanceState.FAILED, timezone.utcnow()
- delta_to_purge),
]:
session.add(
EdgeJobModel(
@@ -94,7 +135,7 @@ class TestEdgeExecutor:
state=state,
queue="default",
command="dummy",
- last_update=timezone.utcnow(),
+ last_update=last_update,
)
)
key = TaskInstanceKey(
@@ -106,6 +147,12 @@ class TestEdgeExecutor:
executor.sync()
+ with create_session() as session:
+ jobs = session.query(EdgeJobModel).all()
+ assert len(session.query(EdgeJobModel).all()) == 1
+ assert jobs[0].task_id == "started_running"
+ assert jobs[0].state == TaskInstanceState.RUNNING
+
assert len(executor.running) == 1
mock_running_state.assert_called_once()
mock_success.assert_called_once()