This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b4c6b8b291b Fix mypy static errors in 
`airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py` (#58651)
b4c6b8b291b is described below

commit b4c6b8b291b59909dc962a96cc448f2d1ba494b2
Author: Vincent <[email protected]>
AuthorDate: Mon Nov 24 18:42:43 2025 -0500

    Fix mypy static errors in 
`airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py` (#58651)
---
 airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)

diff --git a/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py 
b/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
index 7bac5adb00a..3c0fc3be5c0 100644
--- a/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
@@ -20,7 +20,7 @@ from __future__ import annotations
 import collections.abc
 import functools
 from collections import Counter
-from collections.abc import Iterator, KeysView, Mapping
+from collections.abc import Iterator, KeysView, Mapping, Sequence
 from typing import TYPE_CHECKING, NamedTuple
 
 from sqlalchemy import and_, func, or_, select
@@ -33,7 +33,7 @@ from airflow.utils.state import TaskInstanceState
 if TYPE_CHECKING:
     from sqlalchemy.engine import Row
     from sqlalchemy.orm import Session
-    from sqlalchemy.sql.expression import ColumnOperators
+    from sqlalchemy.sql import ColumnElement
 
     from airflow.models.mappedoperator import MappedOperator
     from airflow.models.taskinstance import TaskInstance
@@ -74,6 +74,7 @@ class _UpstreamTIStates(NamedTuple):
         for ti in finished_upstreams:
             if TYPE_CHECKING:
                 assert ti.task
+                assert ti.state
             curr_state = {ti.state: 1}
             counter.update(curr_state)
             if ti.task.is_setup:
@@ -223,7 +224,7 @@ class TriggerRuleDep(BaseTIDep):
                 return True
             return False
 
-        def _iter_upstream_conditions(relevant_tasks: dict) -> 
Iterator[ColumnOperators]:
+        def _iter_upstream_conditions(relevant_tasks: dict) -> 
Iterator[ColumnElement]:
             # Optimization: If the current task is not in a mapped task group,
             # it depends on all upstream task instances.
             from airflow.models.taskinstance import TaskInstance
@@ -371,7 +372,7 @@ class TriggerRuleDep(BaseTIDep):
                 upstream = len(upstream_tasks)
                 upstream_setup = sum(1 for x in upstream_tasks.values() if 
x.is_setup)
             else:
-                task_id_counts: list[Row[tuple[str, int]]] = session.execute(
+                task_id_counts: Sequence[Row[tuple[str, int]]] = 
session.execute(
                     select(TaskInstance.task_id, 
func.count(TaskInstance.task_id))
                     .where(TaskInstance.dag_id == ti.dag_id, 
TaskInstance.run_id == ti.run_id)
                     
.where(or_(*_iter_upstream_conditions(relevant_tasks=upstream_tasks)))

Reply via email to