[ 
https://issues.apache.org/jira/browse/GOBBLIN-2193?focusedWorklogId=955781&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-955781
 ]

ASF GitHub Bot logged work on GOBBLIN-2193:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 06/Feb/25 07:33
            Start Date: 06/Feb/25 07:33
    Worklog Time Spent: 10m 
      Work Description: Blazer-007 commented on code in PR #4096:
URL: https://github.com/apache/gobblin/pull/4096#discussion_r1944213593


##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/event/JobFailureEvent.java:
##########
@@ -0,0 +1,20 @@
+package org.apache.gobblin.cluster.event;
+
+import lombok.Getter;
+import org.apache.gobblin.runtime.JobState;
+
+
+/**
+ * The `JobFailureEvent` class represents an event that is triggered when a 
job fails.
+ * It contains information about the job state and a summary of the issues 
that caused the failure.
+ */
+public class JobFailureEvent {
+  @Getter
+  private final JobState jobState;
+  @Getter
+  private final String issuesSummary;
+  public JobFailureEvent(JobState jobState, String issuesSummary) {
+    this.jobState = jobState;
+    this.issuesSummary = issuesSummary;
+  }

Review Comment:
   can we use @AllArgsConstructor in place of this ?



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java:
##########
@@ -107,13 +111,38 @@ protected Config applyJobLauncherOverrides(Config config) 
{
     return configOverrides.withFallback(config);
   }
 
+  private String getIssuesSummary() {
+    TextStringBuilder sb = new TextStringBuilder();
+    try {
+      List<Issue> issues = this.getIssueRepository().getAll();
+      sb.appendln("");
+      sb.appendln("vvvvv============= Issues (summary) =============vvvvv");
+
+      for (int i = 0; i < issues.size(); i++) {
+        Issue issue = issues.get(i);
+
+        sb.appendln("%s) %s %s %s | source: %s", i + 1, 
issue.getSeverity().toString(), issue.getCode(),
+            issue.getSummary(), issue.getSourceClass());
+      }
+      sb.append("^^^^^=============================================^^^^^");
+      sb.toString();
+    }
+    catch(Exception e) {
+      log.warn("Failed to get issue summary", e);
+    }
+    return sb.toString();
+  }
+
   @Override
   protected void handleLaunchFinalization() {
     // NOTE: This code only makes sense when there is 1 source / workflow 
being launched per application for Temporal. This is a stop-gap
     // for achieving batch job behavior. Given the current constraints of yarn 
applications requiring a static proxy user
     // during application creation, it is not possible to have multiple 
workflows running in the same application.
     // and so it makes sense to just kill the job after this is completed
     log.info("Requesting the AM to shutdown after the job {} completed", 
this.jobContext.getJobId());
+    JobState jobState = this.jobContext.getJobState();
+    String issuesSummary = this.getIssuesSummary();
+    eventBus.post(new JobFailureEvent(jobState, issuesSummary));

Review Comment:
   This event more of a seems like JobIssuesSummaryEvent and not 
JobFailureEvent or we can add issues only in case of failure as anyway we have 
checked the same while unregistering AM



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/event/JobFailureEvent.java:
##########
@@ -0,0 +1,20 @@
+package org.apache.gobblin.cluster.event;
+

Review Comment:
   Please add license statements from other files



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -198,6 +201,13 @@ class YarnService extends AbstractIdleService {
   private final AtomicLong allocationRequestIdGenerator = new 
AtomicLong(DEFAULT_ALLOCATION_REQUEST_ID);
   private final ConcurrentMap<Long, WorkerProfile> 
workerProfileByAllocationRequestId = new ConcurrentHashMap<>();
 
+  @VisibleForTesting
+  @Getter(AccessLevel.PROTECTED)
+  private JobState jobState;
+  @VisibleForTesting
+  @Getter(AccessLevel.PROTECTED)
+  private String jobIssuesSummary;
+

Review Comment:
   I believe we can directly make them protected instead of private



##########
gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java:
##########
@@ -380,6 +382,19 @@ public void launch() throws IOException, YarnException, 
InterruptedException {
     }, 0, this.appReportIntervalMinutes, TimeUnit.MINUTES);
 
     addServices();
+
+    synchronized (this.applicationDone) {
+      while (!this.applicationCompleted) {
+        try {
+          this.applicationDone.wait();
+          if (this.applicationFailed) {
+            throw new RuntimeException("Gobblin Yarn application failed");
+          }
+        } catch (InterruptedException ie) {
+          LOGGER.error("Interrupted while waiting for the Gobblin Yarn 
application to finish", ie);
+        }
+      }
+    }

Review Comment:
   Can you please add comment on the importance of this wait ?



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -353,7 +370,11 @@ protected void shutDown() throws IOException {
         }
       }
 
-      
this.amrmClientAsync.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
 null, null);
+      if (this.jobState != null && !this.jobState.getState().isSuccess()) {
+        
this.amrmClientAsync.unregisterApplicationMaster(FinalApplicationStatus.FAILED, 
this.jobIssuesSummary, null);
+      } else {
+        
this.amrmClientAsync.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
 null, null);
+      }

Review Comment:
   Should we add issuesSummary even in succeed cases as well ?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 955781)
    Time Spent: 40m  (was: 0.5h)

> Fail Azkaban job on when temporal job fails
> -------------------------------------------
>
>                 Key: GOBBLIN-2193
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2193
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Swapnil Palash
>            Assignee: Hung Tran
>            Priority: Major
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> Currently when the temporal job running on Yarn fails, we don't propagate the 
> error back to Azkaban job which launches the Yarn Application. 
> The change here bubbles the issues encountered when the job fails upto the 
> GobblinYarnAppLaucher run by the Azkaban job and fails with a 
> RuntimeException after logging the issues summary. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to