This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 67fdf3625da Bugfix/support Subpath w/o Execution API Url (#57372)
67fdf3625da is described below

commit 67fdf3625da003a2cf0b9d00ebbf0b3f1cd70239
Author: Jens Scheffler <[email protected]>
AuthorDate: Tue Oct 28 14:04:33 2025 +0100

    Bugfix/support Subpath w/o Execution API Url (#57372)
    
    * Improve default of execution URL if deployed in subpath
    
    * Improve default of execution URL if deployed in subpath
    
    * Fix unit tests and add coverage for execution URL function
    
    * Fix unit tests and add coverage for execution URL function, ups need mark 
as test for AF3 only
    
    * Apply suggestion from @amoghrajesh
    
    Co-authored-by: Amogh Desai <[email protected]>
    
    ---------
    
    Co-authored-by: Amogh Desai <[email protected]>
---
 providers/edge3/docs/deployment.rst                |  4 +--
 .../src/airflow/providers/edge3/cli/worker.py      | 24 ++++++++++------
 .../edge3/tests/unit/edge3/cli/test_worker.py      | 32 ++++++++++++++--------
 3 files changed, 37 insertions(+), 23 deletions(-)

diff --git a/providers/edge3/docs/deployment.rst 
b/providers/edge3/docs/deployment.rst
index 0cfe2fa4d98..d0387c41946 100644
--- a/providers/edge3/docs/deployment.rst
+++ b/providers/edge3/docs/deployment.rst
@@ -51,8 +51,8 @@ Minimum Airflow configuration settings for the Edge Worker to 
make it running is
 - Section ``[core]``
 
   - ``execution_api_server_url``: If not set, the base URL from 
``edge.api_url`` will be used. For example,
-    when ``edge.api_url`` is set to 
``https://your-hostname-and-port/edge_worker/v1/rpcapi``, it will
-    default to ``https://your-hostname-and-port/execution/``.
+    when ``edge.api_url`` is set to 
``https://your-hostname-and-port/subpath/edge_worker/v1/rpcapi``, it will
+    default to ``https://your-hostname-and-port/subpath/execution/`` (starting 
from version Airflow version 3.0.0).
   - ``executor``: Executor must be set or added to be 
``airflow.providers.edge3.executors.EdgeExecutor``
   - ``internal_api_secret_key``: An encryption key must be set on api-server 
and Edge Worker component as
     shared secret to authenticate traffic. It should be a random string like 
the fernet key
diff --git a/providers/edge3/src/airflow/providers/edge3/cli/worker.py 
b/providers/edge3/src/airflow/providers/edge3/cli/worker.py
index 48aa4c8d578..d1e0bcdcf28 100644
--- a/providers/edge3/src/airflow/providers/edge3/cli/worker.py
+++ b/providers/edge3/src/airflow/providers/edge3/cli/worker.py
@@ -21,13 +21,13 @@ import os
 import signal
 import sys
 from datetime import datetime
+from functools import cache
 from http import HTTPStatus
 from multiprocessing import Process
 from pathlib import Path
 from subprocess import Popen
 from time import sleep
 from typing import TYPE_CHECKING
-from urllib.parse import urlparse
 
 from lockfile.pidlockfile import remove_existing_pidfile
 from requests import HTTPError
@@ -176,7 +176,19 @@ class EdgeWorker:
         return EdgeWorkerState.IDLE
 
     @staticmethod
-    def _run_job_via_supervisor(workload) -> int:
+    @cache
+    def _execution_api_server_url() -> str:
+        """Get the execution api server url from config or environment."""
+        api_url = conf.get("edge", "api_url")
+        execution_api_server_url = conf.get("core", 
"execution_api_server_url", fallback="")
+        if not execution_api_server_url and api_url:
+            # Derive execution api url from edge api url as fallback
+            execution_api_server_url = 
api_url.replace("edge_worker/v1/rpcapi", "execution")
+        logger.info("Using execution api server url: %s", 
execution_api_server_url)
+        return execution_api_server_url
+
+    @staticmethod
+    def _run_job_via_supervisor(workload, execution_api_server_url) -> int:
         from airflow.sdk.execution_time.supervisor import supervise
 
         # Ignore ctrl-c in this process -- we don't want to kill _this_ one. 
we let tasks run to completion
@@ -186,12 +198,6 @@ class EdgeWorker:
         setproctitle(f"airflow edge worker: {workload.ti.key}")
 
         try:
-            api_url = conf.get("edge", "api_url")
-            execution_api_server_url = conf.get("core", 
"execution_api_server_url", fallback="")
-            if not execution_api_server_url:
-                parsed = urlparse(api_url)
-                execution_api_server_url = 
f"{parsed.scheme}://{parsed.netloc}/execution/"
-
             supervise(
                 # This is the "wrong" ti type, but it duck types the same. 
TODO: Create a protocol for this.
                 # Same like in 
airflow/executors/local_executor.py:_execute_work()
@@ -215,7 +221,7 @@ class EdgeWorker:
         workload: ExecuteTask = edge_job.command
         process = Process(
             target=EdgeWorker._run_job_via_supervisor,
-            kwargs={"workload": workload},
+            kwargs={"workload": workload, "execution_api_server_url": 
EdgeWorker._execution_api_server_url()},
         )
         process.start()
         base_log_folder = conf.get("logging", "base_log_folder", fallback="NOT 
AVAILABLE")
diff --git a/providers/edge3/tests/unit/edge3/cli/test_worker.py 
b/providers/edge3/tests/unit/edge3/cli/test_worker.py
index 6c0a4aeb80b..427fa328868 100644
--- a/providers/edge3/tests/unit/edge3/cli/test_worker.py
+++ b/providers/edge3/tests/unit/edge3/cli/test_worker.py
@@ -167,12 +167,12 @@ class TestEdgeWorker:
         "configs, expected_url",
         [
             (
-                {("edge", "api_url"): "https://api-endpoint"},
-                "https://api-endpoint/execution/";,
+                {("edge", "api_url"): 
"https://api-host/edge_worker/v1/rpcapi"},
+                "https://api-host/execution";,
             ),
             (
-                {("edge", "api_url"): "https://api:1234/endpoint"},
-                "https://api:1234/execution/";,
+                {("edge", "api_url"): 
"https://api:1234/subpath/edge_worker/v1/rpcapi"},
+                "https://api:1234/subpath/execution";,
             ),
             (
                 {
@@ -183,16 +183,25 @@ class TestEdgeWorker:
             ),
         ],
     )
+    def test_execution_api_server_url(
+        self,
+        configs,
+        expected_url,
+    ):
+        with conf_vars(configs):
+            EdgeWorker._execution_api_server_url.cache_clear()
+            url = EdgeWorker._execution_api_server_url()
+            assert url == expected_url
+
+    @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Test requires Airflow 
3+")
     @patch("airflow.sdk.execution_time.supervisor.supervise")
     @patch("airflow.providers.edge3.cli.worker.Process")
     @patch("airflow.providers.edge3.cli.worker.Popen")
-    def test_use_execution_api_server_url(
+    def test_supervise_launch(
         self,
         mock_popen,
         mock_process,
         mock_supervise,
-        configs,
-        expected_url,
         worker_with_job: EdgeWorker,
     ):
         mock_popen.side_effect = [MagicMock()]
@@ -200,13 +209,12 @@ class TestEdgeWorker:
         mock_process.side_effect = [mock_process_instance]
 
         edge_job = EdgeWorker.jobs.pop().edge_job
-        with conf_vars(configs):
-            worker_with_job._launch_job(edge_job)
+        worker_with_job._launch_job(edge_job)
 
-            mock_process_callback = mock_process.call_args.kwargs["target"]
-            mock_process_callback(workload=MagicMock())
+        mock_process_callback = mock_process.call_args.kwargs["target"]
+        mock_process_callback(workload=MagicMock(), 
execution_api_server_url="http://mock-url";)
 
-            assert mock_supervise.call_args.kwargs["server"] == expected_url
+        assert mock_supervise.call_args.kwargs["server"] == "http://mock-url";
 
     @pytest.mark.parametrize(
         "reserve_result, fetch_result, expected_calls",

Reply via email to