jscheffl commented on code in PR #65943:
URL: https://github.com/apache/airflow/pull/65943#discussion_r3222322431
##########
providers/edge3/src/airflow/providers/edge3/cli/worker.py:
##########
@@ -447,17 +451,77 @@ def _run_job_via_supervisor(self, workload: ExecuteTask,
results_queue: Queue) -
results_queue.put(e)
return 1
- def _launch_job(self, workload: ExecuteTask) -> tuple[Process,
Queue[Exception]]:
+ def _launch_job_subprocess(self, workload: ExecuteTask) ->
subprocess.Popen:
+ """Launch workload via a fresh Python interpreter
(subprocess.Popen)."""
+ env = os.environ.copy()
+ if self._execution_api_server_url:
+ env["AIRFLOW__CORE__EXECUTION_API_SERVER_URL"] =
self._execution_api_server_url
+
+ # Keep stderr off a PIPE: the worker only inspects stderr after the
task finishes,
+ # so a verbose child could otherwise fill the pipe buffer and block
forever.
+ with tempfile.NamedTemporaryFile(
+ prefix="airflow-edge-task-stderr-", suffix=".log", delete=False
+ ) as stderr_file:
+ stderr_file_path = Path(stderr_file.name)
+ try:
+ process = subprocess.Popen(
+ [
+ sys.executable,
+ "-m",
+ "airflow.sdk.execution_time.execute_workload",
+ "--json-string",
+ workload.model_dump_json(),
+ ],
+ env=env,
+ start_new_session=True,
+ stderr=stderr_file,
Review Comment:
Yes. Because:
- Queue might be un-needed complexity - we "just" want to bring over some
error text for logs to upload
- File is low-end and both available for forks and subprocesses
- We recently had a deadlock because of queue and buffer, and this is not a
problem with a bare tempfile
Your sketched approach is what I thought of, will reduce complexity. And err
file only needs to be created/filled if there is an error, in case of "all is
good" the task logs are sufficient which are existing in parallle. the erro
file is just a recovery on top of task logs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]