arezamoosavi commented on a change in pull request #21619:
URL: https://github.com/apache/airflow/pull/21619#discussion_r809046236
##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -659,6 +680,264 @@ def execute(self, context: 'Context') -> dict:
return Cluster.to_dict(cluster)
+class DataprocStartClusterOperator(BaseOperator):
+ """
+ Starts a cluster in a project.
+
+ :param project_id: Required. The ID of the Google Cloud project that the
cluster belongs to (templated).
+ :param region: Required. The Cloud Dataproc region in which to handle the
request (templated).
+ :param cluster_name: Required. The cluster name (templated).
+ :param cluster_uuid: Optional. Specifying the ``cluster_uuid`` means the
RPC should fail
+ if cluster with specified UUID does not exist.
+ :param request_id: Optional. A unique id used to identify the request. If
the server receives two
+ ``DeleteClusterRequest`` requests with the same id, then the second
request will be ignored and the
+ first ``google.longrunning.Operation`` created and stored in the
backend is returned.
+ :param retry: A retry object used to retry requests. If ``None`` is
specified, requests will not be
+ retried.
+ :param timeout: The amount of time, in seconds, to wait for the request to
complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ """
+
+ template_fields: Sequence[str] = (
+ 'project_id', 'region', 'cluster_name', 'impersonation_chain')
+
+ def __init__(
+ self,
+ *,
+ project_id: str,
+ region: str,
+ cluster_name: str,
+ cluster_uuid: Optional[str] = None,
+ request_id: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Sequence[Tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.cluster_name = cluster_name
+ self.cluster_uuid = cluster_uuid
+ self.request_id = request_id
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def _start_cluster(self, hook: DataprocHook):
+ hook.start_cluster(
+ project_id=self.project_id,
+ region=self.region,
+ cluster_name=self.cluster_name,
+ labels=self.labels,
+ cluster_config=self.cluster_config,
+ request_id=self.request_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+
+ def _get_cluster(self, hook: DataprocHook) -> Cluster:
+ return hook.get_cluster(
+ project_id=self.project_id,
+ region=self.region,
+ cluster_name=self.cluster_name,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+
+ def _handle_error_state(self, hook: DataprocHook, cluster: Cluster) ->
None:
+ if cluster.status.state != cluster.status.State.ERROR:
+ return
+ self.log.info("Cluster is in ERROR state")
+ gcs_uri = hook.diagnose_cluster(
+ region=self.region, cluster_name=self.cluster_name,
project_id=self.project_id
+ )
+ self.log.info(
+ 'Diagnostic information for cluster %s available at: %s',
self.cluster_name, gcs_uri)
+ raise AirflowException("Cluster was started but is in ERROR state")
+
+ def _wait_for_cluster_in_starting_state(self, hook: DataprocHook) ->
Cluster:
+ time_left = self.timeout
+ cluster = self._get_cluster(hook)
+ for time_to_sleep in exponential_sleep_generator(initial=10,
maximum=120):
+ if cluster.status.state != cluster.status.State.RUNNING:
+ break
+ if time_left < 0:
+ raise AirflowException(
+ f"Cluster {self.cluster_name} is still CREATING state,
aborting")
+ time.sleep(time_to_sleep)
+ time_left = time_left - time_to_sleep
+ cluster = self._get_cluster(hook)
+ return cluster
+
+ def execute(self, context: 'Context') -> None:
+ self.log.info('Starting cluster: %s', self.cluster_name)
+ hook = DataprocHook(gcp_conn_id=self.gcp_conn_id,
+ impersonation_chain=self.impersonation_chain)
+ # Save data required to display extra link no matter what the cluster
status will be
+ self.xcom_push(
+ context,
+ key="cluster_conf",
+ value={
+ "cluster_name": self.cluster_name,
+ "region": self.region,
+ "project_id": self.project_id,
+ },
+ )
Review comment:
Also, there is another way for set: XCom.set. In the code, they all used
xcom_push in the execution section.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]