This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 6e17d0c [GOBBLIN-1049] Move workunit commit logic to the end of
publish().
6e17d0c is described below
commit 6e17d0c7f1785004ffca5cb17a0e11c445703afe
Author: Kuai Yu <[email protected]>
AuthorDate: Thu Feb 13 15:18:50 2020 -0800
[GOBBLIN-1049] Move workunit commit logic to the end of publish().
Closes #2889 from yukuai518/commit
---
.../main/java/org/apache/gobblin/publisher/DataPublisher.java | 9 +++++++++
.../java/org/apache/gobblin/publisher/BaseDataPublisher.java | 7 -------
.../java/org/apache/gobblin/runtime/JobLauncherTestHelper.java | 3 +--
3 files changed, 10 insertions(+), 9 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/publisher/DataPublisher.java
b/gobblin-api/src/main/java/org/apache/gobblin/publisher/DataPublisher.java
index 4a551df..e01760f 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/publisher/DataPublisher.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/publisher/DataPublisher.java
@@ -81,6 +81,15 @@ public abstract class DataPublisher implements Closeable,
CapabilityAware {
publishData(states);
publishMetadata(states);
}
+ markCommit(states);
+ }
+
+ protected void markCommit(Collection<? extends WorkUnitState> states) {
+ for (WorkUnitState workUnitState : states) {
+ if (workUnitState.getWorkingState() ==
WorkUnitState.WorkingState.SUCCESSFUL) {
+ workUnitState.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
+ }
+ }
}
public State getState() {
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
index 7e74b9e..7fb0c28 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
@@ -365,13 +365,6 @@ public class BaseDataPublisher extends
SingleTaskDataPublisher {
}
this.parallelRunnerCloser.close();
-
- for (WorkUnitState workUnitState : states) {
- // Upon successfully committing the data to the final output directory,
set states
- // of successful tasks to COMMITTED. leaving states of unsuccessful ones
unchanged.
- // This makes sense to the COMMIT_ON_PARTIAL_SUCCESS policy.
- workUnitState.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
- }
}
/**
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
index e608e44..89249b3 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
@@ -345,8 +345,7 @@ public class JobLauncherTestHelper {
Assert.assertEquals(datasetState.getState(),
JobState.RunningState.COMMITTED);
Assert.assertEquals(datasetState.getTaskCount(), 1);
TaskState taskState = datasetState.getTaskStates().get(0);
- // BaseDataPublisher will change the state to COMMITTED
- Assert.assertEquals(taskState.getWorkingState(),
WorkUnitState.WorkingState.COMMITTED);
+ Assert.assertEquals(taskState.getWorkingState(),
WorkUnitState.WorkingState.FAILED);
} else {
// Task 0 should have failed
Assert.assertTrue(this.datasetStateStore.getAll(jobName,
"Dataset0-current.jst").isEmpty());