arorasachin9 commented on code in PR #60588:
URL: https://github.com/apache/airflow/pull/60588#discussion_r2696758697


##########
providers/amazon/src/airflow/providers/amazon/aws/triggers/base.py:
##########
@@ -159,3 +161,25 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
                 self.status_queries,
             )
             yield TriggerEvent({"status": "success", self.return_key: 
self.return_value})
+
+    async def cancel(self):

Review Comment:
   @vincbeck ,
   This function is the function defined in the baseTrigger class.
   ```
       async def cleanup(self) -> None:
           """
           Cleanup the trigger.
   
           Called when the trigger is no longer needed, and it's being removed
           from the active triggerer process.
   
           This method follows the async/await pattern to allow to run the 
cleanup
           in triggerer main event loop. Exceptions raised by the cleanup method
           are ignored, so if you would like to be able to debug them and be 
notified
           that cleanup method failed, you should wrap your code with 
try/except block
           and handle it appropriately (in async-compatible way).
           """
   ```
   And this cleanup function is called in the finally block of the runner 
function of trigerrer
   ```
   async def run_trigger(self, trigger_id: int, trigger: BaseTrigger, 
timeout_after: datetime | None = None):
           """Run a trigger (they are async generators) and push their events 
into our outbound event deque."""
           if not os.environ.get("AIRFLOW_DISABLE_GREENBACK_PORTAL", 
"").lower() == "true":
               import greenback
   
               await greenback.ensure_portal()
   
           bind_log_contextvars(trigger_id=trigger_id)
   
           name = self.triggers[trigger_id]["name"]
           self.log.info("trigger %s starting", name)
           try:
               async for event in trigger.run():
                   await self.log.ainfo(
                       "Trigger fired event", 
name=self.triggers[trigger_id]["name"], result=event
                   )
                   self.triggers[trigger_id]["events"] += 1
                   self.events.append((trigger_id, event))
           except asyncio.CancelledError:
               # We get cancelled by the scheduler changing the task state. But 
if we do lets give a nice error
               # message about it
               if timeout := timeout_after:
                   timeout = timeout.replace(tzinfo=timezone.utc) if not 
timeout.tzinfo else timeout
                   if timeout < timezone.utcnow():
                       await self.log.aerror("Trigger cancelled due to timeout")
               raise
           finally:
               # CancelledError will get injected when we're stopped - which is
               # fine, the cleanup process will understand that, but we want to
               # allow triggers a chance to cleanup, either in that case or if
               # they exit cleanly. Exception from cleanup methods are ignored.
               with suppress(Exception):
                   await trigger.cleanup()
   
               await self.log.ainfo("trigger completed", name=name)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to