This is an automated email from the ASF dual-hosted git repository.
gopidesu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6302dcdfc08 Cleanup mypy ignores in Task SDK where possible (#53208)
6302dcdfc08 is described below
commit 6302dcdfc083f5d98d5830cc63fbccbb2d61d3fd
Author: GPK <[email protected]>
AuthorDate: Fri Jul 11 23:25:12 2025 +0100
Cleanup mypy ignores in Task SDK where possible (#53208)
* cleanup mypy ignores in task-sdk
* cleanup mypy ignores in task-sdk
---
task-sdk/src/airflow/sdk/api/client.py | 6 ++++--
task-sdk/src/airflow/sdk/bases/decorator.py | 2 +-
task-sdk/src/airflow/sdk/bases/operator.py | 2 +-
task-sdk/src/airflow/sdk/bases/xcom.py | 2 +-
task-sdk/src/airflow/sdk/definitions/connection.py | 4 ++--
task-sdk/src/airflow/sdk/definitions/dag.py | 8 ++++----
.../sdk/definitions/decorators/setup_teardown.py | 7 +++----
task-sdk/src/airflow/sdk/definitions/mappedoperator.py | 18 +++++++++---------
task-sdk/src/airflow/sdk/execution_time/context.py | 3 +--
.../src/airflow/sdk/execution_time/secrets_masker.py | 2 +-
task-sdk/src/airflow/sdk/execution_time/supervisor.py | 2 +-
11 files changed, 28 insertions(+), 28 deletions(-)
diff --git a/task-sdk/src/airflow/sdk/api/client.py
b/task-sdk/src/airflow/sdk/api/client.py
index 992cc2e4311..718df4690c7 100644
--- a/task-sdk/src/airflow/sdk/api/client.py
+++ b/task-sdk/src/airflow/sdk/api/client.py
@@ -233,6 +233,7 @@ class TaskInstanceOperations:
states: list[str] | None = None,
) -> TICount:
"""Get count of task instances matching the given criteria."""
+ params: dict[str, Any]
params = {
"dag_id": dag_id,
"task_ids": task_ids,
@@ -246,7 +247,7 @@ class TaskInstanceOperations:
params = {k: v for k, v in params.items() if v is not None}
if map_index is not None and map_index >= 0:
- params.update({"map_index": map_index}) # type: ignore[dict-item]
+ params.update({"map_index": map_index})
resp = self.client.get("task-instances/count", params=params)
return TICount(count=resp.json())
@@ -261,6 +262,7 @@ class TaskInstanceOperations:
run_ids: list[str] | None = None,
) -> TaskStatesResponse:
"""Get task states given criteria."""
+ params: dict[str, Any]
params = {
"dag_id": dag_id,
"task_ids": task_ids,
@@ -273,7 +275,7 @@ class TaskInstanceOperations:
params = {k: v for k, v in params.items() if v is not None}
if map_index is not None and map_index >= 0:
- params.update({"map_index": map_index}) # type: ignore[dict-item]
+ params.update({"map_index": map_index})
resp = self.client.get("task-instances/states", params=params)
return TaskStatesResponse.model_validate_json(resp.read())
diff --git a/task-sdk/src/airflow/sdk/bases/decorator.py
b/task-sdk/src/airflow/sdk/bases/decorator.py
index 4130f8f4f78..42bc66ee5e4 100644
--- a/task-sdk/src/airflow/sdk/bases/decorator.py
+++ b/task-sdk/src/airflow/sdk/bases/decorator.py
@@ -483,7 +483,7 @@ class _TaskDecorator(ExpandableFactory, Generic[FParams,
FReturn, OperatorSubcla
("resources", coerce_resources),
):
if (v := partial_kwargs.get(fld, NOTSET)) is not NOTSET:
- partial_kwargs[fld] = convert(v) # type: ignore[operator]
+ partial_kwargs[fld] = convert(v)
partial_kwargs.setdefault("executor_config", {})
partial_kwargs.setdefault("op_args", [])
diff --git a/task-sdk/src/airflow/sdk/bases/operator.py
b/task-sdk/src/airflow/sdk/bases/operator.py
index 8cab2861e92..165c6d2b174 100644
--- a/task-sdk/src/airflow/sdk/bases/operator.py
+++ b/task-sdk/src/airflow/sdk/bases/operator.py
@@ -1291,7 +1291,7 @@ class BaseOperator(AbstractOperator,
metaclass=BaseOperatorMeta):
def get_dag(self) -> DAG | None:
return self._dag
- @property # type: ignore[override]
+ @property
def dag(self) -> DAG:
"""Returns the Operator's DAG if set, otherwise raises an error."""
if dag := self._dag:
diff --git a/task-sdk/src/airflow/sdk/bases/xcom.py
b/task-sdk/src/airflow/sdk/bases/xcom.py
index 9556c2df8d5..ce38a8679a3 100644
--- a/task-sdk/src/airflow/sdk/bases/xcom.py
+++ b/task-sdk/src/airflow/sdk/bases/xcom.py
@@ -361,7 +361,7 @@ class BaseXCom:
run_id=run_id,
map_index=map_index,
)
- cls.purge(xcom_result) # type: ignore[call-arg]
+ cls.purge(xcom_result)
SUPERVISOR_COMMS.send(
DeleteXCom(
key=key,
diff --git a/task-sdk/src/airflow/sdk/definitions/connection.py
b/task-sdk/src/airflow/sdk/definitions/connection.py
index e7918b2f070..1ed851e32e0 100644
--- a/task-sdk/src/airflow/sdk/definitions/connection.py
+++ b/task-sdk/src/airflow/sdk/definitions/connection.py
@@ -72,7 +72,7 @@ class Connection:
uri = f"{self.conn_type.lower().replace('_', '-')}://"
else:
uri = "//"
-
+ host_to_use: str | None
if self.host and "://" in self.host:
protocol, host = self.host.split("://", 1)
# If the protocol in host matches the connection type, don't add
it again
@@ -84,7 +84,7 @@ class Connection:
host_to_use = host
protocol_to_add = protocol
else:
- host_to_use = self.host # type: ignore[assignment]
+ host_to_use = self.host
protocol_to_add = None
if protocol_to_add:
diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py
b/task-sdk/src/airflow/sdk/definitions/dag.py
index 6de58784654..3ca3e66571a 100644
--- a/task-sdk/src/airflow/sdk/definitions/dag.py
+++ b/task-sdk/src/airflow/sdk/definitions/dag.py
@@ -776,7 +776,7 @@ class DAG:
# deep-copying self.task_dict and self.task_group takes a long time,
and we don't want all
# the tasks anyway, so we copy the tasks manually later
memo = {id(self.task_dict): None, id(self.task_group): None}
- dag = copy.deepcopy(self, memo) # type: ignore
+ dag = copy.deepcopy(self, memo)
if isinstance(task_ids, str):
matched_tasks = [t for t in self.tasks if task_ids in t.task_id]
@@ -935,8 +935,8 @@ class DAG:
) or task_id in self.task_group.used_group_ids:
raise DuplicateTaskIdFound(f"Task id '{task_id}' has already been
added to the DAG")
self.task_dict[task_id] = task
- # TODO: Task-SDK: this type ignore shouldn't be needed!
- task.dag = self # type: ignore[assignment]
+
+ task.dag = self
# Add task_id to used_group_ids to prevent group_id and task_id
collisions.
self.task_group.used_group_ids.add(task_id)
@@ -1089,7 +1089,7 @@ class DAG:
dags=[self],
start_date=logical_date,
end_date=logical_date,
- dag_run_state=False, # type: ignore
+ dag_run_state=False,
)
log.debug("Getting dagrun for dag %s", self.dag_id)
diff --git a/task-sdk/src/airflow/sdk/definitions/decorators/setup_teardown.py
b/task-sdk/src/airflow/sdk/definitions/decorators/setup_teardown.py
index 48cb7732494..24c5483e155 100644
--- a/task-sdk/src/airflow/sdk/definitions/decorators/setup_teardown.py
+++ b/task-sdk/src/airflow/sdk/definitions/decorators/setup_teardown.py
@@ -54,7 +54,7 @@ def setup_task(func: Callable) -> Callable:
if isinstance(func, _TaskGroupFactory):
raise AirflowException("Task groups cannot be marked as setup or
teardown.")
func = cast("_TaskDecorator", func)
- func.is_setup = True # type: ignore[attr-defined] # TODO: Remove this
once mypy is bump to 1.16.1
+ func.is_setup = True
return func
@@ -80,9 +80,8 @@ def teardown_task(_func=None, *, on_failure_fail_dagrun: bool
= False) -> Callab
raise AirflowException("Task groups cannot be marked as setup or
teardown.")
func = cast("_TaskDecorator", func)
- # TODO: Remove below attr-defined once mypy is bump to 1.16.1
- func.is_teardown = True # type: ignore[attr-defined]
- func.on_failure_fail_dagrun = on_failure_fail_dagrun # type:
ignore[attr-defined]
+ func.is_teardown = True
+ func.on_failure_fail_dagrun = on_failure_fail_dagrun
return func
if _func is None:
diff --git a/task-sdk/src/airflow/sdk/definitions/mappedoperator.py
b/task-sdk/src/airflow/sdk/definitions/mappedoperator.py
index c6ac5154d2f..1ad118ea51b 100644
--- a/task-sdk/src/airflow/sdk/definitions/mappedoperator.py
+++ b/task-sdk/src/airflow/sdk/definitions/mappedoperator.py
@@ -393,7 +393,7 @@ class MappedOperator(AbstractOperator):
return self.partial_kwargs.get("task_display_name") or self.task_id
@property
- def owner(self) -> str: # type: ignore[override]
+ def owner(self) -> str:
return self.partial_kwargs.get("owner", DEFAULT_OWNER)
@owner.setter
@@ -537,7 +537,7 @@ class MappedOperator(AbstractOperator):
self.partial_kwargs["retry_exponential_backoff"] = value
@property
- def priority_weight(self) -> int: # type: ignore[override]
+ def priority_weight(self) -> int:
return self.partial_kwargs.get("priority_weight",
DEFAULT_PRIORITY_WEIGHT)
@priority_weight.setter
@@ -545,7 +545,7 @@ class MappedOperator(AbstractOperator):
self.partial_kwargs["priority_weight"] = value
@property
- def weight_rule(self) -> PriorityWeightStrategy: # type: ignore[override]
+ def weight_rule(self) -> PriorityWeightStrategy:
return validate_and_load_priority_weight_strategy(
self.partial_kwargs.get("weight_rule", DEFAULT_WEIGHT_RULE)
)
@@ -626,20 +626,20 @@ class MappedOperator(AbstractOperator):
def executor_config(self) -> dict:
return self.partial_kwargs.get("executor_config", {})
- @property # type: ignore[override]
- def inlets(self) -> list[Any]: # type: ignore[override]
+ @property
+ def inlets(self) -> list[Any]:
return self.partial_kwargs.get("inlets", [])
@inlets.setter
- def inlets(self, value: list[Any]) -> None: # type: ignore[override]
+ def inlets(self, value: list[Any]) -> None:
self.partial_kwargs["inlets"] = value
- @property # type: ignore[override]
- def outlets(self) -> list[Any]: # type: ignore[override]
+ @property
+ def outlets(self) -> list[Any]:
return self.partial_kwargs.get("outlets", [])
@outlets.setter
- def outlets(self, value: list[Any]) -> None: # type: ignore[override]
+ def outlets(self, value: list[Any]) -> None:
self.partial_kwargs["outlets"] = value
@property
diff --git a/task-sdk/src/airflow/sdk/execution_time/context.py
b/task-sdk/src/airflow/sdk/execution_time/context.py
index 251d6861d63..29680582918 100644
--- a/task-sdk/src/airflow/sdk/execution_time/context.py
+++ b/task-sdk/src/airflow/sdk/execution_time/context.py
@@ -167,12 +167,11 @@ def _get_variable(key: str, deserialize_json: bool) ->
Any:
# enabled only if SecretCache.init() has been called first
from airflow.sdk.execution_time.supervisor import
ensure_secrets_backend_loaded
- var_val = None
backends = ensure_secrets_backend_loaded()
# iterate over backends if not in cache (or expired)
for secrets_backend in backends:
try:
- var_val = secrets_backend.get_variable(key=key) # type:
ignore[assignment]
+ var_val = secrets_backend.get_variable(key=key)
if var_val is not None:
if deserialize_json:
import json
diff --git a/task-sdk/src/airflow/sdk/execution_time/secrets_masker.py
b/task-sdk/src/airflow/sdk/execution_time/secrets_masker.py
index d800f16e615..e0d3d89fc85 100644
--- a/task-sdk/src/airflow/sdk/execution_time/secrets_masker.py
+++ b/task-sdk/src/airflow/sdk/execution_time/secrets_masker.py
@@ -255,7 +255,7 @@ class SecretsMasker(logging.Filter):
if isinstance(item, Enum):
return self._redact(item=item.value, name=name, depth=depth,
max_depth=max_depth)
if _is_v1_env_var(item) and hasattr(item, "to_dict"):
- tmp: dict = item.to_dict() # type: ignore[attr-defined] #
V1EnvVar has a to_dict method
+ tmp: dict = item.to_dict()
if should_hide_value_for_key(tmp.get("name", "")) and "value"
in tmp:
tmp["value"] = "***"
else:
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 5af102a786c..f893a8792c8 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -1414,7 +1414,7 @@ class InProcessTestSupervisor(ActivitySubprocess):
client = Client(base_url=None, token="", dry_run=True,
transport=api.transport)
# Mypy is wrong -- the setter accepts a string on the property setter!
`URLType = URL | str`
- client.base_url = "http://in-process.invalid./" # type:
ignore[assignment]
+ client.base_url = "http://in-process.invalid./"
return client
def send_msg(