MaksYermak commented on a change in pull request #19248:
URL: https://github.com/apache/airflow/pull/19248#discussion_r741283461
##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -2159,3 +2160,323 @@ def execute(self, context: Dict):
)
operation.result()
self.log.info("Updated %s cluster.", self.cluster_name)
+
+
+class DataprocCreateBatchOperator(BaseOperator):
+ """
+ Creates a batch workload.
+
+ :param project_id: Required. The ID of the Google Cloud project that the
cluster belongs to.
+ :type project_id: str
+ :param region: Required. The Cloud Dataproc region in which to handle the
request.
+ :type region: str
+ :param batch: Required. The batch to create.
+ :type batch: google.cloud.dataproc_v1.types.Batch
+ :param batch_id: Optional. The ID to use for the batch, which will become
the final component
+ of the batch's resource name.
+ This value must be 4-63 characters. Valid characters are /[a-z][0-9]-/.
+ :type batch_id: str
+ :param request_id: Optional. A unique id used to identify the request. If
the server receives two
+ ``CreateBatchRequest`` requests with the same id, then the second
request will be ignored and
+ the first ``google.longrunning.Operation`` created and stored in the
backend is returned.
+ :type request_id: str
+ :param retry: A retry object used to retry requests. If ``None`` is
specified, requests will not be
+ retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The amount of time, in seconds, to wait for the request to
complete. Note that if
+ ``retry`` is specified, the timeout applies to each individual attempt.
+ :type timeout: float
+ :param metadata: Additional metadata that is provided to the method.
+ :type metadata: Sequence[Tuple[str, str]]
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+ :type gcp_conn_id: str
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ :type impersonation_chain: Union[str, Sequence[str]]
+ """
+
+ template_fields = (
+ 'project_id',
+ 'region',
+ 'impersonation_chain',
+ )
+
+ def __init__(
+ self,
+ *,
+ region: str = None,
+ project_id: str,
+ batch: Union[Dict, Batch],
+ batch_id: Optional[str] = None,
+ request_id: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = "",
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+ self.region = region
+ self.project_id = project_id
+ self.batch = batch
+ self.batch_id = batch_id
+ self.request_id = request_id
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+ self.operation: Optional[operation.Operation] = None
+
+ def execute(self, context):
+ hook = DataprocHook(gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain)
+ self.log.info("Creating batch")
+ try:
+ self.operation = hook.create_batch(
+ region=self.region,
+ project_id=self.project_id,
+ batch=self.batch,
+ batch_id=self.batch_id,
+ request_id=self.request_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ result = hook.wait_for_operation(self.timeout, self.operation)
+ self.log.info("Batch %s created", self.batch_id)
+ return Batch.to_dict(result)
+ except AlreadyExists:
+ self.log.info("Batch with given id already exists")
Review comment:
@turbaszek I´ve fix it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]