diogosilva30 commented on code in PR #65943:
URL: https://github.com/apache/airflow/pull/65943#discussion_r3201587817


##########
providers/edge3/src/airflow/providers/edge3/cli/worker.py:
##########
@@ -447,17 +451,77 @@ def _run_job_via_supervisor(self, workload: ExecuteTask, 
results_queue: Queue) -
             results_queue.put(e)
             return 1
 
-    def _launch_job(self, workload: ExecuteTask) -> tuple[Process, 
Queue[Exception]]:
+    def _launch_job_subprocess(self, workload: ExecuteTask) -> 
subprocess.Popen:
+        """Launch workload via a fresh Python interpreter 
(subprocess.Popen)."""
+        env = os.environ.copy()
+        if self._execution_api_server_url:
+            env["AIRFLOW__CORE__EXECUTION_API_SERVER_URL"] = 
self._execution_api_server_url
+
+        # Keep stderr off a PIPE: the worker only inspects stderr after the 
task finishes,
+        # so a verbose child could otherwise fill the pipe buffer and block 
forever.
+        with tempfile.NamedTemporaryFile(
+            prefix="airflow-edge-task-stderr-", suffix=".log", delete=False
+        ) as stderr_file:
+            stderr_file_path = Path(stderr_file.name)
+            try:
+                process = subprocess.Popen(
+                    [
+                        sys.executable,
+                        "-m",
+                        "airflow.sdk.execution_time.execute_workload",
+                        "--json-string",
+                        workload.model_dump_json(),
+                    ],
+                    env=env,
+                    start_new_session=True,
+                    stderr=stderr_file,

Review Comment:
   Good question. Used a temp file because stderr is the only parent-visible 
diagnostic channel for the fresh-interpreter path, and we want those 
diagnostics attached to the task that failed.
   
   In the fork path, the child can return an exception object through the 
multiprocessing result queue. In the subprocess path, the child is a separate 
Python interpreter running `execute_workload`, so it cannot send that Python 
exception object back to the Edge worker. If something fails early, especially 
during workload parsing, supervisor startup, plugin import, or Dag import, 
stderr is what preserves the traceback.
   
   We could pass `sys.__stderr__` like Celery does, but then output from all 
concurrently running task subprocesses would share the Edge worker’s stderr. 
That means a traceback could end up only in the worker/container log, 
potentially interleaved with other task subprocesses and worker logs, and not 
attached to the failed task’s log.
   
   The temp file is a per-task spool: it avoids `subprocess.PIPE` (which can 
deadlock if the parent does not continuously drain it), keeps stderr 
attributable to the specific task subprocess, and lets us push those startup 
diagnostics into the task log via `logs_push` after the process exits.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to