Repository: incubator-gobblin
Updated Branches:
  refs/heads/master a7a85e150 -> b4597e988


[GOBBLIN-378] Publish tasks in successful state only

Closes #2253 from yukuai518/zero


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/b4597e98
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/b4597e98
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/b4597e98

Branch: refs/heads/master
Commit: b4597e988c9beae18f1b4896bf0b7f36d2ea5c1f
Parents: a7a85e1
Author: Kuai Yu <k...@linkedin.com>
Authored: Wed Feb 21 17:04:54 2018 -0800
Committer: Hung Tran <hut...@linkedin.com>
Committed: Wed Feb 21 17:04:54 2018 -0800

----------------------------------------------------------------------
 .../src/main/java/org/apache/gobblin/runtime/Task.java    | 10 +++++++---
 1 file changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b4597e98/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
index 3265ab8..c3c1b99 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
@@ -869,7 +869,9 @@ public class Task implements TaskIFace {
       if (failedForkIds.size() == 0) {
         // Set the task state to SUCCESSFUL. The state is not set to COMMITTED
         // as the data publisher will do that upon successful data publishing.
-        this.taskState.setWorkingState(WorkUnitState.WorkingState.SUCCESSFUL);
+        if (this.taskState.getWorkingState() != 
WorkUnitState.WorkingState.FAILED) {
+          
this.taskState.setWorkingState(WorkUnitState.WorkingState.SUCCESSFUL);
+        }
       } else {
         failTask(new ForkException("Fork branches " + failedForkIds + " failed 
for task " + this.taskId));
       }
@@ -903,8 +905,10 @@ public class Task implements TaskIFace {
         if (shouldPublishDataInTask()) {
           // If data should be published by the task, publish the data and set 
the task state to COMMITTED.
           // Task data can only be published after all forks have been closed 
by closer.close().
-          publishTaskData();
-          this.taskState.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
+          if (this.taskState.getWorkingState() == 
WorkUnitState.WorkingState.SUCCESSFUL) {
+            publishTaskData();
+            
this.taskState.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
+          }
         }
       } catch (IOException ioe) {
         failTask(ioe);

Reply via email to