This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2fa3553  [GOBBLIN-1362] Fix bug where state is set twice when no 
workunits created
2fa3553 is described below

commit 2fa3553c75775d2d2a3a98f51c92b5c758c7abc9
Author: Jack Moseley <[email protected]>
AuthorDate: Tue Jan 19 11:10:04 2021 -0800

    [GOBBLIN-1362] Fix bug where state is set twice when no workunits created
    
    Closes #3203 from jack-moseley/empty-workunit-bug
---
 .../gobblin/runtime/AbstractJobLauncher.java       | 58 +++++++++++-----------
 1 file changed, 29 insertions(+), 29 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 2ad588f..7edd645 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -558,42 +558,42 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
                 jobListener.onJobFailure(jobContext);
               }
             });
-          }
-
-          for (JobState.DatasetState datasetState : 
this.jobContext.getDatasetStatesByUrns().values()) {
-            // Set the overall job state to FAILED if the job failed to 
process any dataset
-            if (datasetState.getState() == JobState.RunningState.FAILED) {
-              jobState.setState(JobState.RunningState.FAILED);
-              LOG.warn("At least one dataset state is FAILED. Setting job 
state to FAILED.");
-              break;
-            }
-          }
-
-          notifyListeners(this.jobContext, jobListener, 
TimingEvent.LauncherTimings.JOB_COMPLETE, new JobListenerAction() {
-            @Override
-            public void apply(JobListener jobListener, JobContext jobContext)
-                throws Exception {
-              jobListener.onJobCompletion(jobContext);
+          } else {
+            for (JobState.DatasetState datasetState : 
this.jobContext.getDatasetStatesByUrns().values()) {
+              // Set the overall job state to FAILED if the job failed to 
process any dataset
+              if (datasetState.getState() == JobState.RunningState.FAILED) {
+                jobState.setState(JobState.RunningState.FAILED);
+                LOG.warn("At least one dataset state is FAILED. Setting job 
state to FAILED.");
+                break;
+              }
             }
-          });
 
-          if (jobState.getState() == JobState.RunningState.FAILED) {
-            notifyListeners(this.jobContext, jobListener, 
TimingEvent.LauncherTimings.JOB_FAILED, new JobListenerAction() {
-              @Override
-              public void apply(JobListener jobListener, JobContext jobContext)
-                  throws Exception {
-                jobListener.onJobFailure(jobContext);
-              }
-            });
-            throw new JobException(String.format("Job %s failed", jobId));
-          } else {
-            notifyListeners(this.jobContext, jobListener, 
TimingEvent.LauncherTimings.JOB_SUCCEEDED, new JobListenerAction() {
+            notifyListeners(this.jobContext, jobListener, 
TimingEvent.LauncherTimings.JOB_COMPLETE, new JobListenerAction() {
               @Override
               public void apply(JobListener jobListener, JobContext jobContext)
                   throws Exception {
-                jobListener.onJobFailure(jobContext);
+                jobListener.onJobCompletion(jobContext);
               }
             });
+
+            if (jobState.getState() == JobState.RunningState.FAILED) {
+              notifyListeners(this.jobContext, jobListener, 
TimingEvent.LauncherTimings.JOB_FAILED, new JobListenerAction() {
+                @Override
+                public void apply(JobListener jobListener, JobContext 
jobContext)
+                    throws Exception {
+                  jobListener.onJobFailure(jobContext);
+                }
+              });
+              throw new JobException(String.format("Job %s failed", jobId));
+            } else {
+              notifyListeners(this.jobContext, jobListener, 
TimingEvent.LauncherTimings.JOB_SUCCEEDED, new JobListenerAction() {
+                @Override
+                public void apply(JobListener jobListener, JobContext 
jobContext)
+                    throws Exception {
+                  jobListener.onJobFailure(jobContext);
+                }
+              });
+            }
           }
         }
       }

Reply via email to