Repository: incubator-gobblin Updated Branches: refs/heads/master 804622251 -> 602cee793
[GOBBLIN-622] Avoid writing previous workunits in SourceState Closes #2491 from yukuai518/p2 Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/602cee79 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/602cee79 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/602cee79 Branch: refs/heads/master Commit: 602cee793711e63f72304835cd35762d9087770c Parents: 8046222 Author: Kuai Yu <[email protected]> Authored: Fri Oct 26 17:51:20 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Fri Oct 26 17:51:20 2018 -0700 ---------------------------------------------------------------------- .../apache/gobblin/configuration/SourceState.java | 16 +++++++++++++--- .../java/org/apache/gobblin/runtime/JobState.java | 6 +++--- .../gobblin/runtime/mapreduce/MRJobLauncher.java | 5 ++++- 3 files changed, 20 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/602cee79/gobblin-api/src/main/java/org/apache/gobblin/configuration/SourceState.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/SourceState.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/SourceState.java index 9c08a5d..44e2183 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/SourceState.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/SourceState.java @@ -301,9 +301,19 @@ public class SourceState extends State { @Override public void write(DataOutput out) throws IOException { - out.writeInt(this.previousWorkUnitStates.size()); - for (WorkUnitState state : this.previousWorkUnitStates) { - state.write(out); + write(out, true); + } + + public void write(DataOutput out, boolean writePreviousWorkUnitStates) + throws IOException { + + if (!writePreviousWorkUnitStates) { + out.writeInt(0); + } else { + out.writeInt(this.previousWorkUnitStates.size()); + for (WorkUnitState state : this.previousWorkUnitStates) { + state.write(out); + } } super.write(out); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/602cee79/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java index aa09d2c..c0f9271 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java @@ -523,10 +523,10 @@ public class JobState extends SourceState { @Override public void write(DataOutput out) throws IOException { - write(out, true); + write(out, true, true); } - public void write(DataOutput out, boolean writeTasks) + public void write(DataOutput out, boolean writeTasks, boolean writePreviousWorkUnitStates) throws IOException { Text text = new Text(); text.set(this.jobName); @@ -550,7 +550,7 @@ public class JobState extends SourceState { } else { out.writeInt(0); } - super.write(out); + super.write(out, writePreviousWorkUnitStates); } /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/602cee79/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java index 306b4ef..addb0f0 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java @@ -119,6 +119,8 @@ public class MRJobLauncher extends AbstractJobLauncher { static final String INPUT_DIR_NAME = "input"; private static final String OUTPUT_DIR_NAME = "output"; private static final String WORK_UNIT_LIST_FILE_EXTENSION = ".wulist"; + private static final String SERIALIZE_PREVIOUS_WORKUNIT_STATES_KEY = "MRJobLauncher.serializePreviousWorkunitStates"; + private static final boolean DEFAULT_SERIALIZE_PREVIOUS_WORKUNIT_STATES = true; // Configuration that make uploading of jar files more reliable, // since multiple Gobblin Jobs are sharing the same jar directory. @@ -384,7 +386,8 @@ public class MRJobLauncher extends AbstractJobLauncher { Path jobStateFilePath = new Path(mrJobDir, JOB_STATE_FILE_NAME); // Write the job state with an empty task set (work units are read by the mapper from a different file) try (DataOutputStream dataOutputStream = new DataOutputStream(fs.create(jobStateFilePath))) { - jobState.write(dataOutputStream, false); + jobState.write(dataOutputStream, false, + conf.getBoolean(SERIALIZE_PREVIOUS_WORKUNIT_STATES_KEY, DEFAULT_SERIALIZE_PREVIOUS_WORKUNIT_STATES)); } job.getConfiguration().set(ConfigurationKeys.JOB_STATE_FILE_PATH_KEY, jobStateFilePath.toString());
