This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 120b49aa3c9 Fix exceptions of positional session use in airflow-core
ti-deps (#67776)
120b49aa3c9 is described below
commit 120b49aa3c9a7470a0a23129bc466703d54b3eba
Author: Jens Scheffler <[email protected]>
AuthorDate: Sat May 30 20:26:53 2026 +0200
Fix exceptions of positional session use in airflow-core ti-deps (#67776)
---
airflow-core/src/airflow/models/taskinstance.py | 2 +-
.../src/airflow/ti_deps/deps/base_ti_dep.py | 14 +++++++------
.../ti_deps/deps/dag_ti_slots_available_dep.py | 2 +-
.../src/airflow/ti_deps/deps/dag_unpaused_dep.py | 2 +-
.../src/airflow/ti_deps/deps/dagrun_exists_dep.py | 2 +-
.../ti_deps/deps/exec_date_after_start_date_dep.py | 2 +-
.../airflow/ti_deps/deps/mapped_task_expanded.py | 2 +-
.../ti_deps/deps/mapped_task_upstream_dep.py | 3 ++-
.../ti_deps/deps/not_in_retry_period_dep.py | 2 +-
.../ti_deps/deps/not_previously_skipped_dep.py | 2 +-
.../ti_deps/deps/pool_slots_available_dep.py | 2 +-
.../src/airflow/ti_deps/deps/prev_dagrun_dep.py | 2 +-
.../airflow/ti_deps/deps/ready_to_reschedule.py | 3 ++-
.../airflow/ti_deps/deps/runnable_exec_date_dep.py | 2 +-
.../airflow/ti_deps/deps/task_concurrency_dep.py | 2 +-
.../airflow/ti_deps/deps/task_not_running_dep.py | 2 +-
.../src/airflow/ti_deps/deps/trigger_rule_dep.py | 3 ++-
.../src/airflow/ti_deps/deps/valid_state_dep.py | 2 +-
.../deps/test_not_previously_skipped_dep.py | 24 +++++++++++-----------
.../ci/prek/known_provide_session_positional.txt | 13 ------------
20 files changed, 40 insertions(+), 48 deletions(-)
diff --git a/airflow-core/src/airflow/models/taskinstance.py
b/airflow-core/src/airflow/models/taskinstance.py
index 9cc8d6a6271..cd254e53a83 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -1135,7 +1135,7 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload):
assert self.task is not None
dep_context = dep_context or DepContext()
for dep in dep_context.deps | self.task.deps:
- for dep_status in dep.get_dep_statuses(self, session, dep_context):
+ for dep_status in dep.get_dep_statuses(self, dep_context,
session=session):
self.log.debug(
"%s dependency '%s' PASSED: %s, %s",
self,
diff --git a/airflow-core/src/airflow/ti_deps/deps/base_ti_dep.py
b/airflow-core/src/airflow/ti_deps/deps/base_ti_dep.py
index 61c47539d38..92be620e853 100644
--- a/airflow-core/src/airflow/ti_deps/deps/base_ti_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/base_ti_dep.py
@@ -71,8 +71,9 @@ class BaseTIDep:
def _get_dep_statuses(
self,
ti: TaskInstance,
- session: Session,
dep_context: DepContext,
+ *,
+ session: Session,
) -> Iterator[TIDepStatus]:
"""
Abstract method that returns an iterable of TIDepStatus objects.
@@ -92,8 +93,9 @@ class BaseTIDep:
def get_dep_statuses(
self,
ti: TaskInstance,
- session: Session,
dep_context: DepContext | None = None,
+ *,
+ session: Session,
) -> Iterator[TIDepStatus]:
"""
Wrap around the private _get_dep_statuses method.
@@ -101,8 +103,8 @@ class BaseTIDep:
Contains some global checks for all dependencies.
:param ti: the task instance to get the dependency status for
- :param session: database session
:param dep_context: the context for which this dependency should be
evaluated for
+ :param session: database session
"""
cxt = DepContext() if dep_context is None else dep_context
@@ -114,10 +116,10 @@ class BaseTIDep:
yield self._passing_status(reason="Context specified all task
dependencies should be ignored.")
return
- yield from self._get_dep_statuses(ti, session, cxt)
+ yield from self._get_dep_statuses(ti, cxt, session=session)
@provide_session
- def is_met(self, ti: TaskInstance, session: Session, dep_context:
DepContext | None = None) -> bool:
+ def is_met(self, ti: TaskInstance, dep_context: DepContext | None = None,
*, session: Session) -> bool:
"""
Return whether a dependency is met for a given task instance.
@@ -131,7 +133,7 @@ class BaseTIDep:
:meta private:
"""
- return all(status.passed for status in self.get_dep_statuses(ti,
session, dep_context))
+ return all(status.passed for status in self.get_dep_statuses(ti,
dep_context, session=session))
def _failing_status(self, reason: str = "") -> TIDepStatus:
return TIDepStatus(self.name, False, reason)
diff --git
a/airflow-core/src/airflow/ti_deps/deps/dag_ti_slots_available_dep.py
b/airflow-core/src/airflow/ti_deps/deps/dag_ti_slots_available_dep.py
index 550695afeb0..6fb370eabd2 100644
--- a/airflow-core/src/airflow/ti_deps/deps/dag_ti_slots_available_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/dag_ti_slots_available_dep.py
@@ -28,7 +28,7 @@ class DagTISlotsAvailableDep(BaseTIDep):
IGNORABLE = True
@provide_session
- def _get_dep_statuses(self, ti, session, dep_context):
+ def _get_dep_statuses(self, ti, dep_context, *, session):
if ti.task.dag.get_concurrency_reached(session):
yield self._failing_status(
reason=(
diff --git a/airflow-core/src/airflow/ti_deps/deps/dag_unpaused_dep.py
b/airflow-core/src/airflow/ti_deps/deps/dag_unpaused_dep.py
index 5dec655d4e0..a42ec73150f 100644
--- a/airflow-core/src/airflow/ti_deps/deps/dag_unpaused_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/dag_unpaused_dep.py
@@ -37,6 +37,6 @@ class DagUnpausedDep(BaseTIDep):
return session.scalar(select(DagModel.is_paused).where(DagModel.dag_id
== dag_id))
@provide_session
- def _get_dep_statuses(self, ti, session, dep_context):
+ def _get_dep_statuses(self, ti, dep_context, *, session):
if self._is_dag_paused(ti.dag_id, session):
yield self._failing_status(reason=f"Task's DAG '{ti.dag_id}' is
paused.")
diff --git a/airflow-core/src/airflow/ti_deps/deps/dagrun_exists_dep.py
b/airflow-core/src/airflow/ti_deps/deps/dagrun_exists_dep.py
index 0a364628c7f..a97b2714eaa 100644
--- a/airflow-core/src/airflow/ti_deps/deps/dagrun_exists_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/dagrun_exists_dep.py
@@ -29,7 +29,7 @@ class DagrunRunningDep(BaseTIDep):
IGNORABLE = True
@provide_session
- def _get_dep_statuses(self, ti, session, dep_context):
+ def _get_dep_statuses(self, ti, dep_context, *, session):
dr = ti.get_dagrun(session)
if dr.state != DagRunState.RUNNING:
yield self._failing_status(
diff --git
a/airflow-core/src/airflow/ti_deps/deps/exec_date_after_start_date_dep.py
b/airflow-core/src/airflow/ti_deps/deps/exec_date_after_start_date_dep.py
index c96615247cc..1e7d8076206 100644
--- a/airflow-core/src/airflow/ti_deps/deps/exec_date_after_start_date_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/exec_date_after_start_date_dep.py
@@ -28,7 +28,7 @@ class ExecDateAfterStartDateDep(BaseTIDep):
IGNORABLE = True
@provide_session
- def _get_dep_statuses(self, ti, session, dep_context):
+ def _get_dep_statuses(self, ti, dep_context, *, session):
if ti.task.start_date and ti.logical_date and ti.logical_date <
ti.task.start_date:
yield self._failing_status(
reason=(
diff --git a/airflow-core/src/airflow/ti_deps/deps/mapped_task_expanded.py
b/airflow-core/src/airflow/ti_deps/deps/mapped_task_expanded.py
index 8138de9f9e4..f3437376162 100644
--- a/airflow-core/src/airflow/ti_deps/deps/mapped_task_expanded.py
+++ b/airflow-core/src/airflow/ti_deps/deps/mapped_task_expanded.py
@@ -27,7 +27,7 @@ class MappedTaskIsExpanded(BaseTIDep):
IGNORABLE = False
IS_TASK_DEP = False
- def _get_dep_statuses(self, ti, session, dep_context):
+ def _get_dep_statuses(self, ti, dep_context, *, session):
if dep_context.ignore_unmapped_tasks:
return
if ti.map_index == -1:
diff --git a/airflow-core/src/airflow/ti_deps/deps/mapped_task_upstream_dep.py
b/airflow-core/src/airflow/ti_deps/deps/mapped_task_upstream_dep.py
index 966985bd782..062be5f872a 100644
--- a/airflow-core/src/airflow/ti_deps/deps/mapped_task_upstream_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/mapped_task_upstream_dep.py
@@ -49,8 +49,9 @@ class MappedTaskUpstreamDep(BaseTIDep):
def _get_dep_statuses(
self,
ti: TaskInstance,
- session: Session,
dep_context: DepContext,
+ *,
+ session: Session,
) -> Iterator[TIDepStatus]:
from airflow.models.taskinstance import TaskInstance
from airflow.serialization.definitions.mappedoperator import is_mapped
diff --git a/airflow-core/src/airflow/ti_deps/deps/not_in_retry_period_dep.py
b/airflow-core/src/airflow/ti_deps/deps/not_in_retry_period_dep.py
index 1013fecdfb8..ce20fddf40c 100644
--- a/airflow-core/src/airflow/ti_deps/deps/not_in_retry_period_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/not_in_retry_period_dep.py
@@ -31,7 +31,7 @@ class NotInRetryPeriodDep(BaseTIDep):
IS_TASK_DEP = True
@provide_session
- def _get_dep_statuses(self, ti, session, dep_context):
+ def _get_dep_statuses(self, ti, dep_context, *, session):
if dep_context.ignore_in_retry_period:
yield self._passing_status(
reason="The context specified that being in a retry period was
permitted."
diff --git
a/airflow-core/src/airflow/ti_deps/deps/not_previously_skipped_dep.py
b/airflow-core/src/airflow/ti_deps/deps/not_previously_skipped_dep.py
index 907d9f02713..e00ca976d53 100644
--- a/airflow-core/src/airflow/ti_deps/deps/not_previously_skipped_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/not_previously_skipped_dep.py
@@ -43,7 +43,7 @@ class NotPreviouslySkippedDep(BaseTIDep):
IGNORABLE = True
IS_TASK_DEP = True
- def _get_dep_statuses(self, ti, session, dep_context):
+ def _get_dep_statuses(self, ti, dep_context, *, session):
from airflow.utils.state import TaskInstanceState
upstream = ti.task.get_direct_relatives(upstream=True)
diff --git a/airflow-core/src/airflow/ti_deps/deps/pool_slots_available_dep.py
b/airflow-core/src/airflow/ti_deps/deps/pool_slots_available_dep.py
index 564557acdec..b70443cf2c1 100644
--- a/airflow-core/src/airflow/ti_deps/deps/pool_slots_available_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/pool_slots_available_dep.py
@@ -33,7 +33,7 @@ class PoolSlotsAvailableDep(BaseTIDep):
IGNORABLE = True
@provide_session
- def _get_dep_statuses(self, ti, session, dep_context=None):
+ def _get_dep_statuses(self, ti, dep_context=None, *, session):
"""
Determine if the pool task instance is in has available slots.
diff --git a/airflow-core/src/airflow/ti_deps/deps/prev_dagrun_dep.py
b/airflow-core/src/airflow/ti_deps/deps/prev_dagrun_dep.py
index 6cb2390a5dc..397746d13a1 100644
--- a/airflow-core/src/airflow/ti_deps/deps/prev_dagrun_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/prev_dagrun_dep.py
@@ -126,7 +126,7 @@ class PrevDagrunDep(BaseTIDep):
)
@provide_session
- def _get_dep_statuses(self, ti: TI, session: Session, dep_context):
+ def _get_dep_statuses(self, ti: TI, dep_context, *, session: Session):
if TYPE_CHECKING:
assert ti.task
if dep_context.ignore_depends_on_past:
diff --git a/airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py
b/airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py
index 6f611c43d61..384ee0bff48 100644
--- a/airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py
+++ b/airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py
@@ -47,8 +47,9 @@ class ReadyToRescheduleDep(BaseTIDep):
def _get_dep_statuses(
self,
ti: TaskInstance,
- session: Session,
dep_context: DepContext,
+ *,
+ session: Session,
) -> Iterator[TIDepStatus]:
"""
Determine whether a task is ready to be rescheduled.
diff --git a/airflow-core/src/airflow/ti_deps/deps/runnable_exec_date_dep.py
b/airflow-core/src/airflow/ti_deps/deps/runnable_exec_date_dep.py
index 396f8180d1b..7ed248fca09 100644
--- a/airflow-core/src/airflow/ti_deps/deps/runnable_exec_date_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/runnable_exec_date_dep.py
@@ -29,7 +29,7 @@ class RunnableExecDateDep(BaseTIDep):
IGNORABLE = True
@provide_session
- def _get_dep_statuses(self, ti, session, dep_context):
+ def _get_dep_statuses(self, ti, dep_context, *, session):
logical_date = ti.get_dagrun(session).logical_date
if logical_date is None:
return
diff --git a/airflow-core/src/airflow/ti_deps/deps/task_concurrency_dep.py
b/airflow-core/src/airflow/ti_deps/deps/task_concurrency_dep.py
index 9b2ecaba888..0ab5fca6355 100644
--- a/airflow-core/src/airflow/ti_deps/deps/task_concurrency_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/task_concurrency_dep.py
@@ -29,7 +29,7 @@ class TaskConcurrencyDep(BaseTIDep):
IS_TASK_DEP = True
@provide_session
- def _get_dep_statuses(self, ti, session, dep_context):
+ def _get_dep_statuses(self, ti, dep_context, *, session):
if ti.task.max_active_tis_per_dag is None and
ti.task.max_active_tis_per_dagrun is None:
yield self._passing_status(reason="Task concurrency is not set.")
return
diff --git a/airflow-core/src/airflow/ti_deps/deps/task_not_running_dep.py
b/airflow-core/src/airflow/ti_deps/deps/task_not_running_dep.py
index 865fab0ab8d..0a78d7d791f 100644
--- a/airflow-core/src/airflow/ti_deps/deps/task_not_running_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/task_not_running_dep.py
@@ -39,7 +39,7 @@ class TaskNotRunningDep(BaseTIDep):
return hash(type(self))
@provide_session
- def _get_dep_statuses(self, ti, session, dep_context=None):
+ def _get_dep_statuses(self, ti, dep_context=None, *, session):
if ti.state != TaskInstanceState.RUNNING:
yield self._passing_status(reason="Task is not in running state.")
return
diff --git a/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
b/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
index eab9792ba32..5ddba214dd3 100644
--- a/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
@@ -101,8 +101,9 @@ class TriggerRuleDep(BaseTIDep):
def _get_dep_statuses(
self,
ti: TaskInstance,
- session: Session,
dep_context: DepContext,
+ *,
+ session: Session,
) -> Iterator[TIDepStatus]:
if TYPE_CHECKING:
assert ti.task
diff --git a/airflow-core/src/airflow/ti_deps/deps/valid_state_dep.py
b/airflow-core/src/airflow/ti_deps/deps/valid_state_dep.py
index 7e3b58cb29b..59fcb19a66a 100644
--- a/airflow-core/src/airflow/ti_deps/deps/valid_state_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/valid_state_dep.py
@@ -52,7 +52,7 @@ class ValidStateDep(BaseTIDep):
return hash((type(self), tuple(self._valid_states)))
@provide_session
- def _get_dep_statuses(self, ti, session, dep_context):
+ def _get_dep_statuses(self, ti, dep_context, *, session):
if dep_context.ignore_ti_state:
yield self._passing_status(reason="Context specified that state
should be ignored.")
return
diff --git
a/airflow-core/tests/unit/ti_deps/deps/test_not_previously_skipped_dep.py
b/airflow-core/tests/unit/ti_deps/deps/test_not_previously_skipped_dep.py
index da74cd21c4a..58ccce35dd8 100644
--- a/airflow-core/tests/unit/ti_deps/deps/test_not_previously_skipped_dep.py
+++ b/airflow-core/tests/unit/ti_deps/deps/test_not_previously_skipped_dep.py
@@ -62,8 +62,8 @@ def test_no_parent(session, dag_maker):
(ti1,) = dag_maker.create_dagrun(logical_date=start_date).task_instances
dep = NotPreviouslySkippedDep()
- assert len(list(dep.get_dep_statuses(ti1, session, DepContext()))) == 0
- assert dep.is_met(ti1, session)
+ assert len(list(dep.get_dep_statuses(ti1, DepContext(), session=session)))
== 0
+ assert dep.is_met(ti1, session=session)
assert ti1.state != State.SKIPPED
@@ -85,8 +85,8 @@ def test_no_skipmixin_parent(session, dag_maker):
_, ti2 = dag_maker.create_dagrun().task_instances
dep = NotPreviouslySkippedDep()
- assert len(list(dep.get_dep_statuses(ti2, session, DepContext()))) == 0
- assert dep.is_met(ti2, session)
+ assert len(list(dep.get_dep_statuses(ti2, DepContext(), session=session)))
== 0
+ assert dep.is_met(ti2, session=session)
assert ti2.state != State.SKIPPED
@@ -110,8 +110,8 @@ def test_parent_follow_branch(session, dag_maker):
run_task_instance(ti, op1)
dep = NotPreviouslySkippedDep()
- assert len(list(dep.get_dep_statuses(ti2, session, DepContext()))) == 0
- assert dep.is_met(ti2, session)
+ assert len(list(dep.get_dep_statuses(ti2, DepContext(), session=session)))
== 0
+ assert dep.is_met(ti2, session=session)
assert ti2.state != State.SKIPPED
@@ -138,8 +138,8 @@ def test_parent_skip_branch(session, dag_maker):
run_task_instance(tis["op1"], op1)
dep = NotPreviouslySkippedDep()
- assert len(list(dep.get_dep_statuses(tis["op2"], session, DepContext())))
== 1
- assert not dep.is_met(tis["op2"], session)
+ assert len(list(dep.get_dep_statuses(tis["op2"], DepContext(),
session=session))) == 1
+ assert not dep.is_met(tis["op2"], session=session)
assert tis["op2"].state == State.SKIPPED
@@ -163,8 +163,8 @@ def test_parent_not_executed(session, dag_maker):
_, ti2, _ = dag_maker.create_dagrun().task_instances
dep = NotPreviouslySkippedDep()
- assert len(list(dep.get_dep_statuses(ti2, session, DepContext()))) == 0
- assert dep.is_met(ti2, session)
+ assert len(list(dep.get_dep_statuses(ti2, DepContext(), session=session)))
== 0
+ assert dep.is_met(ti2, session=session)
assert ti2.state == State.NONE
@@ -211,6 +211,6 @@ def test_unmapped_parent_skip_mapped_downstream(session,
dag_maker):
session.flush()
dep = NotPreviouslySkippedDep()
- assert len(list(dep.get_dep_statuses(tis["op2"], session, DepContext())))
== 1
- assert not dep.is_met(tis["op2"], session)
+ assert len(list(dep.get_dep_statuses(tis["op2"], DepContext(),
session=session))) == 1
+ assert not dep.is_met(tis["op2"], session=session)
assert tis["op2"].state == State.SKIPPED
diff --git a/scripts/ci/prek/known_provide_session_positional.txt
b/scripts/ci/prek/known_provide_session_positional.txt
index 73b1e8ddc79..63ae75089a3 100644
--- a/scripts/ci/prek/known_provide_session_positional.txt
+++ b/scripts/ci/prek/known_provide_session_positional.txt
@@ -20,19 +20,6 @@ airflow-core/src/airflow/models/trigger.py::7
airflow-core/src/airflow/models/variable.py::2
airflow-core/src/airflow/secrets/metastore.py::2
airflow-core/src/airflow/serialization/definitions/dag.py::2
-airflow-core/src/airflow/ti_deps/deps/base_ti_dep.py::2
-airflow-core/src/airflow/ti_deps/deps/dag_ti_slots_available_dep.py::1
-airflow-core/src/airflow/ti_deps/deps/dag_unpaused_dep.py::1
-airflow-core/src/airflow/ti_deps/deps/dagrun_exists_dep.py::1
-airflow-core/src/airflow/ti_deps/deps/exec_date_after_start_date_dep.py::1
-airflow-core/src/airflow/ti_deps/deps/not_in_retry_period_dep.py::1
-airflow-core/src/airflow/ti_deps/deps/pool_slots_available_dep.py::1
-airflow-core/src/airflow/ti_deps/deps/prev_dagrun_dep.py::1
-airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py::1
-airflow-core/src/airflow/ti_deps/deps/runnable_exec_date_dep.py::1
-airflow-core/src/airflow/ti_deps/deps/task_concurrency_dep.py::1
-airflow-core/src/airflow/ti_deps/deps/task_not_running_dep.py::1
-airflow-core/src/airflow/ti_deps/deps/valid_state_dep.py::1
airflow-core/src/airflow/utils/cli_action_loggers.py::1
airflow-core/src/airflow/utils/db.py::7
airflow-core/src/airflow/utils/db_cleanup.py::2