This is an automated email from the ASF dual-hosted git repository.

pankaj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a2c09d203e Fetch intermediate log async GKEStartPod   (#39348)
a2c09d203e is described below

commit a2c09d203e3831f5e7dc1a28e3daf0d38a545023
Author: Pankaj Singh <[email protected]>
AuthorDate: Wed May 29 18:09:06 2024 +0530

    Fetch intermediate log async GKEStartPod   (#39348)
    
    * Fetch intermediate log in async GKEStartPod
    
    This PR introduces a parameter that enables the retrieval of intermediate 
logs for the GKEStartPod asynchronous operator.
    
    Add param last_log_time and logging_interval in GKEStartPodTrigger serialize
    Add optional param last_log_time in method invoke_defer_method
    Example DAG:
    
    start_pod = GKEStartPodOperator(
            task_id="start_pod",
            project_id=PROJECT_ID,
            location=LOCATION,
            cluster_name=GKE_CLUSTER_NAME,
            do_xcom_push=True,
            namespace=GKE_NAMESPACE,
            image="ubuntu:jammy",
            cmds=["sh", "-c", "timeout 300 bash -c 'while true; do echo 
\"meow\"; sleep 30; done'"],
            name="test-sleep",
            in_cluster=False,
            on_finish_action="delete_pod",
            deferrable=True,
            get_logs=True,
            logging_interval=5,
            gcp_conn_id=GCP_CONN_ID
        )
---
 .../google/cloud/operators/kubernetes_engine.py    | 15 +++---
 .../google/cloud/triggers/kubernetes_engine.py     |  2 +
 .../cloud/operators/test_kubernetes_engine.py      | 54 ++++++++++++++++++++++
 .../cloud/triggers/test_kubernetes_engine.py       |  2 +
 4 files changed, 67 insertions(+), 6 deletions(-)

diff --git a/airflow/providers/google/cloud/operators/kubernetes_engine.py 
b/airflow/providers/google/cloud/operators/kubernetes_engine.py
index 0a28809dd1..f9a75d8b28 100644
--- a/airflow/providers/google/cloud/operators/kubernetes_engine.py
+++ b/airflow/providers/google/cloud/operators/kubernetes_engine.py
@@ -73,6 +73,7 @@ except ImportError:
 
 if TYPE_CHECKING:
     from kubernetes.client.models import V1Job, V1Pod
+    from pendulum import DateTime
 
     from airflow.utils.context import Context
 
@@ -773,16 +774,16 @@ class GKEStartPodOperator(KubernetesPodOperator):
         self._ssl_ca_cert = cluster.master_auth.cluster_ca_certificate
         return self._cluster_url, self._ssl_ca_cert
 
-    def invoke_defer_method(self):
+    def invoke_defer_method(self, last_log_time: DateTime | None = None):
         """Redefine triggers which are being used in child classes."""
         trigger_start_time = utcnow()
         self.defer(
             trigger=GKEStartPodTrigger(
-                pod_name=self.pod.metadata.name,
-                pod_namespace=self.pod.metadata.namespace,
+                pod_name=self.pod.metadata.name,  # type: ignore[union-attr]
+                pod_namespace=self.pod.metadata.namespace,  # type: 
ignore[union-attr]
                 trigger_start_time=trigger_start_time,
-                cluster_url=self._cluster_url,
-                ssl_ca_cert=self._ssl_ca_cert,
+                cluster_url=self._cluster_url,  # type: ignore[arg-type]
+                ssl_ca_cert=self._ssl_ca_cert,  # type: ignore[arg-type]
                 get_logs=self.get_logs,
                 startup_timeout=self.startup_timeout_seconds,
                 cluster_context=self.cluster_context,
@@ -792,6 +793,8 @@ class GKEStartPodOperator(KubernetesPodOperator):
                 on_finish_action=self.on_finish_action,
                 gcp_conn_id=self.gcp_conn_id,
                 impersonation_chain=self.impersonation_chain,
+                logging_interval=self.logging_interval,
+                last_log_time=last_log_time,
             ),
             method_name="execute_complete",
             kwargs={"cluster_url": self._cluster_url, "ssl_ca_cert": 
self._ssl_ca_cert},
@@ -802,7 +805,7 @@ class GKEStartPodOperator(KubernetesPodOperator):
         self._cluster_url = kwargs["cluster_url"]
         self._ssl_ca_cert = kwargs["ssl_ca_cert"]
 
-        return super().execute_complete(context, event, **kwargs)
+        return super().trigger_reentry(context, event)
 
 
 class GKEStartJobOperator(KubernetesJobOperator):
diff --git a/airflow/providers/google/cloud/triggers/kubernetes_engine.py 
b/airflow/providers/google/cloud/triggers/kubernetes_engine.py
index 8557bea082..f05bb0dc6c 100644
--- a/airflow/providers/google/cloud/triggers/kubernetes_engine.py
+++ b/airflow/providers/google/cloud/triggers/kubernetes_engine.py
@@ -142,6 +142,8 @@ class GKEStartPodTrigger(KubernetesPodTrigger):
                 "on_finish_action": self.on_finish_action.value,
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
+                "logging_interval": self.logging_interval,
+                "last_log_time": self.last_log_time,
             },
         )
 
diff --git a/tests/providers/google/cloud/operators/test_kubernetes_engine.py 
b/tests/providers/google/cloud/operators/test_kubernetes_engine.py
index c71cc99c7e..2e27db59b3 100644
--- a/tests/providers/google/cloud/operators/test_kubernetes_engine.py
+++ b/tests/providers/google/cloud/operators/test_kubernetes_engine.py
@@ -673,6 +673,7 @@ class TestGKEPodOperatorAsync:
             namespace=NAMESPACE,
             image=IMAGE,
             deferrable=True,
+            on_finish_action="delete_pod",
         )
         self.gke_op.pod = mock.MagicMock(
             name=TASK_NAME,
@@ -703,6 +704,59 @@ class TestGKEPodOperatorAsync:
         fetch_cluster_info_mock.assert_called_once()
         assert isinstance(exc.value.trigger, GKEStartPodTrigger)
 
+    @pytest.mark.parametrize("status", ["error", "failed", "timeout"])
+    
@mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook.get_pod")
+    @mock.patch(KUB_OP_PATH.format("_clean"))
+    
@mock.patch("airflow.providers.google.cloud.operators.kubernetes_engine.GKEStartPodOperator.hook")
+    @mock.patch(KUB_OP_PATH.format("_write_logs"))
+    def test_execute_complete_failure(self, mock_write_logs, mock_gke_hook, 
mock_clean, mock_get_pod, status):
+        self.gke_op._cluster_url = CLUSTER_URL
+        self.gke_op._ssl_ca_cert = SSL_CA_CERT
+        with pytest.raises(AirflowException):
+            self.gke_op.execute_complete(
+                context=mock.MagicMock(),
+                event={"name": "test", "status": status, "namespace": 
"default", "message": ""},
+                cluster_url=self.gke_op._cluster_url,
+                ssl_ca_cert=self.gke_op._ssl_ca_cert,
+            )
+        mock_write_logs.assert_called_once()
+
+    
@mock.patch("airflow.providers.google.cloud.operators.kubernetes_engine.GKEStartPodOperator.hook")
+    
@mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook.get_pod")
+    @mock.patch(KUB_OP_PATH.format("_clean"))
+    @mock.patch(KUB_OP_PATH.format("_write_logs"))
+    def test_execute_complete_success(self, mock_write_logs, mock_clean, 
mock_get_pod, mock_gke_hook):
+        self.gke_op._cluster_url = CLUSTER_URL
+        self.gke_op._ssl_ca_cert = SSL_CA_CERT
+        self.gke_op.execute_complete(
+            context=mock.MagicMock(),
+            event={"name": "test", "status": "success", "namespace": 
"default"},
+            cluster_url=self.gke_op._cluster_url,
+            ssl_ca_cert=self.gke_op._ssl_ca_cert,
+        )
+        mock_write_logs.assert_called_once()
+
+    @mock.patch(KUB_OP_PATH.format("pod_manager"))
+    @mock.patch(
+        
"airflow.providers.google.cloud.operators.kubernetes_engine.GKEStartPodOperator.invoke_defer_method"
+    )
+    
@mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook.get_pod")
+    @mock.patch(KUB_OP_PATH.format("_clean"))
+    
@mock.patch("airflow.providers.google.cloud.operators.kubernetes_engine.GKEStartPodOperator.hook")
+    def test_execute_complete_running(
+        self, mock_gke_hook, mock_clean, mock_get_pod, 
mock_invoke_defer_method, mock_pod_manager
+    ):
+        self.gke_op._cluster_url = CLUSTER_URL
+        self.gke_op._ssl_ca_cert = SSL_CA_CERT
+        self.gke_op.execute_complete(
+            context=mock.MagicMock(),
+            event={"name": "test", "status": "running", "namespace": 
"default"},
+            cluster_url=self.gke_op._cluster_url,
+            ssl_ca_cert=self.gke_op._ssl_ca_cert,
+        )
+        mock_pod_manager.fetch_container_logs.assert_called_once()
+        mock_invoke_defer_method.assert_called_once()
+
 
 class TestGKEStartJobOperator:
     def setup_method(self):
diff --git a/tests/providers/google/cloud/triggers/test_kubernetes_engine.py 
b/tests/providers/google/cloud/triggers/test_kubernetes_engine.py
index 8a18dfcb90..8a43f3627c 100644
--- a/tests/providers/google/cloud/triggers/test_kubernetes_engine.py
+++ b/tests/providers/google/cloud/triggers/test_kubernetes_engine.py
@@ -125,6 +125,8 @@ class TestGKEStartPodTrigger:
             "should_delete_pod": SHOULD_DELETE_POD,
             "gcp_conn_id": GCP_CONN_ID,
             "impersonation_chain": IMPERSONATION_CHAIN,
+            "last_log_time": None,
+            "logging_interval": None,
         }
 
     @pytest.mark.asyncio

Reply via email to