This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b4c6b8b291b Fix mypy static errors in
`airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py` (#58651)
b4c6b8b291b is described below
commit b4c6b8b291b59909dc962a96cc448f2d1ba494b2
Author: Vincent <[email protected]>
AuthorDate: Mon Nov 24 18:42:43 2025 -0500
Fix mypy static errors in
`airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py` (#58651)
---
airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py | 9 +++++----
1 file changed, 5 insertions(+), 4 deletions(-)
diff --git a/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
b/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
index 7bac5adb00a..3c0fc3be5c0 100644
--- a/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
@@ -20,7 +20,7 @@ from __future__ import annotations
import collections.abc
import functools
from collections import Counter
-from collections.abc import Iterator, KeysView, Mapping
+from collections.abc import Iterator, KeysView, Mapping, Sequence
from typing import TYPE_CHECKING, NamedTuple
from sqlalchemy import and_, func, or_, select
@@ -33,7 +33,7 @@ from airflow.utils.state import TaskInstanceState
if TYPE_CHECKING:
from sqlalchemy.engine import Row
from sqlalchemy.orm import Session
- from sqlalchemy.sql.expression import ColumnOperators
+ from sqlalchemy.sql import ColumnElement
from airflow.models.mappedoperator import MappedOperator
from airflow.models.taskinstance import TaskInstance
@@ -74,6 +74,7 @@ class _UpstreamTIStates(NamedTuple):
for ti in finished_upstreams:
if TYPE_CHECKING:
assert ti.task
+ assert ti.state
curr_state = {ti.state: 1}
counter.update(curr_state)
if ti.task.is_setup:
@@ -223,7 +224,7 @@ class TriggerRuleDep(BaseTIDep):
return True
return False
- def _iter_upstream_conditions(relevant_tasks: dict) ->
Iterator[ColumnOperators]:
+ def _iter_upstream_conditions(relevant_tasks: dict) ->
Iterator[ColumnElement]:
# Optimization: If the current task is not in a mapped task group,
# it depends on all upstream task instances.
from airflow.models.taskinstance import TaskInstance
@@ -371,7 +372,7 @@ class TriggerRuleDep(BaseTIDep):
upstream = len(upstream_tasks)
upstream_setup = sum(1 for x in upstream_tasks.values() if
x.is_setup)
else:
- task_id_counts: list[Row[tuple[str, int]]] = session.execute(
+ task_id_counts: Sequence[Row[tuple[str, int]]] =
session.execute(
select(TaskInstance.task_id,
func.count(TaskInstance.task_id))
.where(TaskInstance.dag_id == ti.dag_id,
TaskInstance.run_id == ti.run_id)
.where(or_(*_iter_upstream_conditions(relevant_tasks=upstream_tasks)))