This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new bf3afcebee7 Fix Edge worker fork mode reporting supervisor failures as 
success (#67887)
bf3afcebee7 is described below

commit bf3afcebee7942e9c62053aea96375f71a583696
Author: Jeongwoo Do <[email protected]>
AuthorDate: Wed Jun 3 03:54:16 2026 +0900

    Fix Edge worker fork mode reporting supervisor failures as success (#67887)
    
    * Fix Edge worker fork mode reporting supervisor failures as success
    
    * fix logic
---
 .../src/airflow/providers/edge3/cli/worker.py      | 18 +++--
 .../edge3/tests/unit/edge3/cli/test_worker.py      | 76 ++++++++++++++++++----
 2 files changed, 74 insertions(+), 20 deletions(-)

diff --git a/providers/edge3/src/airflow/providers/edge3/cli/worker.py 
b/providers/edge3/src/airflow/providers/edge3/cli/worker.py
index 1a044fdde2e..3ecb6396f2c 100644
--- a/providers/edge3/src/airflow/providers/edge3/cli/worker.py
+++ b/providers/edge3/src/airflow/providers/edge3/cli/worker.py
@@ -32,7 +32,7 @@ from functools import cached_property
 from http import HTTPStatus
 from multiprocessing import Process
 from pathlib import Path
-from typing import IO, TYPE_CHECKING
+from typing import IO, TYPE_CHECKING, NoReturn
 
 import anyio
 from aiofiles import open as aio_open
@@ -422,7 +422,7 @@ class EdgeWorker:
             return EdgeWorkerState.MAINTENANCE_MODE
         return EdgeWorkerState.IDLE
 
-    def _run_job_via_supervisor(self, workload: ExecuteTypeBody, 
error_file_path: Path) -> int:
+    def _run_job_via_supervisor(self, workload: ExecuteTypeBody, 
error_file_path: Path) -> NoReturn:
         """Run a task by calling the supervisor directly (executes inside a 
forked child process)."""
         _reset_parent_signal_state()
 
@@ -435,7 +435,7 @@ class EdgeWorker:
             if AIRFLOW_V_3_3_PLUS:
                 from airflow.executors.base_executor import BaseExecutor
 
-                BaseExecutor.run_workload(
+                exit_code = BaseExecutor.run_workload(
                     workload=workload,
                     server=self._execution_api_server_url,
                 )
@@ -448,7 +448,7 @@ class EdgeWorker:
                     f"dag_id={ti.dag_id} task_id={ti.task_id} 
run_id={ti.run_id} map_index={ti.map_index} "
                     f"try_number={ti.try_number}"
                 )
-                supervise(
+                exit_code = supervise(
                     # This is the "wrong" ti type, but it duck types the same. 
TODO: Create a protocol for this.
                     # Same like in 
airflow/executors/local_executor.py:_execute_workload()
                     ti=ti,  # type: ignore[arg-type]
@@ -458,12 +458,17 @@ class EdgeWorker:
                     server=self._execution_api_server_url,
                     log_path=workload.log_path,
                 )
-            return 0
         except Exception:
             logger.exception("Task execution failed")
             with suppress(Exception):
                 error_file_path.write_text(traceback.format_exc())
-            return 1
+            exit_code = 1
+
+        # Exit explicitly so the real exit code propagates to the parent 
process.
+        # the child would always exit 0 without this, so a failed supervisor
+        # (non-zero ``exit_code``, e.g. when ``run_workload`` reports a task 
failure without raising)
+        # would be misreported as success by the parent's ``Job.is_success`` 
check.
+        sys.exit(exit_code)
 
     def _launch_job_subprocess(self, workload: ExecuteTypeBody) -> 
tuple[subprocess.Popen, Path]:
         """Launch workload via a fresh Python interpreter 
(subprocess.Popen)."""
@@ -700,6 +705,7 @@ class EdgeWorker:
                         break
             await self._push_logs_in_chunks(job)
 
+            logger.info("The code is changed: %s", job.edge_job.identifier)
             if job.is_success:
                 logger.info("Job completed: %s", job.edge_job.identifier)
                 await jobs_set_state(job.edge_job.key, 
TaskInstanceState.SUCCESS)
diff --git a/providers/edge3/tests/unit/edge3/cli/test_worker.py 
b/providers/edge3/tests/unit/edge3/cli/test_worker.py
index 23a247d8a08..bbbf2690faf 100644
--- a/providers/edge3/tests/unit/edge3/cli/test_worker.py
+++ b/providers/edge3/tests/unit/edge3/cli/test_worker.py
@@ -363,12 +363,15 @@ class TestEdgeWorker:
         worker_with_job: EdgeWorker,
         tmp_path: Path,
     ):
+        mock_supervise.return_value = 0
         worker_with_job.__dict__["_execution_api_server_url"] = 
"https://mock-server/execution";
         edge_job = worker_with_job.jobs.pop().edge_job
         error_file_path = tmp_path / "fork-error.log"
-        result = worker_with_job._run_job_via_supervisor(edge_job.command, 
error_file_path)
+        with mock.patch("os.setpgrp"), pytest.raises(SystemExit) as exc_info:
+            worker_with_job._run_job_via_supervisor(edge_job.command, 
error_file_path)
 
-        assert result == 0
+        # The child process must exit 0 on success so the parent's 
Job.is_success check passes.
+        assert exc_info.value.code == 0
         assert not error_file_path.exists()  # no error written on success
 
     @patch("airflow.executors.base_executor.BaseExecutor.run_workload")
@@ -382,12 +385,15 @@ class TestEdgeWorker:
         worker_with_job: EdgeWorker,
         tmp_path: Path,
     ):
