This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 120b49aa3c9 Fix exceptions of positional session use in airflow-core 
ti-deps (#67776)
120b49aa3c9 is described below

commit 120b49aa3c9a7470a0a23129bc466703d54b3eba
Author: Jens Scheffler <[email protected]>
AuthorDate: Sat May 30 20:26:53 2026 +0200

    Fix exceptions of positional session use in airflow-core ti-deps (#67776)
---
 airflow-core/src/airflow/models/taskinstance.py    |  2 +-
 .../src/airflow/ti_deps/deps/base_ti_dep.py        | 14 +++++++------
 .../ti_deps/deps/dag_ti_slots_available_dep.py     |  2 +-
 .../src/airflow/ti_deps/deps/dag_unpaused_dep.py   |  2 +-
 .../src/airflow/ti_deps/deps/dagrun_exists_dep.py  |  2 +-
 .../ti_deps/deps/exec_date_after_start_date_dep.py |  2 +-
 .../airflow/ti_deps/deps/mapped_task_expanded.py   |  2 +-
 .../ti_deps/deps/mapped_task_upstream_dep.py       |  3 ++-
 .../ti_deps/deps/not_in_retry_period_dep.py        |  2 +-
 .../ti_deps/deps/not_previously_skipped_dep.py     |  2 +-
 .../ti_deps/deps/pool_slots_available_dep.py       |  2 +-
 .../src/airflow/ti_deps/deps/prev_dagrun_dep.py    |  2 +-
 .../airflow/ti_deps/deps/ready_to_reschedule.py    |  3 ++-
 .../airflow/ti_deps/deps/runnable_exec_date_dep.py |  2 +-
 .../airflow/ti_deps/deps/task_concurrency_dep.py   |  2 +-
 .../airflow/ti_deps/deps/task_not_running_dep.py   |  2 +-
 .../src/airflow/ti_deps/deps/trigger_rule_dep.py   |  3 ++-
 .../src/airflow/ti_deps/deps/valid_state_dep.py    |  2 +-
 .../deps/test_not_previously_skipped_dep.py        | 24 +++++++++++-----------
 .../ci/prek/known_provide_session_positional.txt   | 13 ------------
 20 files changed, 40 insertions(+), 48 deletions(-)

diff --git a/airflow-core/src/airflow/models/taskinstance.py 
b/airflow-core/src/airflow/models/taskinstance.py
index 9cc8d6a6271..cd254e53a83 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -1135,7 +1135,7 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload):
             assert self.task is not None
         dep_context = dep_context or DepContext()
         for dep in dep_context.deps | self.task.deps:
-            for dep_status in dep.get_dep_statuses(self, session, dep_context):
+            for dep_status in dep.get_dep_statuses(self, dep_context, 
session=session):
                 self.log.debug(
                     "%s dependency '%s' PASSED: %s, %s",
                     self,
diff --git a/airflow-core/src/airflow/ti_deps/deps/base_ti_dep.py 
b/airflow-core/src/airflow/ti_deps/deps/base_ti_dep.py
index 61c47539d38..92be620e853 100644
--- a/airflow-core/src/airflow/ti_deps/deps/base_ti_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/base_ti_dep.py
@@ -71,8 +71,9 @@ class BaseTIDep:
     def _get_dep_statuses(
         self,
         ti: TaskInstance,
-        session: Session,
         dep_context: DepContext,
+        *,
+        session: Session,
     ) -> Iterator[TIDepStatus]:
         """
         Abstract method that returns an iterable of TIDepStatus objects.
@@ -92,8 +93,9 @@ class BaseTIDep:
     def get_dep_statuses(
         self,
         ti: TaskInstance,
-        session: Session,
         dep_context: DepContext | None = None,
+        *,
+        session: Session,
     ) -> Iterator[TIDepStatus]:
         """
         Wrap around the private _get_dep_statuses method.
@@ -101,8 +103,8 @@ class BaseTIDep:
         Contains some global checks for all dependencies.
 
         :param ti: the task instance to get the dependency status for
-        :param session: database session
         :param dep_context: the context for which this dependency should be 
evaluated for
+        :param session: database session
         """
         cxt = DepContext() if dep_context is None else dep_context
 
@@ -114,10 +116,10 @@ class BaseTIDep:
             yield self._passing_status(reason="Context specified all task 
dependencies should be ignored.")
             return
 
-        yield from self._get_dep_statuses(ti, session, cxt)
+        yield from self._get_dep_statuses(ti, cxt, session=session)
 
     @provide_session
-    def is_met(self, ti: TaskInstance, session: Session, dep_context: 
DepContext | None = None) -> bool:
+    def is_met(self, ti: TaskInstance, dep_context: DepContext | None = None, 
*, session: Session) -> bool:
         """
         Return whether a dependency is met for a given task instance.
 
@@ -131,7 +133,7 @@ class BaseTIDep:
 
         :meta private:
         """
-        return all(status.passed for status in self.get_dep_statuses(ti, 
session, dep_context))
+        return all(status.passed for status in self.get_dep_statuses(ti, 
dep_context, session=session))
 
     def _failing_status(self, reason: str = "") -> TIDepStatus:
         return TIDepStatus(self.name, False, reason)
diff --git 
a/airflow-core/src/airflow/ti_deps/deps/dag_ti_slots_available_dep.py 
b/airflow-core/src/airflow/ti_deps/deps/dag_ti_slots_available_dep.py
index 550695afeb0..6fb370eabd2 100644
--- a/airflow-core/src/airflow/ti_deps/deps/dag_ti_slots_available_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/dag_ti_slots_available_dep.py
@@ -28,7 +28,7 @@ class DagTISlotsAvailableDep(BaseTIDep):
     IGNORABLE = True
 
     @provide_session
-    def _get_dep_statuses(self, ti, session, dep_context):
+    def _get_dep_statuses(self, ti, dep_context, *, session):
         if ti.task.dag.get_concurrency_reached(session):
             yield self._failing_status(
                 reason=(
diff --git a/airflow-core/src/airflow/ti_deps/deps/dag_unpaused_dep.py 
b/airflow-core/src/airflow/ti_deps/deps/dag_unpaused_dep.py
index 5dec655d4e0..a42ec73150f 100644
--- a/airflow-core/src/airflow/ti_deps/deps/dag_unpaused_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/dag_unpaused_dep.py
@@ -37,6 +37,6 @@ class DagUnpausedDep(BaseTIDep):
         return session.scalar(select(DagModel.is_paused).where(DagModel.dag_id 
== dag_id))
 
     @provide_session
-    def _get_dep_statuses(self, ti, session, dep_context):
+    def _get_dep_statuses(self, ti, dep_context, *, session):
         if self._is_dag_paused(ti.dag_id, session):
             yield self._failing_status(reason=f"Task's DAG '{ti.dag_id}' is 
paused.")
diff --git a/airflow-core/src/airflow/ti_deps/deps/dagrun_exists_dep.py 
b/airflow-core/src/airflow/ti_deps/deps/dagrun_exists_dep.py
index 0a364628c7f..a97b2714eaa 100644
--- a/airflow-core/src/airflow/ti_deps/deps/dagrun_exists_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/dagrun_exists_dep.py
@@ -29,7 +29,7 @@ class DagrunRunningDep(BaseTIDep):
     IGNORABLE = True
 
     @provide_session
-    def _get_dep_statuses(self, ti, session, dep_context):
+    def _get_dep_statuses(self, ti, dep_context, *, session):
         dr = ti.get_dagrun(session)
         if dr.state != DagRunState.RUNNING:
             yield self._failing_status(
diff --git 
a/airflow-core/src/airflow/ti_deps/deps/exec_date_after_start_date_dep.py 
b/airflow-core/src/airflow/ti_deps/deps/exec_date_after_start_date_dep.py
index c96615247cc..1e7d8076206 100644
--- a/airflow-core/src/airflow/ti_deps/deps/exec_date_after_start_date_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/exec_date_after_start_date_dep.py
@@ -28,7 +28,7 @@ class ExecDateAfterStartDateDep(BaseTIDep):
     IGNORABLE = True
 
     @provide_session
-    def _get_dep_statuses(self, ti, session, dep_context):
+    def _get_dep_statuses(self, ti, dep_context, *, session):
         if ti.task.start_date and ti.logical_date and ti.logical_date < 
ti.task.start_date:
             yield self._failing_status(
                 reason=(
diff --git a/airflow-core/src/airflow/ti_deps/deps/mapped_task_expanded.py 
b/airflow-core/src/airflow/ti_deps/deps/mapped_task_expanded.py
index 8138de9f9e4..f3437376162 100644
--- a/airflow-core/src/airflow/ti_deps/deps/mapped_task_expanded.py
+++ b/airflow-core/src/airflow/ti_deps/deps/mapped_task_expanded.py
@@ -27,7 +27,7 @@ class MappedTaskIsExpanded(BaseTIDep):
     IGNORABLE = False
     IS_TASK_DEP = False
 
-    def _get_dep_statuses(self, ti, session, dep_context):
+    def _get_dep_statuses(self, ti, dep_context, *, session):
         if dep_context.ignore_unmapped_tasks:
             return
         if ti.map_index == -1:
diff --git a/airflow-core/src/airflow/ti_deps/deps/mapped_task_upstream_dep.py 
b/airflow-core/src/airflow/ti_deps/deps/mapped_task_upstream_dep.py
index 966985bd782..062be5f872a 100644
--- a/airflow-core/src/airflow/ti_deps/deps/mapped_task_upstream_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/mapped_task_upstream_dep.py
@@ -49,8 +49,9 @@ class MappedTaskUpstreamDep(BaseTIDep):
     def _get_dep_statuses(
         self,
         ti: TaskInstance,
-        session: Session,
         dep_context: DepContext,
+        *,
+        session: Session,
     ) -> Iterator[TIDepStatus]:
         from airflow.models.taskinstance import TaskInstance
         from airflow.serialization.definitions.mappedoperator import is_mapped
diff --git a/airflow-core/src/airflow/ti_deps/deps/not_in_retry_period_dep.py 
b/airflow-core/src/airflow/ti_deps/deps/not_in_retry_period_dep.py
index 1013fecdfb8..ce20fddf40c 100644
--- a/airflow-core/src/airflow/ti_deps/deps/not_in_retry_period_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/not_in_retry_period_dep.py
@@ -31,7 +31,7 @@ class NotInRetryPeriodDep(BaseTIDep):
     IS_TASK_DEP = True
 
     @provide_session
-    def _get_dep_statuses(self, ti, session, dep_context):
+    def _get_dep_statuses(self, ti, dep_context, *, session):
         if dep_context.ignore_in_retry_period:
             yield self._passing_status(
                 reason="The context specified that being in a retry period was 
permitted."
diff --git 
a/airflow-core/src/airflow/ti_deps/deps/not_previously_skipped_dep.py 
b/airflow-core/src/airflow/ti_deps/deps/not_previously_skipped_dep.py
index 907d9f02713..e00ca976d53 100644
--- a/airflow-core/src/airflow/ti_deps/deps/not_previously_skipped_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/not_previously_skipped_dep.py
@@ -43,7 +43,7 @@ class NotPreviouslySkippedDep(BaseTIDep):
     IGNORABLE = True
     IS_TASK_DEP = True
 
-    def _get_dep_statuses(self, ti, session, dep_context):
+    def _get_dep_statuses(self, ti, dep_context, *, session):
         from airflow.utils.state import TaskInstanceState
 
         upstream = ti.task.get_direct_relatives(upstream=True)
diff --git a/airflow-core/src/airflow/ti_deps/deps/pool_slots_available_dep.py 
b/airflow-core/src/airflow/ti_deps/deps/pool_slots_available_dep.py
index 564557acdec..b70443cf2c1 100644
--- a/airflow-core/src/airflow/ti_deps/deps/pool_slots_available_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/pool_slots_available_dep.py
@@ -33,7 +33,7 @@ class PoolSlotsAvailableDep(BaseTIDep):
     IGNORABLE = True
 
     @provide_session
-    def _get_dep_statuses(self, ti, session, dep_context=None):
+    def _get_dep_statuses(self, ti, dep_context=None, *, session):
         """
         Determine if the pool task instance is in has available slots.
 
diff --git a/airflow-core/src/airflow/ti_deps/deps/prev_dagrun_dep.py 
b/airflow-core/src/airflow/ti_deps/deps/prev_dagrun_dep.py
index 6cb2390a5dc..397746d13a1 100644
--- a/airflow-core/src/airflow/ti_deps/deps/prev_dagrun_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/prev_dagrun_dep.py
@@ -126,7 +126,7 @@ class PrevDagrunDep(BaseTIDep):
         )
 
     @provide_session
-    def _get_dep_statuses(self, ti: TI, session: Session, dep_context):
+    def _get_dep_statuses(self, ti: TI, dep_context, *, session: Session):
         if TYPE_CHECKING:
             assert ti.task
         if dep_context.ignore_depends_on_past:
diff --git a/airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py 
b/airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py
index 6f611c43d61..384ee0bff48 100644
--- a/airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py
+++ b/airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py
@@ -47,8 +47,9 @@ class ReadyToRescheduleDep(BaseTIDep):
     def _get_dep_statuses(
         self,
         ti: TaskInstance,
-        session: Session,
         dep_context: DepContext,
+        *,
+        session: Session,
     ) -> Iterator[TIDepStatus]:
         """
         Determine whether a task is ready to be rescheduled.
diff --git a/airflow-core/src/airflow/ti_deps/deps/runnable_exec_date_dep.py 
b/airflow-core/src/airflow/ti_deps/deps/runnable_exec_date_dep.py
index 396f8180d1b..7ed248fca09 100644
--- a/airflow-core/src/airflow/ti_deps/deps/runnable_exec_date_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/runnable_exec_date_dep.py
@@ -29,7 +29,7 @@ class RunnableExecDateDep(BaseTIDep):
     IGNORABLE = True
 
     @provide_session
-    def _get_dep_statuses(self, ti, session, dep_context):
+    def _get_dep_statuses(self, ti, dep_context, *, session):
         logical_date = ti.get_dagrun(session).logical_date
         if logical_date is None:
             return
diff --git a/airflow-core/src/airflow/ti_deps/deps/task_concurrency_dep.py 
b/airflow-core/src/airflow/ti_deps/deps/task_concurrency_dep.py
index 9b2ecaba888..0ab5fca6355 100644
--- a/airflow-core/src/airflow/ti_deps/deps/task_concurrency_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/task_concurrency_dep.py
@@ -29,7 +29,7 @@ class TaskConcurrencyDep(BaseTIDep):
     IS_TASK_DEP = True
 
     @provide_session
-    def _get_dep_statuses(self, ti, session, dep_context):
+    def _get_dep_statuses(self, ti, dep_context, *, session):
         if ti.task.max_active_tis_per_dag is None and 
ti.task.max_active_tis_per_dagrun is None:
             yield self._passing_status(reason="Task concurrency is not set.")
             return
diff --git a/airflow-core/src/airflow/ti_deps/deps/task_not_running_dep.py 
b/airflow-core/src/airflow/ti_deps/deps/task_not_running_dep.py
index 865fab0ab8d..0a78d7d791f 100644
--- a/airflow-core/src/airflow/ti_deps/deps/task_not_running_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/task_not_running_dep.py
@@ -39,7 +39,7 @@ class TaskNotRunningDep(BaseTIDep):
         return hash(type(self))
 
     @provide_session
-    def _get_dep_statuses(self, ti, session, dep_context=None):
+    def _get_dep_statuses(self, ti, dep_context=None, *, session):
         if ti.state != TaskInstanceState.RUNNING:
             yield self._passing_status(reason="Task is not in running state.")
             return
diff --git a/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py 
b/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
index eab9792ba32..5ddba214dd3 100644
--- a/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
@@ -101,8 +101,9 @@ class TriggerRuleDep(BaseTIDep):
     def _get_dep_statuses(
         self,
         ti: TaskInstance,
-        session: Session,
         dep_context: DepContext,
+        *,
+        session: Session,
     ) -> Iterator[TIDepStatus]:
         if TYPE_CHECKING:
             assert ti.task
diff --git a/airflow-core/src/airflow/ti_deps/deps/valid_state_dep.py 
b/airflow-core/src/airflow/ti_deps/deps/valid_state_dep.py
index 7e3b58cb29b..59fcb19a66a 100644
--- a/airflow-core/src/airflow/ti_deps/deps/valid_state_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/valid_state_dep.py
@@ -52,7 +52,7 @@ class ValidStateDep(BaseTIDep):
         return hash((type(self), tuple(self._valid_states)))
 
     @provide_session
-    def _get_dep_statuses(self, ti, session, dep_context):
+    def _get_dep_statuses(self, ti, dep_context, *, session):
         if dep_context.ignore_ti_state:
             yield self._passing_status(reason="Context specified that state 
should be ignored.")
             return
diff --git 
a/airflow-core/tests/unit/ti_deps/deps/test_not_previously_skipped_dep.py 
b/airflow-core/tests/unit/ti_deps/deps/test_not_previously_skipped_dep.py
index da74cd21c4a..58ccce35dd8 100644
--- a/airflow-core/tests/unit/ti_deps/deps/test_not_previously_skipped_dep.py
+++ b/airflow-core/tests/unit/ti_deps/deps/test_not_previously_skipped_dep.py
@@ -62,8 +62,8 @@ def test_no_parent(session, dag_maker):
     (ti1,) = dag_maker.create_dagrun(logical_date=start_date).task_instances
 
     dep = NotPreviouslySkippedDep()
-    assert len(list(dep.get_dep_statuses(ti1, session, DepContext()))) == 0
-    assert dep.is_met(ti1, session)
+    assert len(list(dep.get_dep_statuses(ti1, DepContext(), session=session))) 
== 0
+    assert dep.is_met(ti1, session=session)
     assert ti1.state != State.SKIPPED
 
 
@@ -85,8 +85,8 @@ def test_no_skipmixin_parent(session, dag_maker):
     _, ti2 = dag_maker.create_dagrun().task_instances
 
     dep = NotPreviouslySkippedDep()
-    assert len(list(dep.get_dep_statuses(ti2, session, DepContext()))) == 0
-    assert dep.is_met(ti2, session)
+    assert len(list(dep.get_dep_statuses(ti2, DepContext(), session=session))) 
== 0
+    assert dep.is_met(ti2, session=session)
     assert ti2.state != State.SKIPPED
 
 
@@ -110,8 +110,8 @@ def test_parent_follow_branch(session, dag_maker):
     run_task_instance(ti, op1)
 
     dep = NotPreviouslySkippedDep()
-    assert len(list(dep.get_dep_statuses(ti2, session, DepContext()))) == 0
-    assert dep.is_met(ti2, session)
+    assert len(list(dep.get_dep_statuses(ti2, DepContext(), session=session))) 
== 0
+    assert dep.is_met(ti2, session=session)
     assert ti2.state != State.SKIPPED
 
 
@@ -138,8 +138,8 @@ def test_parent_skip_branch(session, dag_maker):
     run_task_instance(tis["op1"], op1)
 
     dep = NotPreviouslySkippedDep()
-    assert len(list(dep.get_dep_statuses(tis["op2"], session, DepContext()))) 
== 1
-    assert not dep.is_met(tis["op2"], session)
+    assert len(list(dep.get_dep_statuses(tis["op2"], DepContext(), 
session=session))) == 1
+    assert not dep.is_met(tis["op2"], session=session)
     assert tis["op2"].state == State.SKIPPED
 
 
@@ -163,8 +163,8 @@ def test_parent_not_executed(session, dag_maker):
     _, ti2, _ = dag_maker.create_dagrun().task_instances
 
     dep = NotPreviouslySkippedDep()
-    assert len(list(dep.get_dep_statuses(ti2, session, DepContext()))) == 0
-    assert dep.is_met(ti2, session)
+    assert len(list(dep.get_dep_statuses(ti2, DepContext(), session=session))) 
== 0
+    assert dep.is_met(ti2, session=session)
     assert ti2.state == State.NONE
 
 
@@ -211,6 +211,6 @@ def test_unmapped_parent_skip_mapped_downstream(session, 
dag_maker):
     session.flush()
 
     dep = NotPreviouslySkippedDep()
-    assert len(list(dep.get_dep_statuses(tis["op2"], session, DepContext()))) 
== 1
-    assert not dep.is_met(tis["op2"], session)
+    assert len(list(dep.get_dep_statuses(tis["op2"], DepContext(), 
session=session))) == 1
+    assert not dep.is_met(tis["op2"], session=session)
     assert tis["op2"].state == State.SKIPPED
diff --git a/scripts/ci/prek/known_provide_session_positional.txt 
b/scripts/ci/prek/known_provide_session_positional.txt
index 73b1e8ddc79..63ae75089a3 100644
--- a/scripts/ci/prek/known_provide_session_positional.txt
+++ b/scripts/ci/prek/known_provide_session_positional.txt
@@ -20,19 +20,6 @@ airflow-core/src/airflow/models/trigger.py::7
 airflow-core/src/airflow/models/variable.py::2
 airflow-core/src/airflow/secrets/metastore.py::2
 airflow-core/src/airflow/serialization/definitions/dag.py::2
-airflow-core/src/airflow/ti_deps/deps/base_ti_dep.py::2
-airflow-core/src/airflow/ti_deps/deps/dag_ti_slots_available_dep.py::1
-airflow-core/src/airflow/ti_deps/deps/dag_unpaused_dep.py::1
-airflow-core/src/airflow/ti_deps/deps/dagrun_exists_dep.py::1
-airflow-core/src/airflow/ti_deps/deps/exec_date_after_start_date_dep.py::1
-airflow-core/src/airflow/ti_deps/deps/not_in_retry_period_dep.py::1
-airflow-core/src/airflow/ti_deps/deps/pool_slots_available_dep.py::1
-airflow-core/src/airflow/ti_deps/deps/prev_dagrun_dep.py::1
-airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py::1
-airflow-core/src/airflow/ti_deps/deps/runnable_exec_date_dep.py::1
-airflow-core/src/airflow/ti_deps/deps/task_concurrency_dep.py::1
-airflow-core/src/airflow/ti_deps/deps/task_not_running_dep.py::1
-airflow-core/src/airflow/ti_deps/deps/valid_state_dep.py::1
 airflow-core/src/airflow/utils/cli_action_loggers.py::1
 airflow-core/src/airflow/utils/db.py::7
 airflow-core/src/airflow/utils/db_cleanup.py::2

Reply via email to