jaketf commented on a change in pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#discussion_r452554626
##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -502,32 +506,90 @@ def __init__(self,
self.timeout = timeout
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
+ self.delete_on_error = delete_on_error
+
+ def _create_cluster(self, hook):
+ operation = hook.create_cluster(
+ project_id=self.project_id,
+ region=self.region,
+ cluster=self.cluster,
+ request_id=self.request_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ cluster = operation.result()
+ self.log.info("Cluster created.")
+ return cluster
+
+ def _delete_cluster(self, hook):
+ self.log.info("Deleting the cluster")
+ hook.delete_cluster(
+ region=self.region,
+ cluster_name=self.cluster_name,
+ project_id=self.project_id,
+ )
+ raise AirflowException(
+ f"Cluster {self.cluster_name} deleted due to ERROR"
+ )
+
+ def _get_cluster(self, hook):
+ return hook.get_cluster(
+ project_id=self.project_id,
+ region=self.region,
+ cluster_name=self.cluster_name,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+
+ def _handle_error_state(self, hook):
+ self.log.info("Cluster is in ERROR state")
+ gcs_uri = hook.diagnose_cluster(
+ region=self.region,
+ cluster_name=self.cluster_name,
+ project_id=self.project_id,
+ )
+ self.log.info(
+ 'Diagnostic information for cluster %s available at: %s',
+ self.cluster_name, gcs_uri
+ )
+ if self.delete_on_error:
+ self._delete_cluster(hook)
+
+ def _wait_for_cluster_in_deleting_state(self, hook):
+ time_left = 60 * 5
+ for time_to_sleep in exponential_sleep_generator(initial=10,
maximum=120):
+ if time_left < 0:
+ raise AirflowException(
+ f"Cluster {self.cluster_name} is still DELETING state,
aborting"
+ )
+ time.sleep(time_to_sleep)
+ time_left = time_left - time_to_sleep
+ try:
+ self._get_cluster(hook)
+ except NotFound:
+ break
def execute(self, context):
self.log.info('Creating cluster: %s', self.cluster_name)
hook = DataprocHook(gcp_conn_id=self.gcp_conn_id)
try:
- operation = hook.create_cluster(
- project_id=self.project_id,
- region=self.region,
- cluster=self.cluster,
- request_id=self.request_id,
- retry=self.retry,
- timeout=self.timeout,
- metadata=self.metadata,
- )
- cluster = operation.result()
- self.log.info("Cluster created.")
+ cluster = self._create_cluster(hook)
except AlreadyExists:
- cluster = hook.get_cluster(
- project_id=self.project_id,
- region=self.region,
- cluster_name=self.cluster_name,
- retry=self.retry,
- timeout=self.timeout,
- metadata=self.metadata,
- )
self.log.info("Cluster already exists.")
+ cluster = self._get_cluster(hook)
Review comment:
If cluster already exists (I assume this is checked by cluster id /
name) then you should assert that it matches the any configuration explicitly
specified in this operator by the user (e.g. a cluster could exist with this ID
but have different dataproc version / missing init actions / etc.) you would
not want to consider this a successful run of this operator as it did not meet
it's contract of creating a cluster with explicit XYZ config provided by the
user.
IMO this should result in a task failure as it is not clear what the
operator should do in this scenario? delete the existing cluster? add a uuid
suffix to avoid the name clash and create a new cluster?
##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -437,6 +437,9 @@ class DataprocCreateClusterOperator(BaseOperator):
:type project_id: str
:param region: leave as 'global', might become relevant in the future.
(templated)
:type region: str
+ :parm delete_on_error: If true the claster will be deleted if created with
ERROR state. Default
Review comment:
spelling
```suggestion
:parm delete_on_error: If true the cluster will be deleted if created
with ERROR state. Default
```
##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -502,32 +506,90 @@ def __init__(self,
self.timeout = timeout
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
+ self.delete_on_error = delete_on_error
+
+ def _create_cluster(self, hook):
+ operation = hook.create_cluster(
+ project_id=self.project_id,
+ region=self.region,
+ cluster=self.cluster,
+ request_id=self.request_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ cluster = operation.result()
+ self.log.info("Cluster created.")
+ return cluster
+
+ def _delete_cluster(self, hook):
+ self.log.info("Deleting the cluster")
+ hook.delete_cluster(
+ region=self.region,
+ cluster_name=self.cluster_name,
+ project_id=self.project_id,
+ )
+ raise AirflowException(
+ f"Cluster {self.cluster_name} deleted due to ERROR"
+ )
+
+ def _get_cluster(self, hook):
+ return hook.get_cluster(
+ project_id=self.project_id,
+ region=self.region,
+ cluster_name=self.cluster_name,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+
+ def _handle_error_state(self, hook):
+ self.log.info("Cluster is in ERROR state")
+ gcs_uri = hook.diagnose_cluster(
+ region=self.region,
+ cluster_name=self.cluster_name,
+ project_id=self.project_id,
+ )
+ self.log.info(
+ 'Diagnostic information for cluster %s available at: %s',
+ self.cluster_name, gcs_uri
+ )
+ if self.delete_on_error:
+ self._delete_cluster(hook)
+
+ def _wait_for_cluster_in_deleting_state(self, hook):
+ time_left = 60 * 5
+ for time_to_sleep in exponential_sleep_generator(initial=10,
maximum=120):
+ if time_left < 0:
+ raise AirflowException(
+ f"Cluster {self.cluster_name} is still DELETING state,
aborting"
+ )
+ time.sleep(time_to_sleep)
+ time_left = time_left - time_to_sleep
+ try:
+ self._get_cluster(hook)
+ except NotFound:
+ break
def execute(self, context):
self.log.info('Creating cluster: %s', self.cluster_name)
hook = DataprocHook(gcp_conn_id=self.gcp_conn_id)
try:
- operation = hook.create_cluster(
- project_id=self.project_id,
- region=self.region,
- cluster=self.cluster,
- request_id=self.request_id,
- retry=self.retry,
- timeout=self.timeout,
- metadata=self.metadata,
- )
- cluster = operation.result()
- self.log.info("Cluster created.")
+ cluster = self._create_cluster(hook)
except AlreadyExists:
- cluster = hook.get_cluster(
- project_id=self.project_id,
- region=self.region,
- cluster_name=self.cluster_name,
- retry=self.retry,
- timeout=self.timeout,
- metadata=self.metadata,
- )
self.log.info("Cluster already exists.")
+ cluster = self._get_cluster(hook)
Review comment:
if cluster is in creating state should you block til it reaches running?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]