This is an automated email from the ASF dual-hosted git repository.

shahar pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 175da03afe9 🩹 fix: Use GoogleBaseAsyncHook (#55316)
175da03afe9 is described below

commit 175da03afe9bad967cf6d8c01bd246b6518f41ee
Author: Morgan <mdg...@proton.me>
AuthorDate: Sat Sep 6 16:51:58 2025 +1000

    🩹 fix: Use GoogleBaseAsyncHook (#55316)
    
    When run with `deferrable=True`, the `CloudRunExecuteJobOperator` fails
    with the error `You cannot use AsyncToSync in the same thread as an async
    event loop - just await the async function directly`
    
    This is because the `__init__` method of the GoogleBaseHook makes a
    blocking call to retrieve extra details for the connection.
    
    Inheriting from the existing GoogleBaseAsyncHook in the
    `CloudRunAsyncHook` prevents this issue.
    
    Co-authored-by: Morgan Kerle <morgan.ke...@nine.com.au>
---
 .../providers/google/cloud/hooks/cloud_run.py        | 20 +++++++++++++-------
 1 file changed, 13 insertions(+), 7 deletions(-)

diff --git 
a/providers/google/src/airflow/providers/google/cloud/hooks/cloud_run.py 
b/providers/google/src/airflow/providers/google/cloud/hooks/cloud_run.py
index d555e8034d1..ef7452336e3 100644
--- a/providers/google/src/airflow/providers/google/cloud/hooks/cloud_run.py
+++ b/providers/google/src/airflow/providers/google/cloud/hooks/cloud_run.py
@@ -42,7 +42,11 @@ from google.longrunning import operations_pb2
 
 from airflow.exceptions import AirflowException
 from airflow.providers.google.common.consts import CLIENT_INFO
-from airflow.providers.google.common.hooks.base_google import 
PROVIDE_PROJECT_ID, GoogleBaseHook
+from airflow.providers.google.common.hooks.base_google import (
+    PROVIDE_PROJECT_ID,
+    GoogleBaseAsyncHook,
+    GoogleBaseHook,
+)
 
 if TYPE_CHECKING:
     from google.api_core import operation
@@ -159,7 +163,7 @@ class CloudRunHook(GoogleBaseHook):
         return list(itertools.islice(jobs, limit))
 
 
-class CloudRunAsyncHook(GoogleBaseHook):
+class CloudRunAsyncHook(GoogleBaseAsyncHook):
     """
     Async hook for the Google Cloud Run service.
 
@@ -174,6 +178,8 @@ class CloudRunAsyncHook(GoogleBaseHook):
         account from the list granting this role to the originating account.
     """
 
+    sync_hook_class = GoogleBaseHook
+
     def __init__(
         self,
         gcp_conn_id: str = "google_cloud_default",
@@ -183,16 +189,16 @@ class CloudRunAsyncHook(GoogleBaseHook):
         self._client: JobsAsyncClient | None = None
         super().__init__(gcp_conn_id=gcp_conn_id, 
impersonation_chain=impersonation_chain, **kwargs)
 
-    def get_conn(self):
+    async def get_conn(self):
         if self._client is None:
-            self._client = JobsAsyncClient(credentials=self.get_credentials(), 
client_info=CLIENT_INFO)
+            sync_hook = await self.get_sync_hook()
+            self._client = 
JobsAsyncClient(credentials=sync_hook.get_credentials(), 
client_info=CLIENT_INFO)
 
         return self._client
 
     async def get_operation(self, operation_name: str) -> 
operations_pb2.Operation:
-        return await self.get_conn().get_operation(
-            operations_pb2.GetOperationRequest(name=operation_name), 
timeout=120
-        )
+        conn = await self.get_conn()
+        return await 
conn.get_operation(operations_pb2.GetOperationRequest(name=operation_name), 
timeout=120)
 
 
 class CloudRunServiceHook(GoogleBaseHook):

Reply via email to