Lee-W commented on code in PR #55068:
URL: https://github.com/apache/airflow/pull/55068#discussion_r2541293088


##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -1446,6 +1447,80 @@ def update_heartbeat(self):
                 .values(last_heartbeat_at=timezone.utcnow())
             )
 
+    @property
+    def start_trigger_args(self) -> StartTriggerArgs | None:
+        if self.task:
+            if self.task.is_mapped:
+                context = self.get_template_context()
+                if self.task.expand_start_from_trigger(context=context):

Review Comment:
   If `MappedOperator` does not works well now, should we raise a warning?



##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -1446,6 +1447,80 @@ def update_heartbeat(self):
                 .values(last_heartbeat_at=timezone.utcnow())
             )
 
+    @property
+    def start_trigger_args(self) -> StartTriggerArgs | None:
+        if self.task:
+            if self.task.is_mapped:
+                context = self.get_template_context()
+                if self.task.expand_start_from_trigger(context=context):
+                    return self.task.expand_start_trigger_args(context=context)
+            elif self.task.start_from_trigger:
+                return self.task.start_trigger_args
+        return None
+
+    # TODO: We have some code duplication here and in the 
_create_ti_state_update_query_and_update_state
+    #       method of the task_instances module in the execution api when a 
TIDeferredStatePayload is being
+    #       processed. This is because of a TaskInstance being updated 
differently using SQLAlchemy.
+    #       If we use the approach from the execution api as common code in 
the DagRun schedule_tis method,
+    #       the side effect is the changes done to the task instance aren't 
picked up by the scheduler and
+    #       thus the task instance isn't processed until the scheduler is 
restarted.
+    @provide_session
+    def defer_task(self, session: Session = NEW_SESSION) -> bool:
+        """
+        Mark the task as deferred and sets up the trigger that is needed to 
resume it when TaskDeferred is raised.
+
+        :meta: private
+        """
+        from airflow.models.trigger import Trigger
+
+        if TYPE_CHECKING:
+            assert isinstance(self.task, Operator)
+
+        if start_trigger_args := self.start_trigger_args:
+            trigger_kwargs = start_trigger_args.trigger_kwargs or {}
+            timeout = start_trigger_args.timeout
+
+            # Calculate timeout too if it was passed
+            if timeout is not None:
+                self.trigger_timeout = timezone.utcnow() + timeout
+            else:
+                self.trigger_timeout = None
+
+            trigger_row = Trigger(
+                classpath=start_trigger_args.trigger_cls,
+                kwargs=trigger_kwargs,
+            )
+
+            # First, make the trigger entry
+            session.add(trigger_row)
+            session.flush()
+
+            # Then, update ourselves so it matches the deferral request
+            # Keep an eye on the logic in 
`check_and_change_state_before_execution()`
+            # depending on self.next_method semantics
+            self.state = TaskInstanceState.DEFERRED
+            self.trigger_id = trigger_row.id
+            self.next_method = start_trigger_args.next_method
+            self.next_kwargs = start_trigger_args.next_kwargs or {}
+
+            # If an execution_timeout is set, set the timeout to the minimum of
+            # it and the trigger timeout
+            execution_timeout = self.task.execution_timeout
+            if execution_timeout:

Review Comment:
   ```suggestion
               if execution_timeout := self.task.execution_timeout:
   ```



##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -1446,6 +1447,80 @@ def update_heartbeat(self):
                 .values(last_heartbeat_at=timezone.utcnow())
             )
 
+    @property
+    def start_trigger_args(self) -> StartTriggerArgs | None:
+        if self.task:
+            if self.task.is_mapped:
+                context = self.get_template_context()
+                if self.task.expand_start_from_trigger(context=context):
+                    return self.task.expand_start_trigger_args(context=context)
+            elif self.task.start_from_trigger:

Review Comment:
   ```suggestion
               elif self.task.start_from_trigger is True:
   ```
   
   nit: for expliclity



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to