Taragolis commented on code in PR #29038:
URL: https://github.com/apache/airflow/pull/29038#discussion_r1081712597
##########
airflow/providers/http/hooks/http.py:
##########
@@ -246,3 +253,142 @@ def test_connection(self):
return True, "Connection successfully tested"
except Exception as e:
return False, str(e)
+
+
+class HttpAsyncHook(BaseHook):
+ """
+ Interact with HTTP servers using Python Async.
+
+ :param method: the API method to be called
+ :param http_conn_id: http connection id that has the base
+ API url i.e https://www.google.com/ and optional authentication
credentials. Default
+ headers can also be specified in the Extra field in json format.
+ :param auth_type: The auth type for the service
+ """
+
+ conn_name_attr = "http_conn_id"
+ default_conn_name = "http_default"
+ conn_type = "http"
+ hook_name = "HTTP"
+
+ def __init__(
+ self,
+ method: str = "POST",
+ http_conn_id: str = default_conn_name,
+ auth_type: Any = aiohttp.BasicAuth,
+ retry_limit: int = 3,
+ retry_delay: float = 1.0,
+ ) -> None:
+ self.http_conn_id = http_conn_id
+ self.method = method.upper()
+ self.base_url: str = ""
+ self._retry_obj: Callable[..., Any]
+ self.auth_type: Any = auth_type
+ if retry_limit < 1:
+ raise ValueError("Retry limit must be greater than equal to 1")
+ self.retry_limit = retry_limit
+ self.retry_delay = retry_delay
+
+ async def run(
+ self,
+ endpoint: str | None = None,
+ data: dict[str, Any] | str | None = None,
+ headers: dict[str, Any] | None = None,
+ extra_options: dict[str, Any] | None = None,
+ ) -> "ClientResponse":
+ r"""
+ Performs an asynchronous HTTP request call
+
+ :param endpoint: the endpoint to be called i.e. resource/v1/query?
+ :param data: payload to be uploaded or request parameters
+ :param headers: additional headers to be passed through as a dictionary
+ :param extra_options: Additional kwargs to pass when creating a
request.
+ For example, ``run(json=obj)`` is passed as
``aiohttp.ClientSession().get(json=obj)``
+ """
+ extra_options = extra_options or {}
+
+ # headers may be passed through directly or in the "extra" field in
the connection
+ # definition
+ _headers = {}
+ auth = None
+
+ if self.http_conn_id:
+ conn = await sync_to_async(self.get_connection)(self.http_conn_id)
+
+ if conn.host and "://" in conn.host:
+ self.base_url = conn.host
+ else:
+ # schema defaults to HTTP
+ schema = conn.schema if conn.schema else "http"
+ host = conn.host if conn.host else ""
+ self.base_url = schema + "://" + host
+
+ if conn.port:
+ self.base_url = self.base_url + ":" + str(conn.port)
+ if conn.login:
+ auth = self.auth_type(conn.login, conn.password)
+ if conn.extra:
+ try:
+ _headers.update(conn.extra_dejson)
+ except TypeError:
+ self.log.warning("Connection to %s has invalid extra
field.", conn.host)
+ if headers:
+ _headers.update(headers)
+
+ if self.base_url and not self.base_url.endswith("/") and endpoint and
not endpoint.startswith("/"):
+ url = self.base_url + "/" + endpoint
+ else:
+ url = (self.base_url or "") + (endpoint or "")
+
+ async with aiohttp.ClientSession() as session:
+ if self.method == "GET":
+ request_func = session.get
+ elif self.method == "POST":
+ request_func = session.post
+ elif self.method == "PATCH":
+ request_func = session.patch
+ else:
+ raise AirflowException(f"Unexpected HTTP Method:
{self.method}")
+
+ attempt_num = 1
+ while True:
+ response = await request_func(
+ url,
+ json=data if self.method in ("POST", "PATCH") else None,
+ params=data if self.method == "GET" else None,
+ headers=headers,
+ auth=auth,
+ **extra_options,
+ )
+ try:
+ response.raise_for_status()
+ return response
+ except ClientResponseError as e:
+ self.log.warning(
+ "[Try %d of %d] Request to %s failed.",
+ attempt_num,
+ self.retry_limit,
+ url,
+ )
+ if not self._retryable_error_async(e) or attempt_num ==
self.retry_limit:
+ self.log.exception("HTTP error with status: %s",
e.status)
+ # In this case, the user probably made a mistake.
+ # Don't retry.
+ raise AirflowException(str(e.status) + ":" + e.message)
+
+ attempt_num += 1
+ await asyncio.sleep(self.retry_delay)
+
+ def _retryable_error_async(self, exception: ClientResponseError) -> bool:
+ """
+ Determines whether or not an exception that was thrown might be
successful
+ on a subsequent attempt.
+
+ It considers the following to be retryable:
+ - requests_exceptions.ConnectionError
+ - requests_exceptions.Timeout
+ - anything with a status code >= 500
+
+ Most retryable errors are covered by status code >= 500.
+ """
+ return exception.status >= 500
Review Comment:
[429](https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/429),
[413](https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/413) (if it
provide specific header)
##########
airflow/providers/http/hooks/http.py:
##########
@@ -246,3 +253,142 @@ def test_connection(self):
return True, "Connection successfully tested"
except Exception as e:
return False, str(e)
+
+
+class HttpAsyncHook(BaseHook):
+ """
+ Interact with HTTP servers using Python Async.
+
+ :param method: the API method to be called
+ :param http_conn_id: http connection id that has the base
+ API url i.e https://www.google.com/ and optional authentication
credentials. Default
+ headers can also be specified in the Extra field in json format.
+ :param auth_type: The auth type for the service
+ """
+
+ conn_name_attr = "http_conn_id"
+ default_conn_name = "http_default"
+ conn_type = "http"
+ hook_name = "HTTP"
+
+ def __init__(
+ self,
+ method: str = "POST",
+ http_conn_id: str = default_conn_name,
+ auth_type: Any = aiohttp.BasicAuth,
+ retry_limit: int = 3,
+ retry_delay: float = 1.0,
+ ) -> None:
+ self.http_conn_id = http_conn_id
+ self.method = method.upper()
+ self.base_url: str = ""
+ self._retry_obj: Callable[..., Any]
+ self.auth_type: Any = auth_type
+ if retry_limit < 1:
+ raise ValueError("Retry limit must be greater than equal to 1")
+ self.retry_limit = retry_limit
+ self.retry_delay = retry_delay
+
+ async def run(
+ self,
+ endpoint: str | None = None,
+ data: dict[str, Any] | str | None = None,
+ headers: dict[str, Any] | None = None,
+ extra_options: dict[str, Any] | None = None,
+ ) -> "ClientResponse":
+ r"""
+ Performs an asynchronous HTTP request call
+
+ :param endpoint: the endpoint to be called i.e. resource/v1/query?
+ :param data: payload to be uploaded or request parameters
+ :param headers: additional headers to be passed through as a dictionary
+ :param extra_options: Additional kwargs to pass when creating a
request.
+ For example, ``run(json=obj)`` is passed as
``aiohttp.ClientSession().get(json=obj)``
+ """
+ extra_options = extra_options or {}
+
+ # headers may be passed through directly or in the "extra" field in
the connection
+ # definition
+ _headers = {}
+ auth = None
+
+ if self.http_conn_id:
+ conn = await sync_to_async(self.get_connection)(self.http_conn_id)
+
+ if conn.host and "://" in conn.host:
+ self.base_url = conn.host
+ else:
+ # schema defaults to HTTP
+ schema = conn.schema if conn.schema else "http"
+ host = conn.host if conn.host else ""
+ self.base_url = schema + "://" + host
+
+ if conn.port:
+ self.base_url = self.base_url + ":" + str(conn.port)
+ if conn.login:
+ auth = self.auth_type(conn.login, conn.password)
+ if conn.extra:
+ try:
+ _headers.update(conn.extra_dejson)
+ except TypeError:
+ self.log.warning("Connection to %s has invalid extra
field.", conn.host)
+ if headers:
+ _headers.update(headers)
+
+ if self.base_url and not self.base_url.endswith("/") and endpoint and
not endpoint.startswith("/"):
+ url = self.base_url + "/" + endpoint
+ else:
+ url = (self.base_url or "") + (endpoint or "")
+
+ async with aiohttp.ClientSession() as session:
+ if self.method == "GET":
+ request_func = session.get
+ elif self.method == "POST":
+ request_func = session.post
+ elif self.method == "PATCH":
+ request_func = session.patch
+ else:
+ raise AirflowException(f"Unexpected HTTP Method:
{self.method}")
+
+ attempt_num = 1
+ while True:
+ response = await request_func(
+ url,
+ json=data if self.method in ("POST", "PATCH") else None,
+ params=data if self.method == "GET" else None,
+ headers=headers,
+ auth=auth,
+ **extra_options,
+ )
+ try:
+ response.raise_for_status()
+ return response
+ except ClientResponseError as e:
+ self.log.warning(
+ "[Try %d of %d] Request to %s failed.",
+ attempt_num,
+ self.retry_limit,
+ url,
+ )
+ if not self._retryable_error_async(e) or attempt_num ==
self.retry_limit:
+ self.log.exception("HTTP error with status: %s",
e.status)
+ # In this case, the user probably made a mistake.
+ # Don't retry.
+ raise AirflowException(str(e.status) + ":" + e.message)
Review Comment:
```suggestion
raise AirflowException(f"{e.status}:{e.message}"
```
##########
airflow/providers/http/hooks/http.py:
##########
@@ -246,3 +253,142 @@ def test_connection(self):
return True, "Connection successfully tested"
except Exception as e:
return False, str(e)
+
+
+class HttpAsyncHook(BaseHook):
+ """
+ Interact with HTTP servers using Python Async.
+
+ :param method: the API method to be called
+ :param http_conn_id: http connection id that has the base
+ API url i.e https://www.google.com/ and optional authentication
credentials. Default
+ headers can also be specified in the Extra field in json format.
+ :param auth_type: The auth type for the service
+ """
+
+ conn_name_attr = "http_conn_id"
+ default_conn_name = "http_default"
+ conn_type = "http"
+ hook_name = "HTTP"
+
+ def __init__(
+ self,
+ method: str = "POST",
+ http_conn_id: str = default_conn_name,
+ auth_type: Any = aiohttp.BasicAuth,
+ retry_limit: int = 3,
+ retry_delay: float = 1.0,
+ ) -> None:
+ self.http_conn_id = http_conn_id
+ self.method = method.upper()
+ self.base_url: str = ""
+ self._retry_obj: Callable[..., Any]
+ self.auth_type: Any = auth_type
+ if retry_limit < 1:
+ raise ValueError("Retry limit must be greater than equal to 1")
+ self.retry_limit = retry_limit
+ self.retry_delay = retry_delay
+
+ async def run(
+ self,
+ endpoint: str | None = None,
+ data: dict[str, Any] | str | None = None,
+ headers: dict[str, Any] | None = None,
+ extra_options: dict[str, Any] | None = None,
+ ) -> "ClientResponse":
+ r"""
+ Performs an asynchronous HTTP request call
+
+ :param endpoint: the endpoint to be called i.e. resource/v1/query?
+ :param data: payload to be uploaded or request parameters
+ :param headers: additional headers to be passed through as a dictionary
+ :param extra_options: Additional kwargs to pass when creating a
request.
+ For example, ``run(json=obj)`` is passed as
``aiohttp.ClientSession().get(json=obj)``
+ """
+ extra_options = extra_options or {}
+
+ # headers may be passed through directly or in the "extra" field in
the connection
+ # definition
+ _headers = {}
+ auth = None
+
+ if self.http_conn_id:
+ conn = await sync_to_async(self.get_connection)(self.http_conn_id)
Review Comment:
I think every new single "AsyncHook" intend to use this approach.
Maybe it is a good time to create BaseAsyncTask in Core Airflow and somehow
backport it to providers, for example introduce common-backport package? And if
we would have async support for other core components (such as connections, at
least for DB and ENV) it would be the easier to change in one place, rather
then in multiple different.
cc: @kaxil @potiuk
##########
airflow/providers/http/hooks/http.py:
##########
@@ -246,3 +253,142 @@ def test_connection(self):
return True, "Connection successfully tested"
except Exception as e:
return False, str(e)
+
+
+class HttpAsyncHook(BaseHook):
+ """
+ Interact with HTTP servers using Python Async.
+
+ :param method: the API method to be called
+ :param http_conn_id: http connection id that has the base
+ API url i.e https://www.google.com/ and optional authentication
credentials. Default
+ headers can also be specified in the Extra field in json format.
+ :param auth_type: The auth type for the service
+ """
+
+ conn_name_attr = "http_conn_id"
+ default_conn_name = "http_default"
+ conn_type = "http"
+ hook_name = "HTTP"
+
+ def __init__(
+ self,
+ method: str = "POST",
+ http_conn_id: str = default_conn_name,
+ auth_type: Any = aiohttp.BasicAuth,
+ retry_limit: int = 3,
+ retry_delay: float = 1.0,
+ ) -> None:
+ self.http_conn_id = http_conn_id
+ self.method = method.upper()
+ self.base_url: str = ""
+ self._retry_obj: Callable[..., Any]
+ self.auth_type: Any = auth_type
+ if retry_limit < 1:
+ raise ValueError("Retry limit must be greater than equal to 1")
+ self.retry_limit = retry_limit
+ self.retry_delay = retry_delay
+
+ async def run(
+ self,
+ endpoint: str | None = None,
+ data: dict[str, Any] | str | None = None,
+ headers: dict[str, Any] | None = None,
+ extra_options: dict[str, Any] | None = None,
+ ) -> "ClientResponse":
+ r"""
+ Performs an asynchronous HTTP request call
+
+ :param endpoint: the endpoint to be called i.e. resource/v1/query?
+ :param data: payload to be uploaded or request parameters
+ :param headers: additional headers to be passed through as a dictionary
+ :param extra_options: Additional kwargs to pass when creating a
request.
+ For example, ``run(json=obj)`` is passed as
``aiohttp.ClientSession().get(json=obj)``
+ """
+ extra_options = extra_options or {}
+
+ # headers may be passed through directly or in the "extra" field in
the connection
+ # definition
+ _headers = {}
+ auth = None
+
+ if self.http_conn_id:
+ conn = await sync_to_async(self.get_connection)(self.http_conn_id)
+
+ if conn.host and "://" in conn.host:
+ self.base_url = conn.host
+ else:
+ # schema defaults to HTTP
+ schema = conn.schema if conn.schema else "http"
+ host = conn.host if conn.host else ""
+ self.base_url = schema + "://" + host
+
+ if conn.port:
+ self.base_url = self.base_url + ":" + str(conn.port)
+ if conn.login:
+ auth = self.auth_type(conn.login, conn.password)
+ if conn.extra:
+ try:
+ _headers.update(conn.extra_dejson)
+ except TypeError:
+ self.log.warning("Connection to %s has invalid extra
field.", conn.host)
+ if headers:
+ _headers.update(headers)
+
+ if self.base_url and not self.base_url.endswith("/") and endpoint and
not endpoint.startswith("/"):
+ url = self.base_url + "/" + endpoint
+ else:
+ url = (self.base_url or "") + (endpoint or "")
+
+ async with aiohttp.ClientSession() as session:
+ if self.method == "GET":
+ request_func = session.get
+ elif self.method == "POST":
+ request_func = session.post
+ elif self.method == "PATCH":
+ request_func = session.patch
+ else:
+ raise AirflowException(f"Unexpected HTTP Method:
{self.method}")
+
+ attempt_num = 1
+ while True:
+ response = await request_func(
+ url,
+ json=data if self.method in ("POST", "PATCH") else None,
+ params=data if self.method == "GET" else None,
+ headers=headers,
+ auth=auth,
+ **extra_options,
+ )
+ try:
+ response.raise_for_status()
+ return response
+ except ClientResponseError as e:
+ self.log.warning(
+ "[Try %d of %d] Request to %s failed.",
+ attempt_num,
+ self.retry_limit,
+ url,
+ )
+ if not self._retryable_error_async(e) or attempt_num ==
self.retry_limit:
+ self.log.exception("HTTP error with status: %s",
e.status)
+ # In this case, the user probably made a mistake.
+ # Don't retry.
+ raise AirflowException(str(e.status) + ":" + e.message)
+
+ attempt_num += 1
+ await asyncio.sleep(self.retry_delay)
+
+ def _retryable_error_async(self, exception: ClientResponseError) -> bool:
+ """
+ Determines whether or not an exception that was thrown might be
successful
+ on a subsequent attempt.
+
+ It considers the following to be retryable:
+ - requests_exceptions.ConnectionError
+ - requests_exceptions.Timeout
Review Comment:
I can't find place where we catch this exceptions.
##########
airflow/providers/http/hooks/http.py:
##########
@@ -246,3 +253,142 @@ def test_connection(self):
return True, "Connection successfully tested"
except Exception as e:
return False, str(e)
+
+
+class HttpAsyncHook(BaseHook):
+ """
+ Interact with HTTP servers using Python Async.
+
+ :param method: the API method to be called
+ :param http_conn_id: http connection id that has the base
+ API url i.e https://www.google.com/ and optional authentication
credentials. Default
+ headers can also be specified in the Extra field in json format.
+ :param auth_type: The auth type for the service
+ """
+
+ conn_name_attr = "http_conn_id"
+ default_conn_name = "http_default"
+ conn_type = "http"
+ hook_name = "HTTP"
+
+ def __init__(
+ self,
+ method: str = "POST",
+ http_conn_id: str = default_conn_name,
+ auth_type: Any = aiohttp.BasicAuth,
+ retry_limit: int = 3,
+ retry_delay: float = 1.0,
+ ) -> None:
+ self.http_conn_id = http_conn_id
+ self.method = method.upper()
+ self.base_url: str = ""
+ self._retry_obj: Callable[..., Any]
+ self.auth_type: Any = auth_type
+ if retry_limit < 1:
+ raise ValueError("Retry limit must be greater than equal to 1")
+ self.retry_limit = retry_limit
+ self.retry_delay = retry_delay
+
+ async def run(
+ self,
+ endpoint: str | None = None,
+ data: dict[str, Any] | str | None = None,
+ headers: dict[str, Any] | None = None,
+ extra_options: dict[str, Any] | None = None,
+ ) -> "ClientResponse":
+ r"""
+ Performs an asynchronous HTTP request call
+
+ :param endpoint: the endpoint to be called i.e. resource/v1/query?
+ :param data: payload to be uploaded or request parameters
+ :param headers: additional headers to be passed through as a dictionary
+ :param extra_options: Additional kwargs to pass when creating a
request.
+ For example, ``run(json=obj)`` is passed as
``aiohttp.ClientSession().get(json=obj)``
+ """
+ extra_options = extra_options or {}
+
+ # headers may be passed through directly or in the "extra" field in
the connection
+ # definition
+ _headers = {}
+ auth = None
+
+ if self.http_conn_id:
Review Comment:
And also don't know what happen when multiple different Trigger's will try
to call `sync_to_async` simultaneously?
- Is is thread safety?
- Or is all of them would run sequentially?
##########
tests/providers/http/hooks/test_http.py:
##########
@@ -392,3 +394,95 @@ def test_keep_alive_disabled(self):
send_email_test = mock.Mock()
+
+
[email protected]
+def aioresponse():
+ """
+ Creates an mock async API response.
+ This comes from a mock library specific to the aiohttp package:
+ https://github.com/pnuckowski/aioresponses
+
+ """
+ with aioresponses() as async_response:
+ yield async_response
+
+
[email protected]
+async def test_do_api_call_async_non_retryable_error(aioresponse):
+ """Test api call asynchronously with non retryable error."""
+ hook = HttpAsyncHook(method="GET")
+ aioresponse.get("http://httpbin.org/non_existent_endpoint", status=400)
+
+ with pytest.raises(AirflowException) as exc, mock.patch.dict(
+ "os.environ",
+ AIRFLOW_CONN_HTTP_DEFAULT="http://httpbin.org/",
+ ):
+ await hook.run(endpoint="non_existent_endpoint")
+
+ assert str(exc.value) == "400:Bad Request"
+
+
[email protected]
+async def test_do_api_call_async_retryable_error(caplog, aioresponse):
+ """Test api call asynchronously with retryable error."""
+ caplog.set_level(logging.WARNING,
logger="airflow.providers.http.hooks.http")
+ hook = HttpAsyncHook(method="GET")
+ aioresponse.get("http://httpbin.org/non_existent_endpoint", status=500,
repeat=True)
Review Comment:
Should we make any call to resources which not under our control?
And could you give a bit more detail about what we try to achieve here by
directly call `aioresponse.get`?
##########
airflow/providers/http/hooks/http.py:
##########
@@ -246,3 +253,142 @@ def test_connection(self):
return True, "Connection successfully tested"
except Exception as e:
return False, str(e)
+
+
+class HttpAsyncHook(BaseHook):
+ """
+ Interact with HTTP servers using Python Async.
+
+ :param method: the API method to be called
+ :param http_conn_id: http connection id that has the base
+ API url i.e https://www.google.com/ and optional authentication
credentials. Default
+ headers can also be specified in the Extra field in json format.
+ :param auth_type: The auth type for the service
+ """
+
+ conn_name_attr = "http_conn_id"
+ default_conn_name = "http_default"
+ conn_type = "http"
+ hook_name = "HTTP"
Review Comment:
I'm not sure that this required.
This mostly for test connection and new connection in the UI but we already
register conn_type = "http" for HttpHook
##########
airflow/providers/http/hooks/http.py:
##########
@@ -246,3 +253,142 @@ def test_connection(self):
return True, "Connection successfully tested"
except Exception as e:
return False, str(e)
+
+
+class HttpAsyncHook(BaseHook):
+ """
+ Interact with HTTP servers using Python Async.
+
+ :param method: the API method to be called
+ :param http_conn_id: http connection id that has the base
+ API url i.e https://www.google.com/ and optional authentication
credentials. Default
+ headers can also be specified in the Extra field in json format.
+ :param auth_type: The auth type for the service
+ """
+
+ conn_name_attr = "http_conn_id"
+ default_conn_name = "http_default"
+ conn_type = "http"
+ hook_name = "HTTP"
+
+ def __init__(
+ self,
+ method: str = "POST",
+ http_conn_id: str = default_conn_name,
+ auth_type: Any = aiohttp.BasicAuth,
+ retry_limit: int = 3,
+ retry_delay: float = 1.0,
Review Comment:
Maybe better to have exponential backoff retry?
##########
airflow/providers/http/hooks/http.py:
##########
@@ -246,3 +253,142 @@ def test_connection(self):
return True, "Connection successfully tested"
except Exception as e:
return False, str(e)
+
+
+class HttpAsyncHook(BaseHook):
+ """
+ Interact with HTTP servers using Python Async.
+
+ :param method: the API method to be called
+ :param http_conn_id: http connection id that has the base
+ API url i.e https://www.google.com/ and optional authentication
credentials. Default
+ headers can also be specified in the Extra field in json format.
+ :param auth_type: The auth type for the service
+ """
+
+ conn_name_attr = "http_conn_id"
+ default_conn_name = "http_default"
+ conn_type = "http"
+ hook_name = "HTTP"
+
+ def __init__(
+ self,
+ method: str = "POST",
+ http_conn_id: str = default_conn_name,
+ auth_type: Any = aiohttp.BasicAuth,
+ retry_limit: int = 3,
+ retry_delay: float = 1.0,
+ ) -> None:
+ self.http_conn_id = http_conn_id
+ self.method = method.upper()
+ self.base_url: str = ""
+ self._retry_obj: Callable[..., Any]
+ self.auth_type: Any = auth_type
+ if retry_limit < 1:
+ raise ValueError("Retry limit must be greater than equal to 1")
+ self.retry_limit = retry_limit
+ self.retry_delay = retry_delay
+
+ async def run(
+ self,
+ endpoint: str | None = None,
+ data: dict[str, Any] | str | None = None,
+ headers: dict[str, Any] | None = None,
+ extra_options: dict[str, Any] | None = None,
+ ) -> "ClientResponse":
+ r"""
+ Performs an asynchronous HTTP request call
+
+ :param endpoint: the endpoint to be called i.e. resource/v1/query?
+ :param data: payload to be uploaded or request parameters
+ :param headers: additional headers to be passed through as a dictionary
+ :param extra_options: Additional kwargs to pass when creating a
request.
+ For example, ``run(json=obj)`` is passed as
``aiohttp.ClientSession().get(json=obj)``
+ """
+ extra_options = extra_options or {}
+
+ # headers may be passed through directly or in the "extra" field in
the connection
+ # definition
+ _headers = {}
+ auth = None
+
+ if self.http_conn_id:
Review Comment:
Is it intend to get connection for every `run` call?
I see that the same behaviour exists in Requests-based hook but I don't
think this is a good approach, most of the time we would spend for get
connection.
##########
airflow/providers/http/hooks/http.py:
##########
@@ -246,3 +253,142 @@ def test_connection(self):
return True, "Connection successfully tested"
except Exception as e:
return False, str(e)
+
+
+class HttpAsyncHook(BaseHook):
+ """
+ Interact with HTTP servers using Python Async.
+
+ :param method: the API method to be called
+ :param http_conn_id: http connection id that has the base
+ API url i.e https://www.google.com/ and optional authentication
credentials. Default
+ headers can also be specified in the Extra field in json format.
+ :param auth_type: The auth type for the service
+ """
+
+ conn_name_attr = "http_conn_id"
+ default_conn_name = "http_default"
+ conn_type = "http"
+ hook_name = "HTTP"
+
+ def __init__(
+ self,
+ method: str = "POST",
+ http_conn_id: str = default_conn_name,
+ auth_type: Any = aiohttp.BasicAuth,
+ retry_limit: int = 3,
+ retry_delay: float = 1.0,
+ ) -> None:
+ self.http_conn_id = http_conn_id
+ self.method = method.upper()
+ self.base_url: str = ""
+ self._retry_obj: Callable[..., Any]
+ self.auth_type: Any = auth_type
+ if retry_limit < 1:
+ raise ValueError("Retry limit must be greater than equal to 1")
+ self.retry_limit = retry_limit
+ self.retry_delay = retry_delay
+
+ async def run(
+ self,
+ endpoint: str | None = None,
+ data: dict[str, Any] | str | None = None,
+ headers: dict[str, Any] | None = None,
+ extra_options: dict[str, Any] | None = None,
+ ) -> "ClientResponse":
+ r"""
+ Performs an asynchronous HTTP request call
+
+ :param endpoint: the endpoint to be called i.e. resource/v1/query?
+ :param data: payload to be uploaded or request parameters
+ :param headers: additional headers to be passed through as a dictionary
+ :param extra_options: Additional kwargs to pass when creating a
request.
+ For example, ``run(json=obj)`` is passed as
``aiohttp.ClientSession().get(json=obj)``
+ """
+ extra_options = extra_options or {}
+
+ # headers may be passed through directly or in the "extra" field in
the connection
+ # definition
+ _headers = {}
+ auth = None
+
+ if self.http_conn_id:
+ conn = await sync_to_async(self.get_connection)(self.http_conn_id)
+
+ if conn.host and "://" in conn.host:
+ self.base_url = conn.host
+ else:
+ # schema defaults to HTTP
+ schema = conn.schema if conn.schema else "http"
+ host = conn.host if conn.host else ""
+ self.base_url = schema + "://" + host
+
+ if conn.port:
+ self.base_url = self.base_url + ":" + str(conn.port)
+ if conn.login:
+ auth = self.auth_type(conn.login, conn.password)
+ if conn.extra:
+ try:
+ _headers.update(conn.extra_dejson)
+ except TypeError:
+ self.log.warning("Connection to %s has invalid extra
field.", conn.host)
+ if headers:
+ _headers.update(headers)
+
+ if self.base_url and not self.base_url.endswith("/") and endpoint and
not endpoint.startswith("/"):
+ url = self.base_url + "/" + endpoint
+ else:
+ url = (self.base_url or "") + (endpoint or "")
+
+ async with aiohttp.ClientSession() as session:
+ if self.method == "GET":
+ request_func = session.get
+ elif self.method == "POST":
+ request_func = session.post
+ elif self.method == "PATCH":
+ request_func = session.patch
+ else:
+ raise AirflowException(f"Unexpected HTTP Method:
{self.method}")
Review Comment:
What about this methods HEAD, PUT, DELETE, OPTIONS ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]