[ https://issues.apache.org/jira/browse/GOBBLIN-2193?focusedWorklogId=955781&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-955781 ]
ASF GitHub Bot logged work on GOBBLIN-2193: ------------------------------------------- Author: ASF GitHub Bot Created on: 06/Feb/25 07:33 Start Date: 06/Feb/25 07:33 Worklog Time Spent: 10m Work Description: Blazer-007 commented on code in PR #4096: URL: https://github.com/apache/gobblin/pull/4096#discussion_r1944213593 ########## gobblin-cluster/src/main/java/org/apache/gobblin/cluster/event/JobFailureEvent.java: ########## @@ -0,0 +1,20 @@ +package org.apache.gobblin.cluster.event; + +import lombok.Getter; +import org.apache.gobblin.runtime.JobState; + + +/** + * The `JobFailureEvent` class represents an event that is triggered when a job fails. + * It contains information about the job state and a summary of the issues that caused the failure. + */ +public class JobFailureEvent { + @Getter + private final JobState jobState; + @Getter + private final String issuesSummary; + public JobFailureEvent(JobState jobState, String issuesSummary) { + this.jobState = jobState; + this.issuesSummary = issuesSummary; + } Review Comment: can we use @AllArgsConstructor in place of this ? ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java: ########## @@ -107,13 +111,38 @@ protected Config applyJobLauncherOverrides(Config config) { return configOverrides.withFallback(config); } + private String getIssuesSummary() { + TextStringBuilder sb = new TextStringBuilder(); + try { + List<Issue> issues = this.getIssueRepository().getAll(); + sb.appendln(""); + sb.appendln("vvvvv============= Issues (summary) =============vvvvv"); + + for (int i = 0; i < issues.size(); i++) { + Issue issue = issues.get(i); + + sb.appendln("%s) %s %s %s | source: %s", i + 1, issue.getSeverity().toString(), issue.getCode(), + issue.getSummary(), issue.getSourceClass()); + } + sb.append("^^^^^=============================================^^^^^"); + sb.toString(); + } + catch(Exception e) { + log.warn("Failed to get issue summary", e); + } + return sb.toString(); + } + @Override protected void handleLaunchFinalization() { // NOTE: This code only makes sense when there is 1 source / workflow being launched per application for Temporal. This is a stop-gap // for achieving batch job behavior. Given the current constraints of yarn applications requiring a static proxy user // during application creation, it is not possible to have multiple workflows running in the same application. // and so it makes sense to just kill the job after this is completed log.info("Requesting the AM to shutdown after the job {} completed", this.jobContext.getJobId()); + JobState jobState = this.jobContext.getJobState(); + String issuesSummary = this.getIssuesSummary(); + eventBus.post(new JobFailureEvent(jobState, issuesSummary)); Review Comment: This event more of a seems like JobIssuesSummaryEvent and not JobFailureEvent or we can add issues only in case of failure as anyway we have checked the same while unregistering AM ########## gobblin-cluster/src/main/java/org/apache/gobblin/cluster/event/JobFailureEvent.java: ########## @@ -0,0 +1,20 @@ +package org.apache.gobblin.cluster.event; + Review Comment: Please add license statements from other files ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java: ########## @@ -198,6 +201,13 @@ class YarnService extends AbstractIdleService { private final AtomicLong allocationRequestIdGenerator = new AtomicLong(DEFAULT_ALLOCATION_REQUEST_ID); private final ConcurrentMap<Long, WorkerProfile> workerProfileByAllocationRequestId = new ConcurrentHashMap<>(); + @VisibleForTesting + @Getter(AccessLevel.PROTECTED) + private JobState jobState; + @VisibleForTesting + @Getter(AccessLevel.PROTECTED) + private String jobIssuesSummary; + Review Comment: I believe we can directly make them protected instead of private ########## gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java: ########## @@ -380,6 +382,19 @@ public void launch() throws IOException, YarnException, InterruptedException { }, 0, this.appReportIntervalMinutes, TimeUnit.MINUTES); addServices(); + + synchronized (this.applicationDone) { + while (!this.applicationCompleted) { + try { + this.applicationDone.wait(); + if (this.applicationFailed) { + throw new RuntimeException("Gobblin Yarn application failed"); + } + } catch (InterruptedException ie) { + LOGGER.error("Interrupted while waiting for the Gobblin Yarn application to finish", ie); + } + } + } Review Comment: Can you please add comment on the importance of this wait ? ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java: ########## @@ -353,7 +370,11 @@ protected void shutDown() throws IOException { } } - this.amrmClientAsync.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, null); + if (this.jobState != null && !this.jobState.getState().isSuccess()) { + this.amrmClientAsync.unregisterApplicationMaster(FinalApplicationStatus.FAILED, this.jobIssuesSummary, null); + } else { + this.amrmClientAsync.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, null); + } Review Comment: Should we add issuesSummary even in succeed cases as well ? Issue Time Tracking ------------------- Worklog Id: (was: 955781) Time Spent: 40m (was: 0.5h) > Fail Azkaban job on when temporal job fails > ------------------------------------------- > > Key: GOBBLIN-2193 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2193 > Project: Apache Gobblin > Issue Type: Improvement > Reporter: Swapnil Palash > Assignee: Hung Tran > Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > > Currently when the temporal job running on Yarn fails, we don't propagate the > error back to Azkaban job which launches the Yarn Application. > The change here bubbles the issues encountered when the job fails upto the > GobblinYarnAppLaucher run by the Azkaban job and fails with a > RuntimeException after logging the issues summary. -- This message was sent by Atlassian Jira (v8.20.10#820010)