diogosilva30 commented on code in PR #65943:
URL: https://github.com/apache/airflow/pull/65943#discussion_r3217852802


##########
providers/edge3/src/airflow/providers/edge3/cli/worker.py:
##########
@@ -447,17 +451,77 @@ def _run_job_via_supervisor(self, workload: ExecuteTask, 
results_queue: Queue) -
             results_queue.put(e)
             return 1
 
-    def _launch_job(self, workload: ExecuteTask) -> tuple[Process, 
Queue[Exception]]:
+    def _launch_job_subprocess(self, workload: ExecuteTask) -> 
subprocess.Popen:
+        """Launch workload via a fresh Python interpreter 
(subprocess.Popen)."""
+        env = os.environ.copy()
+        if self._execution_api_server_url:
+            env["AIRFLOW__CORE__EXECUTION_API_SERVER_URL"] = 
self._execution_api_server_url
+
+        # Keep stderr off a PIPE: the worker only inspects stderr after the 
task finishes,
+        # so a verbose child could otherwise fill the pipe buffer and block 
forever.
+        with tempfile.NamedTemporaryFile(
+            prefix="airflow-edge-task-stderr-", suffix=".log", delete=False
+        ) as stderr_file:
+            stderr_file_path = Path(stderr_file.name)
+            try:
+                process = subprocess.Popen(
+                    [
+                        sys.executable,
+                        "-m",
+                        "airflow.sdk.execution_time.execute_workload",
+                        "--json-string",
+                        workload.model_dump_json(),
+                    ],
+                    env=env,
+                    start_new_session=True,
+                    stderr=stderr_file,

Review Comment:
   Thanks for the suggestion! Before I push I just wanted to confirm the 
approach I've taken is what you had in mind:
   
   - Removed the `multiprocessing.Queue` from the fork path entirely.
   - Both paths now write failure traceback text to a named temp file (`Path`) 
stored as `Job.stderr_file_path`.
   - `Job.failure_details()` reads from that file for both paths — no arguments 
needed, one code path.
   - On success the file is cleaned up via `Job.cleanup()`.
   - The helper `_make_task_temp_file(prefix)` creates the file once and is 
shared by both `_launch_job_subprocess` and `_launch_job_fork`.
   
   Does that match what you had in mind, or would you like any adjustments?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to