This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 24811f729f0 Make task output "unbuffered" so output is captured 
straight away (#44186)
24811f729f0 is described below

commit 24811f729f0e20dfff1be9afa8bf4a60b44fe628
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Tue Nov 19 13:42:33 2024 +0000

    Make task output "unbuffered" so output is captured straight away (#44186)
    
    Without this change a dag like this:
    
    ```
    @task()
    def hello():
        print("hello")
        time.sleep(300)
        print("goodbye")
    ```
    
    would not show the output for "hello" until after the sleep!
    
    This is analogouys to setting PYTHONUNBUFFERED environment variable when
    running something like `python script.py | cat` etc.
---
 .../src/airflow/sdk/execution_time/supervisor.py   |  5 ++--
 task_sdk/tests/execution_time/test_supervisor.py   | 28 +++++++++++-----------
 2 files changed, 17 insertions(+), 16 deletions(-)

diff --git a/task_sdk/src/airflow/sdk/execution_time/supervisor.py 
b/task_sdk/src/airflow/sdk/execution_time/supervisor.py
index 7faddebb25c..f2715ad3e5d 100644
--- a/task_sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task_sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -149,8 +149,9 @@ def _reopen_std_io_handles(child_stdin, child_stdout, 
child_stderr):
                 fd = sock.fileno()
             else:
                 raise
-
-        setattr(sys, handle_name, os.fdopen(fd, mode))
+        # We can't open text mode fully unbuffered (python throws an exception 
if we try), but we can make it line buffered with `buffering=1`
+        handle = os.fdopen(fd, mode, buffering=1)
+        setattr(sys, handle_name, handle)
 
 
 def _fork_main(
diff --git a/task_sdk/tests/execution_time/test_supervisor.py 
b/task_sdk/tests/execution_time/test_supervisor.py
index 5ed51fece51..edd62d17222 100644
--- a/task_sdk/tests/execution_time/test_supervisor.py
+++ b/task_sdk/tests/execution_time/test_supervisor.py
@@ -51,30 +51,30 @@ class TestWatchedSubprocess:
         # Ignore anything lower than INFO for this test. Captured_logs resets 
things for us afterwards
         
structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(logging.INFO))
 
-        line = lineno()
-
         def subprocess_main():
             # This is run in the subprocess!
 
             # Ensure we follow the "protocol" and get the startup message 
before we do anything
             sys.stdin.readline()
 
-            # Flush calls are to ensure ordering of output for predictable 
tests
             import logging
             import warnings
 
             print("I'm a short message")
             sys.stdout.write("Message ")
-            sys.stdout.write("split across two writes\n")
-            sys.stdout.flush()
-
             print("stderr message", file=sys.stderr)
-            sys.stderr.flush()
+            # We need a short sleep for the main process to process things. I 
worry this timining will be
+            # fragile, but I can't think of a better way. This lets the stdout 
be read (partial line) and the
+            # stderr full line be read
+            sleep(0.1)
+            sys.stdout.write("split across two writes\n")
 
             logging.getLogger("airflow.foobar").error("An error message")
 
             warnings.warn("Warning should be captured too", stacklevel=1)
 
+        line = lineno() - 2  # Line the error should be on
+
         instant = tz.datetime(2024, 11, 7, 12, 34, 56, 78901)
         time_machine.move_to(instant, tick=False)
 
@@ -103,16 +103,16 @@ class TestWatchedSubprocess:
                 "timestamp": "2024-11-07T12:34:56.078901Z",
             },
             {
-                "chan": "stdout",
-                "event": "Message split across two writes",
-                "level": "info",
+                "chan": "stderr",
+                "event": "stderr message",
+                "level": "error",
                 "logger": "task",
                 "timestamp": "2024-11-07T12:34:56.078901Z",
             },
             {
-                "chan": "stderr",
-                "event": "stderr message",
-                "level": "error",
+                "chan": "stdout",
+                "event": "Message split across two writes",
+                "level": "info",
                 "logger": "task",
                 "timestamp": "2024-11-07T12:34:56.078901Z",
             },
@@ -127,7 +127,7 @@ class TestWatchedSubprocess:
                 "event": "Warning should be captured too",
                 "filename": __file__,
                 "level": "warning",
-                "lineno": line + 22,
+                "lineno": line,
                 "logger": "py.warnings",
                 "timestamp": instant.replace(tzinfo=None),
             },

Reply via email to