This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6073ddb1a33 Fix broken GKEStartPodOperator extra link (#66657)
6073ddb1a33 is described below
commit 6073ddb1a33d4d86630bc76951188a2a9dadb4b7
Author: deepinsight coder <[email protected]>
AuthorDate: Sun May 10 15:15:08 2026 -0700
Fix broken GKEStartPodOperator extra link (#66657)
* Fix broken extra link in GKEStartPodOperator
* Fix GKE start operator extra links for pods and jobs
* fix(mypy): suppress attr-defined on mixin self.log call
GKEOperatorMixin is a bare mixin; `self.log` resolves at runtime via
the BaseOperator that the operator class actually inherits from, but
mypy can't see that. Apply the same `# type: ignore[attr-defined]`
pattern already used on every other self.<attr> access in this class.
---------
Co-authored-by: Jarek Potiuk <[email protected]>
---
.../google/cloud/operators/kubernetes_engine.py | 84 ++++-
.../cloud/operators/test_kubernetes_engine.py | 358 +++++++++++++++++++++
2 files changed, 430 insertions(+), 12 deletions(-)
diff --git
a/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py
b/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py
index 1538885e4e0..49c80445af9 100644
---
a/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py
+++
b/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py
@@ -182,6 +182,20 @@ class GKEOperatorMixin:
def ssl_ca_cert(self) -> str:
return self.cluster_info[1]
+ def _get_resource_name_and_namespace(
+ self, *, resource: Any, resource_type: str
+ ) -> tuple[str, str] | None:
+ metadata = getattr(resource, "metadata", None)
+ resource_name = getattr(metadata, "name", None)
+ namespace = getattr(metadata, "namespace", None)
+ if not resource_name or not namespace:
+ self.log.debug( # type: ignore[attr-defined]
+ "Skipping Kubernetes %s extra link persistence because
metadata is incomplete.",
+ resource_type,
+ )
+ return None
+ return resource_name, namespace
+
class GKEDeleteClusterOperator(GKEOperatorMixin, GoogleCloudBaseOperator):
"""
@@ -757,6 +771,26 @@ class GKEStartPodOperator(GKEOperatorMixin,
KubernetesPodOperator):
if self.config_file:
raise AirflowException("config_file is not an allowed parameter
for the GKEStartPodOperator.")
+ def _persist_pod_link(self, *, context: Context, pod: k8s.V1Pod | None) ->
None:
+ metadata = self._get_resource_name_and_namespace(resource=pod,
resource_type="Pod")
+ if metadata is None:
+ return
+
+ pod_name, namespace = metadata
+ KubernetesEnginePodLink.persist(
+ context=context,
+ project_id=self.project_id,
+ location=self.location,
+ cluster_name=self.cluster_name,
+ namespace=namespace,
+ pod_name=pod_name,
+ )
+
+ def get_or_create_pod(self, pod_request_obj: k8s.V1Pod, context: Context)
-> k8s.V1Pod:
+ pod = super().get_or_create_pod(pod_request_obj=pod_request_obj,
context=context)
+ self._persist_pod_link(context=context, pod=pod)
+ return pod
+
def invoke_defer_method(
self, last_log_time: DateTime | None = None, context: Context | None =
None
) -> None:
@@ -862,26 +896,52 @@ class GKEStartJobOperator(GKEOperatorMixin,
KubernetesJobOperator):
self.use_internal_ip = use_internal_ip
self.use_dns_endpoint = use_dns_endpoint
self.impersonation_chain = impersonation_chain
+ self._job_link_context: Context | None = None
# There is no need to manage the kube_config file, as it will be
generated automatically.
# All Kubernetes parameters (except config_file) are also valid for
the GKEStartJobOperator.
if self.config_file:
raise AirflowException("config_file is not an allowed parameter
for the GKEStartJobOperator.")
+ def _persist_job_link(self, *, context: Context, job: k8s.V1Job | None) ->
None:
+ metadata = self._get_resource_name_and_namespace(resource=job,
resource_type="Job")
+ if metadata is None:
+ return
+
+ job_name, namespace = metadata
+ KubernetesEngineJobLink.persist(
+ context=context,
+ project_id=self.project_id,
+ location=self.location,
+ cluster_name=self.cluster_name,
+ namespace=namespace,
+ job_name=job_name,
+ )
+
+ def create_job(self, job_request_obj: k8s.V1Job) -> k8s.V1Job:
+ job = super().create_job(job_request_obj=job_request_obj)
+ if self._job_link_context is not None:
+ self._persist_job_link(context=self._job_link_context, job=job)
+ return job
+
def execute(self, context: Context):
"""Execute process of creating Job."""
- if self.deferrable:
- kubernetes_provider =
ProvidersManager().providers["apache-airflow-providers-cncf-kubernetes"]
- kubernetes_provider_name = kubernetes_provider.data["package-name"]
- kubernetes_provider_version = kubernetes_provider.version
- min_version = "8.0.1"
- if parse_version(kubernetes_provider_version) <=
parse_version(min_version):
- raise AirflowException(
- "You are trying to use `GKEStartJobOperator` in deferrable
mode with the provider "
- f"package
{kubernetes_provider_name}=={kubernetes_provider_version} which doesn't "
- f"support this feature. Please upgrade it to version
higher than {min_version}."
- )
- return super().execute(context)
+ self._job_link_context = context
+ try:
+ if self.deferrable:
+ kubernetes_provider =
ProvidersManager().providers["apache-airflow-providers-cncf-kubernetes"]
+ kubernetes_provider_name =
kubernetes_provider.data["package-name"]
+ kubernetes_provider_version = kubernetes_provider.version
+ min_version = "8.0.1"
+ if parse_version(kubernetes_provider_version) <=
parse_version(min_version):
+ raise AirflowException(
+ "You are trying to use `GKEStartJobOperator` in
deferrable mode with the provider "
+ f"package
{kubernetes_provider_name}=={kubernetes_provider_version} which doesn't "
+ f"support this feature. Please upgrade it to version
higher than {min_version}."
+ )
+ return super().execute(context)
+ finally:
+ self._job_link_context = None
def execute_deferrable(self):
self.defer(
diff --git
a/providers/google/tests/unit/google/cloud/operators/test_kubernetes_engine.py
b/providers/google/tests/unit/google/cloud/operators/test_kubernetes_engine.py
index 6453e13376e..80840ba619f 100644
---
a/providers/google/tests/unit/google/cloud/operators/test_kubernetes_engine.py
+++
b/providers/google/tests/unit/google/cloud/operators/test_kubernetes_engine.py
@@ -24,6 +24,7 @@ from unittest.mock import PropertyMock, call
import pytest
from google.api_core.exceptions import AlreadyExists, FailedPrecondition,
PermissionDenied
from google.cloud.container_v1.types import Cluster, NodePool
+from kubernetes.client import models as k8s
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.providers.cncf.kubernetes.operators.job import (
@@ -41,6 +42,13 @@ from airflow.providers.cncf.kubernetes.operators.resource
import (
)
from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction
from airflow.providers.common.compat.sdk import AirflowException
+from airflow.providers.google.cloud.links.base import BASE_LINK
+from airflow.providers.google.cloud.links.kubernetes_engine import (
+ KUBERNETES_JOB_LINK,
+ KUBERNETES_POD_LINK,
+ KubernetesEngineJobLink,
+ KubernetesEnginePodLink,
+)
from airflow.providers.google.cloud.operators.kubernetes_engine import (
GKEClusterAuthDetails,
GKECreateClusterOperator,
@@ -59,6 +67,11 @@ from
airflow.providers.google.cloud.operators.kubernetes_engine import (
GKESuspendJobOperator,
)
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
+
+if AIRFLOW_V_3_0_PLUS:
+ from airflow.sdk.execution_time.comms import XComResult
+
pytestmark =
pytest.mark.filterwarnings("error::airflow.exceptions.AirflowProviderDeprecationWarning")
TEST_PROJECT_ID = "test-id"
@@ -100,6 +113,28 @@ K8S_NAMESPACE = "default"
GKE_OPERATORS_PATH =
"airflow.providers.google.cloud.operators.kubernetes_engine.{}"
+def make_pod(
+ *,
+ name: str | None = K8S_POD_NAME,
+ namespace: str | None = K8S_NAMESPACE,
+ metadata: k8s.V1ObjectMeta | None = None,
+) -> k8s.V1Pod:
+ if metadata is None:
+ metadata = k8s.V1ObjectMeta(name=name, namespace=namespace)
+ return k8s.V1Pod(metadata=metadata)
+
+
+def make_job(
+ *,
+ name: str | None = K8S_JOB_NAME,
+ namespace: str | None = K8S_NAMESPACE,
+ metadata: k8s.V1ObjectMeta | None = None,
+) -> k8s.V1Job:
+ if metadata is None:
+ metadata = k8s.V1ObjectMeta(name=name, namespace=namespace)
+ return k8s.V1Job(metadata=metadata)
+
+
class TestGKEClusterAuthDetails:
@pytest.mark.parametrize(
(
@@ -865,6 +900,129 @@ class TestGKEStartPodOperator:
for expected_attr in expected_attributes:
assert op.__getattribute__(expected_attr) ==
expected_attributes[expected_attr]
+ @mock.patch(GKE_OPERATORS_PATH.format("KubernetesEnginePodLink"))
+
@mock.patch(GKE_OPERATORS_PATH.format("KubernetesPodOperator.get_or_create_pod"))
+ def test_get_or_create_pod_persists_link(self, mock_get_or_create_pod,
mock_link):
+ pod = make_pod()
+ mock_get_or_create_pod.return_value = pod
+ context = {"ti": mock.Mock()}
+
+ result = self.operator.get_or_create_pod(pod_request_obj=make_pod(),
context=context)
+
+ assert result == pod
+ mock_link.persist.assert_called_once_with(
+ context=context,
+ project_id=TEST_PROJECT_ID,
+ location=TEST_LOCATION,
+ cluster_name=GKE_CLUSTER_NAME,
+ namespace=K8S_NAMESPACE,
+ pod_name=K8S_POD_NAME,
+ )
+
+ @pytest.mark.parametrize(
+ "pod",
+ [
+ k8s.V1Pod(),
+ make_pod(metadata=k8s.V1ObjectMeta(name=None,
namespace=K8S_NAMESPACE)),
+ make_pod(metadata=k8s.V1ObjectMeta(name=K8S_POD_NAME,
namespace=None)),
+ ],
+ )
+ @mock.patch(GKE_OPERATORS_PATH.format("KubernetesEnginePodLink"))
+
@mock.patch(GKE_OPERATORS_PATH.format("KubernetesPodOperator.get_or_create_pod"))
+ def test_get_or_create_pod_skips_link_when_metadata_is_incomplete(
+ self, mock_get_or_create_pod, mock_link, pod
+ ):
+ mock_get_or_create_pod.return_value = pod
+
+ result = self.operator.get_or_create_pod(pod_request_obj=make_pod(),
context={"ti": mock.Mock()})
+
+ assert result == pod
+ mock_link.persist.assert_not_called()
+
+
@mock.patch(GKE_OPERATORS_PATH.format("GKEStartPodOperator.post_complete_action"))
+
@mock.patch(GKE_OPERATORS_PATH.format("GKEStartPodOperator.is_istio_enabled"),
return_value=False)
+
@mock.patch(GKE_OPERATORS_PATH.format("GKEStartPodOperator.await_pod_completion"))
+
@mock.patch(GKE_OPERATORS_PATH.format("GKEStartPodOperator.await_pod_start"))
+
@mock.patch(GKE_OPERATORS_PATH.format("GKEStartPodOperator.await_init_containers_completion"))
+ @mock.patch(GKE_OPERATORS_PATH.format("GKEStartPodOperator.find_pod"))
+ @mock.patch(GKE_OPERATORS_PATH.format("KubernetesEnginePodLink"))
+
@mock.patch(GKE_OPERATORS_PATH.format("KubernetesPodOperator.get_or_create_pod"))
+
@mock.patch(GKE_OPERATORS_PATH.format("GKEStartPodOperator.build_pod_request_obj"))
+ def test_execute_sync_persists_link_when_follow_up_step_fails(
+ self,
+ mock_build_pod_request_obj,
+ mock_get_or_create_pod,
+ mock_link,
+ mock_find_pod,
+ mock_await_init_containers_completion,
+ mock_await_pod_start,
+ mock_await_pod_completion,
+ mock_is_istio_enabled,
+ mock_post_complete_action,
+ ):
+ pod = make_pod()
+ context = {"ti": mock.Mock()}
+ self.operator.pod_manager = mock.Mock()
+ self.operator.pod_manager.await_pod_completion.return_value = pod
+ mock_build_pod_request_obj.return_value = make_pod()
+ mock_get_or_create_pod.return_value = pod
+ mock_find_pod.return_value = pod
+ mock_await_init_containers_completion.side_effect =
AirflowException("pod startup failed")
+
+ with pytest.raises(AirflowException, match="pod startup failed"):
+ self.operator.execute_sync(context=context)
+
+ mock_link.persist.assert_called_once_with(
+ context=context,
+ project_id=TEST_PROJECT_ID,
+ location=TEST_LOCATION,
+ cluster_name=GKE_CLUSTER_NAME,
+ namespace=K8S_NAMESPACE,
+ pod_name=K8S_POD_NAME,
+ )
+ mock_post_complete_action.assert_called_once()
+
+
@mock.patch(GKE_OPERATORS_PATH.format("GKEStartPodOperator.post_complete_action"))
+ @mock.patch(GKE_OPERATORS_PATH.format("KubernetesEnginePodLink"))
+
@mock.patch(GKE_OPERATORS_PATH.format("KubernetesPodOperator.get_or_create_pod"))
+
@mock.patch(GKE_OPERATORS_PATH.format("GKEStartPodOperator.build_pod_request_obj"))
+ def test_execute_sync_does_not_persist_link_when_pod_creation_fails(
+ self, mock_build_pod_request_obj, mock_get_or_create_pod, mock_link,
mock_post_complete_action
+ ):
+ context = {"ti": mock.Mock()}
+ mock_build_pod_request_obj.return_value = make_pod()
+ mock_get_or_create_pod.side_effect = AirflowException("pod creation
failed")
+
+ with pytest.raises(AirflowException, match="pod creation failed"):
+ self.operator.execute_sync(context=context)
+
+ mock_link.persist.assert_not_called()
+ mock_post_complete_action.assert_called_once()
+
+ @mock.patch(GKE_OPERATORS_PATH.format("KubernetesEnginePodLink"))
+
@mock.patch(GKE_OPERATORS_PATH.format("KubernetesPodOperator.get_or_create_pod"))
+
@mock.patch(GKE_OPERATORS_PATH.format("GKEStartPodOperator.build_pod_request_obj"))
+ def test_execute_async_persists_link_before_deferral(
+ self, mock_build_pod_request_obj, mock_get_or_create_pod, mock_link
+ ):
+ order: list[str] = []
+ pod = make_pod()
+ context = {"ti": mock.Mock()}
+ mock_build_pod_request_obj.return_value = make_pod()
+ mock_get_or_create_pod.return_value = pod
+ mock_link.persist.side_effect = lambda **_: order.append("persist")
+
+ with mock.patch.object(
+ GKEStartPodOperator,
+ "invoke_defer_method",
+ autospec=True,
+ side_effect=lambda _operator, last_log_time=None, context=None:
order.append("defer"),
+ ) as mock_invoke_defer_method:
+ self.operator.execute_async(context=context)
+
+ assert order == ["persist", "defer"]
+ mock_invoke_defer_method.assert_called_once_with(self.operator,
context=context)
+
@mock.patch(GKE_OPERATORS_PATH.format("GKEStartPodOperator.defer"))
@mock.patch(GKE_OPERATORS_PATH.format("GKEClusterAuthDetails.fetch_cluster_info"))
@mock.patch(GKE_OPERATORS_PATH.format("GKEHook"))
@@ -998,6 +1156,132 @@ class TestGKEStartJobOperator:
mock_super.assert_not_called()
mock_super.return_value.execute.assert_not_called()
+ @mock.patch(GKE_OPERATORS_PATH.format("KubernetesEngineJobLink"))
+ @mock.patch(GKE_OPERATORS_PATH.format("KubernetesJobOperator.create_job"))
+
@mock.patch(GKE_OPERATORS_PATH.format("GKEStartJobOperator.build_job_request_obj"))
+ def test_execute_persists_link(self, mock_build_job_request_obj,
mock_create_job, mock_link):
+ context = {"ti": mock.Mock()}
+ mock_build_job_request_obj.return_value = mock.Mock()
+ mock_create_job.return_value = make_job()
+
+ self.operator.execute(context=context)
+
+ mock_link.persist.assert_called_once_with(
+ context=context,
+ project_id=TEST_PROJECT_ID,
+ location=TEST_LOCATION,
+ cluster_name=GKE_CLUSTER_NAME,
+ namespace=K8S_NAMESPACE,
+ job_name=K8S_JOB_NAME,
+ )
+
+ @mock.patch(GKE_OPERATORS_PATH.format("KubernetesEngineJobLink"))
+ @mock.patch(GKE_OPERATORS_PATH.format("KubernetesJobOperator.get_pods"))
+ @mock.patch(GKE_OPERATORS_PATH.format("KubernetesJobOperator.create_job"))
+
@mock.patch(GKE_OPERATORS_PATH.format("GKEStartJobOperator.build_job_request_obj"))
+ def test_execute_persists_link_when_follow_up_step_fails(
+ self,
+ mock_build_job_request_obj,
+ mock_create_job,
+ mock_get_pods,
+ mock_link,
+ ):
+ context = {"ti": mock.Mock()}
+ self.operator.wait_until_job_complete = True
+ mock_build_job_request_obj.return_value = mock.Mock()
+ mock_create_job.return_value = make_job()
+ mock_get_pods.side_effect = AirflowException("pod discovery failed")
+
+ with pytest.raises(AirflowException, match="pod discovery failed"):
+ self.operator.execute(context=context)
+
+ mock_link.persist.assert_called_once_with(
+ context=context,
+ project_id=TEST_PROJECT_ID,
+ location=TEST_LOCATION,
+ cluster_name=GKE_CLUSTER_NAME,
+ namespace=K8S_NAMESPACE,
+ job_name=K8S_JOB_NAME,
+ )
+
+ @mock.patch(GKE_OPERATORS_PATH.format("KubernetesEngineJobLink"))
+ @mock.patch(GKE_OPERATORS_PATH.format("KubernetesJobOperator.create_job"))
+
@mock.patch(GKE_OPERATORS_PATH.format("GKEStartJobOperator.build_job_request_obj"))
+ def test_execute_does_not_persist_link_when_job_creation_fails(
+ self,
+ mock_build_job_request_obj,
+ mock_create_job,
+ mock_link,
+ ):
+ context = {"ti": mock.Mock()}
+ mock_build_job_request_obj.return_value = mock.Mock()
+ mock_create_job.side_effect = AirflowException("job creation failed")
+
+ with pytest.raises(AirflowException, match="job creation failed"):
+ self.operator.execute(context=context)
+
+ mock_link.persist.assert_not_called()
+
+ @pytest.mark.parametrize(
+ "job",
+ [
+ k8s.V1Job(),
+ make_job(metadata=k8s.V1ObjectMeta(name=None,
namespace=K8S_NAMESPACE)),
+ make_job(metadata=k8s.V1ObjectMeta(name=K8S_JOB_NAME,
namespace=None)),
+ ],
+ )
+ @mock.patch(GKE_OPERATORS_PATH.format("KubernetesEngineJobLink"))
+ @mock.patch(GKE_OPERATORS_PATH.format("KubernetesJobOperator.create_job"))
+ def test_create_job_skips_link_when_metadata_is_incomplete(self,
mock_create_job, mock_link, job):
+ context = {"ti": mock.Mock()}
+ mock_create_job.return_value = job
+ self.operator._job_link_context = context
+
+ result = self.operator.create_job(job_request_obj=mock.Mock())
+
+ assert result == job
+ mock_link.persist.assert_not_called()
+
+ @mock.patch(GKE_OPERATORS_PATH.format("ProvidersManager"))
+ @mock.patch(GKE_OPERATORS_PATH.format("KubernetesEngineJobLink"))
+ @mock.patch(GKE_OPERATORS_PATH.format("KubernetesJobOperator.get_pods"))
+ @mock.patch(GKE_OPERATORS_PATH.format("KubernetesJobOperator.create_job"))
+
@mock.patch(GKE_OPERATORS_PATH.format("GKEStartJobOperator.build_job_request_obj"))
+ def test_execute_deferrable_persists_link_before_deferral(
+ self,
+ mock_build_job_request_obj,
+ mock_create_job,
+ mock_get_pods,
+ mock_link,
+ mock_providers_manager,
+ ):
+ order: list[str] = []
+ context = {"ti": mock.Mock()}
+ self.operator.wait_until_job_complete = True
+ self.operator.deferrable = True
+ self.operator.pod_request_obj = make_pod()
+ mock_build_job_request_obj.return_value = mock.Mock()
+ mock_create_job.return_value = make_job()
+ mock_get_pods.return_value = [make_pod()]
+ mock_link.persist.side_effect = lambda **_: order.append("persist")
+ kubernetes_package_name = "apache-airflow-providers-cncf-kubernetes"
+ mock_providers_manager.return_value.providers = {
+ kubernetes_package_name: mock.MagicMock(
+ data={"package-name": kubernetes_package_name},
+ version="8.0.2",
+ )
+ }
+
+ with mock.patch.object(
+ self.operator,
+ "execute_deferrable",
+ side_effect=lambda: order.append("defer"),
+ ) as mock_execute_deferrable:
+ self.operator.execute(context=context)
+
+ assert order == ["persist", "defer"]
+ mock_execute_deferrable.assert_called_once_with()
+
@mock.patch(GKE_OPERATORS_PATH.format("GKEStartJobOperator.defer"))
@mock.patch(GKE_OPERATORS_PATH.format("GKEClusterAuthDetails.fetch_cluster_info"))
@mock.patch(GKE_OPERATORS_PATH.format("GKEHook"))
@@ -1105,6 +1389,80 @@ class TestGKEStartJobOperator:
)
+class TestGKEStartOperatorExtraLinks:
+ @pytest.mark.db_test
+ def test_start_pod_operator_get_link(
+ self, dag_maker, create_task_instance_of_operator, session,
mock_supervisor_comms
+ ):
+ ti = create_task_instance_of_operator(
+ GKEStartPodOperator,
+ dag_id="test_gke_start_pod_link_dag",
+ task_id=TEST_TASK_ID,
+ project_id=TEST_PROJECT_ID,
+ location=TEST_LOCATION,
+ cluster_name=GKE_CLUSTER_NAME,
+ name=K8S_POD_NAME,
+ namespace=K8S_NAMESPACE,
+ image=TEST_IMAGE,
+ )
+ task = dag_maker.dag.get_task(ti.task_id)
+ link_payload = {
+ "project_id": TEST_PROJECT_ID,
+ "location": TEST_LOCATION,
+ "cluster_name": GKE_CLUSTER_NAME,
+ "namespace": K8S_NAMESPACE,
+ "pod_name": K8S_POD_NAME,
+ }
+
+ if AIRFLOW_V_3_0_PLUS and mock_supervisor_comms:
+ mock_supervisor_comms.send.return_value = XComResult(
+ key=KubernetesEnginePodLink.key,
+ value=link_payload,
+ )
+
+ ti.xcom_push(key=KubernetesEnginePodLink.key, value=link_payload)
+
+ assert task.operator_extra_links[0].get_link(operator=task,
ti_key=ti.key) == BASE_LINK + (
+ KUBERNETES_POD_LINK.format(**link_payload)
+ )
+
+ @pytest.mark.db_test
+ def test_start_job_operator_get_link(
+ self, dag_maker, create_task_instance_of_operator, session,
mock_supervisor_comms
+ ):
+ ti = create_task_instance_of_operator(
+ GKEStartJobOperator,
+ dag_id="test_gke_start_job_link_dag",
+ task_id=TEST_TASK_ID,
+ project_id=TEST_PROJECT_ID,
+ location=TEST_LOCATION,
+ cluster_name=GKE_CLUSTER_NAME,
+ name=K8S_JOB_NAME,
+ namespace=K8S_NAMESPACE,
+ image=TEST_IMAGE,
+ )
+ task = dag_maker.dag.get_task(ti.task_id)
+ link_payload = {
+ "project_id": TEST_PROJECT_ID,
+ "location": TEST_LOCATION,
+ "cluster_name": GKE_CLUSTER_NAME,
+ "namespace": K8S_NAMESPACE,
+ "job_name": K8S_JOB_NAME,
+ }
+
+ if AIRFLOW_V_3_0_PLUS and mock_supervisor_comms:
+ mock_supervisor_comms.send.return_value = XComResult(
+ key=KubernetesEngineJobLink.key,
+ value=link_payload,
+ )
+
+ ti.xcom_push(key=KubernetesEngineJobLink.key, value=link_payload)
+
+ assert task.operator_extra_links[0].get_link(operator=task,
ti_key=ti.key) == BASE_LINK + (
+ KUBERNETES_JOB_LINK.format(**link_payload)
+ )
+
+
class TestGKEDescribeJobOperator:
def setup_method(self):
self.operator = GKEDescribeJobOperator(