dabla commented on code in PR #60268:
URL: https://github.com/apache/airflow/pull/60268#discussion_r2678449486


##########
providers/common/compat/src/airflow/providers/common/compat/standard/operators.py:
##########
@@ -17,18 +17,118 @@
 
 from __future__ import annotations
 
+from typing import TYPE_CHECKING
+
 from airflow.providers.common.compat._compat_utils import create_module_getattr
+from airflow.providers.common.compat.version_compat import (
+    AIRFLOW_V_3_0_PLUS,
+    AIRFLOW_V_3_1_PLUS,
+    AIRFLOW_V_3_2_PLUS,
+)
 
 _IMPORT_MAP: dict[str, str | tuple[str, ...]] = {
     # Re-export from sdk (which handles Airflow 2.x/3.x fallbacks)
     "BaseOperator": "airflow.providers.common.compat.sdk",
+    "BaseAsyncOperator": "airflow.providers.common.compat.sdk",
     "get_current_context": "airflow.providers.common.compat.sdk",
+    "is_async_callable": "airflow.providers.common.compat.sdk",
     # Standard provider items with direct fallbacks
     "PythonOperator": ("airflow.providers.standard.operators.python", 
"airflow.operators.python"),
     "ShortCircuitOperator": ("airflow.providers.standard.operators.python", 
"airflow.operators.python"),
     "_SERIALIZERS": ("airflow.providers.standard.operators.python", 
"airflow.operators.python"),
 }
 
+if TYPE_CHECKING:
+    from airflow.sdk.bases.decorator import is_async_callable
+    from airflow.sdk.bases.operator import BaseAsyncOperator
+elif AIRFLOW_V_3_2_PLUS:
+    from airflow.sdk.bases.decorator import is_async_callable
+    from airflow.sdk.bases.operator import BaseAsyncOperator
+else:
+    import inspect
+    from contextlib import suppress
+    from functools import partial
+
+    if AIRFLOW_V_3_0_PLUS:
+        from airflow.sdk import BaseOperator
+        from airflow.sdk.bases.decorator import _TaskDecorator
+        from airflow.sdk.definitions.mappedoperator import OperatorPartial
+    else:
+        from airflow.decorators.base import _TaskDecorator
+        from airflow.models import BaseOperator
+        from airflow.models.mappedoperator import OperatorPartial
+
+    def unwrap_partial(fn):
+        while isinstance(fn, partial):
+            fn = fn.func
+        return fn
+
+    def unwrap_callable(func):
+        # Airflow-specific unwrap
+        if isinstance(func, (_TaskDecorator, OperatorPartial)):
+            func = getattr(func, "function", getattr(func, "_func", func))
+
+        # Unwrap functools.partial
+        func = unwrap_partial(func)
+
+        # Unwrap @functools.wraps chains
+        with suppress(Exception):
+            func = inspect.unwrap(func)
+
+        return func
+
+    def is_async_callable(func):
+        """Detect if a callable (possibly wrapped) is an async function."""
+        func = unwrap_callable(func)
+
+        if not callable(func):
+            return False
+
+        # Direct async function
+        if inspect.iscoroutinefunction(func):
+            return True
+
+        # Callable object with async __call__
+        if not inspect.isfunction(func):
+            call = type(func).__call__  # Bandit-safe
+            with suppress(Exception):
+                call = inspect.unwrap(call)
+            if inspect.iscoroutinefunction(call):
+                return True
+
+        return False
+
+    class BaseAsyncOperator(BaseOperator):
+        """
+        Base class for async-capable operators.
+
+        As opposed to deferred operators which are executed on the triggerer, 
async operators are executed
+        on the worker.
+        """
+
+        @property
+        def is_async(self) -> bool:
+            return True
+
+        if not AIRFLOW_V_3_1_PLUS:
+
+            @property
+            def xcom_push(self) -> bool:
+                return self.do_xcom_push
+
+            @xcom_push.setter
+            def xcom_push(self, value: bool):
+                self.do_xcom_push = value
+
+        async def aexecute(self, context):
+            """Async version of execute(). Subclasses should implement this."""
+            raise NotImplementedError()
+
+        def execute(self, context):
+            """Run `aexecute()` inside an event loop."""
+            raise NotImplementedError("Airflow 3.2+ is required to allow 
executing async operators!")

Review Comment:
   @potiuk: What do you think of this simplification, at then end this is only 
for backward compat, so the check here doens't need to be as advanced as for 
the real async PythonOperator or the DecoratedOperator in Airflow 3.2+.  I 
think it's indeed a good idea?
   
   ```
   def is_async_callable(func) -> bool:
           """Detect async callables. """
           import inspect
           from functools import partial
           
           while isinstance(func, partial):
               func = func.func
           return inspect.iscoroutinefunction(func)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to