dstandish commented on code in PR #27344:
URL: https://github.com/apache/airflow/pull/27344#discussion_r1008211688


##########
airflow/models/trigger.py:
##########
@@ -122,17 +122,19 @@ def submit_event(cls, trigger_id, event, session=None):
         Takes an event from an instance of itself, and triggers all dependent
         tasks to resume.
         """
-        for task_instance in session.query(TaskInstance).filter(
-            TaskInstance.trigger_id == trigger_id, TaskInstance.state == 
State.DEFERRED
-        ):
-            # Add the event's payload into the kwargs for the task
-            next_kwargs = task_instance.next_kwargs or {}
-            next_kwargs["event"] = event.payload
-            task_instance.next_kwargs = next_kwargs
-            # Remove ourselves as its trigger
-            task_instance.trigger_id = None
-            # Finally, mark it as scheduled so it gets re-queued
-            task_instance.state = State.SCHEDULED
+        for attempt in run_with_db_retries():
+            with attempt:
+                for task_instance in session.query(TaskInstance).filter(
+                    TaskInstance.trigger_id == trigger_id, TaskInstance.state 
== State.DEFERRED
+                ):
+                    # Add the event's payload into the kwargs for the task
+                    next_kwargs = task_instance.next_kwargs or {}
+                    next_kwargs["event"] = event.payload
+                    task_instance.next_kwargs = next_kwargs
+                    # Remove ourselves as its trigger
+                    task_instance.trigger_id = None
+                    # Finally, mark it as scheduled so it gets re-queued
+                    task_instance.state = State.SCHEDULED

Review Comment:
   ```suggestion
           for attempt in run_with_db_retries():
               with attempt:
                   tis = session.query(TaskInstance).filter(
                       TaskInstance.trigger_id == trigger_id, 
TaskInstance.state == State.DEFERRED
                   )
   
           for task_instance in tis:
               # Add the event's payload into the kwargs for the task
               next_kwargs = task_instance.next_kwargs or {}
               next_kwargs["event"] = event.payload
               task_instance.next_kwargs = next_kwargs
               # Remove ourselves as its trigger
               task_instance.trigger_id = None
               # Finally, mark it as scheduled so it gets re-queued
               task_instance.state = State.SCHEDULED
   ```
   
   i think separating the retry logic from the actual work makes things a 
little more digestable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to