Lee-W commented on code in PR #39912:
URL: https://github.com/apache/airflow/pull/39912#discussion_r1684620020


##########
airflow/models/dagrun.py:
##########
@@ -1543,11 +1543,26 @@ def schedule_tis(
                 and not ti.task.outlets
             ):
                 dummy_ti_ids.append((ti.task_id, ti.map_index))
-            elif ti.task.start_from_trigger is True and 
ti.task.start_trigger_args is not None:
-                ti.start_date = timezone.utcnow()
-                if ti.state != TaskInstanceState.UP_FOR_RESCHEDULE:
-                    ti.try_number += 1
-                ti.defer_task(exception=None, session=session)
+            # check "start_trigger_args" to see whether the operator supports 
start execution from triggerer
+            # if so, we'll then check "start_from_trigger" to see whether this 
feature is turned on and defer
+            # this task.
+            # if not, we'll add this "ti" into "schedulable_ti_ids" and later 
execute it to run in the worker
+            elif ti.task.start_trigger_args is not None:
+                from airflow.models.mappedoperator import MappedOperator
+
+                if isinstance(ti.task, MappedOperator):
+                    context = ti.get_template_context()
+                    start_from_trigger = 
ti.task._expand_start_from_trigger(context=context, session=session)
+                else:
+                    start_from_trigger = ti.task.start_from_trigger

Review Comment:
   Sounds great! I just refactored it in this way and pushed it.



##########
airflow/models/taskinstance.py:
##########
@@ -1595,13 +1595,20 @@ def _defer_task(
         next_kwargs = exception.kwargs
         timeout = exception.timeout
     elif ti.task is not None and ti.task.start_trigger_args is not None:
+        if isinstance(ti.task, MappedOperator):
+            context = ti.get_template_context()
+            start_trigger_args = 
ti.task._expand_start_trigger_args(context=context, session=session)
+        else:
+            start_trigger_args = ti.task.start_trigger_args

Review Comment:
   Sounds great! I just refactored it in this way and pushed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to