potiuk commented on a change in pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#discussion_r464284946
##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -502,32 +506,90 @@ def __init__(self,
self.timeout = timeout
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
+ self.delete_on_error = delete_on_error
+
+ def _create_cluster(self, hook):
+ operation = hook.create_cluster(
+ project_id=self.project_id,
+ region=self.region,
+ cluster=self.cluster,
+ request_id=self.request_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ cluster = operation.result()
+ self.log.info("Cluster created.")
+ return cluster
+
+ def _delete_cluster(self, hook):
+ self.log.info("Deleting the cluster")
+ hook.delete_cluster(
+ region=self.region,
+ cluster_name=self.cluster_name,
+ project_id=self.project_id,
+ )
+ raise AirflowException(
+ f"Cluster {self.cluster_name} deleted due to ERROR"
+ )
+
+ def _get_cluster(self, hook):
+ return hook.get_cluster(
+ project_id=self.project_id,
+ region=self.region,
+ cluster_name=self.cluster_name,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+
+ def _handle_error_state(self, hook):
+ self.log.info("Cluster is in ERROR state")
+ gcs_uri = hook.diagnose_cluster(
+ region=self.region,
+ cluster_name=self.cluster_name,
+ project_id=self.project_id,
+ )
+ self.log.info(
+ 'Diagnostic information for cluster %s available at: %s',
+ self.cluster_name, gcs_uri
+ )
+ if self.delete_on_error:
+ self._delete_cluster(hook)
+
+ def _wait_for_cluster_in_deleting_state(self, hook):
+ time_left = 60 * 5
+ for time_to_sleep in exponential_sleep_generator(initial=10,
maximum=120):
+ if time_left < 0:
+ raise AirflowException(
+ f"Cluster {self.cluster_name} is still DELETING state,
aborting"
+ )
+ time.sleep(time_to_sleep)
+ time_left = time_left - time_to_sleep
+ try:
+ self._get_cluster(hook)
+ except NotFound:
+ break
def execute(self, context):
self.log.info('Creating cluster: %s', self.cluster_name)
hook = DataprocHook(gcp_conn_id=self.gcp_conn_id)
try:
- operation = hook.create_cluster(
- project_id=self.project_id,
- region=self.region,
- cluster=self.cluster,
- request_id=self.request_id,
- retry=self.retry,
- timeout=self.timeout,
- metadata=self.metadata,
- )
- cluster = operation.result()
- self.log.info("Cluster created.")
+ cluster = self._create_cluster(hook)
except AlreadyExists:
- cluster = hook.get_cluster(
- project_id=self.project_id,
- region=self.region,
- cluster_name=self.cluster_name,
- retry=self.retry,
- timeout=self.timeout,
- metadata=self.metadata,
- )
self.log.info("Cluster already exists.")
+ cluster = self._get_cluster(hook)
Review comment:
Yeah. See my "terraform" comment above. I think we are pretty good with
manual deletion of the cluster in case we want to change configuration, I don't
think we should handle all potential complexity of computing difference between
expected/actual cluster configuration.
##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -437,6 +437,9 @@ class DataprocCreateClusterOperator(BaseOperator):
:type project_id: str
:param region: leave as 'global', might become relevant in the future.
(templated)
:type region: str
+ :parm delete_on_error: If true the claster will be deleted if created with
ERROR state. Default
+ value is true.
+ :type delete_on_error: bool
Review comment:
I think this is not really something that should be handled by the
operator itself. I'd argue that if you really want to change configuration of
cluster you can simply delete it manually and let it be re-created. I think
working in a "terraformy" or "kubectly" "apply" fashion in this case should be
left to terraform. I.e. if you really want to use this kind of approach, why
not write a terraform script and run terraform.
BTW. Offtop - but should not we think about adding a Terraform/Terragrunt
operator to Airflow ? I'd say it might be a good idea to have such an operator
with some pre-defined ways on how to get terraform/terragrunt scripts in and
how to integrate with airflow's JINJA templating.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]