This is an automated email from the ASF dual-hosted git repository.
shahar pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 602c9976ff8 Use TaskFormatter for Celery log formatting instead of
default Formatter (#61701)
602c9976ff8 is described below
commit 602c9976ff8332bbdfe856a41d1aaf13b3630ffd
Author: Aaron Chen <[email protected]>
AuthorDate: Tue Feb 10 08:43:52 2026 -0800
Use TaskFormatter for Celery log formatting instead of default Formatter
(#61701)
Co-authored-by: Shahar Epstein <[email protected]>
---
.../airflow/providers/celery/cli/celery_command.py | 3 +-
.../tests/unit/celery/cli/test_celery_command.py | 52 ++++++++++++++++++++++
2 files changed, 54 insertions(+), 1 deletion(-)
diff --git
a/providers/celery/src/airflow/providers/celery/cli/celery_command.py
b/providers/celery/src/airflow/providers/celery/cli/celery_command.py
index 195aa4d9329..46713180c8a 100644
--- a/providers/celery/src/airflow/providers/celery/cli/celery_command.py
+++ b/providers/celery/src/airflow/providers/celery/cli/celery_command.py
@@ -29,6 +29,7 @@ import psutil
import sqlalchemy.exc
from celery import maybe_patch_concurrency
from celery.app.defaults import DEFAULT_TASK_LOG_FMT
+from celery.app.log import TaskFormatter
from celery.signals import after_setup_logger
from lockfile.pidlockfile import read_pid_from_pidfile, remove_existing_pidfile
@@ -166,7 +167,7 @@ def logger_setup_handler(logger, **kwargs):
* logs of severity lower than error goes to stdout.
"""
if conf.getboolean("logging", "celery_stdout_stderr_separation",
fallback=False):
- celery_formatter = logging.Formatter(DEFAULT_TASK_LOG_FMT)
+ celery_formatter = TaskFormatter(DEFAULT_TASK_LOG_FMT)
class NoErrorOrAboveFilter(logging.Filter):
"""Allow only logs with level *lower* than ERROR to be reported."""
diff --git a/providers/celery/tests/unit/celery/cli/test_celery_command.py
b/providers/celery/tests/unit/celery/cli/test_celery_command.py
index e8bfa698463..c40f5696a59 100644
--- a/providers/celery/tests/unit/celery/cli/test_celery_command.py
+++ b/providers/celery/tests/unit/celery/cli/test_celery_command.py
@@ -20,6 +20,7 @@ from __future__ import annotations
import contextlib
import importlib
import json
+import logging
import os
import sys
from io import StringIO
@@ -632,3 +633,54 @@ def test_stale_bundle_cleanup(mock_process):
assert len(calls) == 1
actual = [x.kwargs["target"] for x in calls]
assert actual[0].__name__ == "bundle_cleanup_main"
+
+
+class TestLoggerSetupHandler:
+ """Tests for logger_setup_handler that configures celery logging."""
+
+ def
test_logger_setup_handler_uses_task_formatter_when_separation_enabled(self):
+ """When celery_stdout_stderr_separation is True, handlers should use
TaskFormatter."""
+ from celery.app.log import TaskFormatter
+
+ logger = MagicMock(spec=logging.Logger)
+ logger.handlers = []
+
+ with conf_vars({("logging", "celery_stdout_stderr_separation"):
"True"}):
+ celery_command.logger_setup_handler(logger)
+
+ assert len(logger.handlers) == 2
+ stdout_handler, stderr_handler = logger.handlers
+ assert isinstance(stdout_handler.formatter, TaskFormatter)
+ assert isinstance(stderr_handler.formatter, TaskFormatter)
+
+ def test_logger_setup_handler_stdout_stderr_split(self):
+ """Verify stdout handler filters errors out, stderr handler only gets
errors and above."""
+ logger = MagicMock(spec=logging.Logger)
+ logger.handlers = []
+
+ with conf_vars({("logging", "celery_stdout_stderr_separation"):
"True"}):
+ celery_command.logger_setup_handler(logger)
+
+ stdout_handler, stderr_handler = logger.handlers
+
+ # stdout handler should have a filter that rejects ERROR and above
+ assert len(stdout_handler.filters) == 1
+ error_record = logging.LogRecord("test", logging.ERROR, "", 0, "msg",
(), None)
+ warning_record = logging.LogRecord("test", logging.WARNING, "", 0,
"msg", (), None)
+ assert stdout_handler.filters[0].filter(error_record) is False
+ assert stdout_handler.filters[0].filter(warning_record) is True
+
+ # stderr handler level should be ERROR
+ assert stderr_handler.level == logging.ERROR
+
+ def test_logger_setup_handler_noop_when_separation_disabled(self):
+ """When celery_stdout_stderr_separation is False, logger handlers
should not be modified."""
+ logger = MagicMock(spec=logging.Logger)
+ original_handlers = [logging.StreamHandler()]
+ logger.handlers = original_handlers.copy()
+
+ with conf_vars({("logging", "celery_stdout_stderr_separation"):
"False"}):
+ celery_command.logger_setup_handler(logger)
+
+ # Handlers should remain unchanged
+ assert logger.handlers == original_handlers