This is an automated email from the ASF dual-hosted git repository.

shahar pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new febf1fe70ee fix DataprocSubmitTrigger deferred tasks stuck forever 
(#62082)
febf1fe70ee is described below

commit febf1fe70ee2c6efbe793216c2512842646471a0
Author: olegkachur-e <[email protected]>
AuthorDate: Fri Feb 27 11:16:42 2026 +0100

    fix DataprocSubmitTrigger deferred tasks stuck forever (#62082)
    
    - To prevent tasks getting stuck in the deffered state, as a result
    of sync_hook calls thread stuck on retrieveing credentials.
    Observed on secrets storage connection retrival.
    
    Co-authored-by: Oleg Kachur <[email protected]>
---
 .../airflow/providers/google/cloud/triggers/dataproc.py |  8 +++++---
 .../tests/unit/google/cloud/triggers/test_dataproc.py   | 17 ++++++++++-------
 2 files changed, 15 insertions(+), 10 deletions(-)

diff --git 
a/providers/google/src/airflow/providers/google/cloud/triggers/dataproc.py 
b/providers/google/src/airflow/providers/google/cloud/triggers/dataproc.py
index ffe9b9aeaa6..73dd18c4c29 100644
--- a/providers/google/src/airflow/providers/google/cloud/triggers/dataproc.py
+++ b/providers/google/src/airflow/providers/google/cloud/triggers/dataproc.py
@@ -187,11 +187,13 @@ class DataprocSubmitTrigger(DataprocBaseTrigger):
         return task_state != TaskInstanceState.DEFERRED
 
     async def run(self):
+        hook = self.get_async_hook()
+        # Trigger client cache with sync call get_credentials(), evaluated 
once.
+        await hook.get_job_client(region=self.region)
+
         try:
             while True:
-                job = await self.get_async_hook().get_job(
-                    project_id=self.project_id, region=self.region, 
job_id=self.job_id
-                )
+                job = await hook.get_job(project_id=self.project_id, 
region=self.region, job_id=self.job_id)
                 state = job.status.state
                 self.log.info("Dataproc job: %s is in state: %s", self.job_id, 
state)
                 if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED, 
JobStatus.State.ERROR):
diff --git a/providers/google/tests/unit/google/cloud/triggers/test_dataproc.py 
b/providers/google/tests/unit/google/cloud/triggers/test_dataproc.py
index 7a85acba118..aa66d2237ed 100644
--- a/providers/google/tests/unit/google/cloud/triggers/test_dataproc.py
+++ b/providers/google/tests/unit/google/cloud/triggers/test_dataproc.py
@@ -587,10 +587,8 @@ class TestDataprocSubmitTrigger:
     async def test_submit_trigger_run_success(self, mock_get_async_hook, 
submit_trigger):
         """Test the trigger correctly handles a job completion."""
         mock_job = Job(status=JobStatus(state=JobStatus.State.DONE))
-        future = asyncio.Future()
-        future.set_result(mock_job)
-        mock_get_async_hook.return_value.get_job.return_value = future
-
+        mock_get_async_hook.return_value.get_job_client = mock.AsyncMock()
+        mock_get_async_hook.return_value.get_job = 
mock.AsyncMock(return_value=mock_job)
         async_gen = submit_trigger.run()
         event = await async_gen.asend(None)
         expected_event = TriggerEvent(
@@ -603,9 +601,12 @@ class TestDataprocSubmitTrigger:
     async def test_submit_trigger_run_error(self, mock_get_async_hook, 
submit_trigger):
         """Test the trigger correctly handles a job error."""
         mock_job = Job(status=JobStatus(state=JobStatus.State.ERROR))
-        future = asyncio.Future()
-        future.set_result(mock_job)
-        mock_get_async_hook.return_value.get_job.return_value = future
+        mock_get_async_hook.return_value.get_job_client = mock.AsyncMock()
+        mock_get_async_hook.return_value.get_job = 
mock.AsyncMock(return_value=mock_job)
+
+        # future = asyncio.Future()
+        # future.set_result(mock_job)
+        # mock_get_async_hook.return_value.get_job.return_value = future
 
         async_gen = submit_trigger.run()
         event = await async_gen.asend(None)
@@ -625,6 +626,8 @@ class TestDataprocSubmitTrigger:
         """Test the trigger correctly handles an asyncio.CancelledError."""
         mock_safe_to_cancel.return_value = is_safe_to_cancel
         mock_async_hook = mock_get_async_hook.return_value
+        mock_async_hook.get_job_client = mock.AsyncMock()
+
         mock_async_hook.get_job.side_effect = asyncio.CancelledError
 
         mock_sync_hook = mock_get_sync_hook.return_value

Reply via email to