Blazer-007 commented on code in PR #4096: URL: https://github.com/apache/gobblin/pull/4096#discussion_r1944213593
########## gobblin-cluster/src/main/java/org/apache/gobblin/cluster/event/JobFailureEvent.java: ########## @@ -0,0 +1,20 @@ +package org.apache.gobblin.cluster.event; + +import lombok.Getter; +import org.apache.gobblin.runtime.JobState; + + +/** + * The `JobFailureEvent` class represents an event that is triggered when a job fails. + * It contains information about the job state and a summary of the issues that caused the failure. + */ +public class JobFailureEvent { + @Getter + private final JobState jobState; + @Getter + private final String issuesSummary; + public JobFailureEvent(JobState jobState, String issuesSummary) { + this.jobState = jobState; + this.issuesSummary = issuesSummary; + } Review Comment: can we use @AllArgsConstructor in place of this ? ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobLauncher.java: ########## @@ -107,13 +111,38 @@ protected Config applyJobLauncherOverrides(Config config) { return configOverrides.withFallback(config); } + private String getIssuesSummary() { + TextStringBuilder sb = new TextStringBuilder(); + try { + List<Issue> issues = this.getIssueRepository().getAll(); + sb.appendln(""); + sb.appendln("vvvvv============= Issues (summary) =============vvvvv"); + + for (int i = 0; i < issues.size(); i++) { + Issue issue = issues.get(i); + + sb.appendln("%s) %s %s %s | source: %s", i + 1, issue.getSeverity().toString(), issue.getCode(), + issue.getSummary(), issue.getSourceClass()); + } + sb.append("^^^^^=============================================^^^^^"); + sb.toString(); + } + catch(Exception e) { + log.warn("Failed to get issue summary", e); + } + return sb.toString(); + } + @Override protected void handleLaunchFinalization() { // NOTE: This code only makes sense when there is 1 source / workflow being launched per application for Temporal. This is a stop-gap // for achieving batch job behavior. Given the current constraints of yarn applications requiring a static proxy user // during application creation, it is not possible to have multiple workflows running in the same application. // and so it makes sense to just kill the job after this is completed log.info("Requesting the AM to shutdown after the job {} completed", this.jobContext.getJobId()); + JobState jobState = this.jobContext.getJobState(); + String issuesSummary = this.getIssuesSummary(); + eventBus.post(new JobFailureEvent(jobState, issuesSummary)); Review Comment: This event more of a seems like JobIssuesSummaryEvent and not JobFailureEvent or we can add issues only in case of failure as anyway we have checked the same while unregistering AM ########## gobblin-cluster/src/main/java/org/apache/gobblin/cluster/event/JobFailureEvent.java: ########## @@ -0,0 +1,20 @@ +package org.apache.gobblin.cluster.event; + Review Comment: Please add license statements from other files ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java: ########## @@ -198,6 +201,13 @@ class YarnService extends AbstractIdleService { private final AtomicLong allocationRequestIdGenerator = new AtomicLong(DEFAULT_ALLOCATION_REQUEST_ID); private final ConcurrentMap<Long, WorkerProfile> workerProfileByAllocationRequestId = new ConcurrentHashMap<>(); + @VisibleForTesting + @Getter(AccessLevel.PROTECTED) + private JobState jobState; + @VisibleForTesting + @Getter(AccessLevel.PROTECTED) + private String jobIssuesSummary; + Review Comment: I believe we can directly make them protected instead of private ########## gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java: ########## @@ -380,6 +382,19 @@ public void launch() throws IOException, YarnException, InterruptedException { }, 0, this.appReportIntervalMinutes, TimeUnit.MINUTES); addServices(); + + synchronized (this.applicationDone) { + while (!this.applicationCompleted) { + try { + this.applicationDone.wait(); + if (this.applicationFailed) { + throw new RuntimeException("Gobblin Yarn application failed"); + } + } catch (InterruptedException ie) { + LOGGER.error("Interrupted while waiting for the Gobblin Yarn application to finish", ie); + } + } + } Review Comment: Can you please add comment on the importance of this wait ? ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java: ########## @@ -353,7 +370,11 @@ protected void shutDown() throws IOException { } } - this.amrmClientAsync.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, null); + if (this.jobState != null && !this.jobState.getState().isSuccess()) { + this.amrmClientAsync.unregisterApplicationMaster(FinalApplicationStatus.FAILED, this.jobIssuesSummary, null); + } else { + this.amrmClientAsync.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, null); + } Review Comment: Should we add issuesSummary even in succeed cases as well ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org