melicheradam opened a new pull request, #68441:
URL: https://github.com/apache/airflow/pull/68441
feat(standard): forward log levels from virtualenv subprocess stdout
Parse standard Python logging prefixes (DEBUG/INFO/WARNING/ERROR/CRITICAL)
from subprocess stdout in `_execute_in_subprocess` and re-emit each line at
the matching Airflow log level instead of collapsing everything to INFO.
Adds `_log_subprocess_line` helper with a compiled regex, wires it into the
stdout loop, and documents the recommended formatter setup (including
traceback forwarding via `sys.excepthook`) for `PythonVirtualenvOperator`
and `ExternalPythonOperator`.
## Problem
`PythonVirtualenvOperator` and `ExternalPythonOperator` run user code in a
subprocess and stream stdout back to the Airflow logger. Previously every
line was logged at `INFO` regardless of severity, so `WARNING`/`ERROR` log
lines from user code were invisible at elevated log-level filters and mixed
into the INFO stream.
## Solution
`_execute_in_subprocess` now calls `_log_subprocess_line` for each output
line. The helper matches the standard Python logging prefix format
(`LEVEL: message`) and calls `log.log(level, ...)` accordingly. Lines
without a recognised prefix fall back to `INFO` — no behaviour change for
existing code.
Users opt in by configuring their callable's formatter:
```python
class _PrefixAllLinesFormatter(logging.Formatter):
def format(self, record):
msg = super().format(record)
prefix = record.levelname + ": "
return "\n".join(prefix + line for line in msg.splitlines())
handler = logging.StreamHandler()
handler.setFormatter(_PrefixAllLinesFormatter())
logging.root.addHandler(handler)
```
Traceback lines from unhandled top-level exceptions are forwarded via a
sys.excepthook override documented in the operator how-to page.
## Changes
- python_virtualenv.py: add _LEVEL_PREFIX_RE, _log_subprocess_line,
wire into subprocess stdout loop
- test_python_virtualenv.py: TestLogSubprocessLine — 9 parametrised
cases covering all five levels, fallback, wrong case, wrong prefix name,
and empty string
- docs/operators/python.rst: "Log level forwarding" subsection added to
both PythonVirtualenvOperator and ExternalPythonOperator sections,
with collapsible snippet showing formatter + excepthook setup
### Original behaviour
<img width="887" height="427" alt="obrázok"
src="https://github.com/user-attachments/assets/c80bf12c-b60c-4279-a791-b81681812350"
/>
### Updated behaviour
<img width="883" height="457" alt="obrázok"
src="https://github.com/user-attachments/assets/1183ff79-da1a-4bcc-a0fe-31c4e747d3a9"
/>
### Example DAG to test
```python
from datetime import datetime
import logging
from airflow.sdk import DAG, task
with DAG(
dag_id="venv_dag",
start_date=datetime(2024, 1, 1),
schedule=None,
catchup=False,
):
@task.virtualenv(task_id="venv_task", requirements=[],
system_site_packages=False)
def venv_task():
import logging
import sys
import traceback as _tb
class PrefixAllLinesFormatter(logging.Formatter):
def format(self, record):
msg = super().format(record)
prefix = record.levelname + ": "
return "\n".join(prefix + line for line in msg.splitlines())
handler = logging.StreamHandler()
handler.setFormatter(PrefixAllLinesFormatter())
logging.root.addHandler(handler)
def _logging_excepthook(exc_type, exc_value, exc_tb):
logging.error("".join(_tb.format_exception(exc_type, exc_value,
exc_tb)).rstrip())
sys.excepthook = _logging_excepthook
logging.basicConfig(format="%(levelname)s: %(message)s")
logging.warning("This will appear as WARNING in Airflow logs")
logging.error("This will appear as ERROR in Airflow logs")
logging.exception("This will appear as ERROR with stack trace in
Airflow logs")
try:
raise Exception("This will appear as ERROR in Airflow logs")
except Exception as e:
logging.exception("Caught an exception: %s", e)
raise e
@task.virtualenv(task_id="venv_task_old", requirements=[],
system_site_packages=False)
def venv_task_old():
import logging
logging.warning("This will appear as WARNING in Airflow logs")
logging.error("This will appear as ERROR in Airflow logs")
logging.exception("This will appear as ERROR with stack trace in
Airflow logs")
try:
raise Exception("This will appear as ERROR in Airflow logs")
except Exception as e:
logging.exception("Caught an exception: %s", e)
raise e
venv_task()
venv_task_old()
```
---
- [x] Yes (please specify the tool below)
Co-authored by: Claude (claude.ai/claude-code) following the guidelines
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]