VladaZakharova commented on code in PR #29266:
URL: https://github.com/apache/airflow/pull/29266#discussion_r1143103080
##########
airflow/providers/google/cloud/operators/kubernetes_engine.py:
##########
@@ -452,89 +473,69 @@ def __init__(
if self.config_file:
raise AirflowException("config_file is not an allowed parameter
for the GKEStartPodOperator.")
- @staticmethod
- @contextmanager
- def get_gke_config_file(
- gcp_conn_id,
- project_id: str | None,
- cluster_name: str,
- impersonation_chain: str | Sequence[str] | None,
- regional: bool,
- location: str,
- use_internal_ip: bool,
- ) -> Generator[str, None, None]:
-
- hook = GoogleBaseHook(gcp_conn_id=gcp_conn_id)
- project_id = project_id or hook.project_id
+ @cached_property
+ def cluster_hook(self) -> GKEHook:
+ return GKEHook(
+ gcp_conn_id=self.gcp_conn_id,
+ location=self.location,
+ impersonation_chain=self.impersonation_chain,
+ )
- if not project_id:
- raise AirflowException(
- "The project id must be passed either as "
- "keyword project_id parameter or as project_id extra "
- "in Google Cloud connection definition. Both are not set!"
+ @cached_property
+ def hook(self) -> GKEPodHook: # type: ignore[override]
+ if self._cluster_url is None or self._ssl_ca_cert is None:
+ raise AttributeError(
+ "Cluster url and ssl_ca_cert should be defined before using
self.hook method. "
+ "Try to use self.get_kube_creds method",
)
- # Write config to a temp file and set the environment variable to
point to it.
- # This is to avoid race conditions of reading/writing a single file
- with tempfile.NamedTemporaryFile() as conf_file, patch_environ(
- {KUBE_CONFIG_ENV_VAR: conf_file.name}
- ), hook.provide_authorized_gcloud():
- # Attempt to get/update credentials
- # We call gcloud directly instead of using google-cloud-python api
- # because there is no way to write kubernetes config to a file,
which is
- # required by KubernetesPodOperator.
- # The gcloud command looks at the env variable `KUBECONFIG` for
where to save
- # the kubernetes config file.
- cmd = [
- "gcloud",
- "container",
- "clusters",
- "get-credentials",
- cluster_name,
- "--project",
- project_id,
- ]
- if impersonation_chain:
- if isinstance(impersonation_chain, str):
- impersonation_account = impersonation_chain
- elif len(impersonation_chain) == 1:
- impersonation_account = impersonation_chain[0]
- else:
- raise AirflowException(
- "Chained list of accounts is not supported, please
specify only one service account"
- )
-
- cmd.extend(
- [
- "--impersonate-service-account",
- impersonation_account,
- ]
- )
- if regional:
- cmd.append("--region")
- else:
- cmd.append("--zone")
- cmd.append(location)
- if use_internal_ip:
- cmd.append("--internal-ip")
- execute_in_subprocess(cmd)
-
- # Tell `KubernetesPodOperator` where the config file is located
- yield os.environ[KUBE_CONFIG_ENV_VAR]
+ hook = GKEPodHook(
+ cluster_url=self._cluster_url,
+ ssl_ca_cert=self._ssl_ca_cert,
+ )
+ return hook
- def execute(self, context: Context) -> str | None:
+ def execute(self, context: Context):
+ """Executes process of creating pod and executing provided command
inside it."""
+ self.fetch_cluster_info()
+ return super().execute(context)
- with GKEStartPodOperator.get_gke_config_file(
- gcp_conn_id=self.gcp_conn_id,
+ def fetch_cluster_info(self) -> tuple[str, str | None]:
+ """Fetches cluster info for connecting to it."""
+ cluster = self.cluster_hook.get_cluster(
Review Comment:
There is no big need to implement here async call to get cluster since this
call only used for getting endpoint to the cluster, and not perform any other
long-running action on it. the async get_cluster() method is called in the
trigger
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]