This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 24811f729f0 Make task output "unbuffered" so output is captured
straight away (#44186)
24811f729f0 is described below
commit 24811f729f0e20dfff1be9afa8bf4a60b44fe628
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Tue Nov 19 13:42:33 2024 +0000
Make task output "unbuffered" so output is captured straight away (#44186)
Without this change a dag like this:
```
@task()
def hello():
print("hello")
time.sleep(300)
print("goodbye")
```
would not show the output for "hello" until after the sleep!
This is analogouys to setting PYTHONUNBUFFERED environment variable when
running something like `python script.py | cat` etc.
---
.../src/airflow/sdk/execution_time/supervisor.py | 5 ++--
task_sdk/tests/execution_time/test_supervisor.py | 28 +++++++++++-----------
2 files changed, 17 insertions(+), 16 deletions(-)
diff --git a/task_sdk/src/airflow/sdk/execution_time/supervisor.py
b/task_sdk/src/airflow/sdk/execution_time/supervisor.py
index 7faddebb25c..f2715ad3e5d 100644
--- a/task_sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task_sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -149,8 +149,9 @@ def _reopen_std_io_handles(child_stdin, child_stdout,
child_stderr):
fd = sock.fileno()
else:
raise
-
- setattr(sys, handle_name, os.fdopen(fd, mode))
+ # We can't open text mode fully unbuffered (python throws an exception
if we try), but we can make it line buffered with `buffering=1`
+ handle = os.fdopen(fd, mode, buffering=1)
+ setattr(sys, handle_name, handle)
def _fork_main(
diff --git a/task_sdk/tests/execution_time/test_supervisor.py
b/task_sdk/tests/execution_time/test_supervisor.py
index 5ed51fece51..edd62d17222 100644
--- a/task_sdk/tests/execution_time/test_supervisor.py
+++ b/task_sdk/tests/execution_time/test_supervisor.py
@@ -51,30 +51,30 @@ class TestWatchedSubprocess:
# Ignore anything lower than INFO for this test. Captured_logs resets
things for us afterwards
structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(logging.INFO))
- line = lineno()
-
def subprocess_main():
# This is run in the subprocess!
# Ensure we follow the "protocol" and get the startup message
before we do anything
sys.stdin.readline()
- # Flush calls are to ensure ordering of output for predictable
tests
import logging
import warnings
print("I'm a short message")
sys.stdout.write("Message ")
- sys.stdout.write("split across two writes\n")
- sys.stdout.flush()
-
print("stderr message", file=sys.stderr)
- sys.stderr.flush()
+ # We need a short sleep for the main process to process things. I
worry this timining will be
+ # fragile, but I can't think of a better way. This lets the stdout
be read (partial line) and the
+ # stderr full line be read
+ sleep(0.1)
+ sys.stdout.write("split across two writes\n")
logging.getLogger("airflow.foobar").error("An error message")
warnings.warn("Warning should be captured too", stacklevel=1)
+ line = lineno() - 2 # Line the error should be on
+
instant = tz.datetime(2024, 11, 7, 12, 34, 56, 78901)
time_machine.move_to(instant, tick=False)
@@ -103,16 +103,16 @@ class TestWatchedSubprocess:
"timestamp": "2024-11-07T12:34:56.078901Z",
},
{
- "chan": "stdout",
- "event": "Message split across two writes",
- "level": "info",
+ "chan": "stderr",
+ "event": "stderr message",
+ "level": "error",
"logger": "task",
"timestamp": "2024-11-07T12:34:56.078901Z",
},
{
- "chan": "stderr",
- "event": "stderr message",
- "level": "error",
+ "chan": "stdout",
+ "event": "Message split across two writes",
+ "level": "info",
"logger": "task",
"timestamp": "2024-11-07T12:34:56.078901Z",
},
@@ -127,7 +127,7 @@ class TestWatchedSubprocess:
"event": "Warning should be captured too",
"filename": __file__,
"level": "warning",
- "lineno": line + 22,
+ "lineno": line,
"logger": "py.warnings",
"timestamp": instant.replace(tzinfo=None),
},