This is an automated email from the ASF dual-hosted git repository.
shahar1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c6b4192d299 Update the Kubernetes Engine components to be able to work
on "Sovereign Cloud from Google" environments (#66341)
c6b4192d299 is described below
commit c6b4192d2994cd4f482007a9ba74e28676dd15e9
Author: M. Olcay Tercanlı <[email protected]>
AuthorDate: Tue May 5 17:11:12 2026 +0200
Update the Kubernetes Engine components to be able to work on "Sovereign
Cloud from Google" environments (#66341)
---
.../docs/operators/cloud/kubernetes_engine.rst | 25 ++++++++++++
.../google/cloud/hooks/kubernetes_engine.py | 9 +++--
.../google/cloud/operators/kubernetes_engine.py | 46 +++++++++++++++-------
.../google/cloud/triggers/kubernetes_engine.py | 14 +++++++
.../cloud/operators/test_kubernetes_engine.py | 5 +++
.../cloud/triggers/test_kubernetes_engine.py | 4 ++
6 files changed, 86 insertions(+), 17 deletions(-)
diff --git a/providers/google/docs/operators/cloud/kubernetes_engine.rst
b/providers/google/docs/operators/cloud/kubernetes_engine.rst
index e1d661153ae..9baaa0ee6ad 100644
--- a/providers/google/docs/operators/cloud/kubernetes_engine.rst
+++ b/providers/google/docs/operators/cloud/kubernetes_engine.rst
@@ -359,6 +359,31 @@ resume Job in the specified Google Kubernetes Engine
cluster.
:start-after: [START howto_operator_gke_resume_job]
:end-before: [END howto_operator_gke_resume_job]
+Sovereign Cloud from Google guidance
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+In the Sovereign Cloud from Google environments, the practical question for
Kubernetes Engine
+is which operator parameters to set. For most of the operators, the answer is
easy: ``use_dns_endpoint=True``.
+
+For example:
+
+.. code-block:: python
+
+ GKEStartPodOperator(
+ ...,
+ use_dns_endpoint=True,
+ )
+
+
+.. warning::
+ Please be cautious with the configuration parameters for the operators which
requires to start a ``sidecar`` container
+ on the Sovereign Cloud from Google environments.
+
+ Currently, the operators which tries to push something to XCom (with
``do_xcom_push=True`` parameter)
+ do **not** work properly on the Sovereign Cloud from Google environments due
to they require sidecar containers.
+
+For further information, take a look at the official documentation for
+the `Sovereign Cloud from Google <https://cloud.google.com/sovereign-cloud>`__
+
Reference
^^^^^^^^^
diff --git
a/providers/google/src/airflow/providers/google/cloud/hooks/kubernetes_engine.py
b/providers/google/src/airflow/providers/google/cloud/hooks/kubernetes_engine.py
index 9ec2052c45d..646e5ca98b3 100644
---
a/providers/google/src/airflow/providers/google/cloud/hooks/kubernetes_engine.py
+++
b/providers/google/src/airflow/providers/google/cloud/hooks/kubernetes_engine.py
@@ -481,6 +481,7 @@ class GKEKubernetesAsyncHook(GoogleBaseAsyncHook,
AsyncKubernetesHook):
self,
cluster_url: str,
ssl_ca_cert: str,
+ use_dns_endpoint: bool = False,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
enable_tcp_keepalive: bool = True,
@@ -489,6 +490,7 @@ class GKEKubernetesAsyncHook(GoogleBaseAsyncHook,
AsyncKubernetesHook):
self._cluster_url = cluster_url
self._ssl_ca_cert = ssl_ca_cert
self.enable_tcp_keepalive = enable_tcp_keepalive
+ self.use_dns_endpoint = use_dns_endpoint
super().__init__(
cluster_url=cluster_url,
ssl_ca_cert=ssl_ca_cert,
@@ -540,11 +542,12 @@ class GKEKubernetesAsyncHook(GoogleBaseAsyncHook,
AsyncKubernetesHook):
def _get_config(self) -> async_client.configuration.Configuration:
configuration = async_client.Configuration(
host=self._cluster_url,
- ssl_ca_cert=FileOrData(
+ )
+ if not self.use_dns_endpoint:
+ configuration.ssl_ca_cert = FileOrData(
{
"certificate-authority-data": self._ssl_ca_cert,
},
file_key_name="certificate-authority",
- ).as_file(),
- )
+ ).as_file()
return configuration
diff --git
a/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py
b/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py
index 08b27098f09..eba63a7ae60 100644
---
a/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py
+++
b/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py
@@ -88,7 +88,8 @@ class GKEClusterAuthDetails:
:param cluster_name: The name of the Google Kubernetes Engine cluster.
:param project_id: The Google Developers Console project id.
:param use_internal_ip: Use the internal IP address as the endpoint.
- :param use_dns_endpoint: Use the DNS address as the endpoint.
+ :param use_dns_endpoint: Use the DNS address as the endpoint. This needs
to set to True for the Sovereign
+ Cloud from Google.
:param cluster_hook: airflow hook for working with kubernetes cluster.
"""
@@ -209,7 +210,8 @@ class GKEDeleteClusterOperator(GKEOperatorMixin,
GoogleCloudBaseOperator):
cluster resides, e.g. 'us-central1-a'
:param cluster_name: The name of the Google Kubernetes Engine cluster.
:param use_internal_ip: Use the internal IP address as the endpoint.
- :param use_dns_endpoint: Use the DNS address as the endpoint.
+ :param use_dns_endpoint: Use the DNS address as the endpoint. This needs
to set to True for the Sovereign
+ Cloud from Google.
:param project_id: The Google Developers Console project id
:param gcp_conn_id: The Google cloud connection id to use. This allows for
users to specify a service account.
@@ -281,6 +283,7 @@ class GKEDeleteClusterOperator(GKEOperatorMixin,
GoogleCloudBaseOperator):
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
poll_interval=self.poll_interval,
+ use_dns_endpoint=self.use_dns_endpoint,
),
method_name="execute_complete",
)
@@ -338,7 +341,8 @@ class GKECreateClusterOperator(GKEOperatorMixin,
GoogleCloudBaseOperator):
:param location: The name of the Google Kubernetes Engine zone or region
in which the
cluster resides, e.g. 'us-central1-a'
:param use_internal_ip: Use the internal IP address as the endpoint.
- :param use_dns_endpoint: Use the DNS address as the endpoint.
+ :param use_dns_endpoint: Use the DNS address as the endpoint. This needs
to set to True for the Sovereign
+ Cloud from Google.
:param project_id: The Google Developers Console project id
:param gcp_conn_id: The Google cloud connection id to use. This allows for
users to specify a service account.
@@ -571,6 +575,7 @@ class GKECreateClusterOperator(GKEOperatorMixin,
GoogleCloudBaseOperator):
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
poll_interval=self.poll_interval,
+ use_dns_endpoint=self.use_dns_endpoint,
),
method_name="execute_complete",
)
@@ -608,7 +613,8 @@ class GKEStartKueueInsideClusterOperator(GKEOperatorMixin,
KubernetesInstallKueu
cluster resides, e.g. 'us-central1-a'
:param cluster_name: The name of the Google Kubernetes Engine cluster.
:param use_internal_ip: Use the internal IP address as the endpoint.
- :param use_dns_endpoint: Use the DNS address as the endpoint.
+ :param use_dns_endpoint: Use the DNS address as the endpoint. This needs
to set to True for the Sovereign
+ Cloud from Google.
:param project_id: The Google Developers Console project id
:param gcp_conn_id: The Google cloud connection id to use. This allows for
users to specify a service account.
@@ -691,7 +697,8 @@ class GKEStartPodOperator(GKEOperatorMixin,
KubernetesPodOperator):
cluster resides, e.g. 'us-central1-a'
:param cluster_name: The name of the Google Kubernetes Engine cluster.
:param use_internal_ip: Use the internal IP address as the endpoint.
- :param use_dns_endpoint: Use the DNS address as the endpoint.
+ :param use_dns_endpoint: Use the DNS address as the endpoint. This needs
to set to True for the Sovereign
+ Cloud from Google.
:param project_id: The Google Developers Console project id
:param gcp_conn_id: The Google cloud connection id to use. This allows for
users to specify a service account.
@@ -776,6 +783,7 @@ class GKEStartPodOperator(GKEOperatorMixin,
KubernetesPodOperator):
impersonation_chain=self.impersonation_chain,
logging_interval=self.logging_interval,
last_log_time=last_log_time,
+ use_dns_endpoint=self.use_dns_endpoint,
),
method_name="trigger_reentry",
)
@@ -804,7 +812,8 @@ class GKEStartJobOperator(GKEOperatorMixin,
KubernetesJobOperator):
cluster resides, e.g. 'us-central1-a'
:param cluster_name: The name of the Google Kubernetes Engine cluster.
:param use_internal_ip: Use the internal IP address as the endpoint.
- :param use_dns_endpoint: Use the DNS address as the endpoint.
+ :param use_dns_endpoint: Use the DNS address as the endpoint. This needs
to set to True for the Sovereign
+ Cloud from Google.
:param project_id: The Google Developers Console project id
:param gcp_conn_id: The Google cloud connection id to use. This allows for
users to specify a service account.
@@ -888,6 +897,7 @@ class GKEStartJobOperator(GKEOperatorMixin,
KubernetesJobOperator):
impersonation_chain=self.impersonation_chain,
get_logs=self.get_logs,
do_xcom_push=self.do_xcom_push,
+ use_dns_endpoint=self.use_dns_endpoint,
),
method_name="execute_complete",
)
@@ -907,7 +917,8 @@ class GKEDescribeJobOperator(GKEOperatorMixin,
GoogleCloudBaseOperator):
cluster resides, e.g. 'us-central1-a'
:param cluster_name: The name of the Google Kubernetes Engine cluster.
:param use_internal_ip: Use the internal IP address as the endpoint.
- :param use_dns_endpoint: Use the DNS address as the endpoint.
+ :param use_dns_endpoint: Use the DNS address as the endpoint. This needs
to set to True for the Sovereign
+ Cloud from Google.
:param project_id: The Google Developers Console project id
:param gcp_conn_id: The Google cloud connection id to use. This allows for
users to specify a service account.
@@ -985,7 +996,8 @@ class GKEListJobsOperator(GKEOperatorMixin,
GoogleCloudBaseOperator):
cluster resides, e.g. 'us-central1-a'
:param cluster_name: The name of the Google Kubernetes Engine cluster.
:param use_internal_ip: Use the internal IP address as the endpoint.
- :param use_dns_endpoint: Use the DNS address as the endpoint.
+ :param use_dns_endpoint: Use the DNS address as the endpoint. This needs
to set to True for the Sovereign
+ Cloud from Google.
:param project_id: The Google Developers Console project id
:param gcp_conn_id: The Google cloud connection id to use. This allows for
users to specify a service account.
@@ -1072,7 +1084,8 @@ class GKECreateCustomResourceOperator(GKEOperatorMixin,
KubernetesCreateResource
cluster resides, e.g. 'us-central1-a'
:param cluster_name: The name of the Google Kubernetes Engine cluster.
:param use_internal_ip: Use the internal IP address as the endpoint.
- :param use_dns_endpoint: Use the DNS address as the endpoint.
+ :param use_dns_endpoint: Use the DNS address as the endpoint. This needs
to set to True for the Sovereign
+ Cloud from Google.
:param project_id: The Google Developers Console project id
:param gcp_conn_id: The Google cloud connection id to use. This allows for
users to specify a service account.
@@ -1145,7 +1158,8 @@ class GKEDeleteCustomResourceOperator(GKEOperatorMixin,
KubernetesDeleteResource
cluster resides, e.g. 'us-central1-a'
:param cluster_name: The name of the Google Kubernetes Engine cluster.
:param use_internal_ip: Use the internal IP address as the endpoint.
- :param use_dns_endpoint: Use the DNS address as the endpoint.
+ :param use_dns_endpoint: Use the DNS address as the endpoint. This needs
to set to True for the Sovereign
+ Cloud from Google.
:param project_id: The Google Developers Console project id
:param gcp_conn_id: The Google cloud connection id to use. This allows for
users to specify a service account.
@@ -1207,7 +1221,8 @@ class GKEStartKueueJobOperator(GKEOperatorMixin,
KubernetesStartKueueJobOperator
cluster resides, e.g. 'us-central1-a'
:param cluster_name: The name of the Google Kubernetes Engine cluster.
:param use_internal_ip: Use the internal IP address as the endpoint.
- :param use_dns_endpoint: Use the DNS address as the endpoint.
+ :param use_dns_endpoint: Use the DNS address as the endpoint. This needs
to set to True for the Sovereign
+ Cloud from Google.
:param project_id: The Google Developers Console project id
:param gcp_conn_id: The Google cloud connection id to use. This allows for
users to specify a service account.
@@ -1270,7 +1285,8 @@ class GKEDeleteJobOperator(GKEOperatorMixin,
KubernetesDeleteJobOperator):
cluster resides, e.g. 'us-central1-a'
:param cluster_name: The name of the Google Kubernetes Engine cluster.
:param use_internal_ip: Use the internal IP address as the endpoint.
- :param use_dns_endpoint: Use the DNS address as the endpoint.
+ :param use_dns_endpoint: Use the DNS address as the endpoint. This needs
to set to True for the Sovereign
+ Cloud from Google.
:param project_id: The Google Developers Console project id
:param gcp_conn_id: The Google cloud connection id to use. This allows for
users to specify a service account.
@@ -1336,7 +1352,8 @@ class GKESuspendJobOperator(GKEOperatorMixin,
GoogleCloudBaseOperator):
cluster resides, e.g. 'us-central1-a'
:param cluster_name: The name of the Google Kubernetes Engine cluster.
:param use_internal_ip: Use the internal IP address as the endpoint.
- :param use_dns_endpoint: Use the DNS address as the endpoint.
+ :param use_dns_endpoint: Use the DNS address as the endpoint. This needs
to set to True for the Sovereign
+ Cloud from Google.
:param project_id: The Google Developers Console project id
:param gcp_conn_id: The Google cloud connection id to use. This allows for
users to specify a service account.
@@ -1416,7 +1433,8 @@ class GKEResumeJobOperator(GKEOperatorMixin,
GoogleCloudBaseOperator):
cluster resides, e.g. 'us-central1-a'
:param cluster_name: The name of the Google Kubernetes Engine cluster.
:param use_internal_ip: Use the internal IP address as the endpoint.
- :param use_dns_endpoint: Use the DNS address as the endpoint.
+ :param use_dns_endpoint: Use the DNS address as the endpoint. This needs
to set to True for the Sovereign
+ Cloud from Google.
:param project_id: The Google Developers Console project id
:param gcp_conn_id: The Google cloud connection id to use. This allows for
users to specify a service account.
diff --git
a/providers/google/src/airflow/providers/google/cloud/triggers/kubernetes_engine.py
b/providers/google/src/airflow/providers/google/cloud/triggers/kubernetes_engine.py
index 6da2c5ea2ad..01bb74883fb 100644
---
a/providers/google/src/airflow/providers/google/cloud/triggers/kubernetes_engine.py
+++
b/providers/google/src/airflow/providers/google/cloud/triggers/kubernetes_engine.py
@@ -58,6 +58,8 @@ class GKEStartPodTrigger(KubernetesPodTrigger):
:param poll_interval: Polling period in seconds to check for the status.
:param trigger_start_time: time in Datetime format when the trigger was
started
:param in_cluster: run kubernetes client with in_cluster configuration.
+ :param use_dns_endpoint: Use the DNS address as the endpoint. This needs
to set to True for the Sovereign
+ Cloud from Google.
:param get_logs: get the stdout of the container as logs of the tasks.
:param startup_timeout: timeout in seconds to start up the pod.
:param base_container_name: The name of the base container in the pod.
This container's logs
@@ -84,6 +86,7 @@ class GKEStartPodTrigger(KubernetesPodTrigger):
cluster_context: str | None = None,
poll_interval: float = 2,
in_cluster: bool | None = None,
+ use_dns_endpoint: bool = False,
get_logs: bool = True,
startup_timeout: int = 120,
on_finish_action: str = "delete_pod",
@@ -108,6 +111,7 @@ class GKEStartPodTrigger(KubernetesPodTrigger):
self.poll_interval = poll_interval
self.cluster_context = cluster_context
self.in_cluster = in_cluster
+ self.use_dns_endpoint = use_dns_endpoint
self.get_logs = get_logs
self.startup_timeout = startup_timeout
self.gcp_conn_id = gcp_conn_id
@@ -141,6 +145,7 @@ class GKEStartPodTrigger(KubernetesPodTrigger):
"poll_interval": self.poll_interval,
"cluster_context": self.cluster_context,
"in_cluster": self.in_cluster,
+ "use_dns_endpoint": self.use_dns_endpoint,
"get_logs": self.get_logs,
"startup_timeout": self.startup_timeout,
"trigger_start_time": self.trigger_start_time,
@@ -159,6 +164,7 @@ class GKEStartPodTrigger(KubernetesPodTrigger):
return GKEKubernetesAsyncHook(
cluster_url=self._cluster_url,
ssl_ca_cert=self._ssl_ca_cert,
+ use_dns_endpoint=self.use_dns_endpoint,
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
enable_tcp_keepalive=True,
@@ -173,6 +179,7 @@ class GKEOperationTrigger(BaseTrigger):
operation_name: str,
project_id: str | None,
location: str,
+ use_dns_endpoint: bool = False,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
poll_interval: int = 10,
@@ -182,6 +189,7 @@ class GKEOperationTrigger(BaseTrigger):
self.operation_name = operation_name
self.project_id = project_id
self.location = location
+ self.use_dns_endpoint = use_dns_endpoint
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
self.poll_interval = poll_interval
@@ -199,6 +207,7 @@ class GKEOperationTrigger(BaseTrigger):
"gcp_conn_id": self.gcp_conn_id,
"impersonation_chain": self.impersonation_chain,
"poll_interval": self.poll_interval,
+ "use_dns_endpoint": self.use_dns_endpoint,
},
)
@@ -266,6 +275,7 @@ class GKEJobTrigger(BaseTrigger):
pod_namespace: str,
base_container_name: str,
pod_name: str | None = None,
+ use_dns_endpoint: bool = False,
gcp_conn_id: str = "google_cloud_default",
poll_interval: float = 2,
impersonation_chain: str | Sequence[str] | None = None,
@@ -277,6 +287,7 @@ class GKEJobTrigger(BaseTrigger):
self.ssl_ca_cert = ssl_ca_cert
self.job_name = job_name
self.job_namespace = job_namespace
+ self.use_dns_endpoint = use_dns_endpoint
if pod_name is not None:
self._pod_name = pod_name
self.pod_names = [
@@ -312,6 +323,7 @@ class GKEJobTrigger(BaseTrigger):
"job_namespace": self.job_namespace,
"pod_names": self.pod_names,
"pod_namespace": self.pod_namespace,
+ "use_dns_endpoint": self.use_dns_endpoint,
"base_container_name": self.base_container_name,
"gcp_conn_id": self.gcp_conn_id,
"poll_interval": self.poll_interval,
@@ -406,6 +418,7 @@ class GKEJobTrigger(BaseTrigger):
return GKEKubernetesAsyncHook(
cluster_url=self.cluster_url,
ssl_ca_cert=self.ssl_ca_cert,
+ use_dns_endpoint=self.use_dns_endpoint,
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
)
@@ -415,6 +428,7 @@ class GKEJobTrigger(BaseTrigger):
sync_hook = GKEKubernetesHook(
gcp_conn_id=self.gcp_conn_id,
cluster_url=self.cluster_url,
+ use_dns_endpoint=self.use_dns_endpoint,
ssl_ca_cert=self.ssl_ca_cert,
impersonation_chain=self.impersonation_chain,
)
diff --git
a/providers/google/tests/unit/google/cloud/operators/test_kubernetes_engine.py
b/providers/google/tests/unit/google/cloud/operators/test_kubernetes_engine.py
index 654ddd8b45b..24e08af68de 100644
---
a/providers/google/tests/unit/google/cloud/operators/test_kubernetes_engine.py
+++
b/providers/google/tests/unit/google/cloud/operators/test_kubernetes_engine.py
@@ -334,6 +334,7 @@ class TestGKEDeleteClusterOperator:
gcp_conn_id=TEST_CONN_ID,
impersonation_chain=TEST_IMPERSONATION_CHAIN,
poll_interval=10,
+ use_dns_endpoint=False,
)
mock_defer.assert_called_once_with(
trigger=mock_trigger_instance,
@@ -683,6 +684,7 @@ class TestGKECreateClusterOperator:
gcp_conn_id=TEST_CONN_ID,
impersonation_chain=TEST_IMPERSONATION_CHAIN,
poll_interval=10,
+ use_dns_endpoint=False,
)
mock_defer.assert_called_once_with(
trigger=mock_trigger_instance,
@@ -900,6 +902,7 @@ class TestGKEStartPodOperator:
impersonation_chain=TEST_IMPERSONATION_CHAIN,
logging_interval=None,
last_log_time=mock_last_log_time,
+ use_dns_endpoint=False,
)
mock_defer.assert_called_once_with(
trigger=mock_trigger.return_value,
@@ -1031,6 +1034,7 @@ class TestGKEStartJobOperator:
impersonation_chain=TEST_IMPERSONATION_CHAIN,
get_logs=mock_get_logs,
do_xcom_push=False,
+ use_dns_endpoint=False,
)
mock_defer.assert_called_once_with(
trigger=mock_trigger.return_value,
@@ -1091,6 +1095,7 @@ class TestGKEStartJobOperator:
impersonation_chain=TEST_IMPERSONATION_CHAIN,
get_logs=mock_get_logs,
do_xcom_push=False,
+ use_dns_endpoint=False,
)
mock_defer.assert_called_once_with(
trigger=mock_trigger.return_value,
diff --git
a/providers/google/tests/unit/google/cloud/triggers/test_kubernetes_engine.py
b/providers/google/tests/unit/google/cloud/triggers/test_kubernetes_engine.py
index eee26d9fa7b..e423561bfe6 100644
---
a/providers/google/tests/unit/google/cloud/triggers/test_kubernetes_engine.py
+++
b/providers/google/tests/unit/google/cloud/triggers/test_kubernetes_engine.py
@@ -141,6 +141,7 @@ class TestGKEStartPodTrigger:
"impersonation_chain": IMPERSONATION_CHAIN,
"last_log_time": None,
"logging_interval": None,
+ "use_dns_endpoint": False,
}
@pytest.mark.asyncio
@@ -342,6 +343,7 @@ class TestGKEOperationTrigger:
"gcp_conn_id": GCP_CONN_ID,
"impersonation_chain": IMPERSONATION_CHAIN,
"poll_interval": POLL_INTERVAL,
+ "use_dns_endpoint": False,
}
@pytest.mark.asyncio
@@ -495,6 +497,7 @@ class TestGKEStartJobTrigger:
"impersonation_chain": IMPERSONATION_CHAIN,
"get_logs": GET_LOGS,
"do_xcom_push": XCOM_PUSH,
+ "use_dns_endpoint": False,
}
@pytest.mark.asyncio
@@ -588,6 +591,7 @@ class TestGKEStartJobTrigger:
ssl_ca_cert=SSL_CA_CERT,
gcp_conn_id=GCP_CONN_ID,
impersonation_chain=IMPERSONATION_CHAIN,
+ use_dns_endpoint=False,
)
assert hook_actual == hook_expected