jaketf commented on a change in pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#discussion_r452554626



##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -502,32 +506,90 @@ def __init__(self,
         self.timeout = timeout
         self.metadata = metadata
         self.gcp_conn_id = gcp_conn_id
+        self.delete_on_error = delete_on_error
+
+    def _create_cluster(self, hook):
+        operation = hook.create_cluster(
+            project_id=self.project_id,
+            region=self.region,
+            cluster=self.cluster,
+            request_id=self.request_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        cluster = operation.result()
+        self.log.info("Cluster created.")
+        return cluster
+
+    def _delete_cluster(self, hook):
+        self.log.info("Deleting the cluster")
+        hook.delete_cluster(
+            region=self.region,
+            cluster_name=self.cluster_name,
+            project_id=self.project_id,
+        )
+        raise AirflowException(
+            f"Cluster {self.cluster_name} deleted due to ERROR"
+        )
+
+    def _get_cluster(self, hook):
+        return hook.get_cluster(
+            project_id=self.project_id,
+            region=self.region,
+            cluster_name=self.cluster_name,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+
+    def _handle_error_state(self, hook):
+        self.log.info("Cluster is in ERROR state")
+        gcs_uri = hook.diagnose_cluster(
+            region=self.region,
+            cluster_name=self.cluster_name,
+            project_id=self.project_id,
+        )
+        self.log.info(
+            'Diagnostic information for cluster %s available at: %s',
+            self.cluster_name, gcs_uri
+        )
+        if self.delete_on_error:
+            self._delete_cluster(hook)
+
+    def _wait_for_cluster_in_deleting_state(self, hook):
+        time_left = 60 * 5
+        for time_to_sleep in exponential_sleep_generator(initial=10, 
maximum=120):
+            if time_left < 0:
+                raise AirflowException(
+                    f"Cluster {self.cluster_name} is still DELETING state, 
aborting"
+                )
+            time.sleep(time_to_sleep)
+            time_left = time_left - time_to_sleep
+            try:
+                self._get_cluster(hook)
+            except NotFound:
+                break
 
     def execute(self, context):
         self.log.info('Creating cluster: %s', self.cluster_name)
         hook = DataprocHook(gcp_conn_id=self.gcp_conn_id)
         try:
-            operation = hook.create_cluster(
-                project_id=self.project_id,
-                region=self.region,
-                cluster=self.cluster,
-                request_id=self.request_id,
-                retry=self.retry,
-                timeout=self.timeout,
-                metadata=self.metadata,
-            )
-            cluster = operation.result()
-            self.log.info("Cluster created.")
+            cluster = self._create_cluster(hook)
         except AlreadyExists:
-            cluster = hook.get_cluster(
-                project_id=self.project_id,
-                region=self.region,
-                cluster_name=self.cluster_name,
-                retry=self.retry,
-                timeout=self.timeout,
-                metadata=self.metadata,
-            )
             self.log.info("Cluster already exists.")
+            cluster = self._get_cluster(hook)

Review comment:
       If cluster already exists (I assume this is checked by cluster id / 
name) then you should assert that it matches the any configuration explicitly 
specified in this operator by the user (e.g. a cluster could exist with this ID 
but have different dataproc version / missing init actions / etc.) you would 
not want to consider this a successful run of this operator as it did not meet 
it's contract of creating a cluster with explicit XYZ config provided by the 
user.
   
   IMO this should result in a task failure as it is not clear what the 
operator should do in this scenario? delete the existing cluster? add a uuid 
suffix to avoid the name clash and create a new cluster?

##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -437,6 +437,9 @@ class DataprocCreateClusterOperator(BaseOperator):
     :type project_id: str
     :param region: leave as 'global', might become relevant in the future. 
(templated)
     :type region: str
+    :parm delete_on_error: If true the claster will be deleted if created with 
ERROR state. Default

Review comment:
       spelling
   ```suggestion
       :parm delete_on_error: If true the cluster will be deleted if created 
with ERROR state. Default
   ```

##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -502,32 +506,90 @@ def __init__(self,
         self.timeout = timeout
         self.metadata = metadata
         self.gcp_conn_id = gcp_conn_id
+        self.delete_on_error = delete_on_error
+
+    def _create_cluster(self, hook):
+        operation = hook.create_cluster(
+            project_id=self.project_id,
+            region=self.region,
+            cluster=self.cluster,
+            request_id=self.request_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        cluster = operation.result()
+        self.log.info("Cluster created.")
+        return cluster
+
+    def _delete_cluster(self, hook):
+        self.log.info("Deleting the cluster")
+        hook.delete_cluster(
+            region=self.region,
+            cluster_name=self.cluster_name,
+            project_id=self.project_id,
+        )
+        raise AirflowException(
+            f"Cluster {self.cluster_name} deleted due to ERROR"
+        )
+
+    def _get_cluster(self, hook):
+        return hook.get_cluster(
+            project_id=self.project_id,
+            region=self.region,
+            cluster_name=self.cluster_name,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+
+    def _handle_error_state(self, hook):
+        self.log.info("Cluster is in ERROR state")
+        gcs_uri = hook.diagnose_cluster(
+            region=self.region,
+            cluster_name=self.cluster_name,
+            project_id=self.project_id,
+        )
+        self.log.info(
+            'Diagnostic information for cluster %s available at: %s',
+            self.cluster_name, gcs_uri
+        )
+        if self.delete_on_error:
+            self._delete_cluster(hook)
+
+    def _wait_for_cluster_in_deleting_state(self, hook):
+        time_left = 60 * 5
+        for time_to_sleep in exponential_sleep_generator(initial=10, 
maximum=120):
+            if time_left < 0:
+                raise AirflowException(
+                    f"Cluster {self.cluster_name} is still DELETING state, 
aborting"
+                )
+            time.sleep(time_to_sleep)
+            time_left = time_left - time_to_sleep
+            try:
+                self._get_cluster(hook)
+            except NotFound:
+                break
 
     def execute(self, context):
         self.log.info('Creating cluster: %s', self.cluster_name)
         hook = DataprocHook(gcp_conn_id=self.gcp_conn_id)
         try:
-            operation = hook.create_cluster(
-                project_id=self.project_id,
-                region=self.region,
-                cluster=self.cluster,
-                request_id=self.request_id,
-                retry=self.retry,
-                timeout=self.timeout,
-                metadata=self.metadata,
-            )
-            cluster = operation.result()
-            self.log.info("Cluster created.")
+            cluster = self._create_cluster(hook)
         except AlreadyExists:
-            cluster = hook.get_cluster(
-                project_id=self.project_id,
-                region=self.region,
-                cluster_name=self.cluster_name,
-                retry=self.retry,
-                timeout=self.timeout,
-                metadata=self.metadata,
-            )
             self.log.info("Cluster already exists.")
+            cluster = self._get_cluster(hook)

Review comment:
       if cluster is in creating state should you block til it reaches running?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to