kalluripradeep opened a new pull request, #61338:
URL: https://github.com/apache/airflow/pull/61338
This PR adds support for the run_after parameter in TriggerDagRunOperator,
addressing issue #60443. This enhancement allows users to schedule triggered
DAGs for future execution and enables parallel DAG runs by decoupling
logical_date from the unique constraint in Airflow 3.
Problem Statement
Currently, TriggerDagRunOperator forces a logical_date (defaulting to
utcnow() if not set) and does not support the run_after parameter. This
limitation prevents:
Scheduling a triggered DAG to run at a specific future time.
Triggering multiple parallel runs of the same DAG with different
configurations in Airflow 3, as the unique constraint on (dag_id, logical_date)
causes conflicts when logical_date is forced.
Solution
Added run_after support to TriggerDagRunOperator with the following changes:
New Parameter: Added run_after to the operator's __init__ and execute
methods.
Smart Defaults: If run_after is provided and logical_date is NOTSET,
logical_date defaults to None. This leverages Airflow 3's ability to handle
runs without a strict logical date, enabling parallel execution.
Airflow 3 Compatibility: Updated DagRunTriggerException usage (via
_trigger_dag_af_3) to pass the run_after parameter, ensuring compatibility with
the latest Airflow architecture.
Templating: Added run_after to template_fields for dynamic scheduling.
Example Usage
Python
from datetime import datetime
from airflow.providers.standard.operators.trigger_dagrun import
TriggerDagRunOperator
trigger = TriggerDagRunOperator(
task_id="trigger_future_run",
trigger_dag_id="target_dag",
# Schedule the run for 1 hour from now
run_after="{{ macros.datetime.now() + macros.timedelta(hours=1) }}",
# logical_date will automatically be None, allowing multiple such
triggers
conf={"param": "value"}
)
Implementation Details
Modified TriggerDagRunOperator:
Updated __init__ to accept run_after.
Updated execute to parse run_after.
Modified logic to set parsed_logical_date = None if run_after is present and
logical_date was not explicitly provided.
Updated DagRun.generate_run_id call to include run_after if supported.
Updated _trigger_dag_af_3 helper to pass run_after to the
DagRunTriggerException.
Files Modified:
providers/standard/src/airflow/providers/standard/operators/trigger_dagrun.py
Testing
Verified that providing run_after correctly sets the field in the triggered
DAG run.
Verified that logical_date defaults to None when run_after is set (enabling
parallelism).
Verified backwards compatibility for cases where run_after is not provided.
Checklist
[x] Unit tests added/updated.
[x] Documentation updated (code docstrings).
[x] Backward compatible.
Related Issue
Fixes #60443
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]