This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 68558f343a2 Fix mypy errors in models (#58728)
68558f343a2 is described below
commit 68558f343a25ef731b9052d6cb1805ddc3befd84
Author: Vincent <[email protected]>
AuthorDate: Mon Dec 1 09:06:51 2025 -0500
Fix mypy errors in models (#58728)
---
.../airflow/api_fastapi/execution_api/datamodels/taskinstance.py | 2 +-
airflow-core/src/airflow/models/connection.py | 3 ++-
airflow-core/src/airflow/models/dag.py | 7 ++++---
airflow-core/src/airflow/models/serialized_dag.py | 2 +-
airflow-core/src/airflow/models/taskinstance.py | 4 +---
task-sdk/src/airflow/sdk/api/datamodels/_generated.py | 2 +-
6 files changed, 10 insertions(+), 10 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
index e576ac13740..56656bc7222 100644
---
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
+++
b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
@@ -140,7 +140,7 @@ class TIDeferredStatePayload(StrictBaseModel):
trigger_timeout: timedelta | None = None
next_method: str
"""The name of the method on the operator to call in the worker after the
trigger has fired."""
- next_kwargs: Annotated[dict[str, Any] | str, Field(default_factory=dict)]
+ next_kwargs: Annotated[dict[str, Any], Field(default_factory=dict)]
"""
Kwargs to pass to the above method, either a plain dict or an encrypted
string.
diff --git a/airflow-core/src/airflow/models/connection.py
b/airflow-core/src/airflow/models/connection.py
index f64c38b5efe..5c57f53922e 100644
--- a/airflow-core/src/airflow/models/connection.py
+++ b/airflow-core/src/airflow/models/connection.py
@@ -363,8 +363,9 @@ class Connection(Base, LoggingMixin):
"""Password. The value is decrypted/encrypted when reading/setting the
value."""
return synonym("_password", descriptor=property(cls.get_password,
cls.set_password))
- def get_extra(self) -> str:
+ def get_extra(self) -> str | None:
"""Return encrypted extra-data."""
+ extra_val: str | None
if self._extra and self.is_extra_encrypted:
fernet = get_fernet()
if not fernet.is_encrypted:
diff --git a/airflow-core/src/airflow/models/dag.py
b/airflow-core/src/airflow/models/dag.py
index e43be246c15..16ce63aab9e 100644
--- a/airflow-core/src/airflow/models/dag.py
+++ b/airflow-core/src/airflow/models/dag.py
@@ -141,6 +141,8 @@ def get_run_data_interval(timetable: Timetable, run:
DagRun) -> DataInterval:
# Compatibility: runs created before AIP-39 implementation don't have an
# explicit data interval. Try to infer from the logical date.
+ if TYPE_CHECKING:
+ assert run.logical_date is not None
return infer_automated_data_interval(timetable, run.logical_date)
@@ -521,14 +523,13 @@ class DagModel(Base):
:param session: ORM Session
:return: Paused Dag_ids
"""
- paused_dag_ids = session.execute(
+ paused_dag_ids = session.scalars(
select(DagModel.dag_id)
.where(DagModel.is_paused == expression.true())
.where(DagModel.dag_id.in_(dag_ids))
)
- paused_dag_ids = {paused_dag_id for (paused_dag_id,) in paused_dag_ids}
- return paused_dag_ids
+ return set(paused_dag_ids)
@property
def safe_dag_id(self):
diff --git a/airflow-core/src/airflow/models/serialized_dag.py
b/airflow-core/src/airflow/models/serialized_dag.py
index 191d36c6737..a5cf8a21a07 100644
--- a/airflow-core/src/airflow/models/serialized_dag.py
+++ b/airflow-core/src/airflow/models/serialized_dag.py
@@ -335,7 +335,7 @@ class SerializedDagModel(Base):
# serve as cache so no need to decompress and load, when accessing
data field
# when COMPRESS_SERIALIZED_DAGS is True
- self.__data_cache = dag_data
+ self.__data_cache: dict[Any, Any] | None = dag_data
def __repr__(self) -> str:
return f"<SerializedDag: {self.dag_id}>"
diff --git a/airflow-core/src/airflow/models/taskinstance.py
b/airflow-core/src/airflow/models/taskinstance.py
index 5e873096712..6d11d1472af 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -438,9 +438,7 @@ class TaskInstance(Base, LoggingMixin):
# The method to call next, and any extra arguments to pass to it.
# Usually used when resuming from DEFERRED.
next_method: Mapped[str | None] = mapped_column(String(1000),
nullable=True)
- next_kwargs: Mapped[dict | str | None] = mapped_column(
- MutableDict.as_mutable(ExtendedJSON), nullable=True
- )
+ next_kwargs: Mapped[dict | None] =
mapped_column(MutableDict.as_mutable(ExtendedJSON), nullable=True)
_task_display_property_value: Mapped[str | None] = mapped_column(
"task_display_name", String(2000), nullable=True
diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
index 3b263416127..4b38ff8bb50 100644
--- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
+++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
@@ -184,7 +184,7 @@ class TIDeferredStatePayload(BaseModel):
trigger_kwargs: Annotated[dict[str, Any] | str | None,
Field(title="Trigger Kwargs")] = None
trigger_timeout: Annotated[timedelta | None, Field(title="Trigger
Timeout")] = None
next_method: Annotated[str, Field(title="Next Method")]
- next_kwargs: Annotated[dict[str, Any] | str | None, Field(title="Next
Kwargs")] = None
+ next_kwargs: Annotated[dict[str, Any] | None, Field(title="Next Kwargs")]
= None
rendered_map_index: Annotated[str | None, Field(title="Rendered Map
Index")] = None