This is an automated email from the ASF dual-hosted git repository. rahulvats pushed a commit to branch py-client-sync in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 5667499b9c26ca203f1f2921ed310990159499ca Author: David Blain <[email protected]> AuthorDate: Tue Mar 24 17:52:36 2026 +0100 FIX: Allow override of HTTP method in run_method of LivyAsyncHook (#64150) * refactor: Allow override the method in run_method of LivyAsyncHook * refactor: Allow override the method in run_method of LivyAsyncHook * refactor: Removed duplicate test_run_method_success as it is the same as test_run_get_method_with_success --- .../airflow/providers/apache/livy/hooks/livy.py | 2 +- .../livy/tests/unit/apache/livy/hooks/test_livy.py | 27 ++-------------- .../http/src/airflow/providers/http/hooks/http.py | 37 ++++++++++++---------- providers/http/tests/unit/http/hooks/test_http.py | 16 ++++++++++ 4 files changed, 39 insertions(+), 43 deletions(-) diff --git a/providers/apache/livy/src/airflow/providers/apache/livy/hooks/livy.py b/providers/apache/livy/src/airflow/providers/apache/livy/hooks/livy.py index e9f8e94c748..0b6b5c5c01b 100644 --- a/providers/apache/livy/src/airflow/providers/apache/livy/hooks/livy.py +++ b/providers/apache/livy/src/airflow/providers/apache/livy/hooks/livy.py @@ -523,7 +523,7 @@ class LivyAsyncHook(HttpAsyncHook): ) try: - async with self.session() as session: + async with self.session(method=method) as session: response = await session.run( endpoint=endpoint, data=data, diff --git a/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py b/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py index 90bdcad8866..3bd7e1fafb6 100644 --- a/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py +++ b/providers/apache/livy/tests/unit/apache/livy/hooks/test_livy.py @@ -591,27 +591,6 @@ class TestLivyAsyncHook: log_dump = await hook.dump_batch_logs(BATCH_ID) assert log_dump == {"id": 1, "log": ["mock_log_1", "mock_log_2"]} - @pytest.mark.asyncio - @mock.patch("airflow.providers.http.hooks.http.aiohttp.ClientSession") - @mock.patch( - "airflow.providers.common.compat.connection.get_async_connection", - return_value=Connection( - conn_id=LIVY_CONN_ID, - conn_type="http", - host="http://host", - port=80, - ), - ) - async def test_run_method_success(self, mock_get_connection, mock_session): - """Asserts the run_method for success response.""" - mock_session.return_value.__aenter__.return_value.post = AsyncMock() - mock_session.return_value.__aenter__.return_value.post.return_value.json = AsyncMock( - return_value={"id": 1} - ) - hook = LivyAsyncHook(livy_conn_id=LIVY_CONN_ID) - response = await hook.run_method("localhost", "GET") - assert response == {"status": "success", "response": {"id": 1}} - @pytest.mark.asyncio async def test_run_method_error(self): """Asserts the run_method for error response.""" @@ -659,8 +638,7 @@ class TestLivyAsyncHook: return_value={"hello": "world"} ) hook = LivyAsyncHook(livy_conn_id=LIVY_CONN_ID) - hook.method = "GET" - response = await hook.run_method("api/jobs/runs/get") + response = await hook.run_method("api/jobs/runs/get", method="GET") assert response["status"] == "success" assert response["response"] == {"hello": "world"} @@ -682,8 +660,7 @@ class TestLivyAsyncHook: return_value={"hello": "world"} ) hook = LivyAsyncHook(livy_conn_id=LIVY_CONN_ID) - hook.method = "PATCH" - response = await hook.run_method("api/jobs/runs/get") + response = await hook.run_method("api/jobs/runs/get", method="PATCH") assert response["status"] == "success" assert response["response"] == {"hello": "world"} diff --git a/providers/http/src/airflow/providers/http/hooks/http.py b/providers/http/src/airflow/providers/http/hooks/http.py index 240bd24d06c..3d22867f265 100644 --- a/providers/http/src/airflow/providers/http/hooks/http.py +++ b/providers/http/src/airflow/providers/http/hooks/http.py @@ -453,8 +453,10 @@ class AsyncHttpSession(LoggingMixin): hook: HttpAsyncHook, request: Callable[..., Awaitable[ClientResponse]], config: SessionConfig, + method: str | None = None, ) -> None: super().__init__() + self.method = method or hook.method self._hook = hook self._request = request self.config = config @@ -467,10 +469,6 @@ class AsyncHttpSession(LoggingMixin): def base_url(self) -> str: return self.config.base_url - @property - def method(self) -> str: - return self._hook.method - @property def retry_limit(self) -> int: return self._hook.retry_limit @@ -587,23 +585,25 @@ class HttpAsyncHook(BaseHook): self.retry_delay = retry_delay self._config: SessionConfig | None = None - def _get_request_func(self, session: aiohttp.ClientSession) -> Callable[..., Any]: - method = self.method - if method == "GET": + def _get_request_func( + self, session: aiohttp.ClientSession, method: str | None = None + ) -> Callable[..., Any]: + http_method = method or self.method + if http_method == "GET": return session.get - if method == "POST": + if http_method == "POST": return session.post - if method == "PATCH": + if http_method == "PATCH": return session.patch - if method == "HEAD": + if http_method == "HEAD": return session.head - if method == "PUT": + if http_method == "PUT": return session.put - if method == "DELETE": + if http_method == "DELETE": return session.delete - if method == "OPTIONS": + if http_method == "OPTIONS": return session.options - raise HttpMethodException(f"Unexpected HTTP Method: {method}") + raise HttpMethodException(f"Unexpected HTTP Method: {http_method}") async def config(self) -> SessionConfig: if not self._config: @@ -644,16 +644,19 @@ class HttpAsyncHook(BaseHook): return self._config @asynccontextmanager - async def session(self) -> AsyncGenerator[AsyncHttpSession, None]: + async def session(self, method: str | None = None) -> AsyncGenerator[AsyncHttpSession, None]: """ Create an ``AsyncHttpSession`` bound to a single ``aiohttp.ClientSession``. Airflow connection resolution happens exactly once here. + + :param method: Optional HTTP method to be used for requests made by the returned session. + If provided, this value overrides the hook's configured default method. """ async with aiohttp.ClientSession() as session: - request = self._get_request_func(session=session) + request = self._get_request_func(session=session, method=method) config = await self.config() - yield AsyncHttpSession(hook=self, request=request, config=config) + yield AsyncHttpSession(hook=self, request=request, config=config, method=method) async def run( self, diff --git a/providers/http/tests/unit/http/hooks/test_http.py b/providers/http/tests/unit/http/hooks/test_http.py index 7bf3ed1f5b9..360c2383930 100644 --- a/providers/http/tests/unit/http/hooks/test_http.py +++ b/providers/http/tests/unit/http/hooks/test_http.py @@ -738,6 +738,22 @@ class TestHttpAsyncHook: async with aiohttp.ClientSession() as session: await hook.run(session=session, endpoint="non_existent_endpoint", data=json) + @pytest.mark.asyncio + async def test_async_get_request(self): + """Test api call asynchronously for POST request.""" + hook = HttpAsyncHook() + + with aioresponses() as m: + m.get( + "http://test:8080/v1/test", + status=200, + payload='{"status":{"status": 200}}', + reason="OK", + ) + async with hook.session(method="GET") as session: + resp = await session.run(endpoint="v1/test") + assert resp.status == 200 + @pytest.mark.asyncio async def test_async_post_request(self): """Test api call asynchronously for POST request."""
