ashb commented on code in PR #55258:
URL: https://github.com/apache/airflow/pull/55258#discussion_r2322386357


##########
providers/standard/src/airflow/providers/standard/operators/python.py:
##########
@@ -575,10 +575,38 @@ def _execute_python_callable_in_subprocess(self, 
python_path: Path):
                     os.fspath(termination_log_path),
                     os.fspath(airflow_context_path),
                 ]
-                execute_in_subprocess(
-                    cmd=cmd,
+                self.log.info("Executing virtualenv script: %s", cmd)
+                proc = subprocess.Popen(
+                    cmd,
                     env=env_vars,
+                    stdout=subprocess.PIPE,
+                    stderr=subprocess.PIPE,
+                    text=True,
+                    bufsize=1,
                 )
+                assert proc.stdout is not None
+                assert proc.stderr is not None

Review Comment:
   ```suggestion
   ```



##########
providers/standard/src/airflow/providers/standard/operators/python.py:
##########
@@ -575,10 +575,38 @@ def _execute_python_callable_in_subprocess(self, 
python_path: Path):
                     os.fspath(termination_log_path),
                     os.fspath(airflow_context_path),
                 ]
-                execute_in_subprocess(
-                    cmd=cmd,
+                self.log.info("Executing virtualenv script: %s", cmd)
+                proc = subprocess.Popen(
+                    cmd,
                     env=env_vars,
+                    stdout=subprocess.PIPE,
+                    stderr=subprocess.PIPE,
+                    text=True,
+                    bufsize=1,
                 )
+                assert proc.stdout is not None
+                assert proc.stderr is not None
+
+                # Stream stdout → INFO
+                for line in proc.stdout:
+                    if line.strip():
+                        self.log.info("[venv stdout] %s", line.rstrip())
+
+                # Stream stderr → ERROR
+                for line in proc.stderr:
+                    if line.strip():
+                        self.log.error("[venv stderr] %s", line.rstrip())

Review Comment:
   This isn't right -- it will only produce the output once the process has 
finished. If this is a long running task that means nothing will appear until 
the venv task is finished.
   
   However, this is also not the approach I want us to use. The way logging 
works from tasks in Airflow 3 is now as follows:
   
   - The supervisor process (which is the parent process of the code running 
here, not this process -- we are in a task process here) listens to stdout and 
stderr and then writes them to the task log file. You have probable seen these 
as they have `chan=stdout` etc on them
   - Logging is configured to output JSON logs over a new anonymous socket 
(other end is connected to the supervisor process) and anything logging related 
gets send over this.
   
   I'm afraid I don't have time right now to be more specific, but we really 
want to do some thing like this 
https://github.com/apache/airflow/blob/e9a895cee533dd91556f84baba06f60bbd75ece6/task-sdk/src/airflow/sdk/execution_time/task_runner.py#L708-L726
 so that logs can be produced directly from within the venv.
   
   Of course that will only work if Airflow is found in the venv, so maybe we 
need the approach I describe if it's available and something like this (but as 
it comes, only not once at the end) if it's not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to