+        mock_run_workload.return_value = 0
         worker_with_job.__dict__["_execution_api_server_url"] = 
"https://mock-server/execution";
         edge_job = worker_with_job.jobs.pop().edge_job
         error_file_path = tmp_path / "fork-error.log"
-        result = worker_with_job._run_job_via_supervisor(edge_job.command, 
error_file_path)
+        with mock.patch("os.setpgrp"), pytest.raises(SystemExit) as exc_info:
+            worker_with_job._run_job_via_supervisor(edge_job.command, 
error_file_path)
 
-        assert result == 0
+        # The child process must exit 0 on success so the parent's 
Job.is_success check passes.
+        assert exc_info.value.code == 0
         assert not error_file_path.exists()  # no error written on success
 
     @patch("airflow.sdk.execution_time.supervisor.supervise")
@@ -403,9 +409,10 @@ class TestEdgeWorker:
         worker_with_job.__dict__["_execution_api_server_url"] = 
"https://mock-server/execution";
         edge_job = worker_with_job.jobs.pop().edge_job
         error_file_path = tmp_path / "fork-error.log"
-        result = worker_with_job._run_job_via_supervisor(edge_job.command, 
error_file_path)
+        with mock.patch("os.setpgrp"), pytest.raises(SystemExit) as exc_info:
+            worker_with_job._run_job_via_supervisor(edge_job.command, 
error_file_path)
 
-        assert result == 1
+        assert exc_info.value.code == 1
         assert error_file_path.exists()
         assert "Supervise failed" in error_file_path.read_text()
 
@@ -424,12 +431,52 @@ class TestEdgeWorker:
         worker_with_job.__dict__["_execution_api_server_url"] = 
"https://mock-server/execution";
         edge_job = worker_with_job.jobs.pop().edge_job
         error_file_path = tmp_path / "fork-error.log"
-        result = worker_with_job._run_job_via_supervisor(edge_job.command, 
error_file_path)
+        with mock.patch("os.setpgrp"), pytest.raises(SystemExit) as exc_info:
+            worker_with_job._run_job_via_supervisor(edge_job.command, 
error_file_path)
 
-        assert result == 1
+        assert exc_info.value.code == 1
         assert error_file_path.exists()
         assert "Supervise failed" in error_file_path.read_text()
 
+    @patch("airflow.executors.base_executor.BaseExecutor.run_workload")
+    @pytest.mark.skipif(not hasattr(os, "fork"), reason="Requires the fork 
start method")
+    @pytest.mark.skipif(
+        not AIRFLOW_V_3_3_PLUS, reason="Test is for Airflow >= 3.3.0 where 
BaseExecutor.run_workload is used"
+    )
+    def test_fork_child_exits_nonzero_when_supervisor_raises(
+        self,
+        mock_run_workload,
+        worker_with_job: EdgeWorker,
+        tmp_path: Path,
+    ):
+        """
+        A supervisor exception must make the forked child terminate with a 
non-zero exit code so the
+        parent's Job.is_success reports the failure.
+        """
+        import multiprocessing
+
+        mock_run_workload.side_effect = RuntimeError("supervisor crashed")
+        worker_with_job.__dict__["_execution_api_server_url"] = 
"https://mock-server/execution";
+        edge_job = worker_with_job.jobs.pop().edge_job
+        error_file_path = tmp_path / "fork-error.log"
+
+        # Use the fork context explicitly so the child inherits the patched 
run_workload in memory.
+        process = multiprocessing.get_context("fork").Process(
+            target=worker_with_job._run_job_via_supervisor,
+            kwargs={"workload": edge_job.command, "error_file_path": 
error_file_path},
+        )
+        process.start()
+        process.join(timeout=30)
+
+        # With ``return 1`` this would have been 0; ``sys.exit(1)`` propagates 
the non-zero code.
+        assert process.exitcode == 1
+        # And the parent's success check therefore reports failure instead of 
a false success.
+        job = Job(edge_job=edge_job, process=process, logfile=tmp_path / 
"file.log")  # type: ignore[arg-type]
+        assert job.is_success is False
+        # Confirm the non-zero exit came from the supervisor failure path (not 
an unrelated early error).
+        assert error_file_path.exists()
+        assert "supervisor crashed" in error_file_path.read_text()
+
     @patch("airflow.providers.edge3.cli.worker.jobs_fetch")
     @patch("airflow.providers.edge3.cli.worker.EdgeWorker._launch_job")
     @pytest.mark.asyncio
@@ -1148,14 +1195,15 @@ class TestSignalHandling:
             mock.patch("os.setpgrp", side_effect=lambda: order("setpgrp")),
             mock.patch(
                 "airflow.executors.base_executor.BaseExecutor.run_workload",
-                side_effect=lambda **_: order("supervise"),
+                side_effect=lambda **_: (order("supervise"), 0)[1],
             ),
         ):
-            rc = worker._run_job_via_supervisor(
-                workload=self._make_workload(),
-                error_file_path=tmp_path / "fork-error.log",
-            )
-        assert rc == 0
+            with pytest.raises(SystemExit) as exc_info:
+                worker._run_job_via_supervisor(
+                    workload=self._make_workload(),
+                    error_file_path=tmp_path / "fork-error.log",
+                )
+        assert exc_info.value.code == 0
         assert [c.args[0] for c in order.call_args_list] == ["reset", 
"setpgrp", "supervise"]
 
     def test_shutdown_handler_is_idempotent(self, worker_with_one_job):

Reply via email to