This is an automated email from the ASF dual-hosted git repository.
pankaj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a2c09d203e Fetch intermediate log async GKEStartPod (#39348)
a2c09d203e is described below
commit a2c09d203e3831f5e7dc1a28e3daf0d38a545023
Author: Pankaj Singh <[email protected]>
AuthorDate: Wed May 29 18:09:06 2024 +0530
Fetch intermediate log async GKEStartPod (#39348)
* Fetch intermediate log in async GKEStartPod
This PR introduces a parameter that enables the retrieval of intermediate
logs for the GKEStartPod asynchronous operator.
Add param last_log_time and logging_interval in GKEStartPodTrigger serialize
Add optional param last_log_time in method invoke_defer_method
Example DAG:
start_pod = GKEStartPodOperator(
task_id="start_pod",
project_id=PROJECT_ID,
location=LOCATION,
cluster_name=GKE_CLUSTER_NAME,
do_xcom_push=True,
namespace=GKE_NAMESPACE,
image="ubuntu:jammy",
cmds=["sh", "-c", "timeout 300 bash -c 'while true; do echo
\"meow\"; sleep 30; done'"],
name="test-sleep",
in_cluster=False,
on_finish_action="delete_pod",
deferrable=True,
get_logs=True,
logging_interval=5,
gcp_conn_id=GCP_CONN_ID
)
---
.../google/cloud/operators/kubernetes_engine.py | 15 +++---
.../google/cloud/triggers/kubernetes_engine.py | 2 +
.../cloud/operators/test_kubernetes_engine.py | 54 ++++++++++++++++++++++
.../cloud/triggers/test_kubernetes_engine.py | 2 +
4 files changed, 67 insertions(+), 6 deletions(-)
diff --git a/airflow/providers/google/cloud/operators/kubernetes_engine.py
b/airflow/providers/google/cloud/operators/kubernetes_engine.py
index 0a28809dd1..f9a75d8b28 100644
--- a/airflow/providers/google/cloud/operators/kubernetes_engine.py
+++ b/airflow/providers/google/cloud/operators/kubernetes_engine.py
@@ -73,6 +73,7 @@ except ImportError:
if TYPE_CHECKING:
from kubernetes.client.models import V1Job, V1Pod
+ from pendulum import DateTime
from airflow.utils.context import Context
@@ -773,16 +774,16 @@ class GKEStartPodOperator(KubernetesPodOperator):
self._ssl_ca_cert = cluster.master_auth.cluster_ca_certificate
return self._cluster_url, self._ssl_ca_cert
- def invoke_defer_method(self):
+ def invoke_defer_method(self, last_log_time: DateTime | None = None):
"""Redefine triggers which are being used in child classes."""
trigger_start_time = utcnow()
self.defer(
trigger=GKEStartPodTrigger(
- pod_name=self.pod.metadata.name,
- pod_namespace=self.pod.metadata.namespace,
+ pod_name=self.pod.metadata.name, # type: ignore[union-attr]
+ pod_namespace=self.pod.metadata.namespace, # type:
ignore[union-attr]
trigger_start_time=trigger_start_time,
- cluster_url=self._cluster_url,
- ssl_ca_cert=self._ssl_ca_cert,
+ cluster_url=self._cluster_url, # type: ignore[arg-type]
+ ssl_ca_cert=self._ssl_ca_cert, # type: ignore[arg-type]
get_logs=self.get_logs,
startup_timeout=self.startup_timeout_seconds,
cluster_context=self.cluster_context,
@@ -792,6 +793,8 @@ class GKEStartPodOperator(KubernetesPodOperator):
on_finish_action=self.on_finish_action,
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
+ logging_interval=self.logging_interval,
+ last_log_time=last_log_time,
),
method_name="execute_complete",
kwargs={"cluster_url": self._cluster_url, "ssl_ca_cert":
self._ssl_ca_cert},
@@ -802,7 +805,7 @@ class GKEStartPodOperator(KubernetesPodOperator):
self._cluster_url = kwargs["cluster_url"]
self._ssl_ca_cert = kwargs["ssl_ca_cert"]
- return super().execute_complete(context, event, **kwargs)
+ return super().trigger_reentry(context, event)
class GKEStartJobOperator(KubernetesJobOperator):
diff --git a/airflow/providers/google/cloud/triggers/kubernetes_engine.py
b/airflow/providers/google/cloud/triggers/kubernetes_engine.py
index 8557bea082..f05bb0dc6c 100644
--- a/airflow/providers/google/cloud/triggers/kubernetes_engine.py
+++ b/airflow/providers/google/cloud/triggers/kubernetes_engine.py
@@ -142,6 +142,8 @@ class GKEStartPodTrigger(KubernetesPodTrigger):
"on_finish_action": self.on_finish_action.value,
"gcp_conn_id": self.gcp_conn_id,
"impersonation_chain": self.impersonation_chain,
+ "logging_interval": self.logging_interval,
+ "last_log_time": self.last_log_time,
},
)
diff --git a/tests/providers/google/cloud/operators/test_kubernetes_engine.py
b/tests/providers/google/cloud/operators/test_kubernetes_engine.py
index c71cc99c7e..2e27db59b3 100644
--- a/tests/providers/google/cloud/operators/test_kubernetes_engine.py
+++ b/tests/providers/google/cloud/operators/test_kubernetes_engine.py
@@ -673,6 +673,7 @@ class TestGKEPodOperatorAsync:
namespace=NAMESPACE,
image=IMAGE,
deferrable=True,
+ on_finish_action="delete_pod",
)
self.gke_op.pod = mock.MagicMock(
name=TASK_NAME,
@@ -703,6 +704,59 @@ class TestGKEPodOperatorAsync:
fetch_cluster_info_mock.assert_called_once()
assert isinstance(exc.value.trigger, GKEStartPodTrigger)
+ @pytest.mark.parametrize("status", ["error", "failed", "timeout"])
+
@mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook.get_pod")
+ @mock.patch(KUB_OP_PATH.format("_clean"))
+
@mock.patch("airflow.providers.google.cloud.operators.kubernetes_engine.GKEStartPodOperator.hook")
+ @mock.patch(KUB_OP_PATH.format("_write_logs"))
+ def test_execute_complete_failure(self, mock_write_logs, mock_gke_hook,
mock_clean, mock_get_pod, status):
+ self.gke_op._cluster_url = CLUSTER_URL
+ self.gke_op._ssl_ca_cert = SSL_CA_CERT
+ with pytest.raises(AirflowException):
+ self.gke_op.execute_complete(
+ context=mock.MagicMock(),
+ event={"name": "test", "status": status, "namespace":
"default", "message": ""},
+ cluster_url=self.gke_op._cluster_url,
+ ssl_ca_cert=self.gke_op._ssl_ca_cert,
+ )
+ mock_write_logs.assert_called_once()
+
+
@mock.patch("airflow.providers.google.cloud.operators.kubernetes_engine.GKEStartPodOperator.hook")
+
@mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook.get_pod")
+ @mock.patch(KUB_OP_PATH.format("_clean"))
+ @mock.patch(KUB_OP_PATH.format("_write_logs"))
+ def test_execute_complete_success(self, mock_write_logs, mock_clean,
mock_get_pod, mock_gke_hook):
+ self.gke_op._cluster_url = CLUSTER_URL
+ self.gke_op._ssl_ca_cert = SSL_CA_CERT
+ self.gke_op.execute_complete(
+ context=mock.MagicMock(),
+ event={"name": "test", "status": "success", "namespace":
"default"},
+ cluster_url=self.gke_op._cluster_url,
+ ssl_ca_cert=self.gke_op._ssl_ca_cert,
+ )
+ mock_write_logs.assert_called_once()
+
+ @mock.patch(KUB_OP_PATH.format("pod_manager"))
+ @mock.patch(
+
"airflow.providers.google.cloud.operators.kubernetes_engine.GKEStartPodOperator.invoke_defer_method"
+ )
+
@mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook.get_pod")
+ @mock.patch(KUB_OP_PATH.format("_clean"))
+
@mock.patch("airflow.providers.google.cloud.operators.kubernetes_engine.GKEStartPodOperator.hook")
+ def test_execute_complete_running(
+ self, mock_gke_hook, mock_clean, mock_get_pod,
mock_invoke_defer_method, mock_pod_manager
+ ):
+ self.gke_op._cluster_url = CLUSTER_URL
+ self.gke_op._ssl_ca_cert = SSL_CA_CERT
+ self.gke_op.execute_complete(
+ context=mock.MagicMock(),
+ event={"name": "test", "status": "running", "namespace":
"default"},
+ cluster_url=self.gke_op._cluster_url,
+ ssl_ca_cert=self.gke_op._ssl_ca_cert,
+ )
+ mock_pod_manager.fetch_container_logs.assert_called_once()
+ mock_invoke_defer_method.assert_called_once()
+
class TestGKEStartJobOperator:
def setup_method(self):
diff --git a/tests/providers/google/cloud/triggers/test_kubernetes_engine.py
b/tests/providers/google/cloud/triggers/test_kubernetes_engine.py
index 8a18dfcb90..8a43f3627c 100644
--- a/tests/providers/google/cloud/triggers/test_kubernetes_engine.py
+++ b/tests/providers/google/cloud/triggers/test_kubernetes_engine.py
@@ -125,6 +125,8 @@ class TestGKEStartPodTrigger:
"should_delete_pod": SHOULD_DELETE_POD,
"gcp_conn_id": GCP_CONN_ID,
"impersonation_chain": IMPERSONATION_CHAIN,
+ "last_log_time": None,
+ "logging_interval": None,
}
@pytest.mark.asyncio