This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 647b39d2e9b [Edge] Export ti.success and ti.finish metrics from edge 
worker (#47063)
647b39d2e9b is described below

commit 647b39d2e9b2a173369e8cb60c541717a4238236
Author: AutomationDev85 <[email protected]>
AuthorDate: Fri Feb 28 09:58:49 2025 +0100

    [Edge] Export ti.success and ti.finish metrics from edge worker (#47063)
    
    * edge worker exports ti.start and ti.finish metrics
    
    * Fixed wrong job fields during metric export
    
    * Extend metric exporting
    
    * Fix sql query
    
    * Fix finish metric export of orphaned jobs
    
    * Fixed double metric export
    
    * Added unit tests
    
    * increase edge package version
    
    * Reworked if logic
    
    ---------
    
    Co-authored-by: Marco Küttelwesch <[email protected]>
---
 providers/edge/README.rst                          |  6 +-
 providers/edge/docs/changelog.rst                  | 10 +++
 providers/edge/provider.yaml                       |  2 +-
 providers/edge/pyproject.toml                      |  6 +-
 .../edge/src/airflow/providers/edge/__init__.py    |  2 +-
 .../providers/edge/executors/edge_executor.py      | 15 ++++
 .../airflow/providers/edge/get_provider_info.py    |  2 +-
 .../providers/edge/worker_api/routes/jobs.py       | 32 ++++++++
 .../unit/edge/executors/test_edge_executor.py      | 14 +++-
 .../tests/unit/edge/worker_api/routes/test_jobs.py | 96 ++++++++++++++++++++++
 10 files changed, 175 insertions(+), 10 deletions(-)

diff --git a/providers/edge/README.rst b/providers/edge/README.rst
index 48031b6d2ff..6f727510133 100644
--- a/providers/edge/README.rst
+++ b/providers/edge/README.rst
@@ -23,7 +23,7 @@
 
 Package ``apache-airflow-providers-edge``
 
-Release: ``0.19.0pre0``
+Release: ``0.20.0pre0``
 
 
 Handle edge workers on remote sites via HTTP(s) connection and orchestrates 
work over distributed sites
@@ -36,7 +36,7 @@ This is a provider package for ``edge`` provider. All classes 
for this provider
 are in ``airflow.providers.edge`` python package.
 
 You can find package information and changelog for the provider
-in the `documentation 
<https://airflow.apache.org/docs/apache-airflow-providers-edge/0.19.0pre0/>`_.
+in the `documentation 
<https://airflow.apache.org/docs/apache-airflow-providers-edge/0.20.0pre0/>`_.
 
 Installation
 ------------
@@ -59,4 +59,4 @@ PIP package         Version required
 ==================  ===================
 
 The changelog for the provider package can be found in the
-`changelog 
<https://airflow.apache.org/docs/apache-airflow-providers-edge/0.19.0pre0/changelog.html>`_.
+`changelog 
<https://airflow.apache.org/docs/apache-airflow-providers-edge/0.20.0pre0/changelog.html>`_.
diff --git a/providers/edge/docs/changelog.rst 
b/providers/edge/docs/changelog.rst
index 5b8bef99440..89a6012a995 100644
--- a/providers/edge/docs/changelog.rst
+++ b/providers/edge/docs/changelog.rst
@@ -27,6 +27,15 @@
 Changelog
 ---------
 
+0.20.0pre0
+..........
+
+Misc
+~~~~
+
+* ``Edge worker exports not ti.start and ti.finished metrics.``
+
+
 0.19.0pre0
 ..........
 
@@ -36,6 +45,7 @@ Misc
 * ``Edge worker can be set to maintenance via CLI and also return to normal 
operation.``
 
 
+
 0.18.1pre0
 ..........
 
diff --git a/providers/edge/provider.yaml b/providers/edge/provider.yaml
index 562583a9a36..a9a6acb559a 100644
--- a/providers/edge/provider.yaml
+++ b/providers/edge/provider.yaml
@@ -25,7 +25,7 @@ source-date-epoch: 1737371680
 
 # note that those versions are maintained by release manager - do not update 
them manually
 versions:
-  - 0.19.0pre0
+  - 0.20.0pre0
 
 plugins:
   - name: edge_executor
diff --git a/providers/edge/pyproject.toml b/providers/edge/pyproject.toml
index ce404ea9543..c56b033de85 100644
--- a/providers/edge/pyproject.toml
+++ b/providers/edge/pyproject.toml
@@ -25,7 +25,7 @@ build-backend = "flit_core.buildapi"
 
 [project]
 name = "apache-airflow-providers-edge"
-version = "0.19.0pre0"
+version = "0.20.0pre0"
 description = "Provider package apache-airflow-providers-edge for Apache 
Airflow"
 readme = "README.rst"
 authors = [
@@ -61,8 +61,8 @@ dependencies = [
 ]
 
 [project.urls]
-"Documentation" = 
"https://airflow.apache.org/docs/apache-airflow-providers-edge/0.19.0pre0";
-"Changelog" = 
"https://airflow.apache.org/docs/apache-airflow-providers-edge/0.19.0pre0/changelog.html";
+"Documentation" = 
"https://airflow.apache.org/docs/apache-airflow-providers-edge/0.20.0pre0";
+"Changelog" = 
"https://airflow.apache.org/docs/apache-airflow-providers-edge/0.20.0pre0/changelog.html";
 "Bug Tracker" = "https://github.com/apache/airflow/issues";
 "Source Code" = "https://github.com/apache/airflow";
 "Slack Chat" = "https://s.apache.org/airflow-slack";
diff --git a/providers/edge/src/airflow/providers/edge/__init__.py 
b/providers/edge/src/airflow/providers/edge/__init__.py
index 5b8039cec8d..96dfbad6133 100644
--- a/providers/edge/src/airflow/providers/edge/__init__.py
+++ b/providers/edge/src/airflow/providers/edge/__init__.py
@@ -29,7 +29,7 @@ from airflow import __version__ as airflow_version
 
 __all__ = ["__version__"]
 
-__version__ = "0.19.0pre0"
+__version__ = "0.20.0pre0"
 
 if 
packaging.version.parse(packaging.version.parse(airflow_version).base_version) 
< packaging.version.parse(
     "2.10.0"
diff --git 
a/providers/edge/src/airflow/providers/edge/executors/edge_executor.py 
b/providers/edge/src/airflow/providers/edge/executors/edge_executor.py
index ecf604013c1..87828a05b3a 100644
--- a/providers/edge/src/airflow/providers/edge/executors/edge_executor.py
+++ b/providers/edge/src/airflow/providers/edge/executors/edge_executor.py
@@ -216,6 +216,21 @@ class EdgeExecutor(BaseExecutor):
             )
             job.state = ti.state if ti else TaskInstanceState.REMOVED
 
+            if job.state != TaskInstanceState.RUNNING:
+                # Edge worker does not backport emitted Airflow metrics, so 
export some metrics
+                # Export metrics as failed as these jobs will be deleted in 
the future
+                tags = {
+                    "dag_id": job.dag_id,
+                    "task_id": job.task_id,
+                    "queue": job.queue,
+                    "state": str(TaskInstanceState.FAILED),
+                }
+                Stats.incr(
+                    
f"edge_worker.ti.finish.{job.queue}.{TaskInstanceState.FAILED}.{job.dag_id}.{job.task_id}",
+                    tags=tags,
+                )
+                Stats.incr("edge_worker.ti.finish", tags=tags)
+
         return bool(lifeless_jobs)
 
     def _purge_jobs(self, session: Session) -> bool:
diff --git a/providers/edge/src/airflow/providers/edge/get_provider_info.py 
b/providers/edge/src/airflow/providers/edge/get_provider_info.py
index c646666d301..ba5a3f6b544 100644
--- a/providers/edge/src/airflow/providers/edge/get_provider_info.py
+++ b/providers/edge/src/airflow/providers/edge/get_provider_info.py
@@ -28,7 +28,7 @@ def get_provider_info():
         "description": "Handle edge workers on remote sites via HTTP(s) 
connection and orchestrates work over distributed sites\n",
         "state": "not-ready",
         "source-date-epoch": 1737371680,
-        "versions": ["0.19.0pre0"],
+        "versions": ["0.20.0pre0"],
         "plugins": [
             {
                 "name": "edge_executor",
diff --git 
a/providers/edge/src/airflow/providers/edge/worker_api/routes/jobs.py 
b/providers/edge/src/airflow/providers/edge/worker_api/routes/jobs.py
index c976d097965..6380439093e 100644
--- a/providers/edge/src/airflow/providers/edge/worker_api/routes/jobs.py
+++ b/providers/edge/src/airflow/providers/edge/worker_api/routes/jobs.py
@@ -37,6 +37,7 @@ from airflow.providers.edge.worker_api.routes._v2_compat 
import (
     parse_command,
     status,
 )
+from airflow.stats import Stats
 from airflow.utils import timezone
 from airflow.utils.sqlalchemy import with_row_locks
 from airflow.utils.state import TaskInstanceState
@@ -85,6 +86,10 @@ def fetch(
     job.edge_worker = worker_name
     job.last_update = timezone.utcnow()
     session.commit()
+    # Edge worker does not backport emitted Airflow metrics, so export some 
metrics
+    tags = {"dag_id": job.dag_id, "task_id": job.task_id, "queue": job.queue}
+    Stats.incr(f"edge_worker.ti.start.{job.queue}.{job.dag_id}.{job.task_id}", 
tags=tags)
+    Stats.incr("edge_worker.ti.start", tags=tags)
     return EdgeJobFetched(
         dag_id=job.dag_id,
         task_id=job.task_id,
@@ -116,6 +121,33 @@ def state(
     session: SessionDep,
 ) -> None:
     """Update the state of a job running on the edge worker."""
+    # execute query to catch the queue and check if state toggles to success 
or failed
+    # otherwise possible that Executor resets orphaned jobs and stats are 
exported 2 times
+    if state in [TaskInstanceState.SUCCESS, state == TaskInstanceState.FAILED]:
+        query = select(EdgeJobModel).where(
+            EdgeJobModel.dag_id == dag_id,
+            EdgeJobModel.task_id == task_id,
+            EdgeJobModel.run_id == run_id,
+            EdgeJobModel.map_index == map_index,
+            EdgeJobModel.try_number == try_number,
+            EdgeJobModel.state == TaskInstanceState.RUNNING,
+        )
+        job = session.scalar(query)
+
+        if job:
+            # Edge worker does not backport emitted Airflow metrics, so export 
some metrics
+            tags = {
+                "dag_id": job.dag_id,
+                "task_id": job.task_id,
+                "queue": job.queue,
+                "state": str(state),
+            }
+            Stats.incr(
+                
f"edge_worker.ti.finish.{job.queue}.{state}.{job.dag_id}.{job.task_id}",
+                tags=tags,
+            )
+            Stats.incr("edge_worker.ti.finish", tags=tags)
+
     query = (
         update(EdgeJobModel)
         .where(
diff --git a/providers/edge/tests/unit/edge/executors/test_edge_executor.py 
b/providers/edge/tests/unit/edge/executors/test_edge_executor.py
index 5d9ca6fd0e7..3b6c43726ec 100644
--- a/providers/edge/tests/unit/edge/executors/test_edge_executor.py
+++ b/providers/edge/tests/unit/edge/executors/test_edge_executor.py
@@ -83,7 +83,8 @@ class TestEdgeExecutor:
         assert jobs[0].task_id == "test_task"
         assert jobs[0].concurrency_slots == expected_concurrency
 
-    def test_sync_orphaned_tasks(self):
+    @patch("airflow.stats.Stats.incr")
+    def test_sync_orphaned_tasks(self, mock_stats_incr):
         executor = EdgeExecutor()
 
         delta_to_purge = timedelta(minutes=conf.getint("edge", 
"job_fail_purge") + 1)
@@ -116,6 +117,17 @@ class TestEdgeExecutor:
 
         executor.sync()
 
+        mock_stats_incr.assert_called_with(
+            "edge_worker.ti.finish",
+            tags={
+                "dag_id": "test_dag",
+                "queue": "default",
+                "state": "failed",
+                "task_id": "started_running_orphaned",
+            },
+        )
+        mock_stats_incr.call_count == 2
+
         with create_session() as session:
             jobs = session.query(EdgeJobModel).all()
             assert len(jobs) == 1
diff --git a/providers/edge/tests/unit/edge/worker_api/routes/test_jobs.py 
b/providers/edge/tests/unit/edge/worker_api/routes/test_jobs.py
new file mode 100644
index 00000000000..e2bb1d00bac
--- /dev/null
+++ b/providers/edge/tests/unit/edge/worker_api/routes/test_jobs.py
@@ -0,0 +1,96 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+from unittest.mock import patch
+
+import pytest
+
+from airflow.providers.edge.models.edge_job import EdgeJobModel
+from airflow.providers.edge.worker_api.routes.jobs import state
+from airflow.utils.session import create_session
+from airflow.utils.state import TaskInstanceState
+
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Session
+
+pytestmark = pytest.mark.db_test
+
+
+DAG_ID = "my_dag"
+TASK_ID = "my_task"
+RUN_ID = "manual__2024-11-24T21:03:01+01:00"
+QUEUE = "test"
+
+
+class TestJobsApiRoutes:
+    @pytest.fixture(autouse=True)
+    def setup_test_cases(self, dag_maker, session: Session):
+        session.query(EdgeJobModel).delete()
+        session.commit()
+
+    @patch("airflow.stats.Stats.incr")
+    def test_state(self, mock_stats_incr, session: Session):
+        with create_session() as session:
+            job = EdgeJobModel(
+                dag_id=DAG_ID,
+                task_id=TASK_ID,
+                run_id=RUN_ID,
+                try_number=1,
+                map_index=-1,
+                state=TaskInstanceState.RUNNING,
+                queue=QUEUE,
+                concurrency_slots=1,
+                command="execute",
+            )
+            session.add(job)
+            session.commit()
+            state(
+                dag_id=DAG_ID,
+                task_id=TASK_ID,
+                run_id=RUN_ID,
+                try_number=1,
+                map_index=-1,
+                state=TaskInstanceState.RUNNING,
+                session=session,
+            )
+
+            mock_stats_incr.assert_not_called()
+
+            state(
+                dag_id=DAG_ID,
+                task_id=TASK_ID,
+                run_id=RUN_ID,
+                try_number=1,
+                map_index=-1,
+                state=TaskInstanceState.SUCCESS,
+                session=session,
+            )
+
+            mock_stats_incr.assert_called_with(
+                "edge_worker.ti.finish",
+                tags={
+                    "dag_id": DAG_ID,
+                    "queue": QUEUE,
+                    "state": TaskInstanceState.SUCCESS,
+                    "task_id": TASK_ID,
+                },
+            )
+            mock_stats_incr.call_count == 2
+
+            assert session.query(EdgeJobModel).scalar().state == 
TaskInstanceState.SUCCESS

Reply via email to