This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-2-test by this push:
     new 0d465d48d2b [v3-2-test] fix mypy error due to 1.20.0 upgrade (#64832)
0d465d48d2b is described below

commit 0d465d48d2b644095362f22c3ecc143689cfb60e
Author: Wei Lee <[email protected]>
AuthorDate: Tue Apr 7 19:30:05 2026 +0800

    [v3-2-test] fix mypy error due to 1.20.0 upgrade (#64832)
---
 airflow-core/src/airflow/assets/manager.py         |  2 +
 .../config_templates/airflow_local_settings.py     | 45 +++++++++++++---------
 .../core_api/routes/public/test_task_instances.py  |  4 +-
 3 files changed, 30 insertions(+), 21 deletions(-)

diff --git a/airflow-core/src/airflow/assets/manager.py 
b/airflow-core/src/airflow/assets/manager.py
index dca3db9b181..b5ead262d0c 100644
--- a/airflow-core/src/airflow/assets/manager.py
+++ b/airflow-core/src/airflow/assets/manager.py
@@ -575,6 +575,8 @@ def resolve_asset_manager() -> AssetManager:
         key="asset_manager_kwargs",
         fallback={},
     )
+    if TYPE_CHECKING:
+        assert isinstance(_asset_manager_kwargs, dict)
     return _asset_manager_class(**_asset_manager_kwargs)
 
 
diff --git 
a/airflow-core/src/airflow/config_templates/airflow_local_settings.py 
b/airflow-core/src/airflow/config_templates/airflow_local_settings.py
index 17c7cd47d54..48f14b0f9a9 100644
--- a/airflow-core/src/airflow/config_templates/airflow_local_settings.py
+++ b/airflow-core/src/airflow/config_templates/airflow_local_settings.py
@@ -20,7 +20,7 @@
 from __future__ import annotations
 
 import os
-from typing import TYPE_CHECKING, Any
+from typing import TYPE_CHECKING, Any, cast
 from urllib.parse import urlsplit
 
 from airflow.configuration import conf
@@ -159,6 +159,7 @@ if REMOTE_LOGGING:
             "logging/remote_task_handler_kwargs must be a JSON object (a 
python dict), we got "
             f"{type(remote_task_handler_kwargs)}"
         )
+    _handler_kwargs = cast("dict[str, Any]", remote_task_handler_kwargs)
     delete_local_copy = conf.getboolean("logging", "delete_local_logs")
 
     if remote_base_log_folder.startswith("s3://"):
