This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2823acd125 Fix poll_interval in GKEJobTrigger (#41712)
2823acd125 is described below

commit 2823acd1257494cdd69588f421363873ba3b795d
Author: GPK <[email protected]>
AuthorDate: Sun Sep 1 19:56:12 2024 +0100

    Fix poll_interval in GKEJobTrigger (#41712)
---
 .../google/cloud/triggers/kubernetes_engine.py     | 24 +++++++++++++++++++---
 .../cloud/triggers/test_kubernetes_engine.py       |  8 ++++++--
 2 files changed, 27 insertions(+), 5 deletions(-)

diff --git a/airflow/providers/google/cloud/triggers/kubernetes_engine.py 
b/airflow/providers/google/cloud/triggers/kubernetes_engine.py
index e4998d6957..483b72e60e 100644
--- a/airflow/providers/google/cloud/triggers/kubernetes_engine.py
+++ b/airflow/providers/google/cloud/triggers/kubernetes_engine.py
@@ -23,8 +23,9 @@ from functools import cached_property
 from typing import TYPE_CHECKING, Any, AsyncIterator, Sequence
 
 from google.cloud.container_v1.types import Operation
+from packaging.version import parse as parse_version
 
-from airflow.exceptions import AirflowProviderDeprecationWarning
+from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
 from airflow.providers.cncf.kubernetes.triggers.pod import KubernetesPodTrigger
 from airflow.providers.cncf.kubernetes.utils.pod_manager import 
OnFinishAction, PodManager
 from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import PodDefaults
@@ -33,6 +34,7 @@ from airflow.providers.google.cloud.hooks.kubernetes_engine 
import (
     GKEKubernetesAsyncHook,
     GKEKubernetesHook,
 )
+from airflow.providers_manager import ProvidersManager
 from airflow.triggers.base import BaseTrigger, TriggerEvent
 
 if TYPE_CHECKING:
@@ -305,19 +307,35 @@ class GKEJobTrigger(BaseTrigger):
         if self.get_logs or self.do_xcom_push:
             pod = await self.hook.get_pod(name=self.pod_name, 
namespace=self.pod_namespace)
         if self.do_xcom_push:
+            kubernetes_provider = 
ProvidersManager().providers["apache-airflow-providers-cncf-kubernetes"]
+            kubernetes_provider_name = kubernetes_provider.data["package-name"]
+            kubernetes_provider_version = kubernetes_provider.version
+            min_version = "8.4.1"
+            if parse_version(kubernetes_provider_version) < 
parse_version(min_version):
+                raise AirflowException(
+                    "You are trying to use do_xcom_push in 
`GKEStartJobOperator` with the provider "
+                    f"package 
{kubernetes_provider_name}=={kubernetes_provider_version} which doesn't "
+                    f"support this feature. Please upgrade it to version 
higher than or equal to {min_version}."
+                )
             await self.hook.wait_until_container_complete(
-                name=self.pod_name, namespace=self.pod_namespace, 
container_name=self.base_container_name
+                name=self.pod_name,
+                namespace=self.pod_namespace,
+                container_name=self.base_container_name,
+                poll_interval=self.poll_interval,
             )
             self.log.info("Checking if xcom sidecar container is started.")
             await self.hook.wait_until_container_started(
                 name=self.pod_name,
                 namespace=self.pod_namespace,
                 container_name=PodDefaults.SIDECAR_CONTAINER_NAME,
+                poll_interval=self.poll_interval,
             )
             self.log.info("Extracting result from xcom sidecar container.")
             loop = asyncio.get_running_loop()
             xcom_result = await loop.run_in_executor(None, 
self.pod_manager.extract_xcom, pod)
-        job: V1Job = await 
self.hook.wait_until_job_complete(name=self.job_name, 
namespace=self.job_namespace)
+        job: V1Job = await self.hook.wait_until_job_complete(
+            name=self.job_name, namespace=self.job_namespace, 
poll_interval=self.poll_interval
+        )
         job_dict = job.to_dict()
         error_message = self.hook.is_job_failed(job=job)
         status = "error" if error_message else "success"
diff --git a/tests/providers/google/cloud/triggers/test_kubernetes_engine.py 
b/tests/providers/google/cloud/triggers/test_kubernetes_engine.py
index a41eab654b..844e2f9633 100644
--- a/tests/providers/google/cloud/triggers/test_kubernetes_engine.py
+++ b/tests/providers/google/cloud/triggers/test_kubernetes_engine.py
@@ -508,7 +508,9 @@ class TestGKEStartJobTrigger:
 
         event_actual = await job_trigger.run().asend(None)
 
-        
mock_hook.wait_until_job_complete.assert_called_once_with(name=JOB_NAME, 
namespace=NAMESPACE)
+        mock_hook.wait_until_job_complete.assert_called_once_with(
+            name=JOB_NAME, namespace=NAMESPACE, poll_interval=POLL_INTERVAL
+        )
         mock_job.to_dict.assert_called_once()
         mock_is_job_failed.assert_called_once_with(job=mock_job)
         assert event_actual == TriggerEvent(
@@ -544,7 +546,9 @@ class TestGKEStartJobTrigger:
 
         event_actual = await job_trigger.run().asend(None)
 
-        
mock_hook.wait_until_job_complete.assert_called_once_with(name=JOB_NAME, 
namespace=NAMESPACE)
+        mock_hook.wait_until_job_complete.assert_called_once_with(
+            name=JOB_NAME, namespace=NAMESPACE, poll_interval=POLL_INTERVAL
+        )
         mock_job.to_dict.assert_called_once()
         mock_is_job_failed.assert_called_once_with(job=mock_job)
         assert event_actual == TriggerEvent(

Reply via email to