This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 62b845e2d7 Disconnect GKE operators from deprecated hooks (#39434)
62b845e2d7 is described below

commit 62b845e2d731e94b3bcbf43ea60ad78fd8bb94f1
Author: max <[email protected]>
AuthorDate: Wed May 8 08:58:28 2024 +0000

    Disconnect GKE operators from deprecated hooks (#39434)
    
    * Disconnect GKE operators from deprecated hooks
    
    * Remove GKE unit tests from deprecation ignore list
---
 .../google/cloud/operators/kubernetes_engine.py    | 47 ++++++++++------------
 .../google/cloud/triggers/kubernetes_engine.py     |  5 +--
 tests/deprecations_ignore.yml                      | 11 -----
 .../cloud/operators/test_kubernetes_engine.py      | 22 +++++-----
 4 files changed, 33 insertions(+), 52 deletions(-)

diff --git a/airflow/providers/google/cloud/operators/kubernetes_engine.py 
b/airflow/providers/google/cloud/operators/kubernetes_engine.py
index f876622428..0a28809dd1 100644
--- a/airflow/providers/google/cloud/operators/kubernetes_engine.py
+++ b/airflow/providers/google/cloud/operators/kubernetes_engine.py
@@ -42,12 +42,8 @@ from airflow.providers.cncf.kubernetes.operators.resource 
import (
 )
 from airflow.providers.cncf.kubernetes.utils.pod_manager import OnFinishAction
 from airflow.providers.google.cloud.hooks.kubernetes_engine import (
-    GKECustomResourceHook,
-    GKEDeploymentHook,
     GKEHook,
-    GKEJobHook,
     GKEKubernetesHook,
-    GKEPodHook,
 )
 from airflow.providers.google.cloud.links.kubernetes_engine import (
     KubernetesEngineClusterLink,
@@ -533,13 +529,13 @@ class 
GKEStartKueueInsideClusterOperator(GoogleCloudBaseOperator):
         )
 
     @cached_property
-    def deployment_hook(self) -> GKEDeploymentHook:
+    def deployment_hook(self) -> GKEKubernetesHook:
         if self._cluster_url is None or self._ssl_ca_cert is None:
             raise AttributeError(
-                "Cluster url and ssl_ca_cert should be defined before using 
self.hook method. "
+                "Cluster url and ssl_ca_cert should be defined before using 
self.deployment_hook method. "
                 "Try to use self.get_kube_creds method",
             )
-        return GKEDeploymentHook(
+        return GKEKubernetesHook(
             gcp_conn_id=self.gcp_conn_id,
             impersonation_chain=self.impersonation_chain,
             cluster_url=self._cluster_url,
@@ -547,13 +543,14 @@ class 
GKEStartKueueInsideClusterOperator(GoogleCloudBaseOperator):
         )
 
     @cached_property
-    def pod_hook(self) -> GKEPodHook:
+    def pod_hook(self) -> GKEKubernetesHook:
         if self._cluster_url is None or self._ssl_ca_cert is None:
             raise AttributeError(
-                "Cluster url and ssl_ca_cert should be defined before using 
self.hook method. "
+                "Cluster url and ssl_ca_cert should be defined before using 
self.pod_hook method. "
                 "Try to use self.get_kube_creds method",
             )
-        return GKEPodHook(
+
+        return GKEKubernetesHook(
             gcp_conn_id=self.gcp_conn_id,
             impersonation_chain=self.impersonation_chain,
             cluster_url=self._cluster_url,
@@ -742,21 +739,20 @@ class GKEStartPodOperator(KubernetesPodOperator):
         )
 
     @cached_property
-    def hook(self) -> GKEPodHook:
+    def hook(self) -> GKEKubernetesHook:
         if self._cluster_url is None or self._ssl_ca_cert is None:
             raise AttributeError(
                 "Cluster url and ssl_ca_cert should be defined before using 
self.hook method. "
                 "Try to use self.get_kube_creds method",
             )
 
-        hook = GKEPodHook(
+        return GKEKubernetesHook(
             gcp_conn_id=self.gcp_conn_id,
             cluster_url=self._cluster_url,
             ssl_ca_cert=self._ssl_ca_cert,
             impersonation_chain=self.impersonation_chain,
             enable_tcp_keepalive=True,
         )
-        return hook
 
     def execute(self, context: Context):
         """Execute process of creating pod and executing provided command 
inside it."""
@@ -901,19 +897,18 @@ class GKEStartJobOperator(KubernetesJobOperator):
         )
 
     @cached_property
-    def hook(self) -> GKEJobHook:
+    def hook(self) -> GKEKubernetesHook:
         if self._cluster_url is None or self._ssl_ca_cert is None:
             raise AttributeError(
                 "Cluster url and ssl_ca_cert should be defined before using 
self.hook method. "
                 "Try to use self.get_kube_creds method",
             )
 
-        hook = GKEJobHook(
+        return GKEKubernetesHook(
             gcp_conn_id=self.gcp_conn_id,
             cluster_url=self._cluster_url,
             ssl_ca_cert=self._ssl_ca_cert,
         )
-        return hook
 
     def execute(self, context: Context):
         """Execute process of creating Job."""
@@ -1027,7 +1022,7 @@ class GKEDescribeJobOperator(GoogleCloudBaseOperator):
         )
 
     @cached_property
-    def hook(self) -> GKEJobHook:
+    def hook(self) -> GKEKubernetesHook:
         self._cluster_url, self._ssl_ca_cert = GKEClusterAuthDetails(
             cluster_name=self.cluster_name,
             project_id=self.project_id,
@@ -1035,7 +1030,7 @@ class GKEDescribeJobOperator(GoogleCloudBaseOperator):
             cluster_hook=self.cluster_hook,
         ).fetch_cluster_info()
 
-        return GKEJobHook(
+        return GKEKubernetesHook(
             gcp_conn_id=self.gcp_conn_id,
             cluster_url=self._cluster_url,
             ssl_ca_cert=self._ssl_ca_cert,
@@ -1128,7 +1123,7 @@ class GKEListJobsOperator(GoogleCloudBaseOperator):
         )
 
     @cached_property
-    def hook(self) -> GKEJobHook:
+    def hook(self) -> GKEKubernetesHook:
         self._cluster_url, self._ssl_ca_cert = GKEClusterAuthDetails(
             cluster_name=self.cluster_name,
             project_id=self.project_id,
@@ -1136,7 +1131,7 @@ class GKEListJobsOperator(GoogleCloudBaseOperator):
             cluster_hook=self.cluster_hook,
         ).fetch_cluster_info()
 
-        return GKEJobHook(
+        return GKEKubernetesHook(
             gcp_conn_id=self.gcp_conn_id,
             cluster_url=self._cluster_url,
             ssl_ca_cert=self._ssl_ca_cert,
@@ -1234,13 +1229,13 @@ class 
GKECreateCustomResourceOperator(KubernetesCreateResourceOperator):
         )
 
     @cached_property
-    def hook(self) -> GKECustomResourceHook:
+    def hook(self) -> GKEKubernetesHook:
         if self._cluster_url is None or self._ssl_ca_cert is None:
             raise AttributeError(
                 "Cluster url and ssl_ca_cert should be defined before using 
self.hook method. "
                 "Try to use self.get_kube_creds method",
             )
-        return GKECustomResourceHook(
+        return GKEKubernetesHook(
             gcp_conn_id=self.gcp_conn_id,
             cluster_url=self._cluster_url,
             ssl_ca_cert=self._ssl_ca_cert,
@@ -1336,13 +1331,13 @@ class 
GKEDeleteCustomResourceOperator(KubernetesDeleteResourceOperator):
         )
 
     @cached_property
-    def hook(self) -> GKECustomResourceHook:
+    def hook(self) -> GKEKubernetesHook:
         if self._cluster_url is None or self._ssl_ca_cert is None:
             raise AttributeError(
                 "Cluster url and ssl_ca_cert should be defined before using 
self.hook method. "
                 "Try to use self.get_kube_creds method",
             )
-        return GKECustomResourceHook(
+        return GKEKubernetesHook(
             gcp_conn_id=self.gcp_conn_id,
             cluster_url=self._cluster_url,
             ssl_ca_cert=self._ssl_ca_cert,
@@ -1475,14 +1470,14 @@ class GKEDeleteJobOperator(KubernetesDeleteJobOperator):
         )
 
     @cached_property
-    def hook(self) -> GKEJobHook:
+    def hook(self) -> GKEKubernetesHook:
         if self._cluster_url is None or self._ssl_ca_cert is None:
             raise AttributeError(
                 "Cluster url and ssl_ca_cert should be defined before using 
self.hook method. "
                 "Try to use self.get_kube_creds method",
             )
 
-        return GKEJobHook(
+        return GKEKubernetesHook(
             gcp_conn_id=self.gcp_conn_id,
             cluster_url=self._cluster_url,
             ssl_ca_cert=self._ssl_ca_cert,
diff --git a/airflow/providers/google/cloud/triggers/kubernetes_engine.py 
b/airflow/providers/google/cloud/triggers/kubernetes_engine.py
index c0d8fef97a..8557bea082 100644
--- a/airflow/providers/google/cloud/triggers/kubernetes_engine.py
+++ b/airflow/providers/google/cloud/triggers/kubernetes_engine.py
@@ -30,7 +30,6 @@ from airflow.providers.cncf.kubernetes.utils.pod_manager 
import OnFinishAction
 from airflow.providers.google.cloud.hooks.kubernetes_engine import (
     GKEAsyncHook,
     GKEKubernetesAsyncHook,
-    GKEPodAsyncHook,
 )
 from airflow.triggers.base import BaseTrigger, TriggerEvent
 
@@ -147,8 +146,8 @@ class GKEStartPodTrigger(KubernetesPodTrigger):
         )
 
     @cached_property
-    def hook(self) -> GKEPodAsyncHook:  # type: ignore[override]
-        return GKEPodAsyncHook(
+    def hook(self) -> GKEKubernetesAsyncHook:  # type: ignore[override]
+        return GKEKubernetesAsyncHook(
             cluster_url=self._cluster_url,
             ssl_ca_cert=self._ssl_ca_cert,
             gcp_conn_id=self.gcp_conn_id,
diff --git a/tests/deprecations_ignore.yml b/tests/deprecations_ignore.yml
index f1fafcca73..91c05b3bd2 100644
--- a/tests/deprecations_ignore.yml
+++ b/tests/deprecations_ignore.yml
@@ -517,25 +517,14 @@
 - 
tests/providers/google/cloud/operators/test_dataproc.py::test_scale_cluster_operator_extra_links
 - 
tests/providers/google/cloud/operators/test_dataproc.py::test_submit_spark_job_operator_extra_links
 - 
tests/providers/google/cloud/operators/test_gcs.py::TestGoogleCloudStorageListOperator::test_execute__delimiter
-- 
tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEDeleteJobOperator::test_default_gcp_conn_id
-- 
tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEDeleteJobOperator::test_gcp_conn_id
-- 
tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEDescribeJobOperator::test_default_gcp_conn_id
-- 
tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEDescribeJobOperator::test_gcp_conn_id
 - 
tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_cluster_info
 - 
tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_config_file_throws_error
 - 
tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_default_gcp_conn_id
 - 
tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_execute
-- 
tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_execute_with_impersonation_service_account
-- 
tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_execute_with_impersonation_service_chain_one_element
 - 
tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_gcp_conn_id
 - 
tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_on_finish_action_handler
 - 
tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperator::test_template_fields
 - 
tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEPodOperatorAsync::test_async_create_pod_should_execute_successfully
-- 
tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEStartJobOperator::test_default_gcp_conn_id
-- 
tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEStartJobOperator::test_gcp_conn_id
-- 
tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEStartKueueInsideClusterOperator::test_execute
-- 
tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEStartKueueJobOperator::test_default_gcp_conn_id
-- 
tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGKEStartKueueJobOperator::test_gcp_conn_id
 - 
tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGoogleCloudPlatformContainerOperator::test_create_execute
 - 
tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGoogleCloudPlatformContainerOperator::test_create_execute_call_defer_method
 - 
tests/providers/google/cloud/operators/test_kubernetes_engine.py::TestGoogleCloudPlatformContainerOperator::test_create_execute_error_body
diff --git a/tests/providers/google/cloud/operators/test_kubernetes_engine.py 
b/tests/providers/google/cloud/operators/test_kubernetes_engine.py
index 05c7bf24a7..c71cc99c7e 100644
--- a/tests/providers/google/cloud/operators/test_kubernetes_engine.py
+++ b/tests/providers/google/cloud/operators/test_kubernetes_engine.py
@@ -81,9 +81,7 @@ FILE_NAME = "/tmp/mock_name"
 KUB_OP_PATH = 
"airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.{}"
 GKE_HOOK_MODULE_PATH = 
"airflow.providers.google.cloud.operators.kubernetes_engine"
 GKE_HOOK_PATH = f"{GKE_HOOK_MODULE_PATH}.GKEHook"
-GKE_POD_HOOK_PATH = f"{GKE_HOOK_MODULE_PATH}.GKEPodHook"
-GKE_DEPLOYMENT_HOOK_PATH = f"{GKE_HOOK_MODULE_PATH}.GKEDeploymentHook"
-GKE_JOB_HOOK_PATH = f"{GKE_HOOK_MODULE_PATH}.GKEJobHook"
+GKE_KUBERNETES_HOOK = f"{GKE_HOOK_MODULE_PATH}.GKEKubernetesHook"
 GKE_K8S_HOOK_PATH = f"{GKE_HOOK_MODULE_PATH}.GKEKubernetesHook"
 KUB_OPERATOR_EXEC = 
"airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.execute"
 KUB_JOB_OPERATOR_EXEC = 
"airflow.providers.cncf.kubernetes.operators.job.KubernetesJobOperator.execute"
@@ -502,8 +500,8 @@ class TestGKEStartKueueInsideClusterOperator:
     @mock.patch(TEMP_FILE)
     @mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
     @mock.patch(GKE_HOOK_PATH)
-    @mock.patch(f"{GKE_DEPLOYMENT_HOOK_PATH}.check_kueue_deployment_running")
-    @mock.patch(GKE_POD_HOOK_PATH)
+    @mock.patch(f"{GKE_KUBERNETES_HOOK}.check_kueue_deployment_running")
+    @mock.patch(GKE_KUBERNETES_HOOK)
     def test_execute(self, mock_pod_hook, mock_deployment, mock_hook, 
fetch_cluster_info_mock, file_mock):
         mock_pod_hook.return_value.apply_from_yaml_file.side_effect = 
mock.MagicMock()
         fetch_cluster_info_mock.return_value = (CLUSTER_URL, SSL_CA_CERT)
@@ -515,9 +513,9 @@ class TestGKEStartKueueInsideClusterOperator:
     @mock.patch.dict(os.environ, {})
     @mock.patch(TEMP_FILE)
     @mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
-    @mock.patch(GKE_DEPLOYMENT_HOOK_PATH)
+    @mock.patch(GKE_KUBERNETES_HOOK)
     @mock.patch(GKE_HOOK_PATH)
-    @mock.patch(GKE_POD_HOOK_PATH)
+    @mock.patch(GKE_KUBERNETES_HOOK)
     def test_execute_autoscaled_cluster(
         self, mock_pod_hook, mock_hook, mock_depl_hook, 
fetch_cluster_info_mock, file_mock, caplog
     ):
@@ -534,7 +532,7 @@ class TestGKEStartKueueInsideClusterOperator:
     @mock.patch(TEMP_FILE)
     @mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
     @mock.patch(GKE_HOOK_PATH)
-    @mock.patch(GKE_POD_HOOK_PATH)
+    @mock.patch(GKE_KUBERNETES_HOOK)
     def test_execute_autoscaled_cluster_check_error(
         self, mock_pod_hook, mock_hook, fetch_cluster_info_mock, file_mock, 
caplog
     ):
@@ -550,7 +548,7 @@ class TestGKEStartKueueInsideClusterOperator:
     @mock.patch(TEMP_FILE)
     @mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
     @mock.patch(GKE_HOOK_PATH)
-    @mock.patch(GKE_POD_HOOK_PATH)
+    @mock.patch(GKE_KUBERNETES_HOOK)
     def test_execute_non_autoscaled_cluster_check_error(
         self, mock_pod_hook, mock_hook, fetch_cluster_info_mock, file_mock, 
caplog
     ):
@@ -916,7 +914,7 @@ class TestGKEDescribeJobOperator:
     @mock.patch(TEMP_FILE)
     @mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
     @mock.patch(GKE_HOOK_PATH)
-    @mock.patch(GKE_JOB_HOOK_PATH)
+    @mock.patch(GKE_KUBERNETES_HOOK)
     def test_execute(self, mock_job_hook, mock_hook, fetch_cluster_info_mock, 
file_mock):
         mock_job_hook.return_value.get_job.return_value = mock.MagicMock()
         fetch_cluster_info_mock.return_value = (CLUSTER_URL, SSL_CA_CERT)
@@ -931,7 +929,7 @@ class TestGKEDescribeJobOperator:
     @mock.patch(TEMP_FILE)
     @mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
     @mock.patch(GKE_HOOK_PATH)
-    @mock.patch(GKE_JOB_HOOK_PATH)
+    @mock.patch(GKE_KUBERNETES_HOOK)
     def test_execute_with_impersonation_service_account(
         self, mock_job_hook, mock_hook, fetch_cluster_info_mock, file_mock, 
get_con_mock
     ):
@@ -949,7 +947,7 @@ class TestGKEDescribeJobOperator:
     @mock.patch(TEMP_FILE)
     @mock.patch(f"{GKE_CLUSTER_AUTH_DETAILS_PATH}.fetch_cluster_info")
     @mock.patch(GKE_HOOK_PATH)
-    @mock.patch(GKE_JOB_HOOK_PATH)
+    @mock.patch(GKE_KUBERNETES_HOOK)
     def test_execute_with_impersonation_service_chain_one_element(
         self, mock_job_hook, mock_hook, fetch_cluster_info_mock, file_mock, 
get_con_mock
     ):

Reply via email to