This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6073ddb1a33 Fix broken GKEStartPodOperator extra link (#66657)
6073ddb1a33 is described below

commit 6073ddb1a33d4d86630bc76951188a2a9dadb4b7
Author: deepinsight coder <[email protected]>
AuthorDate: Sun May 10 15:15:08 2026 -0700

    Fix broken GKEStartPodOperator extra link (#66657)
    
    * Fix broken extra link in GKEStartPodOperator
    
    * Fix GKE start operator extra links for pods and jobs
    
    * fix(mypy): suppress attr-defined on mixin self.log call
    
    GKEOperatorMixin is a bare mixin; `self.log` resolves at runtime via
    the BaseOperator that the operator class actually inherits from, but
    mypy can't see that. Apply the same `# type: ignore[attr-defined]`
    pattern already used on every other self.<attr> access in this class.
    
    ---------
    
    Co-authored-by: Jarek Potiuk <[email protected]>
---
 .../google/cloud/operators/kubernetes_engine.py    |  84 ++++-
 .../cloud/operators/test_kubernetes_engine.py      | 358 +++++++++++++++++++++
 2 files changed, 430 insertions(+), 12 deletions(-)

diff --git 
a/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py
 
b/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py
index 1538885e4e0..49c80445af9 100644
--- 
a/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py
+++ 
b/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py
@@ -182,6 +182,20 @@ class GKEOperatorMixin:
     def ssl_ca_cert(self) -> str:
         return self.cluster_info[1]
 
+    def _get_resource_name_and_namespace(
+        self, *, resource: Any, resource_type: str
+    ) -> tuple[str, str] | None:
+        metadata = getattr(resource, "metadata", None)
+        resource_name = getattr(metadata, "name", None)
+        namespace = getattr(metadata, "namespace", None)
+        if not resource_name or not namespace:
+            self.log.debug(  # type: ignore[attr-defined]
+                "Skipping Kubernetes %s extra link persistence because 
metadata is incomplete.",
+                resource_type,
+            )
+            return None
+        return resource_name, namespace
+
 
 class GKEDeleteClusterOperator(GKEOperatorMixin, GoogleCloudBaseOperator):
     """
@@ -757,6 +771,26 @@ class GKEStartPodOperator(GKEOperatorMixin, 
KubernetesPodOperator):
         if self.config_file:
             raise AirflowException("config_file is not an allowed parameter 
for the GKEStartPodOperator.")
 
+    def _persist_pod_link(self, *, context: Context, pod: k8s.V1Pod | None) -> 
None:
+        metadata = self._get_resource_name_and_namespace(resource=pod, 
resource_type="Pod")
+        if metadata is None:
+            return
+
+        pod_name, namespace = metadata
+        KubernetesEnginePodLink.persist(
+            context=context,
+            project_id=self.project_id,
+            location=self.location,
+            cluster_name=self.cluster_name,
+            namespace=namespace,
+            pod_name=pod_name,
+        )
+
+    def get_or_create_pod(self, pod_request_obj: k8s.V1Pod, context: Context) 
-> k8s.V1Pod:
+        pod = super().get_or_create_pod(pod_request_obj=pod_request_obj, 
context=context)
+        self._persist_pod_link(context=context, pod=pod)
+        return pod
+
     def invoke_defer_method(
         self, last_log_time: DateTime | None = None, context: Context | None = 
None
     ) -> None:
@@ -862,26 +896,52 @@ class GKEStartJobOperator(GKEOperatorMixin, 
KubernetesJobOperator):
         self.use_internal_ip = use_internal_ip
         self.use_dns_endpoint = use_dns_endpoint
         self.impersonation_chain = impersonation_chain
+        self._job_link_context: Context | None = None
 
         # There is no need to manage the kube_config file, as it will be 
generated automatically.
         # All Kubernetes parameters (except config_file) are also valid for 
the GKEStartJobOperator.
         if self.config_file:
             raise AirflowException("config_file is not an allowed parameter 
for the GKEStartJobOperator.")
 
+    def _persist_job_link(self, *, context: Context, job: k8s.V1Job | None) -> 
None:
+        metadata = self._get_resource_name_and_namespace(resource=job, 
resource_type="Job")
+        if metadata is None:
+            return
+
+        job_name, namespace = metadata
+        KubernetesEngineJobLink.persist(
+            context=context,
+            project_id=self.project_id,
+            location=self.location,
+            cluster_name=self.cluster_name,
+            namespace=namespace,
+            job_name=job_name,
+        )
+
+    def create_job(self, job_request_obj: k8s.V1Job) -> k8s.V1Job:
+        job = super().create_job(job_request_obj=job_request_obj)
+        if self._job_link_context is not None:
+            self._persist_job_link(context=self._job_link_context, job=job)
+        return job
+
     def execute(self, context: Context):
         """Execute process of creating Job."""
-        if self.deferrable:
-            kubernetes_provider = 
ProvidersManager().providers["apache-airflow-providers-cncf-kubernetes"]
-            kubernetes_provider_name = kubernetes_provider.data["package-name"]
-            kubernetes_provider_version = kubernetes_provider.version
-            min_version = "8.0.1"
-            if parse_version(kubernetes_provider_version) <= 
parse_version(min_version):
-                raise AirflowException(
-                    "You are trying to use `GKEStartJobOperator` in deferrable 
mode with the provider "
-                    f"package 
{kubernetes_provider_name}=={kubernetes_provider_version} which doesn't "
-                    f"support this feature. Please upgrade it to version 
higher than {min_version}."
-                )
-        return super().execute(context)
+        self._job_link_context = context
+        try:
+            if self.deferrable:
+                kubernetes_provider = 
ProvidersManager().providers["apache-airflow-providers-cncf-kubernetes"]
+                kubernetes_provider_name = 
kubernetes_provider.data["package-name"]
+                kubernetes_provider_version = kubernetes_provider.version
+                min_version = "8.0.1"
+                if parse_version(kubernetes_provider_version) <= 
parse_version(min_version):
+                    raise AirflowException(
+                        "You are trying to use `GKEStartJobOperator` in 
deferrable mode with the provider "
+                        f"package 
{kubernetes_provider_name}=={kubernetes_provider_version} which doesn't "
+                        f"support this feature. Please upgrade it to version 
higher than {min_version}."
+                    )
+            return super().execute(context)
+        finally:
+            self._job_link_context = None
 
     def execute_deferrable(self):
         self.defer(
diff --git 
a/providers/google/tests/unit/google/cloud/operators/test_kubernetes_engine.py 
b/providers/google/tests/unit/google/cloud/operators/test_kubernetes_engine.py
index 6453e13376e..80840ba619f 100644
--- 
a/providers/google/tests/unit/google/cloud/operators/test_kubernetes_engine.py
+++ 
b/providers/google/tests/unit/google/cloud/operators/test_kubernetes_engine.py
@@ -24,6 +24,7 @@ from unittest.mock import PropertyMock, call
 import pytest
 from google.api_core.exceptions import AlreadyExists, FailedPrecondition, 
PermissionDenied
 from google.cloud.container_v1.types import Cluster, NodePool
+from kubernetes.client import models as k8s
 
 from airflow.exceptions import AirflowProviderDeprecationWarning
 from airflow.providers.cncf.kubernetes.operators.job import (
@@ -41,6 +42,13 @@ from airflow.providers.cncf.kubernetes.operators.resource 
import (
 )
 from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction
 from airflow.providers.common.compat.sdk import AirflowException
+from airflow.providers.google.cloud.links.base import BASE_LINK
+from airflow.providers.google.cloud.links.kubernetes_engine import (
+    KUBERNETES_JOB_LINK,
+    KUBERNETES_POD_LINK,
+    KubernetesEngineJobLink,
+    KubernetesEnginePodLink,
+)
 from airflow.providers.google.cloud.operators.kubernetes_engine import (
     GKEClusterAuthDetails,
     GKECreateClusterOperator,
@@ -59,6 +67,11 @@ from 
airflow.providers.google.cloud.operators.kubernetes_engine import (
     GKESuspendJobOperator,
 )
 
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
+
+if AIRFLOW_V_3_0_PLUS:
+    from airflow.sdk.execution_time.comms import XComResult
+
 pytestmark = 
pytest.mark.filterwarnings("error::airflow.exceptions.AirflowProviderDeprecationWarning")
 
 TEST_PROJECT_ID = "test-id"
@@ -100,6 +113,28 @@ K8S_NAMESPACE = "default"
 GKE_OPERATORS_PATH = 
"airflow.providers.google.cloud.operators.kubernetes_engine.{}"
 
 
+def make_pod(
+    *,
+    name: str | None = K8S_POD_NAME,
+    namespace: str | None = K8S_NAMESPACE,
+    metadata: k8s.V1ObjectMeta | None = None,
+) -> k8s.V1Pod:
+    if metadata is None:
+        metadata = k8s.V1ObjectMeta(name=name, namespace=namespace)
+    return k8s.V1Pod(metadata=metadata)
+
+
+def make_job(
+    *,
+    name: str | None = K8S_JOB_NAME,
+    namespace: str | None = K8S_NAMESPACE,
+    metadata: k8s.V1ObjectMeta | None = None,
+) -> k8s.V1Job:
+    if metadata is None:
+        metadata = k8s.V1ObjectMeta(name=name, namespace=namespace)
+    return k8s.V1Job(metadata=metadata)
+
+
 class TestGKEClusterAuthDetails:
     @pytest.mark.parametrize(
         (
@@ -865,6 +900,129 @@ class TestGKEStartPodOperator:
             for expected_attr in expected_attributes:
                 assert op.__getattribute__(expected_attr) == 
expected_attributes[expected_attr]
 
+    @mock.patch(GKE_OPERATORS_PATH.format("KubernetesEnginePodLink"))
+    
@mock.patch(GKE_OPERATORS_PATH.format("KubernetesPodOperator.get_or_create_pod"))
+    def test_get_or_create_pod_persists_link(self, mock_get_or_create_pod, 
mock_link):
+        pod = make_pod()
+        mock_get_or_create_pod.return_value = pod
+        context = {"ti": mock.Mock()}
+
+        result = self.operator.get_or_create_pod(pod_request_obj=make_pod(), 
context=context)
+
+        assert result == pod
+        mock_link.persist.assert_called_once_with(
+            context=context,
+            project_id=TEST_PROJECT_ID,
+            location=TEST_LOCATION,
+            cluster_name=GKE_CLUSTER_NAME,
+            namespace=K8S_NAMESPACE,
+            pod_name=K8S_POD_NAME,
+        )
+
+    @pytest.mark.parametrize(
+        "pod",
+        [
+            k8s.V1Pod(),
+            make_pod(metadata=k8s.V1ObjectMeta(name=None, 
namespace=K8S_NAMESPACE)),
+            make_pod(metadata=k8s.V1ObjectMeta(name=K8S_POD_NAME, 
namespace=None)),
+        ],
+    )
+    @mock.patch(GKE_OPERATORS_PATH.format("KubernetesEnginePodLink"))
+    
@mock.patch(GKE_OPERATORS_PATH.format("KubernetesPodOperator.get_or_create_pod"))
+    def test_get_or_create_pod_skips_link_when_metadata_is_incomplete(
+        self, mock_get_or_create_pod, mock_link, pod
+    ):
+        mock_get_or_create_pod.return_value = pod
+
+        result = self.operator.get_or_create_pod(pod_request_obj=make_pod(), 
context={"ti": mock.Mock()})
+
+        assert result == pod
+        mock_link.persist.assert_not_called()
+
+    
@mock.patch(GKE_OPERATORS_PATH.format("GKEStartPodOperator.post_complete_action"))
+    
@mock.patch(GKE_OPERATORS_PATH.format("GKEStartPodOperator.is_istio_enabled"), 
return_value=False)
+    
@mock.patch(GKE_OPERATORS_PATH.format("GKEStartPodOperator.await_pod_completion"))
+    
@mock.patch(GKE_OPERATORS_PATH.format("GKEStartPodOperator.await_pod_start"))
+    
@mock.patch(GKE_OPERATORS_PATH.format("GKEStartPodOperator.await_init_containers_completion"))
+    @mock.patch(GKE_OPERATORS_PATH.format("GKEStartPodOperator.find_pod"))
+    @mock.patch(GKE_OPERATORS_PATH.format("KubernetesEnginePodLink"))
+    
@mock.patch(GKE_OPERATORS_PATH.format("KubernetesPodOperator.get_or_create_pod"))
+    
@mock.patch(GKE_OPERATORS_PATH.format("GKEStartPodOperator.build_pod_request_obj"))
+    def test_execute_sync_persists_link_when_follow_up_step_fails(
+        self,
+        mock_build_pod_request_obj,
+        mock_get_or_create_pod,
+        mock_link,
+        mock_find_pod,
+        mock_await_init_containers_completion,
+        mock_await_pod_start,
+        mock_await_pod_completion,
+        mock_is_istio_enabled,
+        mock_post_complete_action,
+    ):
+        pod = make_pod()
+        context = {"ti": mock.Mock()}
+        self.operator.pod_manager = mock.Mock()
+        self.operator.pod_manager.await_pod_completion.return_value = pod
+        mock_build_pod_request_obj.return_value = make_pod()
+        mock_get_or_create_pod.return_value = pod
+        mock_find_pod.return_value = pod
+        mock_await_init_containers_completion.side_effect = 
AirflowException("pod startup failed")
+
+        with pytest.raises(AirflowException, match="pod startup failed"):
+            self.operator.execute_sync(context=context)
+
+        mock_link.persist.assert_called_once_with(
+            context=context,
+            project_id=TEST_PROJECT_ID,
+            location=TEST_LOCATION,
+            cluster_name=GKE_CLUSTER_NAME,
+            namespace=K8S_NAMESPACE,
+            pod_name=K8S_POD_NAME,
+        )
+        mock_post_complete_action.assert_called_once()
+
+    
@mock.patch(GKE_OPERATORS_PATH.format("GKEStartPodOperator.post_complete_action"))
+    @mock.patch(GKE_OPERATORS_PATH.format("KubernetesEnginePodLink"))
+    
@mock.patch(GKE_OPERATORS_PATH.format("KubernetesPodOperator.get_or_create_pod"))
+    
@mock.patch(GKE_OPERATORS_PATH.format("GKEStartPodOperator.build_pod_request_obj"))
+    def test_execute_sync_does_not_persist_link_when_pod_creation_fails(
+        self, mock_build_pod_request_obj, mock_get_or_create_pod, mock_link, 
mock_post_complete_action
+    ):
+        context = {"ti": mock.Mock()}
+        mock_build_pod_request_obj.return_value = make_pod()
+        mock_get_or_create_pod.side_effect = AirflowException("pod creation 
failed")
+
+        with pytest.raises(AirflowException, match="pod creation failed"):
+            self.operator.execute_sync(context=context)
+
+        mock_link.persist.assert_not_called()
+        mock_post_complete_action.assert_called_once()
+
+    @mock.patch(GKE_OPERATORS_PATH.format("KubernetesEnginePodLink"))
+    
@mock.patch(GKE_OPERATORS_PATH.format("KubernetesPodOperator.get_or_create_pod"))
+    
@mock.patch(GKE_OPERATORS_PATH.format("GKEStartPodOperator.build_pod_request_obj"))
+    def test_execute_async_persists_link_before_deferral(
+        self, mock_build_pod_request_obj, mock_get_or_create_pod, mock_link
+    ):
+        order: list[str] = []
+        pod = make_pod()
+        context = {"ti": mock.Mock()}
+        mock_build_pod_request_obj.return_value = make_pod()
+        mock_get_or_create_pod.return_value = pod
+        mock_link.persist.side_effect = lambda **_: order.append("persist")
+
+        with mock.patch.object(
+            GKEStartPodOperator,
+            "invoke_defer_method",
+            autospec=True,
+            side_effect=lambda _operator, last_log_time=None, context=None: 
order.append("defer"),
+        ) as mock_invoke_defer_method:
+            self.operator.execute_async(context=context)
+
+        assert order == ["persist", "defer"]
+        mock_invoke_defer_method.assert_called_once_with(self.operator, 
context=context)
+
     @mock.patch(GKE_OPERATORS_PATH.format("GKEStartPodOperator.defer"))
     
@mock.patch(GKE_OPERATORS_PATH.format("GKEClusterAuthDetails.fetch_cluster_info"))
     @mock.patch(GKE_OPERATORS_PATH.format("GKEHook"))
@@ -998,6 +1156,132 @@ class TestGKEStartJobOperator:
         mock_super.assert_not_called()
         mock_super.return_value.execute.assert_not_called()
 
+    @mock.patch(GKE_OPERATORS_PATH.format("KubernetesEngineJobLink"))
+    @mock.patch(GKE_OPERATORS_PATH.format("KubernetesJobOperator.create_job"))
+    
@mock.patch(GKE_OPERATORS_PATH.format("GKEStartJobOperator.build_job_request_obj"))
+    def test_execute_persists_link(self, mock_build_job_request_obj, 
mock_create_job, mock_link):
+        context = {"ti": mock.Mock()}
+        mock_build_job_request_obj.return_value = mock.Mock()
+        mock_create_job.return_value = make_job()
+
+        self.operator.execute(context=context)
+
+        mock_link.persist.assert_called_once_with(
+            context=context,
+            project_id=TEST_PROJECT_ID,
+            location=TEST_LOCATION,
+            cluster_name=GKE_CLUSTER_NAME,
+            namespace=K8S_NAMESPACE,
+            job_name=K8S_JOB_NAME,
+        )
+
+    @mock.patch(GKE_OPERATORS_PATH.format("KubernetesEngineJobLink"))
+    @mock.patch(GKE_OPERATORS_PATH.format("KubernetesJobOperator.get_pods"))
+    @mock.patch(GKE_OPERATORS_PATH.format("KubernetesJobOperator.create_job"))
+    
@mock.patch(GKE_OPERATORS_PATH.format("GKEStartJobOperator.build_job_request_obj"))
+    def test_execute_persists_link_when_follow_up_step_fails(
+        self,
+        mock_build_job_request_obj,
+        mock_create_job,
+        mock_get_pods,
+        mock_link,
+    ):
+        context = {"ti": mock.Mock()}
+        self.operator.wait_until_job_complete = True
+        mock_build_job_request_obj.return_value = mock.Mock()
+        mock_create_job.return_value = make_job()
+        mock_get_pods.side_effect = AirflowException("pod discovery failed")
+
+        with pytest.raises(AirflowException, match="pod discovery failed"):
+            self.operator.execute(context=context)
+
+        mock_link.persist.assert_called_once_with(
+            context=context,
+            project_id=TEST_PROJECT_ID,
+            location=TEST_LOCATION,
+            cluster_name=GKE_CLUSTER_NAME,
+            namespace=K8S_NAMESPACE,
+            job_name=K8S_JOB_NAME,
+        )
+
+    @mock.patch(GKE_OPERATORS_PATH.format("KubernetesEngineJobLink"))
+    @mock.patch(GKE_OPERATORS_PATH.format("KubernetesJobOperator.create_job"))
+    
@mock.patch(GKE_OPERATORS_PATH.format("GKEStartJobOperator.build_job_request_obj"))
+    def test_execute_does_not_persist_link_when_job_creation_fails(
+        self,
+        mock_build_job_request_obj,
+        mock_create_job,
+        mock_link,
+    ):
+        context = {"ti": mock.Mock()}
+        mock_build_job_request_obj.return_value = mock.Mock()
+        mock_create_job.side_effect = AirflowException("job creation failed")
+
+        with pytest.raises(AirflowException, match="job creation failed"):
+            self.operator.execute(context=context)
+
+        mock_link.persist.assert_not_called()
+
+    @pytest.mark.parametrize(
+        "job",
+        [
+            k8s.V1Job(),
+            make_job(metadata=k8s.V1ObjectMeta(name=None, 
namespace=K8S_NAMESPACE)),
+            make_job(metadata=k8s.V1ObjectMeta(name=K8S_JOB_NAME, 
namespace=None)),
+        ],
+    )
+    @mock.patch(GKE_OPERATORS_PATH.format("KubernetesEngineJobLink"))
+    @mock.patch(GKE_OPERATORS_PATH.format("KubernetesJobOperator.create_job"))
+    def test_create_job_skips_link_when_metadata_is_incomplete(self, 
mock_create_job, mock_link, job):
+        context = {"ti": mock.Mock()}
+        mock_create_job.return_value = job
+        self.operator._job_link_context = context
+
+        result = self.operator.create_job(job_request_obj=mock.Mock())
+
+        assert result == job
+        mock_link.persist.assert_not_called()
+
+    @mock.patch(GKE_OPERATORS_PATH.format("ProvidersManager"))
+    @mock.patch(GKE_OPERATORS_PATH.format("KubernetesEngineJobLink"))
+    @mock.patch(GKE_OPERATORS_PATH.format("KubernetesJobOperator.get_pods"))
+    @mock.patch(GKE_OPERATORS_PATH.format("KubernetesJobOperator.create_job"))
+    
@mock.patch(GKE_OPERATORS_PATH.format("GKEStartJobOperator.build_job_request_obj"))
+    def test_execute_deferrable_persists_link_before_deferral(
+        self,
+        mock_build_job_request_obj,
+        mock_create_job,
+        mock_get_pods,
+        mock_link,
+        mock_providers_manager,
+    ):
+        order: list[str] = []
+        context = {"ti": mock.Mock()}
+        self.operator.wait_until_job_complete = True
+        self.operator.deferrable = True
+        self.operator.pod_request_obj = make_pod()
+        mock_build_job_request_obj.return_value = mock.Mock()
+        mock_create_job.return_value = make_job()
+        mock_get_pods.return_value = [make_pod()]
+        mock_link.persist.side_effect = lambda **_: order.append("persist")
+        kubernetes_package_name = "apache-airflow-providers-cncf-kubernetes"
+        mock_providers_manager.return_value.providers = {
+            kubernetes_package_name: mock.MagicMock(
+                data={"package-name": kubernetes_package_name},
+                version="8.0.2",
+            )
+        }
+
+        with mock.patch.object(
+            self.operator,
+            "execute_deferrable",
+            side_effect=lambda: order.append("defer"),
+        ) as mock_execute_deferrable:
+            self.operator.execute(context=context)
+
+        assert order == ["persist", "defer"]
+        mock_execute_deferrable.assert_called_once_with()
+
     @mock.patch(GKE_OPERATORS_PATH.format("GKEStartJobOperator.defer"))
     
@mock.patch(GKE_OPERATORS_PATH.format("GKEClusterAuthDetails.fetch_cluster_info"))
     @mock.patch(GKE_OPERATORS_PATH.format("GKEHook"))
@@ -1105,6 +1389,80 @@ class TestGKEStartJobOperator:
         )
 
 
+class TestGKEStartOperatorExtraLinks:
+    @pytest.mark.db_test
+    def test_start_pod_operator_get_link(
+        self, dag_maker, create_task_instance_of_operator, session, 
mock_supervisor_comms
+    ):
+        ti = create_task_instance_of_operator(
+            GKEStartPodOperator,
+            dag_id="test_gke_start_pod_link_dag",
+            task_id=TEST_TASK_ID,
+            project_id=TEST_PROJECT_ID,
+            location=TEST_LOCATION,
+            cluster_name=GKE_CLUSTER_NAME,
+            name=K8S_POD_NAME,
+            namespace=K8S_NAMESPACE,
+            image=TEST_IMAGE,
+        )
+        task = dag_maker.dag.get_task(ti.task_id)
+        link_payload = {
+            "project_id": TEST_PROJECT_ID,
+            "location": TEST_LOCATION,
+            "cluster_name": GKE_CLUSTER_NAME,
+            "namespace": K8S_NAMESPACE,
+            "pod_name": K8S_POD_NAME,
+        }
+
+        if AIRFLOW_V_3_0_PLUS and mock_supervisor_comms:
+            mock_supervisor_comms.send.return_value = XComResult(
+                key=KubernetesEnginePodLink.key,
+                value=link_payload,
+            )
+
+        ti.xcom_push(key=KubernetesEnginePodLink.key, value=link_payload)
+
+        assert task.operator_extra_links[0].get_link(operator=task, 
ti_key=ti.key) == BASE_LINK + (
+            KUBERNETES_POD_LINK.format(**link_payload)
+        )
+
+    @pytest.mark.db_test
+    def test_start_job_operator_get_link(
+        self, dag_maker, create_task_instance_of_operator, session, 
mock_supervisor_comms
+    ):
+        ti = create_task_instance_of_operator(
+            GKEStartJobOperator,
+            dag_id="test_gke_start_job_link_dag",
+            task_id=TEST_TASK_ID,
+            project_id=TEST_PROJECT_ID,
+            location=TEST_LOCATION,
+            cluster_name=GKE_CLUSTER_NAME,
+            name=K8S_JOB_NAME,
+            namespace=K8S_NAMESPACE,
+            image=TEST_IMAGE,
+        )
+        task = dag_maker.dag.get_task(ti.task_id)
+        link_payload = {
+            "project_id": TEST_PROJECT_ID,
+            "location": TEST_LOCATION,
+            "cluster_name": GKE_CLUSTER_NAME,
+            "namespace": K8S_NAMESPACE,
+            "job_name": K8S_JOB_NAME,
+        }
+
+        if AIRFLOW_V_3_0_PLUS and mock_supervisor_comms:
+            mock_supervisor_comms.send.return_value = XComResult(
+                key=KubernetesEngineJobLink.key,
+                value=link_payload,
+            )
+
+        ti.xcom_push(key=KubernetesEngineJobLink.key, value=link_payload)
+
+        assert task.operator_extra_links[0].get_link(operator=task, 
ti_key=ti.key) == BASE_LINK + (
+            KUBERNETES_JOB_LINK.format(**link_payload)
+        )
+
+
 class TestGKEDescribeJobOperator:
     def setup_method(self):
         self.operator = GKEDescribeJobOperator(

Reply via email to