This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c4a106e69b Create GKESuspendJobOperator and GKEResumeJobOperator
operators (#38677)
c4a106e69b is described below
commit c4a106e69bbc396d2527a3b8c94e2d95fced4284
Author: Maksim <[email protected]>
AuthorDate: Fri Apr 12 11:51:03 2024 +0200
Create GKESuspendJobOperator and GKEResumeJobOperator operators (#38677)
---
.../google/cloud/operators/kubernetes_engine.py | 211 +++++++++++++++++++-
.../operators/cloud/kubernetes_engine.rst | 30 +++
.../cloud/operators/test_kubernetes_engine.py | 219 +++++++++++++++++++++
.../example_kubernetes_engine_job.py | 26 +++
4 files changed, 485 insertions(+), 1 deletion(-)
diff --git a/airflow/providers/google/cloud/operators/kubernetes_engine.py
b/airflow/providers/google/cloud/operators/kubernetes_engine.py
index 8729335c5a..4b4261ff8d 100644
--- a/airflow/providers/google/cloud/operators/kubernetes_engine.py
+++ b/airflow/providers/google/cloud/operators/kubernetes_engine.py
@@ -29,7 +29,7 @@ import yaml
from deprecated import deprecated
from google.api_core.exceptions import AlreadyExists
from google.cloud.container_v1.types import Cluster
-from kubernetes.client import V1JobList
+from kubernetes.client import V1JobList, models as k8s
from kubernetes.utils.create_from_yaml import FailToCreateError
from packaging.version import parse as parse_version
@@ -47,6 +47,7 @@ from airflow.providers.google.cloud.hooks.kubernetes_engine
import (
GKEDeploymentHook,
GKEHook,
GKEJobHook,
+ GKEKubernetesHook,
GKEPodHook,
)
from airflow.providers.google.cloud.links.kubernetes_engine import (
@@ -1494,3 +1495,211 @@ class GKEDeleteJobOperator(KubernetesDeleteJobOperator):
).fetch_cluster_info()
return super().execute(context)
+
+
+class GKESuspendJobOperator(GoogleCloudBaseOperator):
+ """
+ Suspend Job by given name.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:GKESuspendJobOperator`
+
+ :param name: The name of the Job to suspend
+ :param project_id: The Google Developers Console project id.
+ :param location: The name of the Google Kubernetes Engine zone or region
in which the cluster
+ resides.
+ :param cluster_name: The name of the Google Kubernetes Engine cluster.
+ :param namespace: The name of the Google Kubernetes Engine namespace.
+ :param use_internal_ip: Use the internal IP address as the endpoint.
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ """
+
+ template_fields: Sequence[str] = (
+ "project_id",
+ "gcp_conn_id",
+ "name",
+ "namespace",
+ "cluster_name",
+ "location",
+ "impersonation_chain",
+ )
+ operator_extra_links = (KubernetesEngineJobLink(),)
+
+ def __init__(
+ self,
+ *,
+ name: str,
+ location: str,
+ namespace: str,
+ cluster_name: str,
+ project_id: str | None = None,
+ use_internal_ip: bool = False,
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+
+ self.project_id = project_id
+ self.gcp_conn_id = gcp_conn_id
+ self.location = location
+ self.name = name
+ self.namespace = namespace
+ self.cluster_name = cluster_name
+ self.use_internal_ip = use_internal_ip
+ self.impersonation_chain = impersonation_chain
+
+ self.job: V1Job | None = None
+ self._ssl_ca_cert: str
+ self._cluster_url: str
+
+ @cached_property
+ def cluster_hook(self) -> GKEHook:
+ return GKEHook(
+ gcp_conn_id=self.gcp_conn_id,
+ location=self.location,
+ impersonation_chain=self.impersonation_chain,
+ )
+
+ @cached_property
+ def hook(self) -> GKEKubernetesHook:
+ self._cluster_url, self._ssl_ca_cert = GKEClusterAuthDetails(
+ cluster_name=self.cluster_name,
+ project_id=self.project_id,
+ use_internal_ip=self.use_internal_ip,
+ cluster_hook=self.cluster_hook,
+ ).fetch_cluster_info()
+
+ return GKEKubernetesHook(
+ gcp_conn_id=self.gcp_conn_id,
+ cluster_url=self._cluster_url,
+ ssl_ca_cert=self._ssl_ca_cert,
+ )
+
+ def execute(self, context: Context) -> None:
+ self.job = self.hook.patch_namespaced_job(
+ job_name=self.name,
+ namespace=self.namespace,
+ body={"spec": {"suspend": True}},
+ )
+ self.log.info(
+ "Job %s from cluster %s was suspended.",
+ self.name,
+ self.cluster_name,
+ )
+ KubernetesEngineJobLink.persist(context=context, task_instance=self)
+
+ return k8s.V1Job.to_dict(self.job)
+
+
+class GKEResumeJobOperator(GoogleCloudBaseOperator):
+ """
+ Resume Job by given name.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:GKEResumeJobOperator`
+
+ :param name: The name of the Job to resume
+ :param project_id: The Google Developers Console project id.
+ :param location: The name of the Google Kubernetes Engine zone or region
in which the cluster
+ resides.
+ :param cluster_name: The name of the Google Kubernetes Engine cluster.
+ :param namespace: The name of the Google Kubernetes Engine namespace.
+ :param use_internal_ip: Use the internal IP address as the endpoint.
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ """
+
+ template_fields: Sequence[str] = (
+ "project_id",
+ "gcp_conn_id",
+ "name",
+ "namespace",
+ "cluster_name",
+ "location",
+ "impersonation_chain",
+ )
+ operator_extra_links = (KubernetesEngineJobLink(),)
+
+ def __init__(
+ self,
+ *,
+ name: str,
+ location: str,
+ namespace: str,
+ cluster_name: str,
+ project_id: str | None = None,
+ use_internal_ip: bool = False,
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+
+ self.project_id = project_id
+ self.gcp_conn_id = gcp_conn_id
+ self.location = location
+ self.name = name
+ self.namespace = namespace
+ self.cluster_name = cluster_name
+ self.use_internal_ip = use_internal_ip
+ self.impersonation_chain = impersonation_chain
+
+ self.job: V1Job | None = None
+ self._ssl_ca_cert: str
+ self._cluster_url: str
+
+ @cached_property
+ def cluster_hook(self) -> GKEHook:
+ return GKEHook(
+ gcp_conn_id=self.gcp_conn_id,
+ location=self.location,
+ impersonation_chain=self.impersonation_chain,
+ )
+
+ @cached_property
+ def hook(self) -> GKEKubernetesHook:
+ self._cluster_url, self._ssl_ca_cert = GKEClusterAuthDetails(
+ cluster_name=self.cluster_name,
+ project_id=self.project_id,
+ use_internal_ip=self.use_internal_ip,
+ cluster_hook=self.cluster_hook,
+ ).fetch_cluster_info()
+
+ return GKEKubernetesHook(
+ gcp_conn_id=self.gcp_conn_id,
+ cluster_url=self._cluster_url,
+ ssl_ca_cert=self._ssl_ca_cert,
+ )
+
+ def execute(self, context: Context) -> None:
+ self.job = self.hook.patch_namespaced_job(
+ job_name=self.name,
+ namespace=self.namespace,
+ body={"spec": {"suspend": False}},
+ )
+ self.log.info(
+ "Job %s from cluster %s was resumed.",
+ self.name,
+ self.cluster_name,
+ )
+ KubernetesEngineJobLink.persist(context=context, task_instance=self)
+
+ return k8s.V1Job.to_dict(self.job)
diff --git
a/docs/apache-airflow-providers-google/operators/cloud/kubernetes_engine.rst
b/docs/apache-airflow-providers-google/operators/cloud/kubernetes_engine.rst
index 7d5cc44826..d71ebf87e9 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/kubernetes_engine.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/kubernetes_engine.rst
@@ -312,6 +312,36 @@ delete resource in the specified Google Kubernetes Engine
cluster.
:start-after: [START howto_operator_gke_delete_resource]
:end-before: [END howto_operator_gke_delete_resource]
+
+.. _howto/operator:GKESuspendJobOperator:
+
+Suspend a Job on a GKE cluster
+""""""""""""""""""""""""""""""
+
+You can use
:class:`~airflow.providers.google.cloud.operators.kubernetes_engine.GKESuspendJobOperator`
to
+suspend Job in the specified Google Kubernetes Engine cluster.
+
+.. exampleinclude::
/../../tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_gke_suspend_job]
+ :end-before: [END howto_operator_gke_suspend_job]
+
+
+.. _howto/operator:GKEResumeJobOperator:
+
+Resume a Job on a GKE cluster
+"""""""""""""""""""""""""""""
+
+You can use
:class:`~airflow.providers.google.cloud.operators.kubernetes_engine.GKEResumeJobOperator`
to
+resume Job in the specified Google Kubernetes Engine cluster.
+
+.. exampleinclude::
/../../tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_gke_resume_job]
+ :end-before: [END howto_operator_gke_resume_job]
+
Reference
^^^^^^^^^
diff --git a/tests/providers/google/cloud/operators/test_kubernetes_engine.py
b/tests/providers/google/cloud/operators/test_kubernetes_engine.py
index 442cdde68e..a6d0de56c4 100644
--- a/tests/providers/google/cloud/operators/test_kubernetes_engine.py
+++ b/tests/providers/google/cloud/operators/test_kubernetes_engine.py
@@ -42,10 +42,12 @@ from
airflow.providers.google.cloud.operators.kubernetes_engine import (
GKEDeleteCustomResourceOperator,
GKEDeleteJobOperator,
GKEDescribeJobOperator,
+ GKEResumeJobOperator,
GKEStartJobOperator,
GKEStartKueueInsideClusterOperator,
GKEStartKueueJobOperator,
GKEStartPodOperator,
+ GKESuspendJobOperator,
)
from airflow.providers.google.cloud.triggers.kubernetes_engine import
GKEStartPodTrigger
@@ -82,6 +84,7 @@ GKE_HOOK_PATH = f"{GKE_HOOK_MODULE_PATH}.GKEHook"
GKE_POD_HOOK_PATH = f"{GKE_HOOK_MODULE_PATH}.GKEPodHook"
GKE_DEPLOYMENT_HOOK_PATH = f"{GKE_HOOK_MODULE_PATH}.GKEDeploymentHook"
GKE_JOB_HOOK_PATH = f"{GKE_HOOK_MODULE_PATH}.GKEJobHook"
+GKE_K8S_HOOK_PATH = f"{GKE_HOOK_MODULE_PATH}.GKEKubernetesHook"
KUB_OPERATOR_EXEC =
"airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.execute"
KUB_JOB_OPERATOR_EXEC =
"airflow.providers.cncf.kubernetes.operators.job.KubernetesJobOperator.execute"
KUB_CREATE_RES_OPERATOR_EXEC = (
@@ -1319,3 +1322,219 @@ class TestGKEDeleteJobOperator:
hook = gke_op.hook
assert hook.gcp_conn_id == "test_conn"
+
+
+class TestGKESuspendJobOperator:
+ def setup_method(self):
+ self.gke_op = GKESuspendJobOperator(
+ project_id=TEST_GCP_PROJECT_ID,
+ location=PROJECT_LOCATION,
+ cluster_name=CLUSTER_NAME,
+ task_id=PROJECT_TASK_ID,
+ name=TASK_NAME,
+ namespace=NAMESPACE,
+ )
+
+ def test_config_file_throws_error(self):
+ with pytest.raises(AirflowException):
+ GKESuspendJobOperator(
+ project_id=TEST_GCP_PROJECT_ID,
+ location=PROJECT_LOCATION,
+ cluster_name=CLUSTER_NAME,
+ task_id=PROJECT_TASK_ID,
+ name=TASK_NAME,
+ namespace=NAMESPACE,
+ config_file="/path/to/alternative/kubeconfig",
+ )
+
+ @mock.patch.dict(os.environ, {})
+ @mock.patch(TEMP_FILE)
+ @mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
+ @mock.patch(GKE_HOOK_PATH)
+ @mock.patch(GKE_K8S_HOOK_PATH)
+ def test_execute(self, mock_job_hook, mock_hook, fetch_cluster_info_mock,
file_mock):
+ mock_job_hook.return_value.get_job.return_value = mock.MagicMock()
+ fetch_cluster_info_mock.return_value = (CLUSTER_URL, SSL_CA_CERT)
+ self.gke_op.execute(context=mock.MagicMock())
+ fetch_cluster_info_mock.assert_called_once()
+
+ @mock.patch.dict(os.environ, {})
+ @mock.patch(
+ "airflow.hooks.base.BaseHook.get_connections",
+ return_value=[Connection(extra=json.dumps({"keyfile_dict":
'{"private_key": "r4nd0m_k3y"}'}))],
+ )
+ @mock.patch(TEMP_FILE)
+ @mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
+ @mock.patch(GKE_HOOK_PATH)
+ @mock.patch(GKE_K8S_HOOK_PATH)
+ def test_execute_with_impersonation_service_account(
+ self, mock_job_hook, mock_hook, fetch_cluster_info_mock, file_mock,
get_con_mock
+ ):
+ mock_job_hook.return_value.get_job.return_value = mock.MagicMock()
+ fetch_cluster_info_mock.return_value = (CLUSTER_URL, SSL_CA_CERT)
+ self.gke_op.impersonation_chain = "[email protected]"
+ self.gke_op.execute(context=mock.MagicMock())
+ fetch_cluster_info_mock.assert_called_once()
+
+ @mock.patch.dict(os.environ, {})
+ @mock.patch(
+ "airflow.hooks.base.BaseHook.get_connections",
+ return_value=[Connection(extra=json.dumps({"keyfile_dict":
'{"private_key": "r4nd0m_k3y"}'}))],
+ )
+ @mock.patch(TEMP_FILE)
+ @mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
+ @mock.patch(GKE_HOOK_PATH)
+ @mock.patch(GKE_K8S_HOOK_PATH)
+ def test_execute_with_impersonation_service_chain_one_element(
+ self, mock_job_hook, mock_hook, fetch_cluster_info_mock, file_mock,
get_con_mock
+ ):
+ fetch_cluster_info_mock.return_value = (CLUSTER_URL, SSL_CA_CERT)
+ self.gke_op.impersonation_chain = ["[email protected]"]
+ self.gke_op.execute(context=mock.MagicMock())
+
+ fetch_cluster_info_mock.assert_called_once()
+
+ @pytest.mark.db_test
+ @mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
+ def test_default_gcp_conn_id(self, fetch_cluster_info_mock):
+ gke_op = GKESuspendJobOperator(
+ project_id=TEST_GCP_PROJECT_ID,
+ location=PROJECT_LOCATION,
+ cluster_name=CLUSTER_NAME,
+ task_id=PROJECT_TASK_ID,
+ name=TASK_NAME,
+ namespace=NAMESPACE,
+ )
+ fetch_cluster_info_mock.return_value = (CLUSTER_URL, SSL_CA_CERT)
+ hook = gke_op.hook
+
+ assert hook.gcp_conn_id == "google_cloud_default"
+
+ @mock.patch(
+
"airflow.providers.google.common.hooks.base_google.GoogleBaseHook.get_connection",
+ return_value=Connection(conn_id="test_conn"),
+ )
+ @mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
+ @mock.patch(GKE_HOOK_PATH)
+ def test_gcp_conn_id(self, mock_hook, fetch_cluster_info_mock,
mock_gke_conn):
+ gke_op = GKESuspendJobOperator(
+ project_id=TEST_GCP_PROJECT_ID,
+ location=PROJECT_LOCATION,
+ cluster_name=CLUSTER_NAME,
+ task_id=PROJECT_TASK_ID,
+ name=TASK_NAME,
+ namespace=NAMESPACE,
+ gcp_conn_id="test_conn",
+ )
+ fetch_cluster_info_mock.return_value = (CLUSTER_URL, SSL_CA_CERT)
+ hook = gke_op.hook
+
+ assert hook.gcp_conn_id == "test_conn"
+
+
+class TestGKEResumeJobOperator:
+ def setup_method(self):
+ self.gke_op = GKEResumeJobOperator(
+ project_id=TEST_GCP_PROJECT_ID,
+ location=PROJECT_LOCATION,
+ cluster_name=CLUSTER_NAME,
+ task_id=PROJECT_TASK_ID,
+ name=TASK_NAME,
+ namespace=NAMESPACE,
+ )
+
+ def test_config_file_throws_error(self):
+ with pytest.raises(AirflowException):
+ GKEResumeJobOperator(
+ project_id=TEST_GCP_PROJECT_ID,
+ location=PROJECT_LOCATION,
+ cluster_name=CLUSTER_NAME,
+ task_id=PROJECT_TASK_ID,
+ name=TASK_NAME,
+ namespace=NAMESPACE,
+ config_file="/path/to/alternative/kubeconfig",
+ )
+
+ @mock.patch.dict(os.environ, {})
+ @mock.patch(TEMP_FILE)
+ @mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
+ @mock.patch(GKE_HOOK_PATH)
+ @mock.patch(GKE_K8S_HOOK_PATH)
+ def test_execute(self, mock_job_hook, mock_hook, fetch_cluster_info_mock,
file_mock):
+ mock_job_hook.return_value.get_job.return_value = mock.MagicMock()
+ fetch_cluster_info_mock.return_value = (CLUSTER_URL, SSL_CA_CERT)
+ self.gke_op.execute(context=mock.MagicMock())
+ fetch_cluster_info_mock.assert_called_once()
+
+ @mock.patch.dict(os.environ, {})
+ @mock.patch(
+ "airflow.hooks.base.BaseHook.get_connections",
+ return_value=[Connection(extra=json.dumps({"keyfile_dict":
'{"private_key": "r4nd0m_k3y"}'}))],
+ )
+ @mock.patch(TEMP_FILE)
+ @mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
+ @mock.patch(GKE_HOOK_PATH)
+ @mock.patch(GKE_K8S_HOOK_PATH)
+ def test_execute_with_impersonation_service_account(
+ self, mock_job_hook, mock_hook, fetch_cluster_info_mock, file_mock,
get_con_mock
+ ):
+ mock_job_hook.return_value.get_job.return_value = mock.MagicMock()
+ fetch_cluster_info_mock.return_value = (CLUSTER_URL, SSL_CA_CERT)
+ self.gke_op.impersonation_chain = "[email protected]"
+ self.gke_op.execute(context=mock.MagicMock())
+ fetch_cluster_info_mock.assert_called_once()
+
+ @mock.patch.dict(os.environ, {})
+ @mock.patch(
+ "airflow.hooks.base.BaseHook.get_connections",
+ return_value=[Connection(extra=json.dumps({"keyfile_dict":
'{"private_key": "r4nd0m_k3y"}'}))],
+ )
+ @mock.patch(TEMP_FILE)
+ @mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
+ @mock.patch(GKE_HOOK_PATH)
+ @mock.patch(GKE_K8S_HOOK_PATH)
+ def test_execute_with_impersonation_service_chain_one_element(
+ self, mock_job_hook, mock_hook, fetch_cluster_info_mock, file_mock,
get_con_mock
+ ):
+ fetch_cluster_info_mock.return_value = (CLUSTER_URL, SSL_CA_CERT)
+ self.gke_op.impersonation_chain = ["[email protected]"]
+ self.gke_op.execute(context=mock.MagicMock())
+
+ fetch_cluster_info_mock.assert_called_once()
+
+ @pytest.mark.db_test
+ @mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
+ def test_default_gcp_conn_id(self, fetch_cluster_info_mock):
+ gke_op = GKEResumeJobOperator(
+ project_id=TEST_GCP_PROJECT_ID,
+ location=PROJECT_LOCATION,
+ cluster_name=CLUSTER_NAME,
+ task_id=PROJECT_TASK_ID,
+ name=TASK_NAME,
+ namespace=NAMESPACE,
+ )
+ fetch_cluster_info_mock.return_value = (CLUSTER_URL, SSL_CA_CERT)
+ hook = gke_op.hook
+
+ assert hook.gcp_conn_id == "google_cloud_default"
+
+ @mock.patch(
+
"airflow.providers.google.common.hooks.base_google.GoogleBaseHook.get_connection",
+ return_value=Connection(conn_id="test_conn"),
+ )
+ @mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
+ @mock.patch(GKE_HOOK_PATH)
+ def test_gcp_conn_id(self, mock_hook, fetch_cluster_info_mock,
mock_gke_conn):
+ gke_op = GKEResumeJobOperator(
+ project_id=TEST_GCP_PROJECT_ID,
+ location=PROJECT_LOCATION,
+ cluster_name=CLUSTER_NAME,
+ task_id=PROJECT_TASK_ID,
+ name=TASK_NAME,
+ namespace=NAMESPACE,
+ gcp_conn_id="test_conn",
+ )
+ fetch_cluster_info_mock.return_value = (CLUSTER_URL, SSL_CA_CERT)
+ hook = gke_op.hook
+
+ assert hook.gcp_conn_id == "test_conn"
diff --git
a/tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py
b/tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py
index 97b5cd406b..2c7790c102 100644
---
a/tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py
+++
b/tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py
@@ -32,7 +32,9 @@ from
airflow.providers.google.cloud.operators.kubernetes_engine import (
GKEDeleteJobOperator,
GKEDescribeJobOperator,
GKEListJobsOperator,
+ GKEResumeJobOperator,
GKEStartJobOperator,
+ GKESuspendJobOperator,
)
from airflow.utils.trigger_rule import TriggerRule
@@ -116,6 +118,28 @@ with DAG(
cluster_name=CLUSTER_NAME,
)
+ # [START howto_operator_gke_suspend_job]
+ suspend_job = GKESuspendJobOperator(
+ task_id="suspend_job",
+ project_id=GCP_PROJECT_ID,
+ location=GCP_LOCATION,
+ cluster_name=CLUSTER_NAME,
+ name=job_task.output["job_name"],
+ namespace="default",
+ )
+ # [END howto_operator_gke_suspend_job]
+
+ # [START howto_operator_gke_resume_job]
+ resume_job = GKEResumeJobOperator(
+ task_id="resume_job",
+ project_id=GCP_PROJECT_ID,
+ location=GCP_LOCATION,
+ cluster_name=CLUSTER_NAME,
+ name=job_task.output["job_name"],
+ namespace="default",
+ )
+ # [END howto_operator_gke_resume_job]
+
# [START howto_operator_gke_delete_job]
delete_job = GKEDeleteJobOperator(
task_id="delete_job",
@@ -149,6 +173,8 @@ with DAG(
[job_task, job_task_def],
list_job_task,
[describe_job_task, describe_job_task_def],
+ suspend_job,
+ resume_job,
[delete_job, delete_job_def],
delete_cluster,
)