This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-9-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 35623740d251f969bd9f068dbbd366d158608e80
Author: Tiago Deane <[email protected]>
AuthorDate: Fri May 3 09:42:04 2024 +0100

    Fix TaskHandlerWithCustomFormatter now adds prefix only once (#38502)
    
    When using the TaskHandlerWithCustomFormatter to add a prefix to logs, it 
was previously adding the prefix multiple times. This happened because it was 
being called multiple times from logging_mixin.py, and worsened because even 
when the handler's formatter was a TimezoneAware formatter (to include UTC 
offset), it was still adding an additional prefix. Because of this, I felt that 
any solution outside of the TaskHandlerWithCustomFormatter itself would either 
require a restructuring of [...]
    
    Note: also fixed the documentation's example for the handler, as the 
previous one was incorrect and didn't work.
    (cherry picked from commit 61d1c95278ecd0e9df32a210d25a65e9ab201ef2)
---
 airflow/config_templates/config.yml                |  2 +-
 .../log/task_handler_with_custom_formatter.py      |  3 ++-
 .../test_task_handler_with_custom_formatter.py     | 24 ++++++++++++++++++----
 3 files changed, 23 insertions(+), 6 deletions(-)

diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index 60a9d3c7dc..62eb79336e 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -848,7 +848,7 @@ logging:
         Specify prefix pattern like mentioned below with stream handler 
TaskHandlerWithCustomFormatter
       version_added: 2.0.0
       type: string
-      example: "{ti.dag_id}-{ti.task_id}-{execution_date}-{try_number}"
+      example: 
"{{ti.dag_id}}-{{ti.task_id}}-{{execution_date}}-{{ti.try_number}}"
       is_template: true
       default: ""
     log_filename_template:
diff --git a/airflow/utils/log/task_handler_with_custom_formatter.py 
b/airflow/utils/log/task_handler_with_custom_formatter.py
index 3fdda914c5..5cb71c4281 100644
--- a/airflow/utils/log/task_handler_with_custom_formatter.py
+++ b/airflow/utils/log/task_handler_with_custom_formatter.py
@@ -45,7 +45,8 @@ class TaskHandlerWithCustomFormatter(logging.StreamHandler):
         :param ti:
         :return:
         """
-        if ti.raw or self.formatter is None:
+        # Returns if there is no formatter or if the prefix has already been 
set
+        if ti.raw or self.formatter is None or self.prefix_jinja_template is 
not None:
             return
         prefix = conf.get("logging", "task_log_prefix_template")
 
diff --git a/tests/utils/test_task_handler_with_custom_formatter.py 
b/tests/utils/test_task_handler_with_custom_formatter.py
index 87d631f0c7..5aad3ca14f 100644
--- a/tests/utils/test_task_handler_with_custom_formatter.py
+++ b/tests/utils/test_task_handler_with_custom_formatter.py
@@ -69,20 +69,36 @@ def task_instance():
     clear_db_runs()
 
 
-def assert_prefix(task_instance: TaskInstance, prefix: str) -> None:
+def assert_prefix_once(task_instance: TaskInstance, prefix: str) -> None:
     handler = next((h for h in task_instance.log.handlers if h.name == 
TASK_HANDLER), None)
     assert handler is not None, "custom task log handler not set up correctly"
     assert handler.formatter is not None, "custom task log formatter not set 
up correctly"
+    previous_formatter = handler.formatter
     expected_format = f"{prefix}:{handler.formatter._fmt}"
     set_context(task_instance.log, task_instance)
     assert expected_format == handler.formatter._fmt
+    handler.setFormatter(previous_formatter)
+
+
+def assert_prefix_multiple(task_instance: TaskInstance, prefix: str) -> None:
+    handler = next((h for h in task_instance.log.handlers if h.name == 
TASK_HANDLER), None)
+    assert handler is not None, "custom task log handler not set up correctly"
+    assert handler.formatter is not None, "custom task log formatter not set 
up correctly"
+    previous_formatter = handler.formatter
+    expected_format = f"{prefix}:{handler.formatter._fmt}"
+    set_context(task_instance.log, task_instance)
+    set_context(task_instance.log, task_instance)
+    set_context(task_instance.log, task_instance)
+    assert expected_format == handler.formatter._fmt
+    handler.setFormatter(previous_formatter)
 
 
 def test_custom_formatter_default_format(task_instance):
     """The default format provides no prefix."""
-    assert_prefix(task_instance, "")
+    assert_prefix_once(task_instance, "")
 
 
-@conf_vars({("logging", "task_log_prefix_template"): "{{ti.dag_id }}-{{ 
ti.task_id }}"})
+@conf_vars({("logging", "task_log_prefix_template"): "{{ ti.dag_id }}-{{ 
ti.task_id }}"})
 def test_custom_formatter_custom_format_not_affected_by_config(task_instance):
-    assert_prefix(task_instance, f"{DAG_ID}-{TASK_ID}")
+    """Certifies that the prefix is only added once, even after repeated 
calls"""
+    assert_prefix_multiple(task_instance, f"{DAG_ID}-{TASK_ID}")

Reply via email to