Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 804622251 -> 602cee793


[GOBBLIN-622] Avoid writing previous workunits in SourceState

Closes #2491 from yukuai518/p2


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/602cee79
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/602cee79
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/602cee79

Branch: refs/heads/master
Commit: 602cee793711e63f72304835cd35762d9087770c
Parents: 8046222
Author: Kuai Yu <[email protected]>
Authored: Fri Oct 26 17:51:20 2018 -0700
Committer: Hung Tran <[email protected]>
Committed: Fri Oct 26 17:51:20 2018 -0700

----------------------------------------------------------------------
 .../apache/gobblin/configuration/SourceState.java   | 16 +++++++++++++---
 .../java/org/apache/gobblin/runtime/JobState.java   |  6 +++---
 .../gobblin/runtime/mapreduce/MRJobLauncher.java    |  5 ++++-
 3 files changed, 20 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/602cee79/gobblin-api/src/main/java/org/apache/gobblin/configuration/SourceState.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/SourceState.java 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/SourceState.java
index 9c08a5d..44e2183 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/SourceState.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/SourceState.java
@@ -301,9 +301,19 @@ public class SourceState extends State {
   @Override
   public void write(DataOutput out)
       throws IOException {
-    out.writeInt(this.previousWorkUnitStates.size());
-    for (WorkUnitState state : this.previousWorkUnitStates) {
-      state.write(out);
+    write(out, true);
+  }
+
+  public void write(DataOutput out, boolean writePreviousWorkUnitStates)
+      throws IOException {
+
+    if (!writePreviousWorkUnitStates) {
+      out.writeInt(0);
+    } else {
+      out.writeInt(this.previousWorkUnitStates.size());
+      for (WorkUnitState state : this.previousWorkUnitStates) {
+        state.write(out);
+      }
     }
     super.write(out);
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/602cee79/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
index aa09d2c..c0f9271 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
@@ -523,10 +523,10 @@ public class JobState extends SourceState {
   @Override
   public void write(DataOutput out)
       throws IOException {
-    write(out, true);
+    write(out, true, true);
   }
 
-  public void write(DataOutput out, boolean writeTasks)
+  public void write(DataOutput out, boolean writeTasks, boolean 
writePreviousWorkUnitStates)
       throws IOException {
     Text text = new Text();
     text.set(this.jobName);
@@ -550,7 +550,7 @@ public class JobState extends SourceState {
     } else {
       out.writeInt(0);
     }
-    super.write(out);
+    super.write(out, writePreviousWorkUnitStates);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/602cee79/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
index 306b4ef..addb0f0 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
@@ -119,6 +119,8 @@ public class MRJobLauncher extends AbstractJobLauncher {
   static final String INPUT_DIR_NAME = "input";
   private static final String OUTPUT_DIR_NAME = "output";
   private static final String WORK_UNIT_LIST_FILE_EXTENSION = ".wulist";
+  private static final String SERIALIZE_PREVIOUS_WORKUNIT_STATES_KEY = 
"MRJobLauncher.serializePreviousWorkunitStates";
+  private static final boolean DEFAULT_SERIALIZE_PREVIOUS_WORKUNIT_STATES = 
true;
 
   // Configuration that make uploading of jar files more reliable,
   // since multiple Gobblin Jobs are sharing the same jar directory.
@@ -384,7 +386,8 @@ public class MRJobLauncher extends AbstractJobLauncher {
     Path jobStateFilePath = new Path(mrJobDir, JOB_STATE_FILE_NAME);
     // Write the job state with an empty task set (work units are read by the 
mapper from a different file)
     try (DataOutputStream dataOutputStream = new 
DataOutputStream(fs.create(jobStateFilePath))) {
-      jobState.write(dataOutputStream, false);
+      jobState.write(dataOutputStream, false,
+          conf.getBoolean(SERIALIZE_PREVIOUS_WORKUNIT_STATES_KEY, 
DEFAULT_SERIALIZE_PREVIOUS_WORKUNIT_STATES));
     }
 
     job.getConfiguration().set(ConfigurationKeys.JOB_STATE_FILE_PATH_KEY, 
jobStateFilePath.toString());

Reply via email to