This is an automated email from the ASF dual-hosted git repository.

shahar pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 602c9976ff8 Use TaskFormatter for Celery log formatting instead of 
default Formatter (#61701)
602c9976ff8 is described below

commit 602c9976ff8332bbdfe856a41d1aaf13b3630ffd
Author: Aaron Chen <[email protected]>
AuthorDate: Tue Feb 10 08:43:52 2026 -0800

    Use TaskFormatter for Celery log formatting instead of default Formatter 
(#61701)
    
    Co-authored-by: Shahar Epstein <[email protected]>
---
 .../airflow/providers/celery/cli/celery_command.py |  3 +-
 .../tests/unit/celery/cli/test_celery_command.py   | 52 ++++++++++++++++++++++
 2 files changed, 54 insertions(+), 1 deletion(-)

diff --git 
a/providers/celery/src/airflow/providers/celery/cli/celery_command.py 
b/providers/celery/src/airflow/providers/celery/cli/celery_command.py
index 195aa4d9329..46713180c8a 100644
--- a/providers/celery/src/airflow/providers/celery/cli/celery_command.py
+++ b/providers/celery/src/airflow/providers/celery/cli/celery_command.py
@@ -29,6 +29,7 @@ import psutil
 import sqlalchemy.exc
 from celery import maybe_patch_concurrency
 from celery.app.defaults import DEFAULT_TASK_LOG_FMT
+from celery.app.log import TaskFormatter
 from celery.signals import after_setup_logger
 from lockfile.pidlockfile import read_pid_from_pidfile, remove_existing_pidfile
 
@@ -166,7 +167,7 @@ def logger_setup_handler(logger, **kwargs):
     * logs of severity lower than error goes to stdout.
     """
     if conf.getboolean("logging", "celery_stdout_stderr_separation", 
fallback=False):
-        celery_formatter = logging.Formatter(DEFAULT_TASK_LOG_FMT)
+        celery_formatter = TaskFormatter(DEFAULT_TASK_LOG_FMT)
 
         class NoErrorOrAboveFilter(logging.Filter):
             """Allow only logs with level *lower* than ERROR to be reported."""
diff --git a/providers/celery/tests/unit/celery/cli/test_celery_command.py 
b/providers/celery/tests/unit/celery/cli/test_celery_command.py
index e8bfa698463..c40f5696a59 100644
--- a/providers/celery/tests/unit/celery/cli/test_celery_command.py
+++ b/providers/celery/tests/unit/celery/cli/test_celery_command.py
@@ -20,6 +20,7 @@ from __future__ import annotations
 import contextlib
 import importlib
 import json
+import logging
 import os
 import sys
 from io import StringIO
@@ -632,3 +633,54 @@ def test_stale_bundle_cleanup(mock_process):
     assert len(calls) == 1
     actual = [x.kwargs["target"] for x in calls]
     assert actual[0].__name__ == "bundle_cleanup_main"
+
+
+class TestLoggerSetupHandler:
+    """Tests for logger_setup_handler that configures celery logging."""
+
+    def 
test_logger_setup_handler_uses_task_formatter_when_separation_enabled(self):
+        """When celery_stdout_stderr_separation is True, handlers should use 
TaskFormatter."""
+        from celery.app.log import TaskFormatter
+
+        logger = MagicMock(spec=logging.Logger)
+        logger.handlers = []
+
+        with conf_vars({("logging", "celery_stdout_stderr_separation"): 
"True"}):
+            celery_command.logger_setup_handler(logger)
+
+        assert len(logger.handlers) == 2
+        stdout_handler, stderr_handler = logger.handlers
+        assert isinstance(stdout_handler.formatter, TaskFormatter)
+        assert isinstance(stderr_handler.formatter, TaskFormatter)
+
+    def test_logger_setup_handler_stdout_stderr_split(self):
+        """Verify stdout handler filters errors out, stderr handler only gets 
errors and above."""
+        logger = MagicMock(spec=logging.Logger)
+        logger.handlers = []
+
+        with conf_vars({("logging", "celery_stdout_stderr_separation"): 
"True"}):
+            celery_command.logger_setup_handler(logger)
+
+        stdout_handler, stderr_handler = logger.handlers
+
+        # stdout handler should have a filter that rejects ERROR and above
+        assert len(stdout_handler.filters) == 1
+        error_record = logging.LogRecord("test", logging.ERROR, "", 0, "msg", 
(), None)
+        warning_record = logging.LogRecord("test", logging.WARNING, "", 0, 
"msg", (), None)
+        assert stdout_handler.filters[0].filter(error_record) is False
+        assert stdout_handler.filters[0].filter(warning_record) is True
+
+        # stderr handler level should be ERROR
+        assert stderr_handler.level == logging.ERROR
+
+    def test_logger_setup_handler_noop_when_separation_disabled(self):
+        """When celery_stdout_stderr_separation is False, logger handlers 
should not be modified."""
+        logger = MagicMock(spec=logging.Logger)
+        original_handlers = [logging.StreamHandler()]
+        logger.handlers = original_handlers.copy()
+
+        with conf_vars({("logging", "celery_stdout_stderr_separation"): 
"False"}):
+            celery_command.logger_setup_handler(logger)
+
+        # Handlers should remain unchanged
+        assert logger.handlers == original_handlers

Reply via email to