GitHub user Urus1201 added a comment to the discussion: Large DAG with
conditional subset execution: Is DAG Versioning + Dynamic DAG Generation the
right approach?
DAG Versioning is **not** the right tool here — it is designed for
schema/structural changes over time, not for runtime task selection.
Dynamically generating a new DAG per run also bypasses Airflow's scheduler
entirely, which leads to parse overhead and confusing audit history.
For a 150-task ML pipeline where each run only executes a subset, there are
three patterns worth considering:
---
## Pattern 1: ShortCircuitOperator at group boundaries (fastest fix)
If the 10-minute wait is from evaluating individual skip conditions per task,
move the skip decision **upstream** to a `ShortCircuitOperator` that gates an
entire `TaskGroup`. Skipping is O(1) per group rather than O(n) per task.
```python
from airflow.operators.python import ShortCircuitOperator
from airflow.utils.task_group import TaskGroup
def should_run_preprocessing(**context) -> bool:
run_conf = context["dag_run"].conf or {}
return "preprocessing" in run_conf.get("stages", ["preprocessing",
"training"])
with TaskGroup("preprocessing") as preprocessing_group:
gate = ShortCircuitOperator(
task_id="gate_preprocessing",
python_callable=should_run_preprocessing,
)
clean = PythonOperator(task_id="clean_data", ...)
features = PythonOperator(task_id="engineer_features", ...)
gate >> clean >> features
with TaskGroup("training") as training_group:
gate2 = ShortCircuitOperator(
task_id="gate_training",
python_callable=lambda **ctx: "training" in (ctx["dag_run"].conf or
{}).get("stages", ["preprocessing", "training"]),
)
train = PythonOperator(task_id="train_model", ...)
gate2 >> train
preprocessing_group >> training_group
```
When `gate_preprocessing` returns `False`, the entire group is marked skipped
immediately — no 10-minute wait.
---
## Pattern 2: BranchPythonOperator for mutually exclusive paths
If the stages are mutually exclusive (e.g. "only preprocessing" vs "only
training" vs "full pipeline"), use branching:
```python
from airflow.operators.python import BranchPythonOperator
def choose_pipeline(**context):
stages = context["dag_run"].conf.get("stages", ["preprocessing",
"training"])
if stages == ["preprocessing"]:
return "preprocessing.gate_preprocessing"
elif stages == ["training"]:
return "training.gate_training"
return ["preprocessing.gate_preprocessing", "training.gate_training"] #
full pipeline
branch = BranchPythonOperator(
task_id="route_pipeline",
python_callable=choose_pipeline,
)
```
---
## Pattern 3: Dynamic Task Mapping (Airflow 2.3+ / 3.x, cleanest for ML)
For your specific ML use case, **dynamic task mapping** lets Airflow create
only the tasks that are actually needed at parse time:
```python
@task
def get_stages_to_run(**context) -> list[str]:
return context["dag_run"].conf.get("stages", ["preprocessing", "training"])
@task
def run_stage(stage_name: str):
stage_map = {
"preprocessing": run_preprocessing,
"training": run_training,
}
stage_map[stage_name]()
stages = get_stages_to_run()
run_stage.expand(stage_name=stages)
```
With this, Airflow creates **only** the mapped task instances for the stages
that were requested — no skip logic at all.
---
## Recommendation for your case
Given that you have a deep learning pipeline with independent stages (data
cleaning → feature engineering → model training):
1. **Immediately**: Add `ShortCircuitOperator` gates inside `TaskGroup`s
(Pattern 1). This drops the 10-minute wait to seconds without restructuring
your DAG.
2. **Long term**: Refactor each major stage (preprocessing, training, etc.)
into its own DAG, and use `TriggerDagRunOperator` from a "coordinator" DAG to
launch only the needed stages. This gives clean separation, independent retry
granularity, and a much smaller parse footprint per run.
DAG Versioning is worth enabling for schema management, but it should not be
used as a mechanism for runtime task selection.
GitHub link:
https://github.com/apache/airflow/discussions/64344#discussioncomment-16503495
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]