Arunodoy18 opened a new pull request, #60385:
URL: https://github.com/apache/airflow/pull/60385
This PR implements the `depends_on_previous_task_ids` parameter for tasks,
allowing a task to depend on the successful completion of multiple specific
tasks from the previous DAG run (within the same DAG).
### Problem Statement
Previously, `depends_on_past=True` only allowed a task to depend on the same
task in the previous dag_run. There was no native way for a task to depend on
multiple specific tasks from the previous dag_run.
### Solution
Added a new parameter `depends_on_previous_task_ids` to the `BaseOperator`
that accepts a list of task IDs. When set (along with `depends_on_past=True`),
the task will only run if all specified tasks in the previous DAG run have
succeeded.
## Changes Made
### 1. **task-sdk/src/airflow/sdk/bases/operator.py**
- Added `depends_on_previous_task_ids` parameter to
`BaseOperator.__init__()`
- Added validation to ensure `depends_on_past=True` when using
`depends_on_previous_task_ids`
- Added parameter to `template_fields` for templating support
- Added comprehensive docstring
### 2. **airflow-core/src/airflow/ti_deps/deps/prev_dagrun_dep.py**
- Extended `PrevDagrunDep._get_dep_statuses()` method to check
dependencies on specified tasks from previous DAG run
- Added logic to verify that all tasks in `depends_on_previous_task_ids`
exist and have succeeded in the previous run
- Provides detailed error messages when dependencies are not met
### 3.
**airflow-core/tests/ti_deps/deps/test_prev_dagrun_dep_specific_tasks.py**
- Added comprehensive unit tests for the new functionality
- Tests cover success scenarios, failure scenarios, first-run behavior,
and validation
## Usage Example
``python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
with DAG('example_dag', start_date=datetime(2022, 1, 1),
schedule_interval='@daily') as dag:
task_a = PythonOperator(task_id='task_a', python_callable=lambda:
print('A'))
task_b = PythonOperator(task_id='task_b', python_callable=lambda:
print('B'))
# task_c will only run if both task_a and task_b succeeded in the
previous DAG run
task_c = PythonOperator(
task_id='task_c',
python_callable=lambda: print('C'),
depends_on_past=True,
depends_on_previous_task_ids=['task_a', 'task_b'],
)
``
## Behavior
- **First DAG run**: Not blocked (no previous run to check)
- **Subsequent runs**: Task waits until all specified tasks from the
previous run have succeeded
- **Validation**: Raises `AirflowException` if
`depends_on_previous_task_ids` is set without `depends_on_past=True`
## Testing
- Unit tests added in `test_prev_dagrun_dep_specific_tasks.py`
- Tests validate correct behavior for:
- First run (should not be blocked)
- Dependencies met (all previous tasks succeeded)
- Dependencies not met (one or more previous tasks failed)
- Invalid configuration (missing `depends_on_past=True`)
## Closes
Closes #60328
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]