This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 91e10295c7d Disable start_from_trigger altogether for now (#54646)
91e10295c7d is described below
commit 91e10295c7d7bb293f7853a518579d619ceae74d
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Wed Aug 20 10:27:07 2025 +0800
Disable start_from_trigger altogether for now (#54646)
---
airflow-core/src/airflow/models/dagrun.py | 29 ++++++++++++++++-----------
airflow-core/tests/unit/models/test_dagrun.py | 3 ++-
2 files changed, 19 insertions(+), 13 deletions(-)
diff --git a/airflow-core/src/airflow/models/dagrun.py
b/airflow-core/src/airflow/models/dagrun.py
index f112a1ebba9..ccf01c65e19 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -2004,18 +2004,23 @@ class DagRun(Base, LoggingMixin):
and not task.inlets
):
empty_ti_ids.append(ti.id)
- # check "start_trigger_args" to see whether the operator supports
start execution from triggerer
- # if so, we'll then check "start_from_trigger" to see whether this
feature is turned on and defer
- # this task.
- # if not, we'll add this "ti" into "schedulable_ti_ids" and later
execute it to run in the worker
- elif task.start_trigger_args is not None:
- if
task.expand_start_from_trigger(context=ti.get_template_context()):
- ti.start_date = timezone.utcnow()
- if ti.state != TaskInstanceState.UP_FOR_RESCHEDULE:
- ti.try_number += 1
- ti.defer_task(exception=None, session=session)
- else:
- schedulable_ti_ids.append(ti.id)
+ # Check "start_trigger_args" to see whether the operator supports
+ # start execution from triggerer. If so, we'll check
"start_from_trigger"
+ # to see whether this feature is turned on and defer this task.
+ # If not, we'll add this "ti" into "schedulable_ti_ids" and later
+ # execute it to run in the worker.
+ # TODO TaskSDK: This is disabled since we haven't figured out how
+ # to render start_from_trigger in the scheduler. If we need to
+ # render the value in a worker, it kind of defeats the purpose of
+ # this feature (which is to save a worker process if possible).
+ # elif task.start_trigger_args is not None:
+ # if
task.expand_start_from_trigger(context=ti.get_template_context()):
+ # ti.start_date = timezone.utcnow()
+ # if ti.state != TaskInstanceState.UP_FOR_RESCHEDULE:
+ # ti.try_number += 1
+ # ti.defer_task(exception=None, session=session)
+ # else:
+ # schedulable_ti_ids.append(ti.id)
else:
schedulable_ti_ids.append(ti.id)
diff --git a/airflow-core/tests/unit/models/test_dagrun.py
b/airflow-core/tests/unit/models/test_dagrun.py
index f22d3aa68bd..ed92ed26f63 100644
--- a/airflow-core/tests/unit/models/test_dagrun.py
+++ b/airflow-core/tests/unit/models/test_dagrun.py
@@ -2036,6 +2036,7 @@ def test_schedule_tis_map_index(dag_maker, session):
assert ti2.state == TaskInstanceState.SUCCESS
[email protected](reason="We can't keep this behaviour with remote workers
where scheduler can't reach xcom")
@pytest.mark.need_serialized_dag
def test_schedule_tis_start_trigger(dag_maker, session):
"""
@@ -2092,7 +2093,7 @@ def
test_schedule_tis_empty_operator_try_number(dag_maker, session: Session):
assert empty_ti.try_number == 1
[email protected](reason="We can't keep this bevaviour with remote workers
where scheduler can't reach xcom")
[email protected](reason="We can't keep this behaviour with remote workers
where scheduler can't reach xcom")
def test_schedule_tis_start_trigger_through_expand(dag_maker, session):
"""
Test that an operator with start_trigger_args set can be directly deferred
during scheduling.