This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-2-test by this push:
new 0d465d48d2b [v3-2-test] fix mypy error due to 1.20.0 upgrade (#64832)
0d465d48d2b is described below
commit 0d465d48d2b644095362f22c3ecc143689cfb60e
Author: Wei Lee <[email protected]>
AuthorDate: Tue Apr 7 19:30:05 2026 +0800
[v3-2-test] fix mypy error due to 1.20.0 upgrade (#64832)
---
airflow-core/src/airflow/assets/manager.py | 2 +
.../config_templates/airflow_local_settings.py | 45 +++++++++++++---------
.../core_api/routes/public/test_task_instances.py | 4 +-
3 files changed, 30 insertions(+), 21 deletions(-)
diff --git a/airflow-core/src/airflow/assets/manager.py
b/airflow-core/src/airflow/assets/manager.py
index dca3db9b181..b5ead262d0c 100644
--- a/airflow-core/src/airflow/assets/manager.py
+++ b/airflow-core/src/airflow/assets/manager.py
@@ -575,6 +575,8 @@ def resolve_asset_manager() -> AssetManager:
key="asset_manager_kwargs",
fallback={},
)
+ if TYPE_CHECKING:
+ assert isinstance(_asset_manager_kwargs, dict)
return _asset_manager_class(**_asset_manager_kwargs)
diff --git
a/airflow-core/src/airflow/config_templates/airflow_local_settings.py
b/airflow-core/src/airflow/config_templates/airflow_local_settings.py
index 17c7cd47d54..48f14b0f9a9 100644
--- a/airflow-core/src/airflow/config_templates/airflow_local_settings.py
+++ b/airflow-core/src/airflow/config_templates/airflow_local_settings.py
@@ -20,7 +20,7 @@
from __future__ import annotations
import os
-from typing import TYPE_CHECKING, Any
+from typing import TYPE_CHECKING, Any, cast
from urllib.parse import urlsplit
from airflow.configuration import conf
@@ -159,6 +159,7 @@ if REMOTE_LOGGING:
"logging/remote_task_handler_kwargs must be a JSON object (a
python dict), we got "
f"{type(remote_task_handler_kwargs)}"
)
+ _handler_kwargs = cast("dict[str, Any]", remote_task_handler_kwargs)
delete_local_copy = conf.getboolean("logging", "delete_local_logs")
if remote_base_log_folder.startswith("s3://"):
@@ -166,16 +167,17 @@ if REMOTE_LOGGING:
_default_conn_name_from("airflow.providers.amazon.aws.hooks.s3",
"S3Hook")
REMOTE_TASK_LOG = S3RemoteLogIO(
- **(
+ **cast(
+ "dict[str, Any]",
{
"base_log_folder": BASE_LOG_FOLDER,
"remote_base": remote_base_log_folder,
"delete_local_copy": delete_local_copy,
}
- | remote_task_handler_kwargs
+ | _handler_kwargs,
)
)
- remote_task_handler_kwargs = {}
+ _handler_kwargs = {}
elif remote_base_log_folder.startswith("cloudwatch://"):
from airflow.providers.amazon.aws.log.cloudwatch_task_handler import
CloudWatchRemoteLogIO
@@ -183,17 +185,18 @@ if REMOTE_LOGGING:
_default_conn_name_from("airflow.providers.amazon.aws.hooks.logs",
"AwsLogsHook")
url_parts = urlsplit(remote_base_log_folder)
REMOTE_TASK_LOG = CloudWatchRemoteLogIO(
- **(
+ **cast(
+ "dict[str, Any]",
{
"base_log_folder": BASE_LOG_FOLDER,
"remote_base": remote_base_log_folder,
"delete_local_copy": delete_local_copy,
"log_group_arn": url_parts.netloc + url_parts.path,
}
- | remote_task_handler_kwargs
+ | _handler_kwargs,
)
)
- remote_task_handler_kwargs = {}
+ _handler_kwargs = {}
elif remote_base_log_folder.startswith("gs://"):
from airflow.providers.google.cloud.log.gcs_task_handler import
GCSRemoteLogIO
@@ -201,17 +204,18 @@ if REMOTE_LOGGING:
key_path = conf.get_mandatory_value("logging", "google_key_path",
fallback=None)
REMOTE_TASK_LOG = GCSRemoteLogIO(
- **(
+ **cast(
+ "dict[str, Any]",
{
"base_log_folder": BASE_LOG_FOLDER,
"remote_base": remote_base_log_folder,
"delete_local_copy": delete_local_copy,
"gcp_key_path": key_path,
}
- | remote_task_handler_kwargs
+ | _handler_kwargs,
)
)
- remote_task_handler_kwargs = {}
+ _handler_kwargs = {}
elif remote_base_log_folder.startswith("wasb"):
from airflow.providers.microsoft.azure.log.wasb_task_handler import
WasbRemoteLogIO
@@ -224,17 +228,18 @@ if REMOTE_LOGGING:
wasb_remote_base = remote_base_log_folder.removeprefix("wasb://")
REMOTE_TASK_LOG = WasbRemoteLogIO(
- **(
+ **cast(
+ "dict[str, Any]",
{
"base_log_folder": BASE_LOG_FOLDER,
"remote_base": wasb_remote_base,
"delete_local_copy": delete_local_copy,
"wasb_container": wasb_log_container,
}
- | remote_task_handler_kwargs
+ | _handler_kwargs,
)
)
- remote_task_handler_kwargs = {}
+ _handler_kwargs = {}
elif remote_base_log_folder.startswith("stackdriver://"):
key_path = conf.get_mandatory_value("logging", "GOOGLE_KEY_PATH",
fallback=None)
# stackdriver:///airflow-tasks => airflow-tasks
@@ -255,32 +260,34 @@ if REMOTE_LOGGING:
_default_conn_name_from("airflow.providers.alibaba.cloud.hooks.oss",
"OSSHook")
REMOTE_TASK_LOG = OSSRemoteLogIO(
- **(
+ **cast(
+ "dict[str, Any]",
{
"base_log_folder": BASE_LOG_FOLDER,
"remote_base": remote_base_log_folder,
"delete_local_copy": delete_local_copy,
}
- | remote_task_handler_kwargs
+ | _handler_kwargs,
)
)
- remote_task_handler_kwargs = {}
+ _handler_kwargs = {}
elif remote_base_log_folder.startswith("hdfs://"):
from airflow.providers.apache.hdfs.log.hdfs_task_handler import
HdfsRemoteLogIO
_default_conn_name_from("airflow.providers.apache.hdfs.hooks.webhdfs",
"WebHDFSHook")
REMOTE_TASK_LOG = HdfsRemoteLogIO(
- **(
+ **cast(
+ "dict[str, Any]",
{
"base_log_folder": BASE_LOG_FOLDER,
"remote_base": urlsplit(remote_base_log_folder).path,
"delete_local_copy": delete_local_copy,
}
- | remote_task_handler_kwargs
+ | _handler_kwargs,
)
)
- remote_task_handler_kwargs = {}
+ _handler_kwargs = {}
elif ELASTICSEARCH_HOST:
from airflow.providers.elasticsearch.log.es_task_handler import
ElasticsearchRemoteLogIO
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
index ba8f06a7424..76767a96094 100644
---
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
@@ -21,7 +21,7 @@ import datetime as dt
import itertools
import os
from datetime import timedelta
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Any
from unittest import mock
import pendulum
@@ -146,7 +146,7 @@ class TestTaskInstanceEndpoint:
assert dag_version
for mi in map_indexes:
- kwargs = self.ti_init | {"map_index": mi}
+ kwargs: dict[str, Any] = self.ti_init | {"map_index": mi}
ti = TaskInstance(task=tasks[i], **kwargs,
dag_version_id=dag_version.id)
session.add(ti)
ti.dag_run = dr