@@ -166,16 +167,17 @@ if REMOTE_LOGGING:
 
         _default_conn_name_from("airflow.providers.amazon.aws.hooks.s3", 
"S3Hook")
         REMOTE_TASK_LOG = S3RemoteLogIO(
-            **(
+            **cast(
+                "dict[str, Any]",
                 {
                     "base_log_folder": BASE_LOG_FOLDER,
                     "remote_base": remote_base_log_folder,
                     "delete_local_copy": delete_local_copy,
                 }
-                | remote_task_handler_kwargs
+                | _handler_kwargs,
             )
         )
-        remote_task_handler_kwargs = {}
+        _handler_kwargs = {}
 
     elif remote_base_log_folder.startswith("cloudwatch://"):
         from airflow.providers.amazon.aws.log.cloudwatch_task_handler import 
CloudWatchRemoteLogIO
@@ -183,17 +185,18 @@ if REMOTE_LOGGING:
         _default_conn_name_from("airflow.providers.amazon.aws.hooks.logs", 
"AwsLogsHook")
         url_parts = urlsplit(remote_base_log_folder)
         REMOTE_TASK_LOG = CloudWatchRemoteLogIO(
-            **(
+            **cast(
+                "dict[str, Any]",
                 {
                     "base_log_folder": BASE_LOG_FOLDER,
                     "remote_base": remote_base_log_folder,
                     "delete_local_copy": delete_local_copy,
                     "log_group_arn": url_parts.netloc + url_parts.path,
                 }
-                | remote_task_handler_kwargs
+                | _handler_kwargs,
             )
         )
-        remote_task_handler_kwargs = {}
+        _handler_kwargs = {}
     elif remote_base_log_folder.startswith("gs://"):
         from airflow.providers.google.cloud.log.gcs_task_handler import 
GCSRemoteLogIO
 
@@ -201,17 +204,18 @@ if REMOTE_LOGGING:
         key_path = conf.get_mandatory_value("logging", "google_key_path", 
fallback=None)
 
         REMOTE_TASK_LOG = GCSRemoteLogIO(
-            **(
+            **cast(
+                "dict[str, Any]",
                 {
                     "base_log_folder": BASE_LOG_FOLDER,
                     "remote_base": remote_base_log_folder,
                     "delete_local_copy": delete_local_copy,
                     "gcp_key_path": key_path,
                 }
-                | remote_task_handler_kwargs
+                | _handler_kwargs,
             )
         )
-        remote_task_handler_kwargs = {}
+        _handler_kwargs = {}
     elif remote_base_log_folder.startswith("wasb"):
         from airflow.providers.microsoft.azure.log.wasb_task_handler import 
WasbRemoteLogIO
 
@@ -224,17 +228,18 @@ if REMOTE_LOGGING:
         wasb_remote_base = remote_base_log_folder.removeprefix("wasb://")
 
         REMOTE_TASK_LOG = WasbRemoteLogIO(
-            **(
+            **cast(
+                "dict[str, Any]",
                 {
                     "base_log_folder": BASE_LOG_FOLDER,
                     "remote_base": wasb_remote_base,
                     "delete_local_copy": delete_local_copy,
                     "wasb_container": wasb_log_container,
                 }
-                | remote_task_handler_kwargs
+                | _handler_kwargs,
             )
         )
-        remote_task_handler_kwargs = {}
+        _handler_kwargs = {}
     elif remote_base_log_folder.startswith("stackdriver://"):
         key_path = conf.get_mandatory_value("logging", "GOOGLE_KEY_PATH", 
fallback=None)
         # stackdriver:///airflow-tasks => airflow-tasks
@@ -255,32 +260,34 @@ if REMOTE_LOGGING:
         _default_conn_name_from("airflow.providers.alibaba.cloud.hooks.oss", 
"OSSHook")
 
         REMOTE_TASK_LOG = OSSRemoteLogIO(
-            **(
+            **cast(
+                "dict[str, Any]",
                 {
                     "base_log_folder": BASE_LOG_FOLDER,
                     "remote_base": remote_base_log_folder,
                     "delete_local_copy": delete_local_copy,
                 }
-                | remote_task_handler_kwargs
+                | _handler_kwargs,
             )
         )
-        remote_task_handler_kwargs = {}
+        _handler_kwargs = {}
     elif remote_base_log_folder.startswith("hdfs://"):
         from airflow.providers.apache.hdfs.log.hdfs_task_handler import 
HdfsRemoteLogIO
 
         _default_conn_name_from("airflow.providers.apache.hdfs.hooks.webhdfs", 
"WebHDFSHook")
 
         REMOTE_TASK_LOG = HdfsRemoteLogIO(
-            **(
+            **cast(
+                "dict[str, Any]",
                 {
                     "base_log_folder": BASE_LOG_FOLDER,
                     "remote_base": urlsplit(remote_base_log_folder).path,
                     "delete_local_copy": delete_local_copy,
                 }
-                | remote_task_handler_kwargs
+                | _handler_kwargs,
             )
         )
-        remote_task_handler_kwargs = {}
+        _handler_kwargs = {}
     elif ELASTICSEARCH_HOST:
         from airflow.providers.elasticsearch.log.es_task_handler import 
ElasticsearchRemoteLogIO
 
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
index ba8f06a7424..76767a96094 100644
--- 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
+++ 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
@@ -21,7 +21,7 @@ import datetime as dt
 import itertools
 import os
 from datetime import timedelta
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Any
 from unittest import mock
 
 import pendulum
@@ -146,7 +146,7 @@ class TestTaskInstanceEndpoint:
                 assert dag_version
 
             for mi in map_indexes:
-                kwargs = self.ti_init | {"map_index": mi}
+                kwargs: dict[str, Any] = self.ti_init | {"map_index": mi}
                 ti = TaskInstance(task=tasks[i], **kwargs, 
dag_version_id=dag_version.id)
                 session.add(ti)
                 ti.dag_run = dr

Reply via email to