This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 91e10295c7d Disable start_from_trigger altogether for now (#54646)
91e10295c7d is described below

commit 91e10295c7d7bb293f7853a518579d619ceae74d
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Wed Aug 20 10:27:07 2025 +0800

    Disable start_from_trigger altogether for now (#54646)
---
 airflow-core/src/airflow/models/dagrun.py     | 29 ++++++++++++++++-----------
 airflow-core/tests/unit/models/test_dagrun.py |  3 ++-
 2 files changed, 19 insertions(+), 13 deletions(-)

diff --git a/airflow-core/src/airflow/models/dagrun.py 
b/airflow-core/src/airflow/models/dagrun.py
index f112a1ebba9..ccf01c65e19 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -2004,18 +2004,23 @@ class DagRun(Base, LoggingMixin):
                 and not task.inlets
             ):
                 empty_ti_ids.append(ti.id)
-            # check "start_trigger_args" to see whether the operator supports 
start execution from triggerer
-            # if so, we'll then check "start_from_trigger" to see whether this 
feature is turned on and defer
-            # this task.
-            # if not, we'll add this "ti" into "schedulable_ti_ids" and later 
execute it to run in the worker
-            elif task.start_trigger_args is not None:
-                if 
task.expand_start_from_trigger(context=ti.get_template_context()):
-                    ti.start_date = timezone.utcnow()
-                    if ti.state != TaskInstanceState.UP_FOR_RESCHEDULE:
-                        ti.try_number += 1
-                    ti.defer_task(exception=None, session=session)
-                else:
-                    schedulable_ti_ids.append(ti.id)
+            # Check "start_trigger_args" to see whether the operator supports
+            # start execution from triggerer. If so, we'll check 
"start_from_trigger"
+            # to see whether this feature is turned on and defer this task.
+            # If not, we'll add this "ti" into "schedulable_ti_ids" and later
+            # execute it to run in the worker.
+            # TODO TaskSDK: This is disabled since we haven't figured out how
+            # to render start_from_trigger in the scheduler. If we need to
+            # render the value in a worker, it kind of defeats the purpose of
+            # this feature (which is to save a worker process if possible).
+            # elif task.start_trigger_args is not None:
+            #     if 
task.expand_start_from_trigger(context=ti.get_template_context()):
+            #         ti.start_date = timezone.utcnow()
+            #         if ti.state != TaskInstanceState.UP_FOR_RESCHEDULE:
+            #             ti.try_number += 1
+            #         ti.defer_task(exception=None, session=session)
+            #     else:
+            #         schedulable_ti_ids.append(ti.id)
             else:
                 schedulable_ti_ids.append(ti.id)
 
diff --git a/airflow-core/tests/unit/models/test_dagrun.py 
b/airflow-core/tests/unit/models/test_dagrun.py
index f22d3aa68bd..ed92ed26f63 100644
--- a/airflow-core/tests/unit/models/test_dagrun.py
+++ b/airflow-core/tests/unit/models/test_dagrun.py
@@ -2036,6 +2036,7 @@ def test_schedule_tis_map_index(dag_maker, session):
     assert ti2.state == TaskInstanceState.SUCCESS
 
 
[email protected](reason="We can't keep this behaviour with remote workers 
where scheduler can't reach xcom")
 @pytest.mark.need_serialized_dag
 def test_schedule_tis_start_trigger(dag_maker, session):
     """
@@ -2092,7 +2093,7 @@ def 
test_schedule_tis_empty_operator_try_number(dag_maker, session: Session):
     assert empty_ti.try_number == 1
 
 
[email protected](reason="We can't keep this bevaviour with remote workers 
where scheduler can't reach xcom")
[email protected](reason="We can't keep this behaviour with remote workers 
where scheduler can't reach xcom")
 def test_schedule_tis_start_trigger_through_expand(dag_maker, session):
     """
     Test that an operator with start_trigger_args set can be directly deferred 
during scheduling.

Reply via email to