kaxil commented on code in PR #59087:
URL: https://github.com/apache/airflow/pull/59087#discussion_r2672681142


##########
task-sdk/src/airflow/sdk/bases/operator.py:
##########
@@ -1670,6 +1694,35 @@ def has_on_skipped_callback(self) -> bool:
         return bool(self.on_skipped_callback)
 
 
+class BaseAsyncOperator(BaseOperator):
+    """
+    Base class for async-capable operators.
+
+    As opposed to deferred operators which are executed on the triggerer, 
async operators are executed
+    on the worker.
+    """
+
+    @property
+    def is_async(self) -> bool:
+        return True
+
+    async def aexecute(self, context):
+        """Async version of execute(). Subclasses should implement this."""
+        raise NotImplementedError()
+
+    def execute(self, context):
+        """Run `aexecute()` inside an event loop."""
+        with event_loop() as loop:
+            if self.execution_timeout:
+                return loop.run_until_complete(
+                    asyncio.wait_for(
+                        self.aexecute(context),
+                        timeout=self.execution_timeout.total_seconds(),
+                    )
+                )
+            return loop.run_until_complete(self.aexecute(context))

Review Comment:
   We should add a test for this case specifically tbh so that a regression 
won't make async tasks run indefinitely.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to