Arunodoy18 opened a new pull request, #60385:
URL: https://github.com/apache/airflow/pull/60385

   This PR implements the `depends_on_previous_task_ids` parameter for tasks, 
allowing a task to depend on the successful completion of multiple specific 
tasks from the previous DAG run (within the same DAG).
   
   ### Problem Statement
   
   Previously, `depends_on_past=True` only allowed a task to depend on the same 
task in the previous dag_run. There was no native way for a task to depend on 
multiple specific tasks from the previous dag_run.
   
   ### Solution
   
   Added a new parameter `depends_on_previous_task_ids` to the `BaseOperator` 
that accepts a list of task IDs. When set (along with `depends_on_past=True`), 
the task will only run if all specified tasks in the previous DAG run have 
succeeded.
   
   ## Changes Made
   
   ### 1. **task-sdk/src/airflow/sdk/bases/operator.py**
      - Added `depends_on_previous_task_ids` parameter to 
`BaseOperator.__init__()`
      - Added validation to ensure `depends_on_past=True` when using 
`depends_on_previous_task_ids`
      - Added parameter to `template_fields` for templating support
      - Added comprehensive docstring
   
   ### 2. **airflow-core/src/airflow/ti_deps/deps/prev_dagrun_dep.py**
      - Extended `PrevDagrunDep._get_dep_statuses()` method to check 
dependencies on specified tasks from previous DAG run
      - Added logic to verify that all tasks in `depends_on_previous_task_ids` 
exist and have succeeded in the previous run
      - Provides detailed error messages when dependencies are not met
   
   ### 3. 
**airflow-core/tests/ti_deps/deps/test_prev_dagrun_dep_specific_tasks.py**
      - Added comprehensive unit tests for the new functionality
      - Tests cover success scenarios, failure scenarios, first-run behavior, 
and validation
   
   ## Usage Example
   
   ``python
   from airflow import DAG
   from airflow.operators.python import PythonOperator
   from datetime import datetime
   
   with DAG('example_dag', start_date=datetime(2022, 1, 1), 
schedule_interval='@daily') as dag:
       task_a = PythonOperator(task_id='task_a', python_callable=lambda: 
print('A'))
       task_b = PythonOperator(task_id='task_b', python_callable=lambda: 
print('B'))
       
       # task_c will only run if both task_a and task_b succeeded in the 
previous DAG run
       task_c = PythonOperator(
           task_id='task_c',
           python_callable=lambda: print('C'),
           depends_on_past=True,
           depends_on_previous_task_ids=['task_a', 'task_b'],
       )
   ``
   
   ## Behavior
   
   - **First DAG run**: Not blocked (no previous run to check)
   - **Subsequent runs**: Task waits until all specified tasks from the 
previous run have succeeded
   - **Validation**: Raises `AirflowException` if 
`depends_on_previous_task_ids` is set without `depends_on_past=True`
   
   ## Testing
   
   - Unit tests added in `test_prev_dagrun_dep_specific_tasks.py`
   - Tests validate correct behavior for:
     - First run (should not be blocked)
     - Dependencies met (all previous tasks succeeded)
     - Dependencies not met (one or more previous tasks failed)
     - Invalid configuration (missing `depends_on_past=True`)
   
   ## Closes
   
   Closes #60328
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to