This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 647b39d2e9b [Edge] Export ti.success and ti.finish metrics from edge
worker (#47063)
647b39d2e9b is described below
commit 647b39d2e9b2a173369e8cb60c541717a4238236
Author: AutomationDev85 <[email protected]>
AuthorDate: Fri Feb 28 09:58:49 2025 +0100
[Edge] Export ti.success and ti.finish metrics from edge worker (#47063)
* edge worker exports ti.start and ti.finish metrics
* Fixed wrong job fields during metric export
* Extend metric exporting
* Fix sql query
* Fix finish metric export of orphaned jobs
* Fixed double metric export
* Added unit tests
* increase edge package version
* Reworked if logic
---------
Co-authored-by: Marco Küttelwesch <[email protected]>
---
providers/edge/README.rst | 6 +-
providers/edge/docs/changelog.rst | 10 +++
providers/edge/provider.yaml | 2 +-
providers/edge/pyproject.toml | 6 +-
.../edge/src/airflow/providers/edge/__init__.py | 2 +-
.../providers/edge/executors/edge_executor.py | 15 ++++
.../airflow/providers/edge/get_provider_info.py | 2 +-
.../providers/edge/worker_api/routes/jobs.py | 32 ++++++++
.../unit/edge/executors/test_edge_executor.py | 14 +++-
.../tests/unit/edge/worker_api/routes/test_jobs.py | 96 ++++++++++++++++++++++
10 files changed, 175 insertions(+), 10 deletions(-)
diff --git a/providers/edge/README.rst b/providers/edge/README.rst
index 48031b6d2ff..6f727510133 100644
--- a/providers/edge/README.rst
+++ b/providers/edge/README.rst
@@ -23,7 +23,7 @@
Package ``apache-airflow-providers-edge``
-Release: ``0.19.0pre0``
+Release: ``0.20.0pre0``
Handle edge workers on remote sites via HTTP(s) connection and orchestrates
work over distributed sites
@@ -36,7 +36,7 @@ This is a provider package for ``edge`` provider. All classes
for this provider
are in ``airflow.providers.edge`` python package.
You can find package information and changelog for the provider
-in the `documentation
<https://airflow.apache.org/docs/apache-airflow-providers-edge/0.19.0pre0/>`_.
+in the `documentation
<https://airflow.apache.org/docs/apache-airflow-providers-edge/0.20.0pre0/>`_.
Installation
------------
@@ -59,4 +59,4 @@ PIP package Version required
================== ===================
The changelog for the provider package can be found in the
-`changelog
<https://airflow.apache.org/docs/apache-airflow-providers-edge/0.19.0pre0/changelog.html>`_.
+`changelog
<https://airflow.apache.org/docs/apache-airflow-providers-edge/0.20.0pre0/changelog.html>`_.
diff --git a/providers/edge/docs/changelog.rst
b/providers/edge/docs/changelog.rst
index 5b8bef99440..89a6012a995 100644
--- a/providers/edge/docs/changelog.rst
+++ b/providers/edge/docs/changelog.rst
@@ -27,6 +27,15 @@
Changelog
---------
+0.20.0pre0
+..........
+
+Misc
+~~~~
+
+* ``Edge worker exports not ti.start and ti.finished metrics.``
+
+
0.19.0pre0
..........
@@ -36,6 +45,7 @@ Misc
* ``Edge worker can be set to maintenance via CLI and also return to normal
operation.``
+
0.18.1pre0
..........
diff --git a/providers/edge/provider.yaml b/providers/edge/provider.yaml
index 562583a9a36..a9a6acb559a 100644
--- a/providers/edge/provider.yaml
+++ b/providers/edge/provider.yaml
@@ -25,7 +25,7 @@ source-date-epoch: 1737371680
# note that those versions are maintained by release manager - do not update
them manually
versions:
- - 0.19.0pre0
+ - 0.20.0pre0
plugins:
- name: edge_executor
diff --git a/providers/edge/pyproject.toml b/providers/edge/pyproject.toml
index ce404ea9543..c56b033de85 100644
--- a/providers/edge/pyproject.toml
+++ b/providers/edge/pyproject.toml
@@ -25,7 +25,7 @@ build-backend = "flit_core.buildapi"
[project]
name = "apache-airflow-providers-edge"
-version = "0.19.0pre0"
+version = "0.20.0pre0"
description = "Provider package apache-airflow-providers-edge for Apache
Airflow"
readme = "README.rst"
authors = [
@@ -61,8 +61,8 @@ dependencies = [
]
[project.urls]
-"Documentation" =
"https://airflow.apache.org/docs/apache-airflow-providers-edge/0.19.0pre0"
-"Changelog" =
"https://airflow.apache.org/docs/apache-airflow-providers-edge/0.19.0pre0/changelog.html"
+"Documentation" =
"https://airflow.apache.org/docs/apache-airflow-providers-edge/0.20.0pre0"
+"Changelog" =
"https://airflow.apache.org/docs/apache-airflow-providers-edge/0.20.0pre0/changelog.html"
"Bug Tracker" = "https://github.com/apache/airflow/issues"
"Source Code" = "https://github.com/apache/airflow"
"Slack Chat" = "https://s.apache.org/airflow-slack"
diff --git a/providers/edge/src/airflow/providers/edge/__init__.py
b/providers/edge/src/airflow/providers/edge/__init__.py
index 5b8039cec8d..96dfbad6133 100644
--- a/providers/edge/src/airflow/providers/edge/__init__.py
+++ b/providers/edge/src/airflow/providers/edge/__init__.py
@@ -29,7 +29,7 @@ from airflow import __version__ as airflow_version
__all__ = ["__version__"]
-__version__ = "0.19.0pre0"
+__version__ = "0.20.0pre0"
if
packaging.version.parse(packaging.version.parse(airflow_version).base_version)
< packaging.version.parse(
"2.10.0"
diff --git
a/providers/edge/src/airflow/providers/edge/executors/edge_executor.py
b/providers/edge/src/airflow/providers/edge/executors/edge_executor.py
index ecf604013c1..87828a05b3a 100644
--- a/providers/edge/src/airflow/providers/edge/executors/edge_executor.py
+++ b/providers/edge/src/airflow/providers/edge/executors/edge_executor.py
@@ -216,6 +216,21 @@ class EdgeExecutor(BaseExecutor):
)
job.state = ti.state if ti else TaskInstanceState.REMOVED
+ if job.state != TaskInstanceState.RUNNING:
+ # Edge worker does not backport emitted Airflow metrics, so
export some metrics
+ # Export metrics as failed as these jobs will be deleted in
the future
+ tags = {
+ "dag_id": job.dag_id,
+ "task_id": job.task_id,
+ "queue": job.queue,
+ "state": str(TaskInstanceState.FAILED),
+ }
+ Stats.incr(
+
f"edge_worker.ti.finish.{job.queue}.{TaskInstanceState.FAILED}.{job.dag_id}.{job.task_id}",
+ tags=tags,
+ )
+ Stats.incr("edge_worker.ti.finish", tags=tags)
+
return bool(lifeless_jobs)
def _purge_jobs(self, session: Session) -> bool:
diff --git a/providers/edge/src/airflow/providers/edge/get_provider_info.py
b/providers/edge/src/airflow/providers/edge/get_provider_info.py
index c646666d301..ba5a3f6b544 100644
--- a/providers/edge/src/airflow/providers/edge/get_provider_info.py
+++ b/providers/edge/src/airflow/providers/edge/get_provider_info.py
@@ -28,7 +28,7 @@ def get_provider_info():
"description": "Handle edge workers on remote sites via HTTP(s)
connection and orchestrates work over distributed sites\n",
"state": "not-ready",
"source-date-epoch": 1737371680,
- "versions": ["0.19.0pre0"],
+ "versions": ["0.20.0pre0"],
"plugins": [
{
"name": "edge_executor",
diff --git
a/providers/edge/src/airflow/providers/edge/worker_api/routes/jobs.py
b/providers/edge/src/airflow/providers/edge/worker_api/routes/jobs.py
index c976d097965..6380439093e 100644
--- a/providers/edge/src/airflow/providers/edge/worker_api/routes/jobs.py
+++ b/providers/edge/src/airflow/providers/edge/worker_api/routes/jobs.py
@@ -37,6 +37,7 @@ from airflow.providers.edge.worker_api.routes._v2_compat
import (
parse_command,
status,
)
+from airflow.stats import Stats
from airflow.utils import timezone
from airflow.utils.sqlalchemy import with_row_locks
from airflow.utils.state import TaskInstanceState
@@ -85,6 +86,10 @@ def fetch(
job.edge_worker = worker_name
job.last_update = timezone.utcnow()
session.commit()
+ # Edge worker does not backport emitted Airflow metrics, so export some
metrics
+ tags = {"dag_id": job.dag_id, "task_id": job.task_id, "queue": job.queue}
+ Stats.incr(f"edge_worker.ti.start.{job.queue}.{job.dag_id}.{job.task_id}",
tags=tags)
+ Stats.incr("edge_worker.ti.start", tags=tags)
return EdgeJobFetched(
dag_id=job.dag_id,
task_id=job.task_id,
@@ -116,6 +121,33 @@ def state(
session: SessionDep,
) -> None:
"""Update the state of a job running on the edge worker."""
+ # execute query to catch the queue and check if state toggles to success
or failed
+ # otherwise possible that Executor resets orphaned jobs and stats are
exported 2 times
+ if state in [TaskInstanceState.SUCCESS, state == TaskInstanceState.FAILED]:
+ query = select(EdgeJobModel).where(
+ EdgeJobModel.dag_id == dag_id,
+ EdgeJobModel.task_id == task_id,
+ EdgeJobModel.run_id == run_id,
+ EdgeJobModel.map_index == map_index,
+ EdgeJobModel.try_number == try_number,
+ EdgeJobModel.state == TaskInstanceState.RUNNING,
+ )
+ job = session.scalar(query)
+
+ if job:
+ # Edge worker does not backport emitted Airflow metrics, so export
some metrics
+ tags = {
+ "dag_id": job.dag_id,
+ "task_id": job.task_id,
+ "queue": job.queue,
+ "state": str(state),
+ }
+ Stats.incr(
+
f"edge_worker.ti.finish.{job.queue}.{state}.{job.dag_id}.{job.task_id}",
+ tags=tags,
+ )
+ Stats.incr("edge_worker.ti.finish", tags=tags)
+
query = (
update(EdgeJobModel)
.where(
diff --git a/providers/edge/tests/unit/edge/executors/test_edge_executor.py
b/providers/edge/tests/unit/edge/executors/test_edge_executor.py
index 5d9ca6fd0e7..3b6c43726ec 100644
--- a/providers/edge/tests/unit/edge/executors/test_edge_executor.py
+++ b/providers/edge/tests/unit/edge/executors/test_edge_executor.py
@@ -83,7 +83,8 @@ class TestEdgeExecutor:
assert jobs[0].task_id == "test_task"
assert jobs[0].concurrency_slots == expected_concurrency
- def test_sync_orphaned_tasks(self):
+ @patch("airflow.stats.Stats.incr")
+ def test_sync_orphaned_tasks(self, mock_stats_incr):
executor = EdgeExecutor()
delta_to_purge = timedelta(minutes=conf.getint("edge",
"job_fail_purge") + 1)
@@ -116,6 +117,17 @@ class TestEdgeExecutor:
executor.sync()
+ mock_stats_incr.assert_called_with(
+ "edge_worker.ti.finish",
+ tags={
+ "dag_id": "test_dag",
+ "queue": "default",
+ "state": "failed",
+ "task_id": "started_running_orphaned",
+ },
+ )
+ mock_stats_incr.call_count == 2
+
with create_session() as session:
jobs = session.query(EdgeJobModel).all()
assert len(jobs) == 1
diff --git a/providers/edge/tests/unit/edge/worker_api/routes/test_jobs.py
b/providers/edge/tests/unit/edge/worker_api/routes/test_jobs.py
new file mode 100644
index 00000000000..e2bb1d00bac
--- /dev/null
+++ b/providers/edge/tests/unit/edge/worker_api/routes/test_jobs.py
@@ -0,0 +1,96 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+from unittest.mock import patch
+
+import pytest
+
+from airflow.providers.edge.models.edge_job import EdgeJobModel
+from airflow.providers.edge.worker_api.routes.jobs import state
+from airflow.utils.session import create_session
+from airflow.utils.state import TaskInstanceState
+
+if TYPE_CHECKING:
+ from sqlalchemy.orm import Session
+
+pytestmark = pytest.mark.db_test
+
+
+DAG_ID = "my_dag"
+TASK_ID = "my_task"
+RUN_ID = "manual__2024-11-24T21:03:01+01:00"
+QUEUE = "test"
+
+
+class TestJobsApiRoutes:
+ @pytest.fixture(autouse=True)
+ def setup_test_cases(self, dag_maker, session: Session):
+ session.query(EdgeJobModel).delete()
+ session.commit()
+
+ @patch("airflow.stats.Stats.incr")
+ def test_state(self, mock_stats_incr, session: Session):
+ with create_session() as session:
+ job = EdgeJobModel(
+ dag_id=DAG_ID,
+ task_id=TASK_ID,
+ run_id=RUN_ID,
+ try_number=1,
+ map_index=-1,
+ state=TaskInstanceState.RUNNING,
+ queue=QUEUE,
+ concurrency_slots=1,
+ command="execute",
+ )
+ session.add(job)
+ session.commit()
+ state(
+ dag_id=DAG_ID,
+ task_id=TASK_ID,
+ run_id=RUN_ID,
+ try_number=1,
+ map_index=-1,
+ state=TaskInstanceState.RUNNING,
+ session=session,
+ )
+
+ mock_stats_incr.assert_not_called()
+
+ state(
+ dag_id=DAG_ID,
+ task_id=TASK_ID,
+ run_id=RUN_ID,
+ try_number=1,
+ map_index=-1,
+ state=TaskInstanceState.SUCCESS,
+ session=session,
+ )
+
+ mock_stats_incr.assert_called_with(
+ "edge_worker.ti.finish",
+ tags={
+ "dag_id": DAG_ID,
+ "queue": QUEUE,
+ "state": TaskInstanceState.SUCCESS,
+ "task_id": TASK_ID,
+ },
+ )
+ mock_stats_incr.call_count == 2
+
+ assert session.query(EdgeJobModel).scalar().state ==
TaskInstanceState.SUCCESS