This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 68558f343a2 Fix mypy errors in models (#58728)
68558f343a2 is described below

commit 68558f343a25ef731b9052d6cb1805ddc3befd84
Author: Vincent <[email protected]>
AuthorDate: Mon Dec 1 09:06:51 2025 -0500

    Fix mypy errors in models (#58728)
---
 .../airflow/api_fastapi/execution_api/datamodels/taskinstance.py   | 2 +-
 airflow-core/src/airflow/models/connection.py                      | 3 ++-
 airflow-core/src/airflow/models/dag.py                             | 7 ++++---
 airflow-core/src/airflow/models/serialized_dag.py                  | 2 +-
 airflow-core/src/airflow/models/taskinstance.py                    | 4 +---
 task-sdk/src/airflow/sdk/api/datamodels/_generated.py              | 2 +-
 6 files changed, 10 insertions(+), 10 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
index e576ac13740..56656bc7222 100644
--- 
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
+++ 
b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
@@ -140,7 +140,7 @@ class TIDeferredStatePayload(StrictBaseModel):
     trigger_timeout: timedelta | None = None
     next_method: str
     """The name of the method on the operator to call in the worker after the 
trigger has fired."""
-    next_kwargs: Annotated[dict[str, Any] | str, Field(default_factory=dict)]
+    next_kwargs: Annotated[dict[str, Any], Field(default_factory=dict)]
     """
     Kwargs to pass to the above method, either a plain dict or an encrypted 
string.
 
diff --git a/airflow-core/src/airflow/models/connection.py 
b/airflow-core/src/airflow/models/connection.py
index f64c38b5efe..5c57f53922e 100644
--- a/airflow-core/src/airflow/models/connection.py
+++ b/airflow-core/src/airflow/models/connection.py
@@ -363,8 +363,9 @@ class Connection(Base, LoggingMixin):
         """Password. The value is decrypted/encrypted when reading/setting the 
value."""
         return synonym("_password", descriptor=property(cls.get_password, 
cls.set_password))
 
-    def get_extra(self) -> str:
+    def get_extra(self) -> str | None:
         """Return encrypted extra-data."""
+        extra_val: str | None
         if self._extra and self.is_extra_encrypted:
             fernet = get_fernet()
             if not fernet.is_encrypted:
diff --git a/airflow-core/src/airflow/models/dag.py 
b/airflow-core/src/airflow/models/dag.py
index e43be246c15..16ce63aab9e 100644
--- a/airflow-core/src/airflow/models/dag.py
+++ b/airflow-core/src/airflow/models/dag.py
@@ -141,6 +141,8 @@ def get_run_data_interval(timetable: Timetable, run: 
DagRun) -> DataInterval:
 
     # Compatibility: runs created before AIP-39 implementation don't have an
     # explicit data interval. Try to infer from the logical date.
+    if TYPE_CHECKING:
+        assert run.logical_date is not None
     return infer_automated_data_interval(timetable, run.logical_date)
 
 
@@ -521,14 +523,13 @@ class DagModel(Base):
         :param session: ORM Session
         :return: Paused Dag_ids
         """
-        paused_dag_ids = session.execute(
+        paused_dag_ids = session.scalars(
             select(DagModel.dag_id)
             .where(DagModel.is_paused == expression.true())
             .where(DagModel.dag_id.in_(dag_ids))
         )
 
-        paused_dag_ids = {paused_dag_id for (paused_dag_id,) in paused_dag_ids}
-        return paused_dag_ids
+        return set(paused_dag_ids)
 
     @property
     def safe_dag_id(self):
diff --git a/airflow-core/src/airflow/models/serialized_dag.py 
b/airflow-core/src/airflow/models/serialized_dag.py
index 191d36c6737..a5cf8a21a07 100644
--- a/airflow-core/src/airflow/models/serialized_dag.py
+++ b/airflow-core/src/airflow/models/serialized_dag.py
@@ -335,7 +335,7 @@ class SerializedDagModel(Base):
 
         # serve as cache so no need to decompress and load, when accessing 
data field
         # when COMPRESS_SERIALIZED_DAGS is True
-        self.__data_cache = dag_data
+        self.__data_cache: dict[Any, Any] | None = dag_data
 
     def __repr__(self) -> str:
         return f"<SerializedDag: {self.dag_id}>"
diff --git a/airflow-core/src/airflow/models/taskinstance.py 
b/airflow-core/src/airflow/models/taskinstance.py
index 5e873096712..6d11d1472af 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -438,9 +438,7 @@ class TaskInstance(Base, LoggingMixin):
     # The method to call next, and any extra arguments to pass to it.
     # Usually used when resuming from DEFERRED.
     next_method: Mapped[str | None] = mapped_column(String(1000), 
nullable=True)
-    next_kwargs: Mapped[dict | str | None] = mapped_column(
-        MutableDict.as_mutable(ExtendedJSON), nullable=True
-    )
+    next_kwargs: Mapped[dict | None] = 
mapped_column(MutableDict.as_mutable(ExtendedJSON), nullable=True)
 
     _task_display_property_value: Mapped[str | None] = mapped_column(
         "task_display_name", String(2000), nullable=True
diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py 
b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
index 3b263416127..4b38ff8bb50 100644
--- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
+++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
@@ -184,7 +184,7 @@ class TIDeferredStatePayload(BaseModel):
     trigger_kwargs: Annotated[dict[str, Any] | str | None, 
Field(title="Trigger Kwargs")] = None
     trigger_timeout: Annotated[timedelta | None, Field(title="Trigger 
Timeout")] = None
     next_method: Annotated[str, Field(title="Next Method")]
-    next_kwargs: Annotated[dict[str, Any] | str | None, Field(title="Next 
Kwargs")] = None
+    next_kwargs: Annotated[dict[str, Any] | None, Field(title="Next Kwargs")] 
= None
     rendered_map_index: Annotated[str | None, Field(title="Rendered Map 
Index")] = None
 
 

Reply via email to