Copilot commented on code in PR #64653:
URL: https://github.com/apache/airflow/pull/64653#discussion_r3066493039
##########
task-sdk/src/airflow/sdk/bases/operator.py:
##########
@@ -1030,6 +1041,7 @@ def __init__(
start_date: datetime | None = None,
end_date: datetime | None = None,
depends_on_past: bool = False,
+ depends_on_previous_tasks: Collection[str] | None = None,
Review Comment:
`depends_on_previous_tasks` is annotated as `Collection[str] | None`, but
the runtime argument type validation only accepts `list`/`tuple` (see
`BASEOPERATOR_ARGS_EXPECTED_TYPES`). This mismatch is misleading for users and
type checkers (e.g. `set[str]` satisfies `Collection[str]` but will raise
`TypeError`). Consider narrowing the annotation to `Sequence[str] | None` (or
`list[str] | tuple[str, ...] | None`) to match the actual accepted types.
##########
airflow-core/docs/core-concepts/dags.rst:
##########
@@ -422,6 +422,41 @@ You can also say a task can only run if the *previous* run
of the task in the pr
Note that if you are running the Dag at the very start of its
life---specifically, its first ever *automated* run---then the Task will still
run, as there is no previous run to depend on.
+.. _concepts:depends-on-previous-tasks:
+
+Depends on Previous Tasks
+~~~~~~~~~~~~~~~~~~~~~~~~~
+
+In stateful or partially retryable pipelines, you may need a task to wait for
+*multiple specific tasks* from the previous Dag Run to complete successfully
+before it can run. Use the ``depends_on_previous_tasks`` parameter:
+
+.. code-block:: python
+
+ C = PythonOperator(
+ task_id="C",
+ depends_on_previous_tasks=["C", "D", "E"],
+ )
+
+In this example, task ``C`` in ``dag_run(N)`` will only be scheduled if tasks
+``C``, ``D``, and ``E`` in ``dag_run(N-1)`` all finished with ``success`` or
+``skipped``. The first Dag Run is never blocked.
+
+``depends_on_previous_tasks`` **cannot** be combined with
+``depends_on_past`` or ``wait_for_downstream``. It is a superset of both:
+
+- To replicate ``depends_on_past=True`` for a task with ``task_id="C"``,
+ include ``"C"`` in the list. This ensures task C in ``dag_run(N)`` waits
+ for task C in ``dag_run(N-1)`` to succeed --- the same behavior as
+ ``depends_on_past=True``.
+- To replicate ``wait_for_downstream=True`` for task C with downstream
+ task D, use ``depends_on_previous_tasks=["C", "D"]``.
+
+.. note::
+
+ All task IDs in the list must exist in the same Dag. Invalid task IDs
+ produce a validation error at parse time.
Review Comment:
The note claims invalid task IDs in `depends_on_previous_tasks` "produce a
validation error at parse time", but there is no corresponding validation in
the implementation (the only check is at runtime in `PrevDagrunDep` by looking
for task instances in the previous DagRun). Please either implement parse-time
validation (e.g., during DAG validation once all tasks are known) or reword
this note to reflect the actual behavior (runtime dependency failure /
never-met dependency).
##########
airflow-core/src/airflow/ti_deps/deps/prev_dagrun_dep.py:
##########
@@ -180,6 +182,26 @@ def _get_dep_statuses(self, ti: TI, session: Session,
dep_context):
yield self._passing_status(reason="This task instance was the
first task instance for its task.")
return
+ # depends_on_previous_tasks (mutually exclusive with depends_on_past)
+ if depends_on_previous_tasks:
+ for prev_task_id in depends_on_previous_tasks:
+ if not self._has_tis(last_dagrun, prev_task_id,
session=session):
+ yield self._failing_status(
+ reason=f"depends_on_previous_tasks requires task
'{prev_task_id}' "
+ f"but it was not found in the previous dagrun."
Review Comment:
This loop performs up to two DB queries per listed task (`_has_tis` +
`_count_unsuccessful_tis`), which can become expensive for larger
`depends_on_previous_tasks` lists. Consider replacing this with a single query
that fetches presence + unsuccessful counts for all task_ids at once (e.g.,
`WHERE task_id IN (...) GROUP BY task_id`), then generate per-task failure
reasons from that result.
##########
airflow-core/tests/unit/ti_deps/deps/test_prev_dagrun_dep.py:
##########
@@ -333,3 +333,232 @@ def test_dagrun_dep(mock_get_previous_dagrun,
mock_get_previous_scheduled_dagrun
ti.xcom_push.assert_called_with(key="past_depends_met", value=True)
else:
ti.xcom_push.assert_not_called()
+
+
+class TestDependsOnPreviousTasks:
+ """Tests for the depends_on_previous_tasks feature."""
+
+ @patch("airflow.models.dagrun.DagRun.get_previous_scheduled_dagrun")
+ @patch("airflow.models.dagrun.DagRun.get_previous_dagrun")
+ def test_not_set_passes(self, mock_get_previous_dagrun,
mock_get_previous_scheduled_dagrun):
+ """When depends_on_previous_tasks is not set, the dep should pass."""
+ task = Mock(
+ spec=BaseOperator,
+ task_id="test_task",
+ depends_on_past=False,
+ depends_on_previous_tasks=None,
+ dag=Mock(catchup=False),
+ start_date=None,
+ )
+ dagrun = Mock(backfill_id=None, logical_date=datetime(2016, 1, 3),
dag_id="test_dag")
+ ti = Mock(task=task, task_id="test_task",
**{"get_dagrun.return_value": dagrun})
+
+ dep_context = DepContext(ignore_depends_on_past=False)
+ dep = PrevDagrunDep()
+ assert dep.is_met(ti=ti, dep_context=dep_context)
+
+ @patch("airflow.models.dagrun.DagRun.get_previous_scheduled_dagrun")
+ @patch("airflow.models.dagrun.DagRun.get_previous_dagrun")
+ def test_all_succeeded_passes(self, mock_get_previous_dagrun,
mock_get_previous_scheduled_dagrun):
+ """When all listed tasks succeeded in prev run, the dep should pass."""
+ task = Mock(
+ spec=BaseOperator,
+ task_id="C",
+ depends_on_past=False,
+ depends_on_previous_tasks=["D", "E"],
+ dag=Mock(catchup=False),
+ start_date=None,
+ )
+ prev_dagrun = Mock(logical_date=datetime(2016, 1, 2))
+ mock_get_previous_dagrun.return_value = prev_dagrun
+ dagrun = Mock(backfill_id=None, logical_date=datetime(2016, 1, 3),
dag_id="test_dag")
+ ti = Mock(
+ task=task,
+ task_id="C",
+ **{"get_dagrun.return_value": dagrun, "xcom_push.return_value":
None},
+ )
+
+ dep_context = DepContext(ignore_depends_on_past=False)
+ dep = PrevDagrunDep()
+ with patch.multiple(
+ dep,
+ _has_tis=Mock(return_value=True),
+ _count_unsuccessful_tis=Mock(return_value=0),
+ ):
+ assert dep.is_met(ti=ti, dep_context=dep_context)
Review Comment:
New `depends_on_previous_tasks` behavior isn’t covered for the
`wait_for_past_depends_before_skipping` path: when the dependency is satisfied,
`PrevDagrunDep` should push the `past_depends_met` XCom (same as the
`depends_on_past` path). Please add a regression test that sets
`DepContext(wait_for_past_depends_before_skipping=True)` and asserts
`ti.xcom_push(key=PAST_DEPENDS_MET, value=True)` is called for a passing
`depends_on_previous_tasks` check.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]