dabla commented on code in PR #61546:
URL: https://github.com/apache/airflow/pull/61546#discussion_r2784159047
##########
providers/google/tests/unit/google/cloud/hooks/test_cloud_run.py:
##########
@@ -293,6 +308,56 @@ async def test_get_operation(self):
operations_pb2.GetOperationRequest(name=OPERATION_NAME),
timeout=120
)
+ @pytest.mark.asyncio
+
@mock.patch("airflow.providers.google.cloud.hooks.cloud_run.JobsAsyncClient")
+ async def test_get_conn_uses_async_client_by_default(self,
mock_async_client):
+ """Test that get_conn uses JobsAsyncClient (grpc_asyncio) when
transport is None or grpc."""
+ hook = CloudRunAsyncHook(transport=None)
+ mock_sync_hook = mock.MagicMock()
+ mock_sync_hook.get_credentials.return_value = "credentials"
+ hook.get_sync_hook = mock.AsyncMock(return_value=mock_sync_hook)
+
+ await hook.get_conn()
+
+ mock_async_client.assert_called_once()
+ call_kwargs = mock_async_client.call_args[1]
+ assert "transport" not in call_kwargs
+
+ @pytest.mark.asyncio
+ @mock.patch("airflow.providers.google.cloud.hooks.cloud_run.JobsClient")
+ async def test_get_conn_uses_sync_client_for_rest(self, mock_sync_client):
+ """Test that get_conn uses sync JobsClient with REST transport."""
+ hook = CloudRunAsyncHook(transport="rest")
+ mock_sync_hook = mock.MagicMock()
+ mock_sync_hook.get_credentials.return_value = "credentials"
+ hook.get_sync_hook = mock.AsyncMock(return_value=mock_sync_hook)
+
+ await hook.get_conn()
+
+ mock_sync_client.assert_called_once()
+ call_kwargs = mock_sync_client.call_args[1]
+ assert call_kwargs["transport"] == "rest"
+
+ @pytest.mark.asyncio
+ @mock.patch("asyncio.to_thread")
+ async def test_get_operation_rest_uses_to_thread(self, mock_to_thread):
+ """Test that get_operation uses asyncio.to_thread for REST
transport."""
+ expected_operation = operations_pb2.Operation(name=OPERATION_NAME)
+ mock_to_thread.return_value = expected_operation
+
+ hook = CloudRunAsyncHook(transport="rest")
+ mock_conn = mock.MagicMock() # sync client
+ hook.get_conn = mock.AsyncMock(return_value=mock_conn)
+
+ result = await hook.get_operation(operation_name=OPERATION_NAME)
+
+ mock_to_thread.assert_called_once_with(
+ mock_conn.get_operation,
+ operations_pb2.GetOperationRequest(name=OPERATION_NAME),
+ timeout=120,
+ )
+ assert result == expected_operation
+
def mock_get_operation(self, expected_operation):
Review Comment:
type the expected_operation as operations_pb2.Operation and also add spec to
get_operation AsyncMock(spec=operations_pb2.Operation)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]