This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d79c6c21f2d [edge] Clean up of dead tasks in edge_jobs table  (#44280)
d79c6c21f2d is described below

commit d79c6c21f2d571bae236419bad87bc48bf9c97ce
Author: AutomationDev85 <[email protected]>
AuthorDate: Fri Nov 22 21:02:47 2024 +0100

    [edge] Clean up of dead tasks in edge_jobs table  (#44280)
    
    * Add edge_job clean up
    
    * Reworked unit test
    
    * Reworked unit test
    
    ---------
    
    Co-authored-by: Marco Küttelwesch <[email protected]>
---
 providers/src/airflow/providers/edge/CHANGELOG.rst |  8 +++
 providers/src/airflow/providers/edge/__init__.py   |  2 +-
 .../providers/edge/executors/edge_executor.py      | 41 +++++++++++++--
 providers/src/airflow/providers/edge/provider.yaml |  2 +-
 .../tests/edge/executors/test_edge_executor.py     | 59 +++++++++++++++++++---
 5 files changed, 99 insertions(+), 13 deletions(-)

diff --git a/providers/src/airflow/providers/edge/CHANGELOG.rst 
b/providers/src/airflow/providers/edge/CHANGELOG.rst
index 48c7a76b5f0..d24373463f8 100644
--- a/providers/src/airflow/providers/edge/CHANGELOG.rst
+++ b/providers/src/airflow/providers/edge/CHANGELOG.rst
@@ -27,6 +27,14 @@
 Changelog
 ---------
 
+0.6.1pre0
+.........
+
+Misc
+~~~~
+
+* ``Update jobs or edge workers who have been killed to clean up job table.``
+
 0.6.0pre0
 .........
 
diff --git a/providers/src/airflow/providers/edge/__init__.py 
b/providers/src/airflow/providers/edge/__init__.py
index 1613f44510a..b3f545f8244 100644
--- a/providers/src/airflow/providers/edge/__init__.py
+++ b/providers/src/airflow/providers/edge/__init__.py
@@ -29,7 +29,7 @@ from airflow import __version__ as airflow_version
 
 __all__ = ["__version__"]
 
-__version__ = "0.6.0pre0"
+__version__ = "0.6.1pre0"
 
 if 
packaging.version.parse(packaging.version.parse(airflow_version).base_version) 
< packaging.version.parse(
     "2.10.0"
diff --git a/providers/src/airflow/providers/edge/executors/edge_executor.py 
b/providers/src/airflow/providers/edge/executors/edge_executor.py
index a13552fbf8a..b990a331157 100644
--- a/providers/src/airflow/providers/edge/executors/edge_executor.py
+++ b/providers/src/airflow/providers/edge/executors/edge_executor.py
@@ -26,7 +26,7 @@ from airflow.cli.cli_config import GroupCommand
 from airflow.configuration import conf
 from airflow.executors.base_executor import BaseExecutor
 from airflow.models.abstractoperator import DEFAULT_QUEUE
-from airflow.models.taskinstance import TaskInstanceState
+from airflow.models.taskinstance import TaskInstance, TaskInstanceState
 from airflow.providers.edge.cli.edge_command import EDGE_COMMANDS
 from airflow.providers.edge.models.edge_job import EdgeJobModel
 from airflow.providers.edge.models.edge_logs import EdgeLogsModel
@@ -42,7 +42,6 @@ if TYPE_CHECKING:
     from sqlalchemy.orm import Session
 
     from airflow.executors.base_executor import CommandType
-    from airflow.models.taskinstance import TaskInstance
     from airflow.models.taskinstancekey import TaskInstanceKey
 
 PARALLELISM: int = conf.getint("core", "PARALLELISM")
@@ -108,6 +107,30 @@ class EdgeExecutor(BaseExecutor):
 
         return changed
 
+    def _update_orphaned_jobs(self, session: Session) -> bool:
+        """Update status ob jobs when workers die and don't update anymore."""
+        heartbeat_interval: int = conf.getint("scheduler", 
"scheduler_zombie_task_threshold")
+        lifeless_jobs: list[EdgeJobModel] = (
+            session.query(EdgeJobModel)
+            .filter(
+                EdgeJobModel.state == TaskInstanceState.RUNNING,
+                EdgeJobModel.last_update < (timezone.utcnow() - 
timedelta(seconds=heartbeat_interval)),
+            )
+            .all()
+        )
+
+        for job in lifeless_jobs:
+            ti = TaskInstance.get_task_instance(
+                dag_id=job.dag_id,
+                run_id=job.run_id,
+                task_id=job.task_id,
+                map_index=job.map_index,
+                session=session,
+            )
+            job.state = ti.state if ti else TaskInstanceState.REMOVED
+
+        return bool(lifeless_jobs)
+
     def _purge_jobs(self, session: Session) -> bool:
         """Clean finished jobs."""
         purged_marker = False
@@ -117,7 +140,12 @@ class EdgeExecutor(BaseExecutor):
             session.query(EdgeJobModel)
             .filter(
                 EdgeJobModel.state.in_(
-                    [TaskInstanceState.RUNNING, TaskInstanceState.SUCCESS, 
TaskInstanceState.FAILED]
+                    [
+                        TaskInstanceState.RUNNING,
+                        TaskInstanceState.SUCCESS,
+                        TaskInstanceState.FAILED,
+                        TaskInstanceState.REMOVED,
+                    ]
                 )
             )
             .all()
@@ -145,7 +173,7 @@ class EdgeExecutor(BaseExecutor):
                 job.state == TaskInstanceState.SUCCESS
                 and job.last_update_t < (datetime.now() - 
timedelta(minutes=job_success_purge)).timestamp()
             ) or (
-                job.state == TaskInstanceState.FAILED
+                job.state in (TaskInstanceState.FAILED, 
TaskInstanceState.REMOVED)
                 and job.last_update_t < (datetime.now() - 
timedelta(minutes=job_fail_purge)).timestamp()
             ):
                 if job.key in self.last_reported_state:
@@ -168,7 +196,10 @@ class EdgeExecutor(BaseExecutor):
     def sync(self, session: Session = NEW_SESSION) -> None:
         """Sync will get called periodically by the heartbeat method."""
         with Stats.timer("edge_executor.sync.duration"):
-            if self._purge_jobs(session) or 
self._check_worker_liveness(session):
+            orphaned = self._update_orphaned_jobs(session)
+            purged = self._purge_jobs(session)
+            liveness = self._check_worker_liveness(session)
+            if purged or liveness or orphaned:
                 session.commit()
 
     def end(self) -> None:
diff --git a/providers/src/airflow/providers/edge/provider.yaml 
b/providers/src/airflow/providers/edge/provider.yaml
index 96ce7f152f7..6fe609502aa 100644
--- a/providers/src/airflow/providers/edge/provider.yaml
+++ b/providers/src/airflow/providers/edge/provider.yaml
@@ -27,7 +27,7 @@ source-date-epoch: 1729683247
 
 # note that those versions are maintained by release manager - do not update 
them manually
 versions:
-  - 0.6.0pre0
+  - 0.6.1pre0
 
 dependencies:
   - apache-airflow>=2.10.0
diff --git a/providers/tests/edge/executors/test_edge_executor.py 
b/providers/tests/edge/executors/test_edge_executor.py
index 7970e5fad04..126afa1fb70 100644
--- a/providers/tests/edge/executors/test_edge_executor.py
+++ b/providers/tests/edge/executors/test_edge_executor.py
@@ -16,11 +16,12 @@
 # under the License.
 from __future__ import annotations
 
-from datetime import datetime
+from datetime import datetime, timedelta
 from unittest.mock import patch
 
 import pytest
 
+from airflow.configuration import conf
 from airflow.models.taskinstancekey import TaskInstanceKey
 from airflow.providers.edge.executors.edge_executor import EdgeExecutor
 from airflow.providers.edge.models.edge_job import EdgeJobModel
@@ -65,6 +66,44 @@ class TestEdgeExecutor:
         assert jobs[0].run_id == "test_run"
         assert jobs[0].task_id == "test_task"
 
+    def test_sync_orphaned_tasks(self):
+        executor = EdgeExecutor()
+
+        delta_to_purge = timedelta(minutes=conf.getint("edge", 
"job_fail_purge") + 1)
+        delta_to_orphaned = timedelta(seconds=conf.getint("scheduler", 
"scheduler_zombie_task_threshold") + 1)
+
+        with create_session() as session:
+            for task_id, state, last_update in [
+                (
+                    "started_running_orphaned",
+                    TaskInstanceState.RUNNING,
+                    timezone.utcnow() - delta_to_orphaned,
+                ),
+                ("started_removed", TaskInstanceState.REMOVED, 
timezone.utcnow() - delta_to_purge),
+            ]:
+                session.add(
+                    EdgeJobModel(
+                        dag_id="test_dag",
+                        task_id=task_id,
+                        run_id="test_run",
+                        map_index=-1,
+                        try_number=1,
+                        state=state,
+                        queue="default",
+                        command="dummy",
+                        last_update=last_update,
+                    )
+                )
+                session.commit()
+
+        executor.sync()
+
+        with create_session() as session:
+            jobs = session.query(EdgeJobModel).all()
+            assert len(jobs) == 1
+            assert jobs[0].task_id == "started_running_orphaned"
+            assert jobs[0].task_id == "started_running_orphaned"
+
     
@patch("airflow.providers.edge.executors.edge_executor.EdgeExecutor.running_state")
     
@patch("airflow.providers.edge.executors.edge_executor.EdgeExecutor.success")
     @patch("airflow.providers.edge.executors.edge_executor.EdgeExecutor.fail")
@@ -77,12 +116,14 @@ class TestEdgeExecutor:
         mock_success.side_effect = remove_from_running
         mock_fail.side_effect = remove_from_running
 
+        delta_to_purge = timedelta(minutes=conf.getint("edge", 
"job_fail_purge") + 1)
+
         # Prepare some data
         with create_session() as session:
-            for task_id, state in [
-                ("started_running", TaskInstanceState.RUNNING),
-                ("started_success", TaskInstanceState.SUCCESS),
-                ("started_failed", TaskInstanceState.FAILED),
+            for task_id, state, last_update in [
+                ("started_running", TaskInstanceState.RUNNING, 
timezone.utcnow()),
+                ("started_success", TaskInstanceState.SUCCESS, 
timezone.utcnow() - delta_to_purge),
+                ("started_failed", TaskInstanceState.FAILED, timezone.utcnow() 
- delta_to_purge),
             ]:
                 session.add(
                     EdgeJobModel(
@@ -94,7 +135,7 @@ class TestEdgeExecutor:
                         state=state,
                         queue="default",
                         command="dummy",
-                        last_update=timezone.utcnow(),
+                        last_update=last_update,
                     )
                 )
                 key = TaskInstanceKey(
@@ -106,6 +147,12 @@ class TestEdgeExecutor:
 
         executor.sync()
 
+        with create_session() as session:
+            jobs = session.query(EdgeJobModel).all()
+            assert len(session.query(EdgeJobModel).all()) == 1
+            assert jobs[0].task_id == "started_running"
+            assert jobs[0].state == TaskInstanceState.RUNNING
+
         assert len(executor.running) == 1
         mock_running_state.assert_called_once()
         mock_success.assert_called_once()

Reply via email to