Taragolis commented on code in PR #29038:
URL: https://github.com/apache/airflow/pull/29038#discussion_r1106290063


##########
airflow/providers/http/hooks/http.py:
##########
@@ -246,3 +253,142 @@ def test_connection(self):
             return True, "Connection successfully tested"
         except Exception as e:
             return False, str(e)
+
+
+class HttpAsyncHook(BaseHook):
+    """
+    Interact with HTTP servers using Python Async.
+
+    :param method: the API method to be called
+    :param http_conn_id: http connection id that has the base
+        API url i.e https://www.google.com/ and optional authentication 
credentials. Default
+        headers can also be specified in the Extra field in json format.
+    :param auth_type: The auth type for the service
+    """
+
+    conn_name_attr = "http_conn_id"
+    default_conn_name = "http_default"
+    conn_type = "http"
+    hook_name = "HTTP"
+
+    def __init__(
+        self,
+        method: str = "POST",
+        http_conn_id: str = default_conn_name,
+        auth_type: Any = aiohttp.BasicAuth,
+        retry_limit: int = 3,
+        retry_delay: float = 1.0,
+    ) -> None:
+        self.http_conn_id = http_conn_id
+        self.method = method.upper()
+        self.base_url: str = ""
+        self._retry_obj: Callable[..., Any]
+        self.auth_type: Any = auth_type
+        if retry_limit < 1:
+            raise ValueError("Retry limit must be greater than equal to 1")
+        self.retry_limit = retry_limit
+        self.retry_delay = retry_delay
+
+    async def run(
+        self,
+        endpoint: str | None = None,
+        data: dict[str, Any] | str | None = None,
+        headers: dict[str, Any] | None = None,
+        extra_options: dict[str, Any] | None = None,
+    ) -> "ClientResponse":
+        r"""
+        Performs an asynchronous HTTP request call
+
+        :param endpoint: the endpoint to be called i.e. resource/v1/query?
+        :param data: payload to be uploaded or request parameters
+        :param headers: additional headers to be passed through as a dictionary
+        :param extra_options: Additional kwargs to pass when creating a 
request.
+            For example, ``run(json=obj)`` is passed as 
``aiohttp.ClientSession().get(json=obj)``
+        """
+        extra_options = extra_options or {}
+
+        # headers may be passed through directly or in the "extra" field in 
the connection
+        # definition
+        _headers = {}
+        auth = None
+
+        if self.http_conn_id:

Review Comment:
   The main point that `sync_to_async` decorated functions run in some kind of 
sequential queue (or otherwise it not thread safe), as result It may lead some 
issues with thread safety.
   
   Code which run inside `sync_to_async` not lock native async code but lock 
execution other `sync_to_async` decorated code it is demonstrated in my 
example. The same happen in triggerer airflow service when single hanged 
`sync_to_async` decorated function might cause that one of triggered would have 
huge queue for that code, an almost every single Trigger call `sync_to_async`.
   
   So, unfortunetly ,"we have already used sync_to_async in various airflow 
provider's code base currently as well like" not make things better rather than 
create some kind of time bomb.
   
   I don't mean that you doing a bad job and all this `sync_to_async` decorated 
functions added for a good purpose, and in general it nice to have more 
deferrable tasks out-of-the-box however the main problem that Airflow core not 
ready to operate with Deferrable tasks which need to have access to Core 
components, such as Connections, Variables, XCom and etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to