Repository: incubator-gobblin Updated Branches: refs/heads/master a7a85e150 -> b4597e988
[GOBBLIN-378] Publish tasks in successful state only Closes #2253 from yukuai518/zero Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/b4597e98 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/b4597e98 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/b4597e98 Branch: refs/heads/master Commit: b4597e988c9beae18f1b4896bf0b7f36d2ea5c1f Parents: a7a85e1 Author: Kuai Yu <k...@linkedin.com> Authored: Wed Feb 21 17:04:54 2018 -0800 Committer: Hung Tran <hut...@linkedin.com> Committed: Wed Feb 21 17:04:54 2018 -0800 ---------------------------------------------------------------------- .../src/main/java/org/apache/gobblin/runtime/Task.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b4597e98/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java index 3265ab8..c3c1b99 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java @@ -869,7 +869,9 @@ public class Task implements TaskIFace { if (failedForkIds.size() == 0) { // Set the task state to SUCCESSFUL. The state is not set to COMMITTED // as the data publisher will do that upon successful data publishing. - this.taskState.setWorkingState(WorkUnitState.WorkingState.SUCCESSFUL); + if (this.taskState.getWorkingState() != WorkUnitState.WorkingState.FAILED) { + this.taskState.setWorkingState(WorkUnitState.WorkingState.SUCCESSFUL); + } } else { failTask(new ForkException("Fork branches " + failedForkIds + " failed for task " + this.taskId)); } @@ -903,8 +905,10 @@ public class Task implements TaskIFace { if (shouldPublishDataInTask()) { // If data should be published by the task, publish the data and set the task state to COMMITTED. // Task data can only be published after all forks have been closed by closer.close(). - publishTaskData(); - this.taskState.setWorkingState(WorkUnitState.WorkingState.COMMITTED); + if (this.taskState.getWorkingState() == WorkUnitState.WorkingState.SUCCESSFUL) { + publishTaskData(); + this.taskState.setWorkingState(WorkUnitState.WorkingState.COMMITTED); + } } } catch (IOException ioe) { failTask(ioe);