VladaZakharova commented on code in PR #60083:
URL: https://github.com/apache/airflow/pull/60083#discussion_r2661091786


##########
providers/google/tests/system/google/cloud/dataproc/example_dataproc_delete_cluster.py:
##########
@@ -0,0 +1,127 @@
+# Licensed to the Apache Software Foundation (ASF) under one

Review Comment:
   We usually don't add a new system test for the updated logic in the 
operator. We already test this operator, so new need for the new one



##########
providers/google/src/airflow/providers/google/cloud/operators/dataproc.py:
##########
@@ -995,35 +995,35 @@ def __init__(
 
     def execute(self, context: Context) -> None:
         hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, 
impersonation_chain=self.impersonation_chain)
-        operation = self._delete_cluster(hook)
-        if not self.deferrable:
-            hook.wait_for_operation(timeout=self.timeout, 
result_retry=self.retry, operation=operation)
-            self.log.info("Cluster deleted.")
-        else:
-            try:
-                hook.get_cluster(
-                    project_id=self.project_id, region=self.region, 
cluster_name=self.cluster_name
-                )
-            except NotFound:
+        try:
+            op: operation.Operation = self._delete_cluster(hook)
+            if not self.deferrable:
+                hook.wait_for_operation(timeout=self.timeout, 
result_retry=self.retry, operation=op)
                 self.log.info("Cluster deleted.")
-                return
-            except Exception as e:
-                raise AirflowException(str(e))
-
-            end_time: float = time.time() + self.timeout
-            self.defer(
-                trigger=DataprocDeleteClusterTrigger(
-                    gcp_conn_id=self.gcp_conn_id,
-                    project_id=self.project_id,
-                    region=self.region,
-                    cluster_name=self.cluster_name,
-                    end_time=end_time,
-                    metadata=self.metadata,
-                    impersonation_chain=self.impersonation_chain,
-                    polling_interval_seconds=self.polling_interval_seconds,
-                ),
-                method_name="execute_complete",
+            else:
+                end_time: float = time.time() + self.timeout
+                self.defer(
+                    trigger=DataprocDeleteClusterTrigger(
+                        gcp_conn_id=self.gcp_conn_id,
+                        project_id=self.project_id,
+                        region=self.region,
+                        cluster_name=self.cluster_name,
+                        end_time=end_time,
+                        metadata=self.metadata,
+                        impersonation_chain=self.impersonation_chain,
+                        polling_interval_seconds=self.polling_interval_seconds,
+                    ),
+                    method_name="execute_complete",
+                )
+        except NotFound:

Review Comment:
   NotFound exception I think will not be raised here from deferrable mode. If 
it will be raised, it will be raised by the API call, not the def mode itself.



##########
providers/google/src/airflow/providers/google/cloud/operators/dataproc.py:
##########
@@ -995,35 +995,35 @@ def __init__(
 
     def execute(self, context: Context) -> None:
         hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, 
impersonation_chain=self.impersonation_chain)
-        operation = self._delete_cluster(hook)
-        if not self.deferrable:
-            hook.wait_for_operation(timeout=self.timeout, 
result_retry=self.retry, operation=operation)
-            self.log.info("Cluster deleted.")
-        else:
-            try:
-                hook.get_cluster(
-                    project_id=self.project_id, region=self.region, 
cluster_name=self.cluster_name
-                )
-            except NotFound:
+        try:
+            op: operation.Operation = self._delete_cluster(hook)
+            if not self.deferrable:
+                hook.wait_for_operation(timeout=self.timeout, 
result_retry=self.retry, operation=op)
                 self.log.info("Cluster deleted.")
-                return
-            except Exception as e:
-                raise AirflowException(str(e))
-
-            end_time: float = time.time() + self.timeout
-            self.defer(
-                trigger=DataprocDeleteClusterTrigger(
-                    gcp_conn_id=self.gcp_conn_id,
-                    project_id=self.project_id,
-                    region=self.region,
-                    cluster_name=self.cluster_name,
-                    end_time=end_time,
-                    metadata=self.metadata,
-                    impersonation_chain=self.impersonation_chain,
-                    polling_interval_seconds=self.polling_interval_seconds,
-                ),
-                method_name="execute_complete",
+            else:
+                end_time: float = time.time() + self.timeout
+                self.defer(
+                    trigger=DataprocDeleteClusterTrigger(
+                        gcp_conn_id=self.gcp_conn_id,
+                        project_id=self.project_id,
+                        region=self.region,
+                        cluster_name=self.cluster_name,
+                        end_time=end_time,
+                        metadata=self.metadata,
+                        impersonation_chain=self.impersonation_chain,
+                        polling_interval_seconds=self.polling_interval_seconds,
+                    ),
+                    method_name="execute_complete",
+                )
+        except NotFound:
+            self.log.info(
+                "Cluster %s not found in region %s. Skipping deletion.", 
self.cluster_name, self.region
+            )
+            raise AirflowSkipException(

Review Comment:
   I am not sure why we need to add this exception? The task will fail anyway 
if the cluster can not be deleted, I am not sure that logic here should be like 
this with raising SkipException



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to