This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e17d0c  [GOBBLIN-1049] Move workunit commit logic to the end of 
publish().
6e17d0c is described below

commit 6e17d0c7f1785004ffca5cb17a0e11c445703afe
Author: Kuai Yu <[email protected]>
AuthorDate: Thu Feb 13 15:18:50 2020 -0800

    [GOBBLIN-1049] Move workunit commit logic to the end of publish().
    
    Closes #2889 from yukuai518/commit
---
 .../main/java/org/apache/gobblin/publisher/DataPublisher.java    | 9 +++++++++
 .../java/org/apache/gobblin/publisher/BaseDataPublisher.java     | 7 -------
 .../java/org/apache/gobblin/runtime/JobLauncherTestHelper.java   | 3 +--
 3 files changed, 10 insertions(+), 9 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/publisher/DataPublisher.java 
b/gobblin-api/src/main/java/org/apache/gobblin/publisher/DataPublisher.java
index 4a551df..e01760f 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/publisher/DataPublisher.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/publisher/DataPublisher.java
@@ -81,6 +81,15 @@ public abstract class DataPublisher implements Closeable, 
CapabilityAware {
       publishData(states);
       publishMetadata(states);
     }
+    markCommit(states);
+  }
+
+  protected void markCommit(Collection<? extends WorkUnitState> states) {
+    for (WorkUnitState workUnitState : states) {
+      if (workUnitState.getWorkingState() == 
WorkUnitState.WorkingState.SUCCESSFUL) {
+        workUnitState.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
+      }
+    }
   }
 
   public State getState() {
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
index 7e74b9e..7fb0c28 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
@@ -365,13 +365,6 @@ public class BaseDataPublisher extends 
SingleTaskDataPublisher {
     }
 
     this.parallelRunnerCloser.close();
-
-    for (WorkUnitState workUnitState : states) {
-      // Upon successfully committing the data to the final output directory, 
set states
-      // of successful tasks to COMMITTED. leaving states of unsuccessful ones 
unchanged.
-      // This makes sense to the COMMIT_ON_PARTIAL_SUCCESS policy.
-      workUnitState.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
-    }
   }
 
   /**
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
index e608e44..89249b3 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
@@ -345,8 +345,7 @@ public class JobLauncherTestHelper {
       Assert.assertEquals(datasetState.getState(), 
JobState.RunningState.COMMITTED);
       Assert.assertEquals(datasetState.getTaskCount(), 1);
       TaskState taskState = datasetState.getTaskStates().get(0);
-      // BaseDataPublisher will change the state to COMMITTED
-      Assert.assertEquals(taskState.getWorkingState(), 
WorkUnitState.WorkingState.COMMITTED);
+      Assert.assertEquals(taskState.getWorkingState(), 
WorkUnitState.WorkingState.FAILED);
     } else {
       // Task 0 should have failed
       Assert.assertTrue(this.datasetStateStore.getAll(jobName, 
"Dataset0-current.jst").isEmpty());

Reply via email to