kalluripradeep opened a new pull request, #61338:
URL: https://github.com/apache/airflow/pull/61338

   This PR adds support for the run_after parameter in TriggerDagRunOperator, 
addressing issue #60443. This enhancement allows users to schedule triggered 
DAGs for future execution and enables parallel DAG runs by decoupling 
logical_date from the unique constraint in Airflow 3.
   
   Problem Statement
   Currently, TriggerDagRunOperator forces a logical_date (defaulting to 
utcnow() if not set) and does not support the run_after parameter. This 
limitation prevents:
   
   Scheduling a triggered DAG to run at a specific future time.
   
   Triggering multiple parallel runs of the same DAG with different 
configurations in Airflow 3, as the unique constraint on (dag_id, logical_date) 
causes conflicts when logical_date is forced.
   
   Solution
   Added run_after support to TriggerDagRunOperator with the following changes:
   
   New Parameter: Added run_after to the operator's __init__ and execute 
methods.
   
   Smart Defaults: If run_after is provided and logical_date is NOTSET, 
logical_date defaults to None. This leverages Airflow 3's ability to handle 
runs without a strict logical date, enabling parallel execution.
   
   Airflow 3 Compatibility: Updated DagRunTriggerException usage (via 
_trigger_dag_af_3) to pass the run_after parameter, ensuring compatibility with 
the latest Airflow architecture.
   
   Templating: Added run_after to template_fields for dynamic scheduling.
   
   Example Usage
   Python
   from datetime import datetime
   from airflow.providers.standard.operators.trigger_dagrun import 
TriggerDagRunOperator
   
   trigger = TriggerDagRunOperator(
       task_id="trigger_future_run",
       trigger_dag_id="target_dag",
       # Schedule the run for 1 hour from now
       run_after="{{ macros.datetime.now() + macros.timedelta(hours=1) }}",
       # logical_date will automatically be None, allowing multiple such 
triggers
       conf={"param": "value"}
   )
   Implementation Details
   Modified TriggerDagRunOperator:
   
   Updated __init__ to accept run_after.
   
   Updated execute to parse run_after.
   
   Modified logic to set parsed_logical_date = None if run_after is present and 
logical_date was not explicitly provided.
   
   Updated DagRun.generate_run_id call to include run_after if supported.
   
   Updated _trigger_dag_af_3 helper to pass run_after to the 
DagRunTriggerException.
   
   Files Modified:
   
   providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
   
   Testing
   Verified that providing run_after correctly sets the field in the triggered 
DAG run.
   
   Verified that logical_date defaults to None when run_after is set (enabling 
parallelism).
   
   Verified backwards compatibility for cases where run_after is not provided.
   
   Checklist
   [x] Unit tests added/updated.
   
   [x] Documentation updated (code docstrings).
   
   [x] Backward compatible.
   
   Related Issue
   Fixes #60443


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to