This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 823b86e006d Fix remote_task_handler_kwargs passing handler params to 
RemoteLogIO (#65957)
823b86e006d is described below

commit 823b86e006d30f118093e0ae76d25aecabd92ecb
Author: Sadha Chilukoori <[email protected]>
AuthorDate: Tue May 5 12:07:11 2026 -0700

    Fix remote_task_handler_kwargs passing handler params to RemoteLogIO 
(#65957)
    
    * Fix remote_task_handler_kwargs passing handler params to RemoteLogIO
    
    Split remote_task_handler_kwargs into handler-level params (max_bytes,
    backup_count, delay) and IO-level params before the provider if/elif
    chain. Handler params go to DEFAULT_LOGGING_CONFIG["handlers"]["task"],
    IO params go to the RemoteLogIO constructor.
    
    Previously all kwargs were passed to RemoteLogIO (causing TypeError
    for attrs-based classes) and then reset to {}, so handler params
    never reached FileTaskHandler.
    
    closes: #58770
    
    * Derive FileTaskHandler params dynamically via inspect instead of 
hardcoding
    
    Address review feedback: use inspect.signature(FileTaskHandler.__init__)
    to determine which remote_task_handler_kwargs belong to the file handler
    vs the RemoteLogIO constructor. Remove dead _io_kwargs = {} reset lines.
    
    * Move inspect and FileTaskHandler imports to top level
    
    ---------
    
    Co-authored-by: Sadha Chilukoori <[email protected]>
---
 .../config_templates/airflow_local_settings.py     |  35 ++++---
 .../test_airflow_local_settings.py                 | 107 ++++++++++++++++-----
 2 files changed, 103 insertions(+), 39 deletions(-)

diff --git 
a/airflow-core/src/airflow/config_templates/airflow_local_settings.py 
b/airflow-core/src/airflow/config_templates/airflow_local_settings.py
index 04ca2a32f97..7072d2dd6ea 100644
--- a/airflow-core/src/airflow/config_templates/airflow_local_settings.py
+++ b/airflow-core/src/airflow/config_templates/airflow_local_settings.py
@@ -19,12 +19,14 @@
 
 from __future__ import annotations
 
+import inspect
 import os
 from typing import TYPE_CHECKING, Any, cast
 from urllib.parse import urlsplit
 
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException
+from airflow.utils.log.file_task_handler import FileTaskHandler
 
 if TYPE_CHECKING:
     from airflow.logging.remote import RemoteLogIO, RemoteLogStreamIO
@@ -159,7 +161,13 @@ if REMOTE_LOGGING:
             "logging/remote_task_handler_kwargs must be a JSON object (a 
python dict), we got "
             f"{type(remote_task_handler_kwargs)}"
         )
-    _handler_kwargs = cast("dict[str, Any]", remote_task_handler_kwargs)
+    _all_kwargs = cast("dict[str, Any]", remote_task_handler_kwargs)
+    _fth_params = 
frozenset(inspect.signature(FileTaskHandler.__init__).parameters) - {
+        "self",
+        "base_log_folder",
+    }
+    _file_handler_kwargs = {k: v for k, v in _all_kwargs.items() if k in 
_fth_params}
+    _io_kwargs = {k: v for k, v in _all_kwargs.items() if k not in _fth_params}
     delete_local_copy = conf.getboolean("logging", "delete_local_logs")
 
     if remote_base_log_folder.startswith("s3://"):
@@ -174,10 +182,9 @@ if REMOTE_LOGGING:
                     "remote_base": remote_base_log_folder,
                     "delete_local_copy": delete_local_copy,
                 }
-                | _handler_kwargs,
+                | _io_kwargs,
             )
         )
-        _handler_kwargs = {}
 
     elif remote_base_log_folder.startswith("cloudwatch://"):
         from airflow.providers.amazon.aws.log.cloudwatch_task_handler import 
CloudWatchRemoteLogIO
@@ -193,10 +200,10 @@ if REMOTE_LOGGING:
                     "delete_local_copy": delete_local_copy,
                     "log_group_arn": url_parts.netloc + url_parts.path,
                 }
-                | _handler_kwargs,
+                | _io_kwargs,
             )
         )
-        _handler_kwargs = {}
+
     elif remote_base_log_folder.startswith("gs://"):
         from airflow.providers.google.cloud.log.gcs_task_handler import 
GCSRemoteLogIO
 
@@ -212,10 +219,10 @@ if REMOTE_LOGGING:
                     "delete_local_copy": delete_local_copy,
                     "gcp_key_path": key_path,
                 }
-                | _handler_kwargs,
+                | _io_kwargs,
             )
         )
-        _handler_kwargs = {}
+
     elif remote_base_log_folder.startswith("wasb"):
         from airflow.providers.microsoft.azure.log.wasb_task_handler import 
WasbRemoteLogIO
 
@@ -236,10 +243,10 @@ if REMOTE_LOGGING:
                     "delete_local_copy": delete_local_copy,
                     "wasb_container": wasb_log_container,
                 }
-                | _handler_kwargs,
+                | _io_kwargs,
             )
         )
-        _handler_kwargs = {}
+
     elif remote_base_log_folder.startswith("stackdriver://"):
         key_path = conf.get_mandatory_value("logging", "GOOGLE_KEY_PATH", 
fallback=None)
         # stackdriver:///airflow-tasks => airflow-tasks
@@ -267,10 +274,10 @@ if REMOTE_LOGGING:
                     "remote_base": remote_base_log_folder,
                     "delete_local_copy": delete_local_copy,
                 }
-                | _handler_kwargs,
+                | _io_kwargs,
             )
         )
-        _handler_kwargs = {}
+
     elif remote_base_log_folder.startswith("hdfs://"):
         from airflow.providers.apache.hdfs.log.hdfs_task_handler import 
HdfsRemoteLogIO
 
@@ -284,10 +291,10 @@ if REMOTE_LOGGING:
                     "remote_base": urlsplit(remote_base_log_folder).path,
                     "delete_local_copy": delete_local_copy,
                 }
-                | _handler_kwargs,
+                | _io_kwargs,
             )
         )
-        _handler_kwargs = {}
+
     elif ELASTICSEARCH_HOST:
         from airflow.providers.elasticsearch.log.es_task_handler import 
ElasticsearchRemoteLogIO
 
@@ -370,4 +377,4 @@ if REMOTE_LOGGING:
             "section 'elasticsearch' if you are using Elasticsearch. In the 
other case, "
             "'remote_base_log_folder' option in the 'logging' section."
         )
-    DEFAULT_LOGGING_CONFIG["handlers"]["task"].update(_handler_kwargs)
+    DEFAULT_LOGGING_CONFIG["handlers"]["task"].update(_file_handler_kwargs)
diff --git 
a/airflow-core/tests/unit/config_templates/test_airflow_local_settings.py 
b/airflow-core/tests/unit/config_templates/test_airflow_local_settings.py
index 025dbdf571f..d131efa0026 100644
--- a/airflow-core/tests/unit/config_templates/test_airflow_local_settings.py
+++ b/airflow-core/tests/unit/config_templates/test_airflow_local_settings.py
@@ -19,15 +19,30 @@
 from __future__ import annotations
 
 import importlib
+import inspect
 import json
 from unittest import mock
 
 import pytest
 
 from airflow.config_templates import airflow_local_settings
+from airflow.utils.log.file_task_handler import FileTaskHandler
 
 from tests_common.test_utils.config import conf_vars
 
+REMOTE_IO_PROVIDERS = [
+    ("s3://bucket/path", 
"airflow.providers.amazon.aws.log.s3_task_handler.S3RemoteLogIO"),
+    ("wasb-logs", 
"airflow.providers.microsoft.azure.log.wasb_task_handler.WasbRemoteLogIO"),
+    ("gs://bucket/path", 
"airflow.providers.google.cloud.log.gcs_task_handler.GCSRemoteLogIO"),
+    (
+        "cloudwatch://arn:aws:logs:us-east-1:0:log-group:foo",
+        
"airflow.providers.amazon.aws.log.cloudwatch_task_handler.CloudWatchRemoteLogIO",
+    ),
+    ("oss://bucket/path", 
"airflow.providers.alibaba.cloud.log.oss_task_handler.OSSRemoteLogIO"),
+    ("hdfs://host/path", 
"airflow.providers.apache.hdfs.log.hdfs_task_handler.HdfsRemoteLogIO"),
+]
+REMOTE_IO_IDS = ["s3", "wasb", "gcs", "cloudwatch", "oss", "hdfs"]
+
 
 @pytest.fixture
 def restore_local_settings():
@@ -35,42 +50,84 @@ def restore_local_settings():
     importlib.reload(airflow_local_settings)
 
 
[email protected](
-    ("remote_base", "remote_io_path"),
-    [
-        ("s3://bucket/path", 
"airflow.providers.amazon.aws.log.s3_task_handler.S3RemoteLogIO"),
-        ("wasb-logs", 
"airflow.providers.microsoft.azure.log.wasb_task_handler.WasbRemoteLogIO"),
-        ("gs://bucket/path", 
"airflow.providers.google.cloud.log.gcs_task_handler.GCSRemoteLogIO"),
-        (
-            "cloudwatch://arn:aws:logs:us-east-1:0:log-group:foo",
-            
"airflow.providers.amazon.aws.log.cloudwatch_task_handler.CloudWatchRemoteLogIO",
-        ),
-        ("oss://bucket/path", 
"airflow.providers.alibaba.cloud.log.oss_task_handler.OSSRemoteLogIO"),
-        ("hdfs://host/path", 
"airflow.providers.apache.hdfs.log.hdfs_task_handler.HdfsRemoteLogIO"),
-    ],
-    ids=["s3", "wasb", "gcs", "cloudwatch", "oss", "hdfs"],
-)
-def test_remote_task_handler_kwargs_not_leaked_to_local_task_handler(
-    remote_base, remote_io_path, restore_local_settings
-):
-    """Verify remote_task_handler_kwargs are passed to RemoteLogIO and not 
leaked to FileTaskHandler."""
[email protected](("remote_base", "remote_io_path"), 
REMOTE_IO_PROVIDERS, ids=REMOTE_IO_IDS)
+def test_io_kwargs_forwarded_to_remote_log_io(remote_base, remote_io_path, 
restore_local_settings):
+    """IO-level kwargs reach the RemoteLogIO constructor and don't leak into 
the handler config."""
     pytest.importorskip(remote_io_path.rsplit(".", 1)[0])
-    user_kwargs = {"remote_base": "ignored", "custom_key": "v"}
+    io_kwargs = {"remote_base": "ignored", "custom_key": "v"}
     with (
         mock.patch(remote_io_path) as mock_remote_io,
         conf_vars(
             {
                 ("logging", "remote_logging"): "True",
                 ("logging", "remote_base_log_folder"): remote_base,
-                ("logging", "remote_task_handler_kwargs"): 
json.dumps(user_kwargs),
+                ("logging", "remote_task_handler_kwargs"): 
json.dumps(io_kwargs),
             }
         ),
     ):
         importlib.reload(airflow_local_settings)
         task_cfg = 
airflow_local_settings.DEFAULT_LOGGING_CONFIG["handlers"]["task"]
-        for k in user_kwargs:
-            assert k not in task_cfg, f"{k!r} leaked into task handler for 
{remote_base}"
+        for k in io_kwargs:
+            assert k not in task_cfg, f"IO kwarg {k!r} leaked into task 
handler config"
 
-        # Verify kwargs were passed to REMOTE_TASK_LOG
-        for k, v in user_kwargs.items():
+        for k, v in io_kwargs.items():
             assert mock_remote_io.call_args.kwargs[k] == v
+
+
[email protected](("remote_base", "remote_io_path"), 
REMOTE_IO_PROVIDERS, ids=REMOTE_IO_IDS)
+def test_handler_kwargs_reach_file_task_handler(remote_base, remote_io_path, 
restore_local_settings):
+    """Handler-level kwargs (max_bytes, backup_count, delay) reach the 
FileTaskHandler config."""
+    pytest.importorskip(remote_io_path.rsplit(".", 1)[0])
+    handler_kwargs = {"max_bytes": 5_000_000, "backup_count": 5}
+    with (
+        mock.patch(remote_io_path) as mock_remote_io,
+        conf_vars(
+            {
+                ("logging", "remote_logging"): "True",
+                ("logging", "remote_base_log_folder"): remote_base,
+                ("logging", "remote_task_handler_kwargs"): 
json.dumps(handler_kwargs),
+            }
+        ),
+    ):
+        importlib.reload(airflow_local_settings)
+        task_cfg = 
airflow_local_settings.DEFAULT_LOGGING_CONFIG["handlers"]["task"]
+        for k, v in handler_kwargs.items():
+            assert task_cfg[k] == v, f"Handler kwarg {k!r} not found in task 
handler config"
+
+        for k in handler_kwargs:
+            assert k not in mock_remote_io.call_args.kwargs, (
+                f"Handler kwarg {k!r} leaked into RemoteLogIO constructor"
+            )
+
+
[email protected](("remote_base", "remote_io_path"), 
REMOTE_IO_PROVIDERS, ids=REMOTE_IO_IDS)
+def test_mixed_kwargs_split_correctly(remote_base, remote_io_path, 
restore_local_settings):
+    """When both handler and IO kwargs are present, each goes to the right 
place."""
+    pytest.importorskip(remote_io_path.rsplit(".", 1)[0])
+    mixed_kwargs = {"max_bytes": 5_000_000, "backup_count": 5, 
"custom_io_key": "val"}
+    with (
+        mock.patch(remote_io_path) as mock_remote_io,
+        conf_vars(
+            {
+                ("logging", "remote_logging"): "True",
+                ("logging", "remote_base_log_folder"): remote_base,
+                ("logging", "remote_task_handler_kwargs"): 
json.dumps(mixed_kwargs),
+            }
+        ),
+    ):
+        importlib.reload(airflow_local_settings)
+        task_cfg = 
airflow_local_settings.DEFAULT_LOGGING_CONFIG["handlers"]["task"]
+
+        assert task_cfg["max_bytes"] == 5_000_000
+        assert task_cfg["backup_count"] == 5
+        assert "custom_io_key" not in task_cfg
+
+        assert mock_remote_io.call_args.kwargs["custom_io_key"] == "val"
+        assert "max_bytes" not in mock_remote_io.call_args.kwargs
+        assert "backup_count" not in mock_remote_io.call_args.kwargs
+
+
+def test_file_handler_params_introspected_correctly():
+    """The introspected FileTaskHandler params include the expected kwargs."""
+    init_params = set(inspect.signature(FileTaskHandler.__init__).parameters) 
- {"self", "base_log_folder"}
+    assert {"max_bytes", "backup_count", "delay"} <= init_params

Reply via email to