This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d0f4512ecb Fix DataprocSubmitJobOperator to retrieve failed job error
message (#36053)
d0f4512ecb is described below
commit d0f4512ecb9c0683a60be7b0de8945948444df8e
Author: VladaZakharova <[email protected]>
AuthorDate: Mon Dec 4 16:27:34 2023 +0100
Fix DataprocSubmitJobOperator to retrieve failed job error message (#36053)
Co-authored-by: Ulada Zakharava <[email protected]>
---
airflow/providers/google/cloud/operators/dataproc.py | 7 ++++---
airflow/providers/google/cloud/triggers/dataproc.py | 7 ++-----
2 files changed, 6 insertions(+), 8 deletions(-)
diff --git a/airflow/providers/google/cloud/operators/dataproc.py
b/airflow/providers/google/cloud/operators/dataproc.py
index b489a79dc8..477ddbf7d0 100644
--- a/airflow/providers/google/cloud/operators/dataproc.py
+++ b/airflow/providers/google/cloud/operators/dataproc.py
@@ -2152,7 +2152,7 @@ class DataprocSubmitJobOperator(GoogleCloudBaseOperator):
Service Account Token Creator IAM role to the directly preceding
identity, with first
account from the list granting this role to the originating account
(templated).
:param asynchronous: Flag to return after submitting the job to the
Dataproc API.
- This is useful for submitting long running jobs and
+ This is useful for submitting long-running jobs and
waiting on them asynchronously using the DataprocJobSensor
:param deferrable: Run operator in the deferrable mode
:param polling_interval_seconds: time in seconds between polling for job
completion.
@@ -2267,10 +2267,11 @@ class
DataprocSubmitJobOperator(GoogleCloudBaseOperator):
"""
job_state = event["job_state"]
job_id = event["job_id"]
+ job = event["job"]
if job_state == JobStatus.State.ERROR:
- raise AirflowException(f"Job failed:\n{job_id}")
+ raise AirflowException(f"Job {job_id} failed:\n{job}")
if job_state == JobStatus.State.CANCELLED:
- raise AirflowException(f"Job was cancelled:\n{job_id}")
+ raise AirflowException(f"Job {job_id} was cancelled:\n{job}")
self.log.info("%s completed successfully.", self.task_id)
return job_id
diff --git a/airflow/providers/google/cloud/triggers/dataproc.py
b/airflow/providers/google/cloud/triggers/dataproc.py
index 7d7215ebc2..e03f7a14ca 100644
--- a/airflow/providers/google/cloud/triggers/dataproc.py
+++ b/airflow/providers/google/cloud/triggers/dataproc.py
@@ -25,7 +25,6 @@ from typing import Any, AsyncIterator, Sequence
from google.api_core.exceptions import NotFound
from google.cloud.dataproc_v1 import Batch, ClusterStatus, JobStatus
-from airflow.exceptions import AirflowException
from airflow.providers.google.cloud.hooks.dataproc import DataprocAsyncHook
from airflow.triggers.base import BaseTrigger, TriggerEvent
@@ -98,12 +97,10 @@ class DataprocSubmitTrigger(DataprocBaseTrigger):
)
state = job.status.state
self.log.info("Dataproc job: %s is in state: %s", self.job_id,
state)
- if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED):
+ if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED,
JobStatus.State.ERROR):
break
- elif state == JobStatus.State.ERROR:
- raise AirflowException(f"Dataproc job execution failed
{self.job_id}")
await asyncio.sleep(self.polling_interval_seconds)
- yield TriggerEvent({"job_id": self.job_id, "job_state": state})
+ yield TriggerEvent({"job_id": self.job_id, "job_state": state, "job":
job})
class DataprocClusterTrigger(DataprocBaseTrigger):