diogosilva30 commented on code in PR #65943:
URL: https://github.com/apache/airflow/pull/65943#discussion_r3201587817
##########
providers/edge3/src/airflow/providers/edge3/cli/worker.py:
##########
@@ -447,17 +451,77 @@ def _run_job_via_supervisor(self, workload: ExecuteTask,
results_queue: Queue) -
results_queue.put(e)
return 1
- def _launch_job(self, workload: ExecuteTask) -> tuple[Process,
Queue[Exception]]:
+ def _launch_job_subprocess(self, workload: ExecuteTask) ->
subprocess.Popen:
+ """Launch workload via a fresh Python interpreter
(subprocess.Popen)."""
+ env = os.environ.copy()
+ if self._execution_api_server_url:
+ env["AIRFLOW__CORE__EXECUTION_API_SERVER_URL"] =
self._execution_api_server_url
+
+ # Keep stderr off a PIPE: the worker only inspects stderr after the
task finishes,
+ # so a verbose child could otherwise fill the pipe buffer and block
forever.
+ with tempfile.NamedTemporaryFile(
+ prefix="airflow-edge-task-stderr-", suffix=".log", delete=False
+ ) as stderr_file:
+ stderr_file_path = Path(stderr_file.name)
+ try:
+ process = subprocess.Popen(
+ [
+ sys.executable,
+ "-m",
+ "airflow.sdk.execution_time.execute_workload",
+ "--json-string",
+ workload.model_dump_json(),
+ ],
+ env=env,
+ start_new_session=True,
+ stderr=stderr_file,
Review Comment:
Good question. Used a temp file because stderr is the only parent-visible
diagnostic channel for the fresh-interpreter path, and we want those
diagnostics attached to the task that failed.
In the fork path, the child can return an exception object through the
multiprocessing result queue. In the subprocess path, the child is a separate
Python interpreter running `execute_workload`, so it cannot send that Python
exception object back to the Edge worker. If something fails early, especially
during workload parsing, supervisor startup, plugin import, or Dag import,
stderr is what preserves the traceback.
We could pass `sys.__stderr__` like Celery does, but then output from all
concurrently running task subprocesses would share the Edge worker’s stderr.
That means a traceback could end up only in the worker/container log,
potentially interleaved with other task subprocesses and worker logs, and not
attached to the failed task’s log.
The temp file is a per-task spool: it avoids `subprocess.PIPE` (which can
deadlock if the parent does not continuously drain it), keeps stderr
attributable to the specific task subprocess, and lets us push those startup
diagnostics into the task log via `logs_push` after the process exits.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]