This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 67fdf3625da Bugfix/support Subpath w/o Execution API Url (#57372)
67fdf3625da is described below
commit 67fdf3625da003a2cf0b9d00ebbf0b3f1cd70239
Author: Jens Scheffler <[email protected]>
AuthorDate: Tue Oct 28 14:04:33 2025 +0100
Bugfix/support Subpath w/o Execution API Url (#57372)
* Improve default of execution URL if deployed in subpath
* Improve default of execution URL if deployed in subpath
* Fix unit tests and add coverage for execution URL function
* Fix unit tests and add coverage for execution URL function, ups need mark
as test for AF3 only
* Apply suggestion from @amoghrajesh
Co-authored-by: Amogh Desai <[email protected]>
---------
Co-authored-by: Amogh Desai <[email protected]>
---
providers/edge3/docs/deployment.rst | 4 +--
.../src/airflow/providers/edge3/cli/worker.py | 24 ++++++++++------
.../edge3/tests/unit/edge3/cli/test_worker.py | 32 ++++++++++++++--------
3 files changed, 37 insertions(+), 23 deletions(-)
diff --git a/providers/edge3/docs/deployment.rst
b/providers/edge3/docs/deployment.rst
index 0cfe2fa4d98..d0387c41946 100644
--- a/providers/edge3/docs/deployment.rst
+++ b/providers/edge3/docs/deployment.rst
@@ -51,8 +51,8 @@ Minimum Airflow configuration settings for the Edge Worker to
make it running is
- Section ``[core]``
- ``execution_api_server_url``: If not set, the base URL from
``edge.api_url`` will be used. For example,
- when ``edge.api_url`` is set to
``https://your-hostname-and-port/edge_worker/v1/rpcapi``, it will
- default to ``https://your-hostname-and-port/execution/``.
+ when ``edge.api_url`` is set to
``https://your-hostname-and-port/subpath/edge_worker/v1/rpcapi``, it will
+ default to ``https://your-hostname-and-port/subpath/execution/`` (starting
from version Airflow version 3.0.0).
- ``executor``: Executor must be set or added to be
``airflow.providers.edge3.executors.EdgeExecutor``
- ``internal_api_secret_key``: An encryption key must be set on api-server
and Edge Worker component as
shared secret to authenticate traffic. It should be a random string like
the fernet key
diff --git a/providers/edge3/src/airflow/providers/edge3/cli/worker.py
b/providers/edge3/src/airflow/providers/edge3/cli/worker.py
index 48aa4c8d578..d1e0bcdcf28 100644
--- a/providers/edge3/src/airflow/providers/edge3/cli/worker.py
+++ b/providers/edge3/src/airflow/providers/edge3/cli/worker.py
@@ -21,13 +21,13 @@ import os
import signal
import sys
from datetime import datetime
+from functools import cache
from http import HTTPStatus
from multiprocessing import Process
from pathlib import Path
from subprocess import Popen
from time import sleep
from typing import TYPE_CHECKING
-from urllib.parse import urlparse
from lockfile.pidlockfile import remove_existing_pidfile
from requests import HTTPError
@@ -176,7 +176,19 @@ class EdgeWorker:
return EdgeWorkerState.IDLE
@staticmethod
- def _run_job_via_supervisor(workload) -> int:
+ @cache
+ def _execution_api_server_url() -> str:
+ """Get the execution api server url from config or environment."""
+ api_url = conf.get("edge", "api_url")
+ execution_api_server_url = conf.get("core",
"execution_api_server_url", fallback="")
+ if not execution_api_server_url and api_url:
+ # Derive execution api url from edge api url as fallback
+ execution_api_server_url =
api_url.replace("edge_worker/v1/rpcapi", "execution")
+ logger.info("Using execution api server url: %s",
execution_api_server_url)
+ return execution_api_server_url
+
+ @staticmethod
+ def _run_job_via_supervisor(workload, execution_api_server_url) -> int:
from airflow.sdk.execution_time.supervisor import supervise
# Ignore ctrl-c in this process -- we don't want to kill _this_ one.
we let tasks run to completion
@@ -186,12 +198,6 @@ class EdgeWorker:
setproctitle(f"airflow edge worker: {workload.ti.key}")
try:
- api_url = conf.get("edge", "api_url")
- execution_api_server_url = conf.get("core",
"execution_api_server_url", fallback="")
- if not execution_api_server_url:
- parsed = urlparse(api_url)
- execution_api_server_url =
f"{parsed.scheme}://{parsed.netloc}/execution/"
-
supervise(
# This is the "wrong" ti type, but it duck types the same.
TODO: Create a protocol for this.
# Same like in
airflow/executors/local_executor.py:_execute_work()
@@ -215,7 +221,7 @@ class EdgeWorker:
workload: ExecuteTask = edge_job.command
process = Process(
target=EdgeWorker._run_job_via_supervisor,
- kwargs={"workload": workload},
+ kwargs={"workload": workload, "execution_api_server_url":
EdgeWorker._execution_api_server_url()},
)
process.start()
base_log_folder = conf.get("logging", "base_log_folder", fallback="NOT
AVAILABLE")
diff --git a/providers/edge3/tests/unit/edge3/cli/test_worker.py
b/providers/edge3/tests/unit/edge3/cli/test_worker.py
index 6c0a4aeb80b..427fa328868 100644
--- a/providers/edge3/tests/unit/edge3/cli/test_worker.py
+++ b/providers/edge3/tests/unit/edge3/cli/test_worker.py
@@ -167,12 +167,12 @@ class TestEdgeWorker:
"configs, expected_url",
[
(
- {("edge", "api_url"): "https://api-endpoint"},
- "https://api-endpoint/execution/",
+ {("edge", "api_url"):
"https://api-host/edge_worker/v1/rpcapi"},
+ "https://api-host/execution",
),
(
- {("edge", "api_url"): "https://api:1234/endpoint"},
- "https://api:1234/execution/",
+ {("edge", "api_url"):
"https://api:1234/subpath/edge_worker/v1/rpcapi"},
+ "https://api:1234/subpath/execution",
),
(
{
@@ -183,16 +183,25 @@ class TestEdgeWorker:
),
],
)
+ def test_execution_api_server_url(
+ self,
+ configs,
+ expected_url,
+ ):
+ with conf_vars(configs):
+ EdgeWorker._execution_api_server_url.cache_clear()
+ url = EdgeWorker._execution_api_server_url()
+ assert url == expected_url
+
+ @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Test requires Airflow
3+")
@patch("airflow.sdk.execution_time.supervisor.supervise")
@patch("airflow.providers.edge3.cli.worker.Process")
@patch("airflow.providers.edge3.cli.worker.Popen")
- def test_use_execution_api_server_url(
+ def test_supervise_launch(
self,
mock_popen,
mock_process,
mock_supervise,
- configs,
- expected_url,
worker_with_job: EdgeWorker,
):
mock_popen.side_effect = [MagicMock()]
@@ -200,13 +209,12 @@ class TestEdgeWorker:
mock_process.side_effect = [mock_process_instance]
edge_job = EdgeWorker.jobs.pop().edge_job
- with conf_vars(configs):
- worker_with_job._launch_job(edge_job)
+ worker_with_job._launch_job(edge_job)
- mock_process_callback = mock_process.call_args.kwargs["target"]
- mock_process_callback(workload=MagicMock())
+ mock_process_callback = mock_process.call_args.kwargs["target"]
+ mock_process_callback(workload=MagicMock(),
execution_api_server_url="http://mock-url")
- assert mock_supervise.call_args.kwargs["server"] == expected_url
+ assert mock_supervise.call_args.kwargs["server"] == "http://mock-url"
@pytest.mark.parametrize(
"reserve_result, fetch_result, expected_calls",