jason810496 commented on code in PR #55068:
URL: https://github.com/apache/airflow/pull/55068#discussion_r2627617268


##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -604,74 +607,96 @@ def update_triggers(self, requested_trigger_ids: 
set[int]):
         adds them to the dequeues so the subprocess can actually mutate the 
running
         trigger set.
         """
-        render_log_fname = log_filename_template_renderer()
+        from airflow.models.dagbag import DBDagBag
 
-        known_trigger_ids = (
-            self.running_triggers.union(x[0] for x in self.events)
-            .union(self.cancelling_triggers)
-            .union(trigger[0] for trigger in self.failed_triggers)
-            .union(trigger.id for trigger in self.creating_triggers)
-        )
-        # Work out the two difference sets
-        new_trigger_ids = requested_trigger_ids - known_trigger_ids
-        cancel_trigger_ids = self.running_triggers - requested_trigger_ids
-        # Bulk-fetch new trigger records
-        new_triggers = Trigger.bulk_fetch(new_trigger_ids)
-        trigger_ids_with_non_task_associations = 
Trigger.fetch_trigger_ids_with_non_task_associations()
-        to_create: list[workloads.RunTrigger] = []
-        # Add in new triggers
-        for new_id in new_trigger_ids:
-            # Check it didn't vanish in the meantime
-            if new_id not in new_triggers:
-                log.warning("Trigger disappeared before we could start it", 
id=new_id)
-                continue
-
-            new_trigger_orm = new_triggers[new_id]
-
-            # If the trigger is not associated to a task, an asset, or a 
callback, this means the TaskInstance
-            # row was updated by either Trigger.submit_event or 
Trigger.submit_failure
-            # and can happen when a single trigger Job is being run on 
multiple TriggerRunners
-            # in a High-Availability setup.
-            if new_trigger_orm.task_instance is None and new_id not in 
trigger_ids_with_non_task_associations:
-                log.info(
-                    (
-                        "TaskInstance Trigger is None. It was likely updated 
by another trigger job. "
-                        "Skipping trigger instantiation."
-                    ),
-                    id=new_id,
-                )
-                continue
+        dag_bag = DBDagBag()
+        render_log_fname = log_filename_template_renderer()
 
-            workload = workloads.RunTrigger(
-                classpath=new_trigger_orm.classpath,
-                id=new_id,
-                encrypted_kwargs=new_trigger_orm.encrypted_kwargs,
-                ti=None,
-            )
-            if new_trigger_orm.task_instance:
-                log_path = render_log_fname(ti=new_trigger_orm.task_instance)
+        @provide_session
+        def create_workload(trigger: Trigger, session: Session = NEW_SESSION) 
-> workloads.RunTrigger | None:
+            if trigger.task_instance:

Review Comment:
   Would early return for non TI related Trigger workload be better?
   ```suggestion
               if trigger.task_instance is None:
                   return workloads.RunTrigger(
                       id=new_id,
                       classpath=trigger.classpath,
                       encrypted_kwargs=trigger.encrypted_kwargs,
                   )
   ```



##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -604,74 +607,96 @@ def update_triggers(self, requested_trigger_ids: 
set[int]):
         adds them to the dequeues so the subprocess can actually mutate the 
running
         trigger set.
         """
-        render_log_fname = log_filename_template_renderer()
+        from airflow.models.dagbag import DBDagBag
 
-        known_trigger_ids = (
-            self.running_triggers.union(x[0] for x in self.events)
-            .union(self.cancelling_triggers)
-            .union(trigger[0] for trigger in self.failed_triggers)
-            .union(trigger.id for trigger in self.creating_triggers)
-        )
-        # Work out the two difference sets
-        new_trigger_ids = requested_trigger_ids - known_trigger_ids
-        cancel_trigger_ids = self.running_triggers - requested_trigger_ids
-        # Bulk-fetch new trigger records
-        new_triggers = Trigger.bulk_fetch(new_trigger_ids)
-        trigger_ids_with_non_task_associations = 
Trigger.fetch_trigger_ids_with_non_task_associations()
-        to_create: list[workloads.RunTrigger] = []
-        # Add in new triggers
-        for new_id in new_trigger_ids:
-            # Check it didn't vanish in the meantime
-            if new_id not in new_triggers:
-                log.warning("Trigger disappeared before we could start it", 
id=new_id)
-                continue
-
-            new_trigger_orm = new_triggers[new_id]
-
-            # If the trigger is not associated to a task, an asset, or a 
callback, this means the TaskInstance
-            # row was updated by either Trigger.submit_event or 
Trigger.submit_failure
-            # and can happen when a single trigger Job is being run on 
multiple TriggerRunners
-            # in a High-Availability setup.
-            if new_trigger_orm.task_instance is None and new_id not in 
trigger_ids_with_non_task_associations:
-                log.info(
-                    (
-                        "TaskInstance Trigger is None. It was likely updated 
by another trigger job. "
-                        "Skipping trigger instantiation."
-                    ),
-                    id=new_id,
-                )
-                continue
+        dag_bag = DBDagBag()
+        render_log_fname = log_filename_template_renderer()
 
-            workload = workloads.RunTrigger(
-                classpath=new_trigger_orm.classpath,
-                id=new_id,
-                encrypted_kwargs=new_trigger_orm.encrypted_kwargs,
-                ti=None,
-            )
-            if new_trigger_orm.task_instance:
-                log_path = render_log_fname(ti=new_trigger_orm.task_instance)
+        @provide_session
+        def create_workload(trigger: Trigger, session: Session = NEW_SESSION) 
-> workloads.RunTrigger | None:

Review Comment:
   We pass `new_trigger_orm` as `trigger` parameter in line 696. So it seems we 
could make `create_workload` a `staticmethod` instead of closure, then 
`new_trigger_orm` at line 618 and 622 could be replaced by `trigger` parameter.
   
   Since `new_trigger_orm` and `trigger` parameter are refer to same Trigger 
instance.



##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -1474,6 +1475,79 @@ def update_heartbeat(self):
                 .values(last_heartbeat_at=timezone.utcnow())
             )
 
+    @property
+    def start_trigger_args(self) -> StartTriggerArgs | None:
+        if self.task:
+            if self.task.is_mapped:
+                context = self.get_template_context()
+                if self.task.expand_start_from_trigger(context=context):
+                    return self.task.expand_start_trigger_args(context=context)
+            elif self.task.start_from_trigger is True:
+                return self.task.start_trigger_args
+        return None
+
+    # TODO: We have some code duplication here and in the 
_create_ti_state_update_query_and_update_state
+    #       method of the task_instances module in the execution api when a 
TIDeferredStatePayload is being
+    #       processed. This is because of a TaskInstance being updated 
differently using SQLAlchemy.
+    #       If we use the approach from the execution api as common code in 
the DagRun schedule_tis method,
+    #       the side effect is the changes done to the task instance aren't 
picked up by the scheduler and
+    #       thus the task instance isn't processed until the scheduler is 
restarted.
+    @provide_session
+    def defer_task(self, session: Session = NEW_SESSION) -> bool:
+        """
+        Mark the task as deferred and sets up the trigger that is needed to 
resume it when TaskDeferred is raised.
+
+        :meta: private
+        """
+        from airflow.models.trigger import Trigger
+
+        if TYPE_CHECKING:
+            assert isinstance(self.task, Operator)
+
+        if start_trigger_args := self.start_trigger_args:
+            trigger_kwargs = start_trigger_args.trigger_kwargs or {}
+            timeout = start_trigger_args.timeout
+
+            # Calculate timeout too if it was passed
+            if timeout is not None:
+                self.trigger_timeout = timezone.utcnow() + timeout
+            else:
+                self.trigger_timeout = None
+
+            trigger_row = Trigger(
+                classpath=start_trigger_args.trigger_cls,
+                kwargs=trigger_kwargs,
+            )
+

Review Comment:
   Do we need to set `encrypted_kwargs` attribute here as well?
   
   
https://github.com/apache/airflow/blob/dcaad6008e24964ec605e1fd8e09f4ea198a6422/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py#L506



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to