This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9b1d58aedb2 Allow using fresh interpreter besides fork() in Edge
Worker (#65943)
9b1d58aedb2 is described below
commit 9b1d58aedb2a055a9575297ce3691279cbb76a95
Author: Diogo Silva <[email protected]>
AuthorDate: Thu May 14 17:12:18 2026 +0100
Allow using fresh interpreter besides fork() in Edge Worker (#65943)
* fix(edge3): replace fork() with subprocess.Popen to prevent deadlocks in
multi-threaded workers
The edge worker process runs 22+ threads (asyncio event loop,
ThreadPoolExecutor, HTTP clients). When `_launch_job()` used
`multiprocessing.Process` (fork start method), `os.fork()` copied
locked import locks from other threads into the child. Since only the
forking thread survives, those locks are never released — causing
permanent deadlocks on any subsequent import in the child process.
A non-deadlock variant also occurs where the child inherits corrupted
`sys.modules` state, causing `ModuleNotFoundError` cascades for all
plugin and DAG imports.
This commit replaces the `multiprocessing.Process` fork with
`subprocess.Popen` launching a fresh Python interpreter via the
existing `airflow.sdk.execution_time.execute_workload` CLI entrypoint.
The `ExecuteTask` workload is already a Pydantic model with
`model_dump_json()` — the same serialization path used by the ECS
executor and the edge executor's own DB storage.
Changes:
- `worker.py`: Replace `_launch_job` to use `subprocess.Popen` with
`execute_workload --json-string`. Remove `_run_job_via_supervisor`,
`_reset_parent_signal_state`, `multiprocessing` imports, and the
`results_queue` plumbing.
- `dataclasses.py`: Change `Job.process` type from
`multiprocessing.Process` to `subprocess.Popen`. Update `is_running`
to use `poll()` and `is_success` to check `returncode`.
- `test_worker.py`: Update mocks and assertions to match the new
subprocess-based approach.
Fixes: #65942
* Fix test_worker.py to use multiprocessing.Process instead of
subprocess.Popen
* Honor fresh interpreter mode in Edge worker
* Clarify Edge worker task process handling
* Improve Edge worker subprocess failure handling
* fix: rollback supervise changes & fix tests related to display_name
* Unify error transport for fork and subprocess paths via temp file
Remove the multiprocessing.Queue from the fork execution path and use a
plain temp file for both fork and subprocess paths. Both paths now write
failure text to a NamedTemporaryFile (Path stored as Job.stderr_file_path);
the parent reads it after the child exits via Job.failure_details() and
pushes the content to the task log via logs_push.
Benefits over the Queue approach:
- No risk of buffer deadlock (the Queue deadlock was the original issue)
- Works identically for both fork and subprocess children
- Simpler: no IPC setup, no draining loop, no Queue import
- Error file is only created/filled on failure; task logs cover the
success path
Also extract _make_task_temp_file() helper to avoid duplicating the
NamedTemporaryFile creation pattern across _launch_job_subprocess and
_launch_job_fork.
---
.../src/airflow/providers/edge3/cli/dataclasses.py | 28 +-
.../src/airflow/providers/edge3/cli/worker.py | 121 +++++--
.../edge3/tests/unit/edge3/cli/test_worker.py | 362 ++++++++++++++-------
3 files changed, 373 insertions(+), 138 deletions(-)
diff --git a/providers/edge3/src/airflow/providers/edge3/cli/dataclasses.py
b/providers/edge3/src/airflow/providers/edge3/cli/dataclasses.py
index 63e12f6f810..e3e3d3f09e6 100644
--- a/providers/edge3/src/airflow/providers/edge3/cli/dataclasses.py
+++ b/providers/edge3/src/airflow/providers/edge3/cli/dataclasses.py
@@ -17,6 +17,7 @@
from __future__ import annotations
import json
+import subprocess
from dataclasses import asdict, dataclass
from multiprocessing import Process
from pathlib import Path
@@ -72,17 +73,42 @@ class Job:
"""Holds all information for a task/job to be executed as bundle."""
edge_job: EdgeJobFetched
- process: Process
+ process: subprocess.Popen | Process
+ """Can be subprocess.Popen (for the spawn path) or multiprocessing.Process
(for the fork path)."""
logfile: Path
logsize: int = 0
"""Last size of log file, point of last chunk push."""
+ stderr_file_path: Path | None = None
+ """Path to file where error details are written on failure (stderr for
subprocess path, traceback text for fork path)."""
@property
def is_running(self) -> bool:
"""Check if the job is still running."""
+ if isinstance(self.process, subprocess.Popen):
+ return self.process.poll() is None
return self.process.is_alive()
@property
def is_success(self) -> bool:
"""Check if the job was successful."""
+ if isinstance(self.process, subprocess.Popen):
+ return self.process.returncode == 0
return self.process.exitcode == 0
+
+ def failure_details(self) -> str:
+ """Format failure details, reading error text from the error file if
available."""
+ error_output = ""
+ if self.stderr_file_path and self.stderr_file_path.exists():
+ error_output =
self.stderr_file_path.read_bytes().decode(errors="backslashreplace").strip()
+ if isinstance(self.process, subprocess.Popen):
+ ex_txt = f"Task subprocess exited with code
{self.process.returncode}"
+ else:
+ ex_txt = f"Task fork exited with code {self.process.exitcode}"
+ if error_output:
+ ex_txt = f"{ex_txt}\n{error_output}"
+ return ex_txt
+
+ def cleanup(self) -> None:
+ """Remove transient files owned by this job."""
+ if self.stderr_file_path:
+ self.stderr_file_path.unlink(missing_ok=True)
diff --git a/providers/edge3/src/airflow/providers/edge3/cli/worker.py
b/providers/edge3/src/airflow/providers/edge3/cli/worker.py
index 778af0dc325..fec957f06eb 100644
--- a/providers/edge3/src/airflow/providers/edge3/cli/worker.py
+++ b/providers/edge3/src/airflow/providers/edge3/cli/worker.py
@@ -19,7 +19,9 @@ from __future__ import annotations
import logging
import os
import signal
+import subprocess
import sys
+import tempfile
import time
import traceback
from asyncio import Task, create_task, gather, get_running_loop, sleep
@@ -28,9 +30,9 @@ from contextlib import suppress
from datetime import datetime
from functools import cached_property
from http import HTTPStatus
-from multiprocessing import Process, Queue
+from multiprocessing import Process
from pathlib import Path
-from typing import TYPE_CHECKING
+from typing import IO, TYPE_CHECKING
import anyio
from aiofiles import open as aio_open
@@ -66,6 +68,7 @@ from airflow.utils.state import TaskInstanceState
if TYPE_CHECKING:
from airflow.configuration import AirflowConfigParser
from airflow.executors.workloads import ExecuteTask
+ from airflow.providers.edge3.worker_api.datamodels import EdgeJobFetched
logger = logging.getLogger(__name__)
@@ -75,6 +78,12 @@ else:
from setproctitle import setproctitle
+def _make_task_temp_file(prefix: str) -> tuple[IO[bytes], Path]:
+ """Create a named temporary file for task output capture and return the
open file and its path."""
+ f = tempfile.NamedTemporaryFile(prefix=prefix, suffix=".log", delete=False)
+ return f, Path(f.name)
+
+
def _edge_hostname() -> str:
"""Get the hostname of the edge worker that should be reported by tasks."""
return os.environ.get("HOSTNAME", getfqdn())
@@ -413,7 +422,8 @@ class EdgeWorker:
return EdgeWorkerState.MAINTENANCE_MODE
return EdgeWorkerState.IDLE
- def _run_job_via_supervisor(self, workload: ExecuteTask, results_queue:
Queue) -> int:
+ def _run_job_via_supervisor(self, workload: ExecuteTask, error_file_path:
Path) -> int:
+ """Run a task by calling the supervisor directly (executes inside a
forked child process)."""
_reset_parent_signal_state()
# Ignore ctrl-c in this process -- we don't want to kill _this_ one.
we let tasks run to completion
@@ -448,23 +458,89 @@ class EdgeWorker:
server=self._execution_api_server_url,
log_path=workload.log_path,
)
- results_queue.put("OK")
return 0
- except Exception as e:
+ except Exception:
logger.exception("Task execution failed")
- results_queue.put(e)
+ with suppress(Exception):
+ error_file_path.write_text(traceback.format_exc())
return 1
- def _launch_job(self, workload: ExecuteTask) -> tuple[Process,
Queue[Exception]]:
+ def _launch_job_subprocess(self, workload: ExecuteTask) ->
tuple[subprocess.Popen, Path]:
+ """Launch workload via a fresh Python interpreter
(subprocess.Popen)."""
+ env = os.environ.copy()
+ if self._execution_api_server_url:
+ env["AIRFLOW__CORE__EXECUTION_API_SERVER_URL"] =
self._execution_api_server_url
+
+ # Keep stderr off a PIPE: the worker only inspects stderr after the
task finishes,
+ # so a verbose child could otherwise fill the pipe buffer and block
forever. Also keep
+ # it task-scoped instead of inheriting the worker's stderr/stdout;
supervisor startup
+ # failures should be pushed to the task log, not only the
worker/container log.
+ stderr_file, stderr_file_path =
_make_task_temp_file("airflow-edge-task-stderr-")
+ try:
+ process = subprocess.Popen(
+ [
+ sys.executable,
+ "-m",
+ "airflow.sdk.execution_time.execute_workload",
+ "--json-string",
+ workload.model_dump_json(),
+ ],
+ env=env,
+ start_new_session=True,
+ stderr=stderr_file,
+ )
+ except Exception:
+ stderr_file_path.unlink(missing_ok=True)
+ raise
+ finally:
+ # Close the parent's copy of the fd. Popen already dup2()'d it
into the child,
+ # so the child's stderr remains open and writable. The parent
reads the output
+ # later via stderr_file_path (the Path) once the child has exited.
+ stderr_file.close()
+ logger.info(
+ "Launched task subprocess pid=%d for %s",
+ process.pid,
+ workload.ti.id,
+ )
+ return process, stderr_file_path
+
+ def _launch_job_fork(self, workload: ExecuteTask) -> tuple[Process, Path]:
+ """Launch workload by forking the current process
(multiprocessing.Process)."""
# Improvement: Use frozen GC to prevent child process from copying
unnecessary memory
# See _spawn_workers_with_gc_freeze() in
airflow-core/src/airflow/executors/local_executor.py
- results_queue: Queue[Exception] = Queue()
+ error_file, error_file_path =
_make_task_temp_file("airflow-edge-task-error-")
+ error_file.close() # child writes to the file by path; parent only
reads it after exit
process = Process(
target=self._run_job_via_supervisor,
- kwargs={"workload": workload, "results_queue": results_queue},
+ kwargs={"workload": workload, "error_file_path": error_file_path},
)
process.start()
- return process, results_queue
+ logger.info("Launched task fork pid=%d for %s", process.pid,
workload.ti.id)
+ return process, error_file_path
+
+ def _launch_job(self, edge_job: EdgeJobFetched, workload: ExecuteTask,
logfile: Path) -> Job:
+ """
+ Launch a task process.
+
+ Uses ``subprocess.Popen`` (fresh interpreter) when
+ ``core.execute_tasks_new_python_interpreter`` is ``True`` or when
+ ``os.fork`` is unavailable (e.g. Windows). Falls back to
+ ``multiprocessing.Process`` (fork) otherwise — preserving the
+ original behaviour for existing deployments.
+ """
+ use_new_interpreter = not hasattr(os, "fork") or self.conf.getboolean(
+ "core",
+ "execute_tasks_new_python_interpreter",
+ fallback=False,
+ )
+ if use_new_interpreter:
+ # Fresh subprocess path: spawn a new Python interpreter; no shared
memory with parent
+ # Technically safer and more robust, but with more overhead
+ subprocess_process, stderr_file_path =
self._launch_job_subprocess(workload)
+ return Job(edge_job, subprocess_process, logfile,
stderr_file_path=stderr_file_path)
+ # Fork path: clone the current process; child inherits parent memory
+ fork_process, error_file_path = self._launch_job_fork(workload)
+ return Job(edge_job, fork_process, logfile,
stderr_file_path=error_file_path)
async def _push_logs_in_chunks(self, job: Job):
aio_logfile = anyio.Path(job.logfile)
@@ -589,11 +665,10 @@ class EdgeWorker:
logger.info("Received job: %s", edge_job.identifier)
workload: ExecuteTask = edge_job.command
- process, results_queue = self._launch_job(workload)
if TYPE_CHECKING:
assert workload.log_path # We need to assume this is defined in
here
logfile = Path(self.base_log_folder, workload.log_path)
- job = Job(edge_job, process, logfile)
+ job = self._launch_job(edge_job, workload, logfile)
self.jobs.append(job)
await jobs_set_state(edge_job.key, TaskInstanceState.RUNNING)
@@ -603,39 +678,31 @@ class EdgeWorker:
self.background_tasks.add(task)
task.add_done_callback(self.background_tasks.discard)
- while job.is_running and results_queue.empty():
+ while job.is_running:
await self._push_logs_in_chunks(job)
for _ in range(0, self.job_poll_interval * 10):
await sleep(0.1)
if not job.is_running:
break
await self._push_logs_in_chunks(job)
- supervisor_msg = (
- "(Unknown error, no exception details available)"
- if results_queue.empty()
- else results_queue.get()
- )
- # Ensure that supervisor really ended after we grabbed results from
queue
- while True:
- if not job.is_running:
- break
- await sleep(0.1)
self.jobs.remove(job)
if job.is_success:
logger.info("Job completed: %s", job.edge_job.identifier)
await jobs_set_state(job.edge_job.key, TaskInstanceState.SUCCESS)
else:
- if isinstance(supervisor_msg, Exception):
- supervisor_msg =
"\n".join(traceback.format_exception(supervisor_msg))
- logger.error("Job failed: %s with:\n%s", job.edge_job.identifier,
supervisor_msg)
+ ex_txt = job.failure_details()
+ logger.error("Job failed: %s with:\n%s", job.edge_job.identifier,
ex_txt)
+
# Push it upwards to logs for better diagnostic as well
await logs_push(
task=job.edge_job.key,
log_chunk_time=timezone.utcnow(),
- log_chunk_data=f"Error executing job:\n{supervisor_msg}",
+ log_chunk_data=f"Error executing job:\n{ex_txt}",
)
await jobs_set_state(job.edge_job.key, TaskInstanceState.FAILED)
+ # Cleanup temp files used for the job
+ job.cleanup()
async def heartbeat(self, new_maintenance_comments: str | None = None) ->
bool:
"""Report liveness state of worker to central site with stats."""
diff --git a/providers/edge3/tests/unit/edge3/cli/test_worker.py
b/providers/edge3/tests/unit/edge3/cli/test_worker.py
index 0be1573353d..b50af2f7197 100644
--- a/providers/edge3/tests/unit/edge3/cli/test_worker.py
+++ b/providers/edge3/tests/unit/edge3/cli/test_worker.py
@@ -16,16 +16,17 @@
# under the License.
from __future__ import annotations
-import asyncio
import contextlib
import importlib
import json
import logging
-import multiprocessing
+import os
import signal
+import subprocess
+import sys
from datetime import datetime
from io import StringIO
-from multiprocessing import Process, Queue
+from multiprocessing import Process
from pathlib import Path
from unittest import mock
from unittest.mock import call, patch
@@ -52,6 +53,7 @@ from airflow.providers.edge3.worker_api.datamodels import (
WorkerRegistrationReturn,
WorkerSetStateReturn,
)
+from airflow.utils.state import TaskInstanceState
from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.version_compat import AIRFLOW_V_3_2_PLUS,
AIRFLOW_V_3_3_PLUS
@@ -81,21 +83,9 @@ MOCK_COMMAND = {
}
-def _emit_large_exception_target(results_queue):
- """Worker-process target used by
``test_fetch_and_run_job_possible_deadlock``.
-
- Pushes a >64 KB pickled exception to ``results_queue``. On Linux the OS
pipe
- backing ``multiprocessing.Queue`` only has ~64 KB of buffer, so the queue's
- feeder thread blocks on ``send_bytes`` and the subprocess can't terminate
- until the parent reads from the queue — exactly the production deadlock
- condition that #66144 fixed.
- """
- results_queue.put(Exception(f"Task execution failed with large error
message {'-' * 66000}"))
-
-
class _MockProcess(Process):
def __init__(self, returncode=None):
- self.generated_returncode = None
+ self.generated_returncode = returncode
self._is_alive = False
def poll(self):
@@ -105,6 +95,10 @@ class _MockProcess(Process):
def returncode(self):
return self.generated_returncode
+ @property
+ def exitcode(self):
+ return self.generated_returncode
+
def is_alive(self):
return self._is_alive
@@ -118,6 +112,15 @@ class _MockProcess(Process):
pass
+class _MockPopen(subprocess.Popen):
+ def __init__(self, returncode: int | None = None, pid: int = 1234):
+ self.returncode = returncode
+ self.pid = pid
+
+ def poll(self):
+ return self.returncode
+
+
class TestEdgeWorker:
@pytest.fixture(autouse=True)
def setup_parser(self):
@@ -234,6 +237,98 @@ class TestEdgeWorker:
url = test_worker._execution_api_server_url
assert url == expected_url
+ @pytest.mark.parametrize(
+ ("has_fork", "use_new_interpreter", "expected_launch_method"),
+ [
+ pytest.param(True, False, "fork",
id="fork_available_config_false"),
+ pytest.param(True, True, "subprocess",
id="fork_available_config_true"),
+ pytest.param(False, False, "subprocess",
id="fork_unavailable_config_false"),
+ pytest.param(False, True, "subprocess",
id="fork_unavailable_config_true"),
+ ],
+ )
+ def test_launch_job_honors_execute_tasks_new_python_interpreter(
+ self,
+ has_fork,
+ use_new_interpreter,
+ expected_launch_method,
+ monkeypatch,
+ tmp_path: Path,
+ worker_with_job: EdgeWorker,
+ ):
+ if not has_fork:
+ monkeypatch.delattr(os, "fork", raising=False)
+ worker_with_job.conf = mock.MagicMock()
+ worker_with_job.conf.getboolean.return_value = use_new_interpreter
+ edge_job = worker_with_job.jobs[0].edge_job
+ workload = edge_job.command
+ logfile = tmp_path / "mock.log"
+ subprocess_process = _MockPopen(returncode=None)
+ stderr_file_path = tmp_path / "stderr.log"
+ fork_process = _MockProcess()
+ error_file_path = tmp_path / "fork-error.log"
+
+ with (
+ patch.object(
+ worker_with_job, "_launch_job_subprocess",
return_value=(subprocess_process, stderr_file_path)
+ ) as mock_launch_subprocess,
+ patch.object(
+ worker_with_job, "_launch_job_fork",
return_value=(fork_process, error_file_path)
+ ) as mock_launch_fork,
+ ):
+ job = worker_with_job._launch_job(edge_job, workload, logfile)
+
+ if has_fork:
+ worker_with_job.conf.getboolean.assert_called_once_with(
+ "core", "execute_tasks_new_python_interpreter", fallback=False
+ )
+ else:
+ worker_with_job.conf.getboolean.assert_not_called()
+ if expected_launch_method == "subprocess":
+ assert job.process is subprocess_process
+ assert job.stderr_file_path == stderr_file_path
+ mock_launch_subprocess.assert_called_once_with(workload)
+ mock_launch_fork.assert_not_called()
+ else:
+ assert job.process is fork_process
+ assert job.stderr_file_path == error_file_path
+ mock_launch_fork.assert_called_once_with(workload)
+ mock_launch_subprocess.assert_not_called()
+
+ @patch("airflow.providers.edge3.cli.worker.subprocess.Popen")
+ def test_launch_job_subprocess_uses_fresh_interpreter_and_spools_stderr(
+ self,
+ mock_popen,
+ worker_with_job: EdgeWorker,
+ ):
+ process = _MockPopen(returncode=None, pid=4321)
+ mock_popen.return_value = process
+ worker_with_job.__dict__["_execution_api_server_url"] =
"https://mock-server/execution"
+ workload = worker_with_job.jobs[0].edge_job.command
+ stderr_file_path = None
+
+ try:
+ returned_process, stderr_file_path =
worker_with_job._launch_job_subprocess(workload)
+ assert returned_process is process
+
+ popen_args, popen_kwargs = mock_popen.call_args
+ assert popen_args[0] == [
+ sys.executable,
+ "-m",
+ "airflow.sdk.execution_time.execute_workload",
+ "--json-string",
+ workload.model_dump_json(),
+ ]
+ assert (
+ popen_kwargs["env"]["AIRFLOW__CORE__EXECUTION_API_SERVER_URL"]
+ == "https://mock-server/execution"
+ )
+ assert popen_kwargs["start_new_session"] is True
+ assert popen_kwargs["stderr"] is not subprocess.PIPE
+ assert Path(popen_kwargs["stderr"].name) == stderr_file_path
+ finally:
+ if stderr_file_path:
+ stderr_file_path.unlink(missing_ok=True)
+
@patch("airflow.sdk.execution_time.supervisor.supervise")
@pytest.mark.skipif(AIRFLOW_V_3_3_PLUS, reason="Test is for Airflow <
3.3.0 where supervise was used")
@pytest.mark.asyncio
@@ -241,14 +336,15 @@ class TestEdgeWorker:
self,
mock_supervise,
worker_with_job: EdgeWorker,
+ tmp_path: Path,
):
worker_with_job.__dict__["_execution_api_server_url"] =
"https://mock-server/execution"
edge_job = worker_with_job.jobs.pop().edge_job
- q = mock.MagicMock()
- result = worker_with_job._run_job_via_supervisor(edge_job.command, q)
+ error_file_path = tmp_path / "fork-error.log"
+ result = worker_with_job._run_job_via_supervisor(edge_job.command,
error_file_path)
assert result == 0
- q.put.assert_called_once_with("OK")
+ assert not error_file_path.exists() # no error written on success
@patch("airflow.executors.base_executor.BaseExecutor.run_workload")
@pytest.mark.skipif(
@@ -259,32 +355,58 @@ class TestEdgeWorker:
self,
mock_run_workload,
worker_with_job: EdgeWorker,
+ tmp_path: Path,
):
worker_with_job.__dict__["_execution_api_server_url"] =
"https://mock-server/execution"
edge_job = worker_with_job.jobs.pop().edge_job
- q = mock.MagicMock()
- result = worker_with_job._run_job_via_supervisor(edge_job.command, q)
+ error_file_path = tmp_path / "fork-error.log"
+ result = worker_with_job._run_job_via_supervisor(edge_job.command,
error_file_path)
assert result == 0
- q.put.assert_called_once_with("OK")
+ assert not error_file_path.exists() # no error written on success
@patch("airflow.sdk.execution_time.supervisor.supervise")
+ @pytest.mark.skipif(AIRFLOW_V_3_3_PLUS, reason="Test is for Airflow <
3.3.0 where supervise was used")
@pytest.mark.asyncio
- async def test_supervise_launch_fail(
+ async def test_supervise_launch_fail_pre_3_3(
self,
mock_supervise,
worker_with_job: EdgeWorker,
+ tmp_path: Path,
):
mock_supervise.side_effect = Exception("Supervise failed")
+ worker_with_job.__dict__["_execution_api_server_url"] =
"https://mock-server/execution"
+ edge_job = worker_with_job.jobs.pop().edge_job
+ error_file_path = tmp_path / "fork-error.log"
+ result = worker_with_job._run_job_via_supervisor(edge_job.command,
error_file_path)
+
+ assert result == 1
+ assert error_file_path.exists()
+ assert "Supervise failed" in error_file_path.read_text()
+
+ @patch("airflow.executors.base_executor.BaseExecutor.run_workload")
+ @pytest.mark.skipif(
+ not AIRFLOW_V_3_3_PLUS, reason="Test is for Airflow >= 3.3.0 where
BaseExecutor.run_workload is used"
+ )
+ @pytest.mark.asyncio
+ async def test_supervise_launch_fail(
+ self,
+ mock_run_workload,
+ worker_with_job: EdgeWorker,
+ tmp_path: Path,
+ ):
+ mock_run_workload.side_effect = Exception("Supervise failed")
+ worker_with_job.__dict__["_execution_api_server_url"] =
"https://mock-server/execution"
edge_job = worker_with_job.jobs.pop().edge_job
- q = mock.MagicMock()
- result = worker_with_job._run_job_via_supervisor(edge_job.command, q)
+ error_file_path = tmp_path / "fork-error.log"
+ result = worker_with_job._run_job_via_supervisor(edge_job.command,
error_file_path)
assert result == 1
- q.put.assert_called_once()
+ assert error_file_path.exists()
+ assert "Supervise failed" in error_file_path.read_text()
@patch("airflow.providers.edge3.cli.worker.jobs_fetch")
- @patch("airflow.providers.edge3.cli.worker.EdgeWorker._launch_job",
return_value=(Process(), Queue()))
+ @patch("airflow.providers.edge3.cli.worker.EdgeWorker._launch_job")
@pytest.mark.asyncio
async def test_fetch_and_run_job_no_job(
self,
@@ -301,7 +423,7 @@ class TestEdgeWorker:
mock_launch_job.assert_not_called()
@patch("airflow.providers.edge3.cli.worker.jobs_fetch")
- @patch("airflow.providers.edge3.cli.worker.EdgeWorker._launch_job",
return_value=(Process(), Queue()))
+ @patch("airflow.providers.edge3.cli.worker.EdgeWorker._launch_job")
@patch("airflow.providers.edge3.cli.worker.jobs_set_state")
@patch("airflow.providers.edge3.cli.worker.EdgeWorker._push_logs_in_chunks")
@patch("airflow.providers.edge3.cli.worker.logs_push")
@@ -315,20 +437,20 @@ class TestEdgeWorker:
mock_jobs_set_state,
mock_launch_job,
mock_jobs_fetch,
+ tmp_path: Path,
worker_with_job: EdgeWorker,
):
- mock_jobs_fetch.side_effect = [
- EdgeJobFetched(
- dag_id="test",
- task_id="test",
- run_id="test",
- map_index=-1,
- try_number=1,
- concurrency_slots=1,
- command=MOCK_COMMAND, # type: ignore[arg-type]
- ),
- None,
- ]
+ edge_job = EdgeJobFetched(
+ dag_id="test",
+ task_id="test",
+ run_id="test",
+ map_index=-1,
+ try_number=1,
+ concurrency_slots=1,
+ command=MOCK_COMMAND, # type: ignore[arg-type]
+ )
+ mock_jobs_fetch.side_effect = [edge_job, None]
+ mock_launch_job.return_value = Job(edge_job, _MockProcess(), tmp_path
/ "mock.log")
worker_with_job.concurrency = 1 # only one job at a time
assert worker_with_job.free_concurrency == 0
@@ -337,7 +459,9 @@ class TestEdgeWorker:
mock_jobs_fetch.assert_called_once()
fetch_args = mock_jobs_fetch.call_args
assert fetch_args.args[3] is None # team_name should be None
- mock_launch_job.assert_called_once()
+ mock_launch_job.assert_called_once_with(
+ edge_job, edge_job.command, Path(worker_with_job.base_log_folder,
"mock.log")
+ )
assert mock_jobs_set_state.call_count == 2
mock_push_log_chunks.assert_called_once()
assert len(worker_with_job.jobs) == 1 # no new job added (was removed
at the end...)
@@ -348,110 +472,128 @@ class TestEdgeWorker:
@patch("airflow.providers.edge3.cli.worker.EdgeWorker._push_logs_in_chunks")
@patch("airflow.providers.edge3.cli.worker.logs_push")
@pytest.mark.asyncio
- async def test_fetch_and_run_job_possible_deadlock(
+ async def test_fetch_and_run_job_fork_failure_pushes_error_to_logs(
self,
mock_logs_push,
mock_push_log_chunks,
mock_jobs_set_state,
mock_jobs_fetch,
+ tmp_path: Path,
worker_with_job: EdgeWorker,
):
- """Verify that a large exception from the subprocess does not deadlock
fetch_and_run_job.
-
- Uses an explicit ``fork`` context to spawn the simulated worker
subprocess. Python 3.14
- flipped the POSIX default ``multiprocessing`` start method to
``forkserver``, which
- spawns a fresh interpreter for the child — patches applied in the test
process do not
- propagate, so older variants of this test that mocked ``supervise`` no
longer triggered
- the deadlock condition. Forking a small top-level target sidesteps
that and reproduces
- the actual queue-feeder/pipe-buffer deadlock the fix targets.
- """
- mock_jobs_fetch.side_effect = [
- EdgeJobFetched(
- dag_id="test",
- task_id="test",
- run_id="test",
- map_index=-1,
- try_number=1,
- concurrency_slots=1,
- command=MOCK_COMMAND, # type: ignore[arg-type]
- ),
- None,
- ]
- worker_with_job.concurrency = 1 # only one job at a time
- assert worker_with_job.free_concurrency == 0
+ edge_job = EdgeJobFetched(
+ dag_id="test",
+ task_id="test",
+ run_id="test",
+ map_index=-1,
+ try_number=1,
+ concurrency_slots=1,
+ command=MOCK_COMMAND, # type: ignore[arg-type]
+ )
+ mock_jobs_fetch.return_value = edge_job
+ worker_with_job.concurrency = 1
+ process = _MockProcess(returncode=1)
+ error_file_path = tmp_path / "fork-error.log"
+ error_file_path.write_text(
+ "Traceback (most recent call last):\n ...\nRuntimeError:
supervisor crashed\n"
+ )
+ launched_job = Job(edge_job, process, tmp_path / "mock.log",
stderr_file_path=error_file_path)
+
+ with patch.object(worker_with_job, "_launch_job",
return_value=launched_job):
+ await worker_with_job.fetch_and_run_job()
- ctx = multiprocessing.get_context("fork")
- results_queue = ctx.Queue()
- process = ctx.Process(target=_emit_large_exception_target,
args=(results_queue,))
-
- with patch.object(EdgeWorker, "_launch_job", return_value=(process,
results_queue)):
- process.start()
- try:
- await asyncio.wait_for(worker_with_job.fetch_and_run_job(),
timeout=10.0)
- except asyncio.TimeoutError:
- # Clean up any hanging subprocess to prevent blocking pytest
- if process.is_alive():
- process.terminate()
- process.join(timeout=1.0)
- if process.is_alive():
- process.kill()
- process.join()
- pytest.fail("fetch_and_run_job timed out after 10s - DEADLOCK
DETECTED. ")
- finally:
- if process.is_alive():
- process.join(timeout=1.0)
- if process.is_alive():
- process.terminate()
- process.join(timeout=1.0)
-
- # If we reach here without timeout, the deadlock was not triggered
- assert mock_jobs_set_state.call_count >= 1
- mock_push_log_chunks.assert_called()
- assert len(worker_with_job.jobs) <= 1 # new job removed, original
fixture job still there
+ mock_jobs_fetch.assert_called_once()
+ mock_push_log_chunks.assert_called_once()
+ assert mock_jobs_set_state.call_args_list[-1].args[1] ==
TaskInstanceState.FAILED
+ log_chunk_data = mock_logs_push.call_args.kwargs["log_chunk_data"]
+ assert "Task fork exited with code 1" in log_chunk_data
+ assert "RuntimeError: supervisor crashed" in log_chunk_data
+ assert not error_file_path.exists()
@patch("airflow.providers.edge3.cli.worker.jobs_fetch")
- @patch("airflow.providers.edge3.cli.worker.EdgeWorker._launch_job",
return_value=(Process(), Queue()))
+ @patch("airflow.providers.edge3.cli.worker.EdgeWorker._launch_job")
@patch("airflow.providers.edge3.cli.worker.jobs_set_state")
@patch("airflow.providers.edge3.cli.worker.EdgeWorker._push_logs_in_chunks")
@patch("airflow.providers.edge3.cli.worker.logs_push")
@patch.object(Job, "is_running", property(lambda _: False))
@patch.object(Job, "is_success", property(lambda _: False))
- @patch("traceback.format_exception", return_value=[])
@pytest.mark.asyncio
async def test_fetch_and_run_job_one_job_fail(
self,
- mock_traceback,
mock_logs_push,
mock_push_log_chunks,
mock_jobs_set_state,
mock_launch_job,
mock_jobs_fetch,
+ tmp_path: Path,
worker_with_job: EdgeWorker,
):
- mock_jobs_fetch.side_effect = [
- EdgeJobFetched(
- dag_id="test",
- task_id="test",
- run_id="test",
- map_index=-1,
- try_number=1,
- concurrency_slots=1,
- command=MOCK_COMMAND, # type: ignore[arg-type]
- ),
- None,
- ]
+ edge_job = EdgeJobFetched(
+ dag_id="test",
+ task_id="test",
+ run_id="test",
+ map_index=-1,
+ try_number=1,
+ concurrency_slots=1,
+ command=MOCK_COMMAND, # type: ignore[arg-type]
+ )
+ mock_jobs_fetch.side_effect = [edge_job, None]
+ mock_launch_job.return_value = Job(edge_job, _MockProcess(), tmp_path
/ "mock.log")
worker_with_job.concurrency = 1 # only one job at a time
assert worker_with_job.free_concurrency == 0
await worker_with_job.fetch_and_run_job()
mock_jobs_fetch.assert_called_once()
- mock_launch_job.assert_called_once()
+ mock_launch_job.assert_called_once_with(
+ edge_job, edge_job.command, Path(worker_with_job.base_log_folder,
"mock.log")
+ )
assert mock_jobs_set_state.call_count == 2
mock_push_log_chunks.assert_called_once()
assert len(worker_with_job.jobs) == 1 # no new job added (was removed
at the end...)
mock_logs_push.assert_called_once()
+ @patch("airflow.providers.edge3.cli.worker.jobs_fetch")
+ @patch("airflow.providers.edge3.cli.worker.jobs_set_state")
+
@patch("airflow.providers.edge3.cli.worker.EdgeWorker._push_logs_in_chunks")
+ @patch("airflow.providers.edge3.cli.worker.logs_push")
+ @pytest.mark.asyncio
+ async def test_fetch_and_run_job_subprocess_failure_pushes_stderr_to_logs(
+ self,
+ mock_logs_push,
+ mock_push_log_chunks,
+ mock_jobs_set_state,
+ mock_jobs_fetch,
+ tmp_path: Path,
+ worker_with_job: EdgeWorker,
+ ):
+ edge_job = EdgeJobFetched(
+ dag_id="test",
+ task_id="test",
+ run_id="test",
+ map_index=-1,
+ try_number=1,
+ concurrency_slots=1,
+ command=MOCK_COMMAND, # type: ignore[arg-type]
+ )
+ mock_jobs_fetch.return_value = edge_job
+ worker_with_job.concurrency = 1
+ process = _MockPopen(returncode=1, pid=5678)
+ stderr_file_path = tmp_path / "subprocess-stderr.log"
+ stderr_file_path.write_text("ModuleNotFoundError: No module named
'common'\n")
+ launched_job = Job(edge_job, process, tmp_path / "mock.log",
stderr_file_path=stderr_file_path)
+
+ with patch.object(worker_with_job, "_launch_job",
return_value=launched_job):
+ await worker_with_job.fetch_and_run_job()
+
+ mock_jobs_fetch.assert_called_once()
+ mock_push_log_chunks.assert_called_once()
+ assert mock_jobs_set_state.call_args_list[-1].args[1] ==
TaskInstanceState.FAILED
+ log_chunk_data = mock_logs_push.call_args.kwargs["log_chunk_data"]
+ assert "Task subprocess exited with code 1" in log_chunk_data
+ assert "ModuleNotFoundError: No module named 'common'" in
log_chunk_data
+ assert not stderr_file_path.exists()
+
@time_machine.travel(datetime.now(), tick=False)
@patch("airflow.providers.edge3.cli.worker.logs_push")
@pytest.mark.asyncio
@@ -929,7 +1071,7 @@ class TestSignalHandling:
):
rc = worker._run_job_via_supervisor(
workload=self._make_workload(),
- results_queue=mock.MagicMock(),
+ error_file_path=tmp_path / "fork-error.log",
)
assert rc == 0
assert [c.args[0] for c in order.call_args_list] == ["reset",
"setpgrp", "supervise"]