This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d0f4512ecb Fix DataprocSubmitJobOperator to retrieve failed job error 
message (#36053)
d0f4512ecb is described below

commit d0f4512ecb9c0683a60be7b0de8945948444df8e
Author: VladaZakharova <[email protected]>
AuthorDate: Mon Dec 4 16:27:34 2023 +0100

    Fix DataprocSubmitJobOperator to retrieve failed job error message (#36053)
    
    Co-authored-by: Ulada Zakharava <[email protected]>
---
 airflow/providers/google/cloud/operators/dataproc.py | 7 ++++---
 airflow/providers/google/cloud/triggers/dataproc.py  | 7 ++-----
 2 files changed, 6 insertions(+), 8 deletions(-)

diff --git a/airflow/providers/google/cloud/operators/dataproc.py 
b/airflow/providers/google/cloud/operators/dataproc.py
index b489a79dc8..477ddbf7d0 100644
--- a/airflow/providers/google/cloud/operators/dataproc.py
+++ b/airflow/providers/google/cloud/operators/dataproc.py
@@ -2152,7 +2152,7 @@ class DataprocSubmitJobOperator(GoogleCloudBaseOperator):
         Service Account Token Creator IAM role to the directly preceding 
identity, with first
         account from the list granting this role to the originating account 
(templated).
     :param asynchronous: Flag to return after submitting the job to the 
Dataproc API.
-        This is useful for submitting long running jobs and
+        This is useful for submitting long-running jobs and
         waiting on them asynchronously using the DataprocJobSensor
     :param deferrable: Run operator in the deferrable mode
     :param polling_interval_seconds: time in seconds between polling for job 
completion.
@@ -2267,10 +2267,11 @@ class 
DataprocSubmitJobOperator(GoogleCloudBaseOperator):
         """
         job_state = event["job_state"]
         job_id = event["job_id"]
+        job = event["job"]
         if job_state == JobStatus.State.ERROR:
-            raise AirflowException(f"Job failed:\n{job_id}")
+            raise AirflowException(f"Job {job_id} failed:\n{job}")
         if job_state == JobStatus.State.CANCELLED:
-            raise AirflowException(f"Job was cancelled:\n{job_id}")
+            raise AirflowException(f"Job {job_id} was cancelled:\n{job}")
         self.log.info("%s completed successfully.", self.task_id)
         return job_id
 
diff --git a/airflow/providers/google/cloud/triggers/dataproc.py 
b/airflow/providers/google/cloud/triggers/dataproc.py
index 7d7215ebc2..e03f7a14ca 100644
--- a/airflow/providers/google/cloud/triggers/dataproc.py
+++ b/airflow/providers/google/cloud/triggers/dataproc.py
@@ -25,7 +25,6 @@ from typing import Any, AsyncIterator, Sequence
 from google.api_core.exceptions import NotFound
 from google.cloud.dataproc_v1 import Batch, ClusterStatus, JobStatus
 
-from airflow.exceptions import AirflowException
 from airflow.providers.google.cloud.hooks.dataproc import DataprocAsyncHook
 from airflow.triggers.base import BaseTrigger, TriggerEvent
 
@@ -98,12 +97,10 @@ class DataprocSubmitTrigger(DataprocBaseTrigger):
             )
             state = job.status.state
             self.log.info("Dataproc job: %s is in state: %s", self.job_id, 
state)
-            if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED):
+            if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED, 
JobStatus.State.ERROR):
                 break
-            elif state == JobStatus.State.ERROR:
-                raise AirflowException(f"Dataproc job execution failed 
{self.job_id}")
             await asyncio.sleep(self.polling_interval_seconds)
-        yield TriggerEvent({"job_id": self.job_id, "job_state": state})
+        yield TriggerEvent({"job_id": self.job_id, "job_state": state, "job": 
job})
 
 
 class DataprocClusterTrigger(DataprocBaseTrigger):

Reply via email to