This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2fa3553 [GOBBLIN-1362] Fix bug where state is set twice when no
workunits created
2fa3553 is described below
commit 2fa3553c75775d2d2a3a98f51c92b5c758c7abc9
Author: Jack Moseley <[email protected]>
AuthorDate: Tue Jan 19 11:10:04 2021 -0800
[GOBBLIN-1362] Fix bug where state is set twice when no workunits created
Closes #3203 from jack-moseley/empty-workunit-bug
---
.../gobblin/runtime/AbstractJobLauncher.java | 58 +++++++++++-----------
1 file changed, 29 insertions(+), 29 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 2ad588f..7edd645 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -558,42 +558,42 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
jobListener.onJobFailure(jobContext);
}
});
- }
-
- for (JobState.DatasetState datasetState :
this.jobContext.getDatasetStatesByUrns().values()) {
- // Set the overall job state to FAILED if the job failed to
process any dataset
- if (datasetState.getState() == JobState.RunningState.FAILED) {
- jobState.setState(JobState.RunningState.FAILED);
- LOG.warn("At least one dataset state is FAILED. Setting job
state to FAILED.");
- break;
- }
- }
-
- notifyListeners(this.jobContext, jobListener,
TimingEvent.LauncherTimings.JOB_COMPLETE, new JobListenerAction() {
- @Override
- public void apply(JobListener jobListener, JobContext jobContext)
- throws Exception {
- jobListener.onJobCompletion(jobContext);
+ } else {
+ for (JobState.DatasetState datasetState :
this.jobContext.getDatasetStatesByUrns().values()) {
+ // Set the overall job state to FAILED if the job failed to
process any dataset
+ if (datasetState.getState() == JobState.RunningState.FAILED) {
+ jobState.setState(JobState.RunningState.FAILED);
+ LOG.warn("At least one dataset state is FAILED. Setting job
state to FAILED.");
+ break;
+ }
}
- });
- if (jobState.getState() == JobState.RunningState.FAILED) {
- notifyListeners(this.jobContext, jobListener,
TimingEvent.LauncherTimings.JOB_FAILED, new JobListenerAction() {
- @Override
- public void apply(JobListener jobListener, JobContext jobContext)
- throws Exception {
- jobListener.onJobFailure(jobContext);
- }
- });
- throw new JobException(String.format("Job %s failed", jobId));
- } else {
- notifyListeners(this.jobContext, jobListener,
TimingEvent.LauncherTimings.JOB_SUCCEEDED, new JobListenerAction() {
+ notifyListeners(this.jobContext, jobListener,
TimingEvent.LauncherTimings.JOB_COMPLETE, new JobListenerAction() {
@Override
public void apply(JobListener jobListener, JobContext jobContext)
throws Exception {
- jobListener.onJobFailure(jobContext);
+ jobListener.onJobCompletion(jobContext);
}
});
+
+ if (jobState.getState() == JobState.RunningState.FAILED) {
+ notifyListeners(this.jobContext, jobListener,
TimingEvent.LauncherTimings.JOB_FAILED, new JobListenerAction() {
+ @Override
+ public void apply(JobListener jobListener, JobContext
jobContext)
+ throws Exception {
+ jobListener.onJobFailure(jobContext);
+ }
+ });
+ throw new JobException(String.format("Job %s failed", jobId));
+ } else {
+ notifyListeners(this.jobContext, jobListener,
TimingEvent.LauncherTimings.JOB_SUCCEEDED, new JobListenerAction() {
+ @Override
+ public void apply(JobListener jobListener, JobContext
jobContext)
+ throws Exception {
+ jobListener.onJobFailure(jobContext);
+ }
+ });
+ }
}
}
}