melicheradam opened a new pull request, #68441:
URL: https://github.com/apache/airflow/pull/68441

   feat(standard): forward log levels from virtualenv subprocess stdout
   
     Parse standard Python logging prefixes (DEBUG/INFO/WARNING/ERROR/CRITICAL)
     from subprocess stdout in `_execute_in_subprocess` and re-emit each line at
     the matching Airflow log level instead of collapsing everything to INFO.
   
     Adds `_log_subprocess_line` helper with a compiled regex, wires it into the
     stdout loop, and documents the recommended formatter setup (including
     traceback forwarding via `sys.excepthook`) for `PythonVirtualenvOperator`
     and `ExternalPythonOperator`.
   
     ## Problem
   
     `PythonVirtualenvOperator` and `ExternalPythonOperator` run user code in a
     subprocess and stream stdout back to the Airflow logger. Previously every
     line was logged at `INFO` regardless of severity, so `WARNING`/`ERROR` log
     lines from user code were invisible at elevated log-level filters and mixed
     into the INFO stream.
   
     ## Solution
   
     `_execute_in_subprocess` now calls `_log_subprocess_line` for each output
     line. The helper matches the standard Python logging prefix format
     (`LEVEL: message`) and calls `log.log(level, ...)` accordingly. Lines
     without a recognised prefix fall back to `INFO` — no behaviour change for
     existing code.
   
     Users opt in by configuring their callable's formatter:
   
     ```python
     class _PrefixAllLinesFormatter(logging.Formatter):
         def format(self, record):
             msg = super().format(record)
             prefix = record.levelname + ": "
             return "\n".join(prefix + line for line in msg.splitlines())
   
     handler = logging.StreamHandler()
     handler.setFormatter(_PrefixAllLinesFormatter())
     logging.root.addHandler(handler)
   ```
   
     Traceback lines from unhandled top-level exceptions are forwarded via a
     sys.excepthook override documented in the operator how-to page.
   
     ## Changes
   
     - python_virtualenv.py: add _LEVEL_PREFIX_RE, _log_subprocess_line,
     wire into subprocess stdout loop
     - test_python_virtualenv.py: TestLogSubprocessLine — 9 parametrised
     cases covering all five levels, fallback, wrong case, wrong prefix name,
     and empty string
     - docs/operators/python.rst: "Log level forwarding" subsection added to
     both PythonVirtualenvOperator and ExternalPythonOperator sections,
     with collapsible snippet showing formatter + excepthook setup
   
    ### Original behaviour
    <img width="887" height="427" alt="obrázok" 
src="https://github.com/user-attachments/assets/c80bf12c-b60c-4279-a791-b81681812350";
 />
    
    ### Updated behaviour
    <img width="883" height="457" alt="obrázok" 
src="https://github.com/user-attachments/assets/1183ff79-da1a-4bcc-a0fe-31c4e747d3a9";
 />
   
    ### Example DAG to test
    ```python
    from datetime import datetime
   import logging
   from airflow.sdk import DAG, task
   
   
   with DAG(
       dag_id="venv_dag",
       start_date=datetime(2024, 1, 1),
       schedule=None,
       catchup=False,
   ):
       @task.virtualenv(task_id="venv_task", requirements=[], 
system_site_packages=False)
       def venv_task():
   
           import logging
           import sys
           import traceback as _tb
   
           class PrefixAllLinesFormatter(logging.Formatter):
               def format(self, record):
                   msg = super().format(record)
                   prefix = record.levelname + ": "
                   return "\n".join(prefix + line for line in msg.splitlines())
   
           handler = logging.StreamHandler()
           handler.setFormatter(PrefixAllLinesFormatter())
           logging.root.addHandler(handler)
   
           def _logging_excepthook(exc_type, exc_value, exc_tb):
               logging.error("".join(_tb.format_exception(exc_type, exc_value, 
exc_tb)).rstrip())
   
           sys.excepthook = _logging_excepthook
           logging.basicConfig(format="%(levelname)s: %(message)s")
           logging.warning("This will appear as WARNING in Airflow logs")
           logging.error("This will appear as ERROR in Airflow logs")
           logging.exception("This will appear as ERROR with stack trace in 
Airflow logs")
           try:
               raise Exception("This will appear as ERROR in Airflow logs")
           except Exception as e:
               logging.exception("Caught an exception: %s", e)
               raise e
   
       @task.virtualenv(task_id="venv_task_old", requirements=[], 
system_site_packages=False)
       def venv_task_old():
           import logging
   
           logging.warning("This will appear as WARNING in Airflow logs")
           logging.error("This will appear as ERROR in Airflow logs")
           logging.exception("This will appear as ERROR with stack trace in 
Airflow logs")
           try:
               raise Exception("This will appear as ERROR in Airflow logs")
           except Exception as e:
               logging.exception("Caught an exception: %s", e)
               raise e
   
       venv_task()
       venv_task_old()
   
   
    ```
     ---
     - [x] Yes (please specify the tool below)
   
     Co-authored by: Claude (claude.ai/claude-code) following the guidelines
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to