This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7d2ec76c72 Make the deferrable version of DataprocCreateBatchOperator
handle a batch_id that already exists (#32216)
7d2ec76c72 is described below
commit 7d2ec76c72f70259b67af0047aa785b28668b411
Author: Kristopher Kane <[email protected]>
AuthorDate: Thu Jun 29 09:51:17 2023 -0400
Make the deferrable version of DataprocCreateBatchOperator handle a
batch_id that already exists (#32216)
---
.../providers/google/cloud/operators/dataproc.py | 87 +++++++++++++---------
1 file changed, 52 insertions(+), 35 deletions(-)
diff --git a/airflow/providers/google/cloud/operators/dataproc.py
b/airflow/providers/google/cloud/operators/dataproc.py
index ff9a58cc34..db7d785347 100644
--- a/airflow/providers/google/cloud/operators/dataproc.py
+++ b/airflow/providers/google/cloud/operators/dataproc.py
@@ -172,7 +172,6 @@ class ClusterGenerator:
enable_component_gateway: bool | None = False,
**kwargs,
) -> None:
-
self.project_id = project_id
self.num_masters = num_masters
self.num_workers = num_workers
@@ -489,7 +488,6 @@ class
DataprocCreateClusterOperator(GoogleCloudBaseOperator):
polling_interval_seconds: int = 10,
**kwargs,
) -> None:
-
# TODO: remove one day
if cluster_config is None and virtual_cluster_config is None:
warnings.warn(
@@ -2333,6 +2331,7 @@ class
DataprocCreateBatchOperator(GoogleCloudBaseOperator):
return self.operation.operation.name
else:
+ # processing ends in execute_complete
self.defer(
trigger=DataprocBatchTrigger(
batch_id=self.batch_id,
@@ -2350,35 +2349,35 @@ class
DataprocCreateBatchOperator(GoogleCloudBaseOperator):
# This is only likely to happen if batch_id was provided
# Could be running if Airflow was restarted after task started
# poll until a final state is reached
- if self.batch_id:
- self.log.info("Attaching to the job (%s) if it is still
running.", self.batch_id)
- result = hook.wait_for_batch(
- batch_id=self.batch_id,
- region=self.region,
- project_id=self.project_id,
- retry=self.retry,
- timeout=self.timeout,
- metadata=self.metadata,
- wait_check_interval=self.polling_interval_seconds,
+
+ self.log.info("Attaching to the job %s if it is still running.",
self.batch_id)
+
+ # deferrable handling of a batch_id that already exists -
processing ends in execute_complete
+ if self.deferrable:
+ self.defer(
+ trigger=DataprocBatchTrigger(
+ batch_id=self.batch_id,
+ project_id=self.project_id,
+ region=self.region,
+ gcp_conn_id=self.gcp_conn_id,
+ impersonation_chain=self.impersonation_chain,
+ polling_interval_seconds=self.polling_interval_seconds,
+ ),
+ method_name="execute_complete",
)
- # It is possible we don't have a result in the case where batch_id was
not provide, one was generated
- # by chance, AlreadyExists was caught, but we can't reattach because
we don't have the generated id
- if result is None:
- raise AirflowException("The job could not be reattached because
the id was generated.")
- # The existing batch may be a number of states other than 'SUCCEEDED'\
- # wait_for_operation doesn't fail if the job is cancelled, so we will
check for it here which also
- # finds a cancelling|canceled|unspecified job from wait_for_batch
+ # non-deferrable handling of a batch_id that already exists
+ result = hook.wait_for_batch(
+ batch_id=self.batch_id,
+ region=self.region,
+ project_id=self.project_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ wait_check_interval=self.polling_interval_seconds,
+ )
batch_id = self.batch_id or result.name.split("/")[-1]
- link = DATAPROC_BATCH_LINK.format(region=self.region,
project_id=self.project_id, resource=batch_id)
- if result.state == Batch.State.FAILED:
- raise AirflowException(f"Batch job {batch_id} failed. Driver
Logs: {link}")
- if result.state in (Batch.State.CANCELLED, Batch.State.CANCELLING):
- raise AirflowException(f"Batch job {batch_id} was cancelled.
Driver logs: {link}")
- if result.state == Batch.State.STATE_UNSPECIFIED:
- raise AirflowException(f"Batch job {batch_id} unspecified. Driver
logs: {link}")
- self.log.info("Batch job %s completed. Driver logs: %s", batch_id,
link)
- DataprocLink.persist(context=context, task_instance=self,
url=DATAPROC_BATCH_LINK, resource=batch_id)
+ self.handle_batch_status(context, result.state, batch_id)
return Batch.to_dict(result)
def execute_complete(self, context, event=None) -> None:
@@ -2389,19 +2388,37 @@ class
DataprocCreateBatchOperator(GoogleCloudBaseOperator):
"""
if event is None:
raise AirflowException("Batch failed.")
- batch_state = event["batch_state"]
+ state = event["batch_state"]
batch_id = event["batch_id"]
-
- if batch_state == Batch.State.FAILED:
- raise AirflowException(f"Batch failed:\n{batch_id}")
- if batch_state == Batch.State.CANCELLED:
- raise AirflowException(f"Batch was cancelled:\n{batch_id}")
- self.log.info("%s completed successfully.", self.task_id)
+ self.handle_batch_status(context, state, batch_id)
def on_kill(self):
if self.operation:
self.operation.cancel()
+ def handle_batch_status(self, context: Context, state: Batch.State,
batch_id: str) -> None:
+ # The existing batch may be a number of states other than 'SUCCEEDED'\
+ # wait_for_operation doesn't fail if the job is cancelled, so we will
check for it here which also
+ # finds a cancelling|canceled|unspecified job from wait_for_batch or
the deferred trigger
+ link = DATAPROC_BATCH_LINK.format(region=self.region,
project_id=self.project_id, resource=batch_id)
+ if state == Batch.State.FAILED:
+ DataprocLink.persist(
+ context=context, task_instance=self, url=DATAPROC_BATCH_LINK,
resource=batch_id
+ )
+ raise AirflowException("Batch job %s failed. Driver Logs: %s",
batch_id, link)
+ if state in (Batch.State.CANCELLED, Batch.State.CANCELLING):
+ DataprocLink.persist(
+ context=context, task_instance=self, url=DATAPROC_BATCH_LINK,
resource=batch_id
+ )
+ raise AirflowException("Batch job %s was cancelled. Driver logs:
%s", batch_id, link)
+ if state == Batch.State.STATE_UNSPECIFIED:
+ DataprocLink.persist(
+ context=context, task_instance=self, url=DATAPROC_BATCH_LINK,
resource=batch_id
+ )
+ raise AirflowException("Batch job %s unspecified. Driver logs:
%s", batch_id, link)
+ self.log.info("Batch job %s completed. Driver logs: %s", batch_id,
link)
+ DataprocLink.persist(context=context, task_instance=self,
url=DATAPROC_BATCH_LINK, resource=batch_id)
+
class DataprocDeleteBatchOperator(GoogleCloudBaseOperator):
"""Delete the batch workload resource.