This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8f03ab24e0c Add gcloud command to DataprocCreateClusterOperator to be 
able to create dataproc on GKE cluster (#44185)
8f03ab24e0c is described below

commit 8f03ab24e0c5d552decfbd891fe59f06c217f3bf
Author: Maksim <[email protected]>
AuthorDate: Wed Nov 20 00:59:51 2024 -0800

    Add gcloud command to DataprocCreateClusterOperator to be able to create 
dataproc on GKE cluster (#44185)
    
    Co-authored-by: Ulada Zakharava <[email protected]>
---
 .../providers/google/cloud/hooks/dataproc.py       | 75 +++++++++++++++++++++-
 .../providers/google/cloud/operators/dataproc.py   |  2 +-
 .../tests/google/cloud/hooks/test_dataproc.py      | 51 +++++++++++++++
 .../google/cloud/dataproc/example_dataproc_gke.py  |  2 +-
 4 files changed, 127 insertions(+), 3 deletions(-)

diff --git a/providers/src/airflow/providers/google/cloud/hooks/dataproc.py 
b/providers/src/airflow/providers/google/cloud/hooks/dataproc.py
index f8bedec9853..ec05f5f341c 100644
--- a/providers/src/airflow/providers/google/cloud/hooks/dataproc.py
+++ b/providers/src/airflow/providers/google/cloud/hooks/dataproc.py
@@ -19,6 +19,8 @@
 
 from __future__ import annotations
 
+import shlex
+import subprocess
 import time
 import uuid
 from collections.abc import MutableSequence
@@ -261,6 +263,49 @@ class DataprocHook(GoogleBaseHook):
         """Create a OperationsClient."""
         return self.get_batch_client(region=region).transport.operations_client
 
+    def dataproc_options_to_args(self, options: dict) -> list[str]:
+        """
+        Return a formatted cluster parameters from a dictionary of arguments.
+
+        :param options: Dictionary with options
+        :return: List of arguments
+        """
+        if not options:
+            return []
+
+        args: list[str] = []
+        for attr, value in options.items():
+            if value is None or (isinstance(value, bool) and value):
+                args.append(f"--{attr}")
+            elif isinstance(value, bool) and not value:
+                continue
+            elif isinstance(value, list):
+                args.extend([f"--{attr}={v}" for v in value])
+            else:
+                args.append(f"--{attr}={value}")
+        return args
+
+    def _build_gcloud_command(self, command: list[str], parameters: dict[str, 
str]) -> list[str]:
+        return [*command, *(self.dataproc_options_to_args(parameters))]
+
+    def _create_dataproc_cluster_with_gcloud(self, cmd: list[str]) -> str:
+        """Create a Dataproc cluster with a gcloud command and return the 
job's ID."""
+        self.log.info("Executing command: %s", " ".join(shlex.quote(c) for c 
in cmd))
+        success_code = 0
+
+        with self.provide_authorized_gcloud():
+            proc = subprocess.run(cmd, capture_output=True)
+
+        if proc.returncode != success_code:
+            stderr_last_20_lines = 
"\n".join(proc.stderr.decode().strip().splitlines()[-20:])
+            raise AirflowException(
+                f"Process exit with non-zero exit code. Exit code: 
{proc.returncode}. Error Details : "
+                f"{stderr_last_20_lines}"
+            )
+
+        response = proc.stdout.decode().strip()
+        return response
+
     def wait_for_operation(
         self,
         operation: Operation,
@@ -289,7 +334,7 @@ class DataprocHook(GoogleBaseHook):
         retry: Retry | _MethodDefault = DEFAULT,
         timeout: float | None = None,
         metadata: Sequence[tuple[str, str]] = (),
-    ) -> Operation:
+    ) -> Operation | str:
         """
         Create a cluster in a specified project.
 
@@ -326,6 +371,34 @@ class DataprocHook(GoogleBaseHook):
             "project_id": project_id,
             "cluster_name": cluster_name,
         }
+
+        if virtual_cluster_config and "kubernetes_cluster_config" in 
virtual_cluster_config:
+            kube_config = 
virtual_cluster_config["kubernetes_cluster_config"]["gke_cluster_config"]
+            try:
+                spark_engine_version = 
virtual_cluster_config["kubernetes_cluster_config"][
+                    "kubernetes_software_config"
+                ]["component_version"]["SPARK"]
+            except KeyError:
+                spark_engine_version = "latest"
+            gke_cluster_name = kube_config["gke_cluster_target"].rsplit("/", 
1)[1]
+            gke_pools = kube_config["node_pool_target"][0]
+            gke_pool_name = gke_pools["node_pool"].rsplit("/", 1)[1]
+            gke_pool_role = gke_pools["roles"][0]
+            gke_pool_machine_type = 
gke_pools["node_pool_config"]["config"]["machine_type"]
+            gcp_flags = {
+                "region": region,
+                "gke-cluster": gke_cluster_name,
+                "spark-engine-version": spark_engine_version,
+                "pools": 
f"name={gke_pool_name},roles={gke_pool_role.lower()},machineType={gke_pool_machine_type},min=1,max=10",
+                "setup-workload-identity": None,
+            }
+            cmd = self._build_gcloud_command(
+                command=["gcloud", "dataproc", "clusters", "gke", "create", 
cluster_name],
+                parameters=gcp_flags,
+            )
+            response = self._create_dataproc_cluster_with_gcloud(cmd=cmd)
+            return response
+
         if virtual_cluster_config is not None:
             cluster["virtual_cluster_config"] = virtual_cluster_config  # 
type: ignore
         if cluster_config is not None:
diff --git a/providers/src/airflow/providers/google/cloud/operators/dataproc.py 
b/providers/src/airflow/providers/google/cloud/operators/dataproc.py
index 270114e5e53..d8569e9cb93 100644
--- a/providers/src/airflow/providers/google/cloud/operators/dataproc.py
+++ b/providers/src/airflow/providers/google/cloud/operators/dataproc.py
@@ -818,7 +818,7 @@ class 
DataprocCreateClusterOperator(GoogleCloudBaseOperator):
         try:
             # First try to create a new cluster
             operation = self._create_cluster(hook)
-            if not self.deferrable:
+            if not self.deferrable and type(operation) is not str:
                 cluster = hook.wait_for_operation(
                     timeout=self.timeout, result_retry=self.retry, 
operation=operation
                 )
diff --git a/providers/tests/google/cloud/hooks/test_dataproc.py 
b/providers/tests/google/cloud/hooks/test_dataproc.py
index 88839dabb81..b8236f72d11 100644
--- a/providers/tests/google/cloud/hooks/test_dataproc.py
+++ b/providers/tests/google/cloud/hooks/test_dataproc.py
@@ -44,6 +44,27 @@ TASK_ID = "test-task-id"
 GCP_LOCATION = "global"
 GCP_PROJECT = "test-project"
 CLUSTER_CONFIG = {"test": "test"}
+VIRTUAL_CLUSTER_CONFIG = {
+    "kubernetes_cluster_config": {
+        "gke_cluster_config": {
+            "gke_cluster_target": 
"projects/project_id/locations/region/clusters/gke_cluster_name",
+            "node_pool_target": [
+                {
+                    "node_pool": 
"projects/project_id/locations/region/clusters/gke_cluster_name/nodePools/dp",
+                    "roles": ["DEFAULT"],
+                    "node_pool_config": {
+                        "config": {
+                            "preemptible": False,
+                            "machine_type": "e2-standard-4",
+                        }
+                    },
+                }
+            ],
+        },
+        "kubernetes_software_config": {"component_version": {"SPARK": "3"}},
+    },
+    "staging_bucket": "test-staging-bucket",
+}
 LABELS = {"test": "test"}
 CLUSTER_NAME = "cluster-name"
 CLUSTER = {
@@ -174,6 +195,36 @@ class TestDataprocHook:
             timeout=None,
         )
 
+    
@mock.patch(DATAPROC_STRING.format("DataprocHook._create_dataproc_cluster_with_gcloud"))
+    @mock.patch(DATAPROC_STRING.format("DataprocHook._build_gcloud_command"))
+    
@mock.patch(DATAPROC_STRING.format("DataprocHook.provide_authorized_gcloud"))
+    @mock.patch(DATAPROC_STRING.format("subprocess.run"))
+    def test_create_cluster_with_virtual_cluster_config(
+        self,
+        mock_run,
+        mock_provide_authorized_gcloud,
+        mock_build_gcloud_command,
+        mock_create_dataproc_cluster_with_gcloud,
+    ):
+        self.hook.create_cluster(
+            project_id=GCP_PROJECT,
+            region=GCP_LOCATION,
+            cluster_name=CLUSTER_NAME,
+            cluster_config=CLUSTER_CONFIG,
+            virtual_cluster_config=VIRTUAL_CLUSTER_CONFIG,
+            labels=LABELS,
+        )
+        mock_build_gcloud_command.assert_called_once_with(
+            command=["gcloud", "dataproc", "clusters", "gke", "create", 
CLUSTER_NAME],
+            parameters={
+                "region": GCP_LOCATION,
+                "gke-cluster": "gke_cluster_name",
+                "spark-engine-version": "3",
+                "pools": 
"name=dp,roles=default,machineType=e2-standard-4,min=1,max=10",
+                "setup-workload-identity": None,
+            },
+        )
+
     @mock.patch(DATAPROC_STRING.format("DataprocHook.get_cluster_client"))
     def test_delete_cluster(self, mock_client):
         self.hook.delete_cluster(project_id=GCP_PROJECT, region=GCP_LOCATION, 
cluster_name=CLUSTER_NAME)
diff --git 
a/providers/tests/system/google/cloud/dataproc/example_dataproc_gke.py 
b/providers/tests/system/google/cloud/dataproc/example_dataproc_gke.py
index 8048a23283e..0be3f53ea6f 100644
--- a/providers/tests/system/google/cloud/dataproc/example_dataproc_gke.py
+++ b/providers/tests/system/google/cloud/dataproc/example_dataproc_gke.py
@@ -84,7 +84,7 @@ VIRTUAL_CLUSTER_CONFIG = {
                 }
             ],
         },
-        "kubernetes_software_config": {"component_version": {"SPARK": b"3"}},
+        "kubernetes_software_config": {"component_version": {"SPARK": "3"}},
     },
     "staging_bucket": "test-staging-bucket",
 }

Reply via email to