This is an automated email from the ASF dual-hosted git repository. shahar pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 175da03afe9 🩹 fix: Use GoogleBaseAsyncHook (#55316) 175da03afe9 is described below commit 175da03afe9bad967cf6d8c01bd246b6518f41ee Author: Morgan <mdg...@proton.me> AuthorDate: Sat Sep 6 16:51:58 2025 +1000 🩹 fix: Use GoogleBaseAsyncHook (#55316) When run with `deferrable=True`, the `CloudRunExecuteJobOperator` fails with the error `You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly` This is because the `__init__` method of the GoogleBaseHook makes a blocking call to retrieve extra details for the connection. Inheriting from the existing GoogleBaseAsyncHook in the `CloudRunAsyncHook` prevents this issue. Co-authored-by: Morgan Kerle <morgan.ke...@nine.com.au> --- .../providers/google/cloud/hooks/cloud_run.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/providers/google/src/airflow/providers/google/cloud/hooks/cloud_run.py b/providers/google/src/airflow/providers/google/cloud/hooks/cloud_run.py index d555e8034d1..ef7452336e3 100644 --- a/providers/google/src/airflow/providers/google/cloud/hooks/cloud_run.py +++ b/providers/google/src/airflow/providers/google/cloud/hooks/cloud_run.py @@ -42,7 +42,11 @@ from google.longrunning import operations_pb2 from airflow.exceptions import AirflowException from airflow.providers.google.common.consts import CLIENT_INFO -from airflow.providers.google.common.hooks.base_google import PROVIDE_PROJECT_ID, GoogleBaseHook +from airflow.providers.google.common.hooks.base_google import ( + PROVIDE_PROJECT_ID, + GoogleBaseAsyncHook, + GoogleBaseHook, +) if TYPE_CHECKING: from google.api_core import operation @@ -159,7 +163,7 @@ class CloudRunHook(GoogleBaseHook): return list(itertools.islice(jobs, limit)) -class CloudRunAsyncHook(GoogleBaseHook): +class CloudRunAsyncHook(GoogleBaseAsyncHook): """ Async hook for the Google Cloud Run service. @@ -174,6 +178,8 @@ class CloudRunAsyncHook(GoogleBaseHook): account from the list granting this role to the originating account. """ + sync_hook_class = GoogleBaseHook + def __init__( self, gcp_conn_id: str = "google_cloud_default", @@ -183,16 +189,16 @@ class CloudRunAsyncHook(GoogleBaseHook): self._client: JobsAsyncClient | None = None super().__init__(gcp_conn_id=gcp_conn_id, impersonation_chain=impersonation_chain, **kwargs) - def get_conn(self): + async def get_conn(self): if self._client is None: - self._client = JobsAsyncClient(credentials=self.get_credentials(), client_info=CLIENT_INFO) + sync_hook = await self.get_sync_hook() + self._client = JobsAsyncClient(credentials=sync_hook.get_credentials(), client_info=CLIENT_INFO) return self._client async def get_operation(self, operation_name: str) -> operations_pb2.Operation: - return await self.get_conn().get_operation( - operations_pb2.GetOperationRequest(name=operation_name), timeout=120 - ) + conn = await self.get_conn() + return await conn.get_operation(operations_pb2.GetOperationRequest(name=operation_name), timeout=120) class CloudRunServiceHook(GoogleBaseHook):