uranusjr commented on code in PR #39912:
URL: https://github.com/apache/airflow/pull/39912#discussion_r1682284981


##########
airflow/models/taskinstance.py:
##########
@@ -1595,13 +1595,20 @@ def _defer_task(
         next_kwargs = exception.kwargs
         timeout = exception.timeout
     elif ti.task is not None and ti.task.start_trigger_args is not None:
+        if isinstance(ti.task, MappedOperator):
+            context = ti.get_template_context()
+            start_trigger_args = 
ti.task._expand_start_trigger_args(context=context, session=session)
+        else:
+            start_trigger_args = ti.task.start_trigger_args

Review Comment:
   Same as above, this pattern is quite annoying.



##########
airflow/models/dagrun.py:
##########
@@ -1543,11 +1543,26 @@ def schedule_tis(
                 and not ti.task.outlets
             ):
                 dummy_ti_ids.append((ti.task_id, ti.map_index))
-            elif ti.task.start_from_trigger is True and 
ti.task.start_trigger_args is not None:
-                ti.start_date = timezone.utcnow()
-                if ti.state != TaskInstanceState.UP_FOR_RESCHEDULE:
-                    ti.try_number += 1
-                ti.defer_task(exception=None, session=session)
+            # check "start_trigger_args" to see whether the operator supports 
start execution from triggerer
+            # if so, we'll then check "start_from_trigger" to see whether this 
feature is turned on and defer
+            # this task.
+            # if not, we'll add this "ti" into "schedulable_ti_ids" and later 
execute it to run in the worker
+            elif ti.task.start_trigger_args is not None:
+                from airflow.models.mappedoperator import MappedOperator
+
+                if isinstance(ti.task, MappedOperator):
+                    context = ti.get_template_context()
+                    start_from_trigger = 
ti.task._expand_start_from_trigger(context=context, session=session)
+                else:
+                    start_from_trigger = ti.task.start_from_trigger

Review Comment:
   This `isinstance` check is really out of place, I hope it can be eliminated 
somehow.



##########
airflow/models/mappedoperator.py:
##########
@@ -729,6 +731,67 @@ def _get_unmap_kwargs(self, mapped_kwargs: Mapping[str, 
Any], *, strict: bool) -
             "params": params,
         }
 
+    def _expand_start_from_trigger(self, *, context: Context, session: 
Session) -> bool:
+        """
+        Get the kwargs to create the unmapped start_from_trigger.
+
+        This method is for allowing mapped operator to start execution from 
triggerer.
+        """
+        if not self.start_trigger_args:
+            return False
+
+        mapped_kwargs, _ = self._expand_mapped_kwargs(context, session, 
include_xcom=False)
+        if self._disallow_kwargs_override:
+            prevent_duplicates(
+                self.partial_kwargs,
+                mapped_kwargs,
+                fail_reason="unmappable or already specified",
+            )
+
+        # Ordering is significant; mapped kwargs should override partial ones.
+        return mapped_kwargs.get(
+            "start_from_trigger", 
self.partial_kwargs.get("start_from_trigger", self.start_from_trigger)
+        )
+
+    def _expand_start_trigger_args(self, *, context: Context, session: 
Session) -> StartTriggerArgs:
+        """
+        Get the kwargs to create the unmapped start_trigger_args.
+
+        This method is for allowing mapped operator to start execution from 
triggerer.
+        """
+        if not self.start_trigger_args:
+            raise AirflowException(

Review Comment:
   Can this exception class be more specific than AirflowException? Raising 
this is essentially the same as Exception and carries close to zero semantics.
   
   When can this exception be raised in the first place?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to