This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new dce9e2520 [GOBBLIN-1919] Simplify a few elements of MR-related job 
exec before reusing code in Temporal-based execution (#3784)
dce9e2520 is described below

commit dce9e2520a144f37c63293776a5b02a8349be299
Author: Kip Kohn <[email protected]>
AuthorDate: Thu Oct 26 17:12:39 2023 -0700

    [GOBBLIN-1919] Simplify a few elements of MR-related job exec before 
reusing code in Temporal-based execution (#3784)
    
    * Simplify a few elements of MR-related job exec before reusing code in 
Temporal-based execution
    
    * Add JSON-ification to several foundational config-state representations, 
plus encapsulated convience method `JobState.getJobIdFromProps`
    
    * Update javadoc comments
    
    * Encapsulate check for whether a path has the extension of a 
multi-work-unit
---
 .../gobblin/configuration/ConfigurationKeys.java   |   2 +-
 .../gobblin/source/workunit/MultiWorkUnit.java     |   5 +
 .../apache/gobblin/source/workunit/WorkUnit.java   |  58 +++++++++
 .../gobblin/cluster/GobblinHelixJobLauncher.java   |  20 +--
 .../org/apache/gobblin/cluster/SingleTask.java     |   3 +-
 .../gobblin/data/management/copy/CopyableFile.java |  50 +++++++-
 .../gobblin/runtime/AbstractJobLauncher.java       |   3 -
 .../org/apache/gobblin/runtime/JobContext.java     |   6 +-
 .../java/org/apache/gobblin/runtime/JobState.java  | 142 +++++----------------
 .../java/org/apache/gobblin/runtime/TaskState.java | 138 +++++---------------
 .../runtime/mapreduce/GobblinOutputCommitter.java  |  34 ++---
 .../gobblin/runtime/mapreduce/MRJobLauncher.java   |  68 +++++-----
 .../org/apache/gobblin/util/JobLauncherUtils.java  |  33 ++++-
 13 files changed, 263 insertions(+), 299 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 6d36d9ff3..a50ba8c75 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -725,7 +725,7 @@ public class ConfigurationKeys {
   public static final int DEFAULT_MR_JOB_MAX_MAPPERS = 100;
   public static final boolean DEFAULT_MR_JOB_MAPPER_FAILURE_IS_FATAL = false;
   public static final boolean DEFAULT_MR_PERSIST_WORK_UNITS_THEN_CANCEL = 
false;
-  public static final String DEFAULT_ENABLE_MR_SPECULATIVE_EXECUTION = "false";
+  public static final boolean DEFAULT_ENABLE_MR_SPECULATIVE_EXECUTION = false;
 
   /**
    * Configuration properties used by the distributed job launcher.
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java
index d254de065..1392fa97f 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java
@@ -55,6 +55,11 @@ public class MultiWorkUnit extends WorkUnit {
     super();
   }
 
+  @Override
+  public boolean isMultiWorkUnit() {
+    return true;
+  }
+
   /**
    * Get an immutable list of {@link WorkUnit}s wrapped by this {@link 
MultiWorkUnit}.
    *
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java 
b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java
index bf38c3525..139e60c01 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java
@@ -24,10 +24,12 @@ import org.apache.gobblin.configuration.State;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.StringWriter;
 
 import com.google.gson.Gson;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonParser;
+import com.google.gson.stream.JsonWriter;
 
 import org.apache.gobblin.source.extractor.Extractor;
 import org.apache.gobblin.source.extractor.Watermark;
@@ -134,6 +136,11 @@ public class WorkUnit extends State {
     this.extract = other.getExtract();
   }
 
+  /** @return whether a multi-work-unit (or else a singular one) */
+  public boolean isMultiWorkUnit() {
+    return false; // more efficient than `this instanceof MultiWorkUnit` plus 
no circular dependency
+  }
+
   /**
    * Factory method.
    *
@@ -365,6 +372,57 @@ public class WorkUnit extends State {
     return result;
   }
 
+  /** @return pretty-printed JSON, including all properties */
+  public String toJsonString() {
+    StringWriter stringWriter = new StringWriter();
+    try (JsonWriter jsonWriter = new JsonWriter(stringWriter)) {
+      jsonWriter.setIndent("\t");
+      this.toJson(jsonWriter);
+    } catch (IOException ioe) {
+      // Ignored
+    }
+    return stringWriter.toString();
+  }
+
+  public void toJson(JsonWriter jsonWriter) throws IOException {
+    jsonWriter.beginObject();
+
+    jsonWriter.name("id").value(this.getId());
+    jsonWriter.name("properties");
+    jsonWriter.beginObject();
+    for (String key : this.getPropertyNames()) {
+      jsonWriter.name(key).value(this.getProp(key));
+    }
+    jsonWriter.endObject();
+
+    jsonWriter.name("extract");
+    jsonWriter.beginObject();
+    jsonWriter.name("extractId").value(this.getExtract().getId());
+    jsonWriter.name("extractProperties");
+    jsonWriter.beginObject();
+    for (String key : this.getExtract().getPropertyNames()) {
+      jsonWriter.name(key).value(this.getExtract().getProp(key));
+    }
+    jsonWriter.endObject();
+
+    State prevTableState = this.getExtract().getPreviousTableState();
+    if (prevTableState != null) {
+      jsonWriter.name("extractPrevTableState");
+      jsonWriter.beginObject();
+      jsonWriter.name("prevStateId").value(prevTableState.getId());
+      jsonWriter.name("prevStateProperties");
+      jsonWriter.beginObject();
+      for (String key : prevTableState.getPropertyNames()) {
+        jsonWriter.name(key).value(prevTableState.getProp(key));
+      }
+      jsonWriter.endObject();
+      jsonWriter.endObject();
+    }
+    jsonWriter.endObject();
+
+    jsonWriter.endObject();
+  }
+
   public String getOutputFilePath() {
     // Search for the properties in the workunit.
     // This search for the property first in State and then in the Extract of 
this workunit.
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index cf324b443..f0fcc258f 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -71,7 +71,6 @@ import org.apache.gobblin.runtime.TaskState;
 import org.apache.gobblin.runtime.TaskStateCollectorService;
 import org.apache.gobblin.runtime.listeners.JobListener;
 import org.apache.gobblin.runtime.util.StateStores;
-import org.apache.gobblin.source.workunit.MultiWorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.JobLauncherUtils;
@@ -110,8 +109,6 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(GobblinHelixJobLauncher.class);
 
-  private static final String WORK_UNIT_FILE_EXTENSION = ".wu";
-
   private final HelixManager helixManager;
   private final TaskDriver helixTaskDriver;
   private final String helixWorkFlowName;
@@ -345,7 +342,7 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
     try (ParallelRunner stateSerDeRunner = new 
ParallelRunner(this.stateSerDeRunnerThreads, this.fs)) {
       int multiTaskIdSequence = 0;
       for (WorkUnit workUnit : workUnits) {
-        if (workUnit instanceof MultiWorkUnit) {
+        if (workUnit.isMultiWorkUnit()) {
           
workUnit.setId(JobLauncherUtils.newMultiTaskId(this.jobContext.getJobId(), 
multiTaskIdSequence++));
         }
         addWorkUnit(workUnit, stateSerDeRunner, taskConfigMap);
@@ -535,15 +532,12 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
   private void deleteWorkUnitFromStateStore(String workUnitId, ParallelRunner 
stateSerDeRunner) {
     String workUnitFilePath =
         
workUnitToHelixConfig.get(workUnitId).getConfigMap().get(GobblinClusterConfigurationKeys.WORK_UNIT_FILE_PATH);
-    final StateStore stateStore;
     Path workUnitFile = new Path(workUnitFilePath);
     final String fileName = workUnitFile.getName();
     final String storeName = workUnitFile.getParent().getName();
-    if (fileName.endsWith(MULTI_WORK_UNIT_FILE_EXTENSION)) {
-      stateStore = stateStores.getMwuStateStore();
-    } else {
-      stateStore = stateStores.getWuStateStore();
-    }
+    final StateStore stateStore = 
JobLauncherUtils.hasMultiWorkUnitExtension(workUnitFile)
+        ? stateStores.getMwuStateStore()
+        : stateStores.getWuStateStore();
     stateSerDeRunner.submitCallable(new Callable<Void>() {
       @Override
       public Void call() throws Exception {
@@ -561,11 +555,11 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
     final StateStore stateStore;
     String workUnitFileName = workUnit.getId();
 
-    if (workUnit instanceof MultiWorkUnit) {
-      workUnitFileName += MULTI_WORK_UNIT_FILE_EXTENSION;
+    if (workUnit.isMultiWorkUnit()) {
+      workUnitFileName += JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION;
       stateStore = stateStores.getMwuStateStore();
     } else {
-      workUnitFileName += WORK_UNIT_FILE_EXTENSION;
+      workUnitFileName += JobLauncherUtils.WORK_UNIT_FILE_EXTENSION;
       stateStore = stateStores.getWuStateStore();
     }
 
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
index 93caab41c..5d67c9f51 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
@@ -39,7 +39,6 @@ import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
 import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
 import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
 import org.apache.gobblin.broker.iface.SharedResourcesBroker;
-import org.apache.gobblin.runtime.AbstractJobLauncher;
 import org.apache.gobblin.runtime.GobblinMultiTaskAttempt;
 import org.apache.gobblin.runtime.JobState;
 import org.apache.gobblin.runtime.util.StateStores;
@@ -184,7 +183,7 @@ public class SingleTask {
     WorkUnit workUnit;
 
     try {
-      if 
(_workUnitFilePath.getName().endsWith(AbstractJobLauncher.MULTI_WORK_UNIT_FILE_EXTENSION))
 {
+      if (JobLauncherUtils.hasMultiWorkUnitExtension(_workUnitFilePath)) {
         workUnit = _stateStores.getMwuStateStore().getAll(storeName, 
fileName).get(0);
       } else {
         workUnit = _stateStores.getWuStateStore().getAll(storeName, 
fileName).get(0);
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
index 85fa80f0f..9e9af05e3 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
@@ -17,8 +17,8 @@
 
 package org.apache.gobblin.data.management.copy;
 
-import com.google.common.cache.Cache;
 import java.io.IOException;
+import java.io.StringWriter;
 import java.util.List;
 import java.util.Map;
 
@@ -34,8 +34,10 @@ import org.apache.hadoop.fs.permission.FsPermission;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.gson.stream.JsonWriter;
 
 import lombok.AccessLevel;
 import lombok.EqualsAndHashCode;
@@ -132,6 +134,52 @@ public class CopyableFile extends CopyEntity implements 
File {
     this.datasetOutputPath = datasetOutputPath;
   }
 
+  /** @return pretty-printed JSON, including all metadata */
+  public String toJsonString() {
+    return toJsonString(true);
+  }
+
+  /** @return pretty-printed JSON, optionally including metadata */
+  public String toJsonString(boolean includeMetadata) {
+    StringWriter stringWriter = new StringWriter();
+    try (JsonWriter jsonWriter = new JsonWriter(stringWriter)) {
+      jsonWriter.setIndent("\t");
+      this.toJson(jsonWriter, includeMetadata);
+    } catch (IOException ioe) {
+      // Ignored
+    }
+    return stringWriter.toString();
+  }
+
+  public void toJson(JsonWriter jsonWriter, boolean includeMetadata) throws 
IOException {
+    jsonWriter.beginObject();
+
+    jsonWriter
+        .name("file set").value(this.getFileSet())
+        .name("origin").value(this.getOrigin().toString())
+        .name("destination").value(this.getDestination().toString())
+        
.name("destinationOwnerAndPermission").value(this.getDestinationOwnerAndPermission().toString())
+        // TODO:
+        // this.ancestorsOwnerAndPermission
+        // this.checksum
+        // this.preserve
+        // this.dataFileVersionStrategy
+        // this.originTimestamp
+        // this.upstreamTimestamp
+        
.name("datasetOutputPath").value(this.getDatasetOutputPath().toString());
+
+    if (includeMetadata && this.getAdditionalMetadata() != null) {
+      jsonWriter.name("metadata");
+      jsonWriter.beginObject();
+      for (Map.Entry<String, String> entry : 
this.getAdditionalMetadata().entrySet()) {
+        jsonWriter.name(entry.getKey()).value(entry.getValue());
+      }
+      jsonWriter.endObject();
+    }
+
+    jsonWriter.endObject();
+  }
+
   /**
    * Set file system based source and destination dataset for this {@link 
CopyableFile}
    *
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 3f2570c68..bf86e628e 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -129,9 +129,6 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
 
   public static final String JOB_STATE_FILE_NAME = "job.state";
 
-  public static final String WORK_UNIT_FILE_EXTENSION = ".wu";
-  public static final String MULTI_WORK_UNIT_FILE_EXTENSION = ".mwu";
-
   public static final String GOBBLIN_JOB_TEMPLATE_KEY = "gobblin.template.uri";
 
   public static final String NUM_WORKUNITS = "numWorkUnits";
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
index 5c6eb3eae..658b308b7 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
@@ -74,7 +74,6 @@ import org.apache.gobblin.util.Either;
 import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.util.HadoopUtils;
 import org.apache.gobblin.util.Id;
-import org.apache.gobblin.util.JobLauncherUtils;
 import org.apache.gobblin.util.executors.IteratorExecutor;
 
 
@@ -140,10 +139,9 @@ public class JobContext implements Closeable {
         "A job must have a job name specified by job.name");
 
     this.jobName = JobState.getJobNameFromProps(jobProps);
-    this.jobId = jobProps.containsKey(ConfigurationKeys.JOB_ID_KEY) ? 
jobProps.getProperty(ConfigurationKeys.JOB_ID_KEY)
-        : JobLauncherUtils.newJobId(this.jobName);
+    this.jobId = JobState.getJobIdFromProps(jobProps);
+    jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, this.jobId); // in case 
not yet directly defined as such
     this.jobSequence = Long.toString(Id.Job.parse(this.jobId).getSequence());
-    jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, this.jobId);
 
     this.jobBroker = instanceBroker.newSubscopedBuilder(new 
JobScopeInstance(this.jobName, this.jobId))
         
.withOverridingConfig(ConfigUtils.propertiesToConfig(jobProps)).build();
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
index 02cf3c9df..5f8783fcc 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
@@ -27,6 +27,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import lombok.Getter;
+import lombok.Setter;
+
 import org.apache.gobblin.metastore.DatasetStateStore;
 import org.apache.gobblin.runtime.job.JobProgress;
 
@@ -64,6 +67,7 @@ import org.apache.gobblin.runtime.util.MetricGroup;
 import org.apache.gobblin.source.extractor.JobCommitPolicy;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.util.ImmutableProperties;
+import org.apache.gobblin.util.JobLauncherUtils;
 
 
 /**
@@ -127,12 +131,22 @@ public class JobState extends SourceState implements 
JobProgress {
     }
   }
 
+  @Getter @Setter
   private String jobName;
+  @Getter @Setter
   private String jobId;
+  /** job start time in milliseconds */
+  @Getter @Setter
   private long startTime = 0;
+  /** job end time in milliseconds */
+  @Getter @Setter
   private long endTime = 0;
+  /** job duration in milliseconds */
+  @Getter @Setter
   private long duration = 0;
   private RunningState state = RunningState.PENDING;
+  /** the number of tasks this job consists of */
+  @Getter @Setter
   private int taskCount = 0;
   private final Map<String, TaskState> taskStates = Maps.newLinkedHashMap();
   // Skipped task states shouldn't be exposed to publisher, but they need to 
be in JobState and DatasetState so that they can be written to StateStore.
@@ -149,7 +163,7 @@ public class JobState extends SourceState implements 
JobProgress {
     this.setId(jobId);
   }
 
-  public JobState(State properties,String jobName, String jobId) {
+  public JobState(State properties, String jobName, String jobId) {
     super(properties);
     this.jobName = jobName;
     this.jobId = jobId;
@@ -172,6 +186,11 @@ public class JobState extends SourceState implements 
JobProgress {
     return props.getProperty(ConfigurationKeys.JOB_NAME_KEY);
   }
 
+  public static String getJobIdFromProps(Properties props) {
+    return props.containsKey(ConfigurationKeys.JOB_ID_KEY) ? 
props.getProperty(ConfigurationKeys.JOB_ID_KEY)
+        : JobLauncherUtils.newJobId(JobState.getJobNameFromProps(props));
+  }
+
   public static String getJobGroupFromState(State state) {
     return state.getProp(ConfigurationKeys.JOB_GROUP_KEY);
   }
@@ -188,69 +207,6 @@ public class JobState extends SourceState implements 
JobProgress {
     return props.getProperty(ConfigurationKeys.JOB_DESCRIPTION_KEY);
   }
 
-  /**
-   * Get job name.
-   *
-   * @return job name
-   */
-  public String getJobName() {
-    return this.jobName;
-  }
-
-  /**
-   * Set job name.
-   *
-   * @param jobName job name
-   */
-  public void setJobName(String jobName) {
-    this.jobName = jobName;
-  }
-
-  /**
-   * Get job ID.
-   *
-   * @return job ID
-   */
-  public String getJobId() {
-    return this.jobId;
-  }
-
-  /**
-   * Set job ID.
-   *
-   * @param jobId job ID
-   */
-  public void setJobId(String jobId) {
-    this.jobId = jobId;
-  }
-
-  /**
-   * Get job start time.
-   *
-   * @return job start time
-   */
-  public long getStartTime() {
-    return this.startTime;
-  }
-
-  /**
-   * Set job start time.
-   *
-   * @param startTime job start time
-   */
-  public void setStartTime(long startTime) {
-    this.startTime = startTime;
-  }
-
-  /**
-   * Get job end time.
-   *
-   * @return job end time
-   */
-  public long getEndTime() {
-    return this.endTime;
-  }
-
   /**
    * Get the currently elapsed time for this job.
    * @return
@@ -265,33 +221,6 @@ public class JobState extends SourceState implements 
JobProgress {
     return 0;
   }
 
-  /**
-   * Set job end time.
-   *
-   * @param endTime job end time
-   */
-  public void setEndTime(long endTime) {
-    this.endTime = endTime;
-  }
-
-  /**
-   * Get job duration in milliseconds.
-   *
-   * @return job duration in milliseconds
-   */
-  public long getDuration() {
-    return this.duration;
-  }
-
-  /**
-   * Set job duration in milliseconds.
-   *
-   * @param duration job duration in milliseconds
-   */
-  public void setDuration(long duration) {
-    this.duration = duration;
-  }
-
   /**
    * Get job running state of type {@link RunningState}.
    *
@@ -310,24 +239,6 @@ public class JobState extends SourceState implements 
JobProgress {
     this.state = state;
   }
 
-  /**
-   * Get the number of tasks this job consists of.
-   *
-   * @return number of tasks this job consists of
-   */
-  public int getTaskCount() {
-    return this.taskCount;
-  }
-
-  /**
-   * Set the number of tasks this job consists of.
-   *
-   * @param taskCount number of tasks this job consists of
-   */
-  public void setTaskCount(int taskCount) {
-    this.taskCount = taskCount;
-  }
-
   /**
    * If not already present, set the {@link 
ConfigurationKeys#JOB_FAILURE_EXCEPTION_KEY} to a {@link String}
    * representation of the given {@link Throwable}.
@@ -682,12 +593,23 @@ public class JobState extends SourceState implements 
JobProgress {
     return result;
   }
 
+  /** @return pretty-printed JSON, without including properties */
   @Override
   public String toString() {
+    return toJsonString(false);
+  }
+
+  /** @return pretty-printed JSON, including all properties */
+  public String toJsonString() {
+    return toJsonString(true);
+  }
+
+  /** @return pretty-printed JSON, optionally including properties */
+  public String toJsonString(boolean includeProperties) {
     StringWriter stringWriter = new StringWriter();
     try (JsonWriter jsonWriter = new JsonWriter(stringWriter)) {
       jsonWriter.setIndent("\t");
-      this.toJson(jsonWriter, false);
+      this.toJson(jsonWriter, includeProperties);
     } catch (IOException ioe) {
       // Ignored
     }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java
index 903d94b18..60c475203 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.runtime;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.StringWriter;
 import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.List;
@@ -40,6 +41,7 @@ import com.linkedin.data.template.StringMap;
 
 import javax.annotation.Nullable;
 import lombok.Getter;
+import lombok.Setter;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.WorkUnitState;
@@ -89,15 +91,27 @@ public class TaskState extends WorkUnitState implements 
TaskProgress {
    */
   private static final String BYTES_PER_SECOND = "bytesPerSec";
 
+  /** ID of the job this {@link TaskState} is for */
+  @Getter @Setter
   private String jobId;
+  /** ID of the task this {@link TaskState} is for */
+  @Getter @Setter
   private String taskId;
+  /** sequence number of the task this {@link TaskState} is for */
+  @Getter
   private String taskKey;
   @Getter
   private Optional<String> taskAttemptId;
 
+  /** task start time in milliseconds */
+  @Getter @Setter
   private long startTime = 0;
+  /** task end time in milliseconds */
+  @Getter @Setter
   private long endTime = 0;
-  private long duration;
+  /** task duration in milliseconds */
+  @Getter @Setter
+  private long taskDuration;
 
   // Needed for serialization/deserialization
   public TaskState() {}
@@ -123,105 +137,6 @@ public class TaskState extends WorkUnitState implements 
TaskProgress {
     this.setId(this.taskId);
   }
 
-  /**
-   * Get the ID of the job this {@link TaskState} is for.
-   *
-   * @return ID of the job this {@link TaskState} is for
-   */
-  public String getJobId() {
-    return this.jobId;
-  }
-
-  /**
-   * Set the ID of the job this {@link TaskState} is for.
-   *
-   * @param jobId ID of the job this {@link TaskState} is for
-   */
-  public void setJobId(String jobId) {
-    this.jobId = jobId;
-  }
-
-  /**
-   * Get the sequence number of the task this {@link TaskState} is for.
-   *
-   * @return Sequence number of the task this {@link TaskState} is for
-   */
-  public String getTaskKey() {
-    return this.taskKey;
-  }
-
-  /**
-   * Get the ID of the task this {@link TaskState} is for.
-   *
-   * @return ID of the task this {@link TaskState} is for
-   */
-  public String getTaskId() {
-    return this.taskId;
-  }
-
-  /**
-   * Set the ID of the task this {@link TaskState} is for.
-   *
-   * @param taskId ID of the task this {@link TaskState} is for
-   */
-  public void setTaskId(String taskId) {
-    this.taskId = taskId;
-  }
-
-  /**
-   * Get task start time in milliseconds.
-   *
-   * @return task start time in milliseconds
-   */
-  public long getStartTime() {
-    return this.startTime;
-  }
-
-  /**
-   * Set task start time in milliseconds.
-   *
-   * @param startTime task start time in milliseconds
-   */
-  public void setStartTime(long startTime) {
-    this.startTime = startTime;
-  }
-
-  /**
-   * Get task end time in milliseconds.
-   *
-   * @return task end time in milliseconds
-   */
-  public long getEndTime() {
-    return this.endTime;
-  }
-
-  /**
-   * set task end time in milliseconds.
-   *
-   * @param endTime task end time in milliseconds
-   */
-  public void setEndTime(long endTime) {
-    this.endTime = endTime;
-  }
-
-  /**
-   * Get task duration in milliseconds.
-   *
-   * @return task duration in milliseconds
-   */
-  public long getTaskDuration() {
-    return this.duration;
-  }
-
-  /**
-   * Set task duration in milliseconds.
-   *
-   * @param duration task duration in milliseconds
-   */
-  public void setTaskDuration(long duration) {
-    this.duration = duration;
-  }
-
   /**
    * Get the {@link ConfigurationKeys#TASK_FAILURE_EXCEPTION_KEY} if it 
exists, else return {@link Optional#absent()}.
    */
@@ -348,7 +263,7 @@ public class TaskState extends WorkUnitState implements 
TaskProgress {
     this.setId(this.taskId);
     this.startTime = in.readLong();
     this.endTime = in.readLong();
-    this.duration = in.readLong();
+    this.taskDuration = in.readLong();
     super.readFields(in);
   }
 
@@ -361,7 +276,7 @@ public class TaskState extends WorkUnitState implements 
TaskProgress {
     text.write(out);
     out.writeLong(this.startTime);
     out.writeLong(this.endTime);
-    out.writeLong(this.duration);
+    out.writeLong(this.taskDuration);
     super.write(out);
   }
 
@@ -384,6 +299,23 @@ public class TaskState extends WorkUnitState implements 
TaskProgress {
     return result;
   }
 
+  /** @return pretty-printed JSON, including all properties */
+  public String toJsonString() {
+    return toJsonString(true);
+  }
+
+  /** @return pretty-printed JSON, optionally including properties */
+  public String toJsonString(boolean includeProperties) {
+    StringWriter stringWriter = new StringWriter();
+    try (JsonWriter jsonWriter = new JsonWriter(stringWriter)) {
+      jsonWriter.setIndent("\t");
+      this.toJson(jsonWriter, includeProperties);
+    } catch (IOException ioe) {
+      // Ignored
+    }
+    return stringWriter.toString();
+  }
+
   /**
    * Convert this {@link TaskState} to a json document.
    *
@@ -432,7 +364,7 @@ public class TaskState extends WorkUnitState implements 
TaskProgress {
     if (this.endTime > 0) {
       taskExecutionInfo.setEndTime(this.endTime);
     }
-    taskExecutionInfo.setDuration(this.duration);
+    taskExecutionInfo.setDuration(this.taskDuration);
     
taskExecutionInfo.setState(TaskStateEnum.valueOf(getWorkingState().name()));
     if (this.contains(ConfigurationKeys.TASK_FAILURE_EXCEPTION_KEY)) {
       
taskExecutionInfo.setFailureException(this.getProp(ConfigurationKeys.TASK_FAILURE_EXCEPTION_KEY));
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/GobblinOutputCommitter.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/GobblinOutputCommitter.java
index 65401c03d..57ba0936d 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/GobblinOutputCommitter.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/GobblinOutputCommitter.java
@@ -40,7 +40,6 @@ import com.google.common.io.Closer;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.WorkUnitState;
-import org.apache.gobblin.runtime.AbstractJobLauncher;
 import org.apache.gobblin.runtime.GobblinMultiTaskAttempt;
 import org.apache.gobblin.runtime.listeners.JobListener;
 import org.apache.gobblin.source.workunit.MultiWorkUnit;
@@ -89,28 +88,18 @@ public class GobblinOutputCommitter extends OutputCommitter 
{
 
         Closer workUnitFileCloser = Closer.create();
 
-        // If the file ends with ".wu" de-serialize it into a WorkUnit
-        if 
(status.getPath().getName().endsWith(AbstractJobLauncher.WORK_UNIT_FILE_EXTENSION))
 {
-          WorkUnit wu = WorkUnit.createEmpty();
-          try {
-            wu.readFields(workUnitFileCloser.register(new 
DataInputStream(fs.open(status.getPath()))));
-          } finally {
-            workUnitFileCloser.close();
-          }
-          JobLauncherUtils.cleanTaskStagingData(new WorkUnitState(wu), LOG);
+        WorkUnit wu = 
JobLauncherUtils.createEmptyWorkUnitPerExtension(status.getPath());
+        try {
+          wu.readFields(workUnitFileCloser.register(new 
DataInputStream(fs.open(status.getPath()))));
+        } finally {
+          workUnitFileCloser.close();
         }
-
-        // If the file ends with ".mwu" de-serialize it into a MultiWorkUnit
-        if 
(status.getPath().getName().endsWith(AbstractJobLauncher.MULTI_WORK_UNIT_FILE_EXTENSION))
 {
-          MultiWorkUnit mwu = MultiWorkUnit.createEmpty();
-          try {
-            mwu.readFields(workUnitFileCloser.register(new 
DataInputStream(fs.open(status.getPath()))));
-          } finally {
-            workUnitFileCloser.close();
-          }
-          for (WorkUnit wu : mwu.getWorkUnits()) {
-            JobLauncherUtils.cleanTaskStagingData(new WorkUnitState(wu), LOG);
+        if (wu instanceof MultiWorkUnit) {
+          for (WorkUnit eachWU : ((MultiWorkUnit) wu).getWorkUnits()) {
+            JobLauncherUtils.cleanTaskStagingData(new WorkUnitState(eachWU), 
LOG);
           }
+        } else {
+          JobLauncherUtils.cleanTaskStagingData(new WorkUnitState(wu), LOG);
         }
       }
     } finally {
@@ -173,8 +162,7 @@ public class GobblinOutputCommitter extends OutputCommitter 
{
   private static class WorkUnitFilter implements PathFilter {
     @Override
     public boolean accept(Path path) {
-      return 
path.getName().endsWith(AbstractJobLauncher.WORK_UNIT_FILE_EXTENSION)
-          || 
path.getName().endsWith(AbstractJobLauncher.MULTI_WORK_UNIT_FILE_EXTENSION);
+      return JobLauncherUtils.hasAnyWorkUnitExtension(path);
     }
   }
 }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
index cd34fe1b5..1893cebd7 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
@@ -25,6 +25,7 @@ import java.text.SimpleDateFormat;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -56,7 +57,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
 import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -523,19 +523,18 @@ public class MRJobLauncher extends AbstractJobLauncher {
     mrJobSetupTimer.stop();
   }
 
-  static boolean isSpeculativeExecutionEnabled(Properties props) {
-    return Boolean.valueOf(
-        props.getProperty(JobContext.MAP_SPECULATIVE, 
ConfigurationKeys.DEFAULT_ENABLE_MR_SPECULATIVE_EXECUTION));
+  static boolean isBooleanPropEnabled(Properties props, String propKey, 
Optional<Boolean> optDefault) {
+    return (props.containsKey(propKey) && 
Boolean.parseBoolean(props.getProperty(propKey)))
+        || (optDefault.isPresent() && optDefault.get());
   }
 
-  static boolean isCustomizedProgressReportEnabled(Properties properties) {
-    return properties.containsKey(ENABLED_CUSTOMIZED_PROGRESS)
-        && 
Boolean.parseBoolean(properties.getProperty(ENABLED_CUSTOMIZED_PROGRESS));
+  static boolean isSpeculativeExecutionEnabled(Properties props) {
+    return isBooleanPropEnabled(props, JobContext.MAP_SPECULATIVE,
+        
Optional.of(ConfigurationKeys.DEFAULT_ENABLE_MR_SPECULATIVE_EXECUTION));
   }
 
-  static boolean isBooleanPropEnabled(Properties props, String propKey, 
Optional<Boolean> optDefault) {
-    return (props.containsKey(propKey) && 
Boolean.parseBoolean(props.getProperty(propKey)))
-        || (optDefault.isPresent() && optDefault.get());
+  static boolean isCustomizedProgressReportEnabled(Properties properties) {
+    return isBooleanPropEnabled(properties, ENABLED_CUSTOMIZED_PROGRESS, 
Optional.empty());
   }
 
   static boolean isMapperFailureFatalEnabled(Properties props) {
@@ -688,11 +687,11 @@ public class MRJobLauncher extends AbstractJobLauncher {
       for (WorkUnit workUnit : workUnits) {
 
         String workUnitFileName;
-        if (workUnit instanceof MultiWorkUnit) {
+        if (workUnit.isMultiWorkUnit()) {
           workUnitFileName = 
JobLauncherUtils.newMultiTaskId(this.jobContext.getJobId(), 
multiTaskIdSequence++)
-              + MULTI_WORK_UNIT_FILE_EXTENSION;
+              + JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION;
         } else {
-          workUnitFileName = workUnit.getProp(ConfigurationKeys.TASK_ID_KEY) + 
WORK_UNIT_FILE_EXTENSION;
+          workUnitFileName = workUnit.getProp(ConfigurationKeys.TASK_ID_KEY) + 
JobLauncherUtils.WORK_UNIT_FILE_EXTENSION;
         }
         Path workUnitFile = new Path(this.jobInputPath, workUnitFileName);
         LOG.debug("Writing work unit file " + workUnitFileName);
@@ -731,7 +730,7 @@ public class MRJobLauncher extends AbstractJobLauncher {
    */
   @VisibleForTesting
   void countersToMetrics(GobblinMetrics metrics) throws IOException {
-    Optional<Counters> counters = 
Optional.fromNullable(this.job.getCounters());
+    Optional<Counters> counters = Optional.ofNullable(this.job.getCounters());
 
     if (counters.isPresent()) {
       // Write job-level counters
@@ -772,7 +771,7 @@ public class MRJobLauncher extends AbstractJobLauncher {
     private TaskExecutor taskExecutor;
     private TaskStateTracker taskStateTracker;
     private ServiceManager serviceManager;
-    private Optional<JobMetrics> jobMetrics = Optional.absent();
+    private Optional<JobMetrics> jobMetrics = Optional.empty();
     private boolean isSpeculativeEnabled;
     private boolean customizedProgressEnabled;
     private final JobState jobState = new JobState();
@@ -809,25 +808,18 @@ public class MRJobLauncher extends AbstractJobLauncher {
         this.fs = FileSystem.get(context.getConfiguration());
         this.taskStateStore =
             new FsStateStore<>(this.fs, 
FileOutputFormat.getOutputPath(context).toUri().getPath(), TaskState.class);
-
         String jobStateFileName = 
context.getConfiguration().get(ConfigurationKeys.JOB_STATE_DISTRIBUTED_CACHE_NAME);
-        boolean foundStateFile = false;
-        for (Path dcPath : 
DistributedCache.getLocalCacheFiles(context.getConfiguration())) {
-          if (dcPath.getName().equals(jobStateFileName)) {
-            SerializationUtils.deserializeStateFromInputStream(
-                closer.register(new 
FileInputStream(dcPath.toUri().getPath())), this.jobState);
-            foundStateFile = true;
-            break;
-          }
-        }
-        if (!foundStateFile) {
-          throw new IOException("Job state file not found.");
+        Optional<URI> jobStateFileUri = 
getStateFileUriForJob(context.getConfiguration(), jobStateFileName);
+        if (jobStateFileUri.isPresent()) {
+          SerializationUtils.deserializeStateFromInputStream(
+                closer.register(new 
FileInputStream(jobStateFileUri.get().getPath())), this.jobState);
+        } else {
+          throw new IOException("Job state file not found: '" + 
jobStateFileName + "'.");
         }
       } catch (IOException | ReflectiveOperationException e) {
         throw new RuntimeException("Failed to setup the mapper task", e);
       }
 
-
       // load dynamic configuration to add to the job configuration
       Configuration configuration = context.getConfiguration();
       Config jobStateAsConfig = 
ConfigUtils.propertiesToConfig(this.jobState.getProperties());
@@ -989,17 +981,17 @@ public class MRJobLauncher extends AbstractJobLauncher {
 
     @Override
     public void map(LongWritable key, Text value, Context context) throws 
IOException, InterruptedException {
-      WorkUnit workUnit = 
(value.toString().endsWith(MULTI_WORK_UNIT_FILE_EXTENSION) ? 
MultiWorkUnit.createEmpty()
-          : WorkUnit.createEmpty());
-      SerializationUtils.deserializeState(this.fs, new Path(value.toString()), 
workUnit);
-
-      if (workUnit instanceof MultiWorkUnit) {
-        List<WorkUnit> flattenedWorkUnits =
-            JobLauncherUtils.flattenWorkUnits(((MultiWorkUnit) 
workUnit).getWorkUnits());
-        this.workUnits.addAll(flattenedWorkUnits);
-      } else {
-        this.workUnits.add(workUnit);
+      this.workUnits.addAll(JobLauncherUtils.loadFlattenedWorkUnits(this.fs, 
new Path(value.toString())));
+    }
+
+    /** @return {@link URI} if a distributed cache file matches 
`jobStateFileName` */
+    protected Optional<URI> getStateFileUriForJob(Configuration conf, String 
jobStateFileName) throws IOException {
+      for (Path dcPath : DistributedCache.getLocalCacheFiles(conf)) {
+        if (dcPath.getName().equals(jobStateFileName)) {
+          return Optional.of(dcPath.toUri());
+        }
       }
+      return Optional.empty();
     }
 
     /**
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
index 42fae521e..1f20b1b73 100644
--- 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
+++ 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
@@ -55,6 +55,9 @@ import org.apache.gobblin.source.workunit.WorkUnit;
 @Slf4j
 public class JobLauncherUtils {
 
+  public static final String WORK_UNIT_FILE_EXTENSION = ".wu";
+  public static final String MULTI_WORK_UNIT_FILE_EXTENSION = ".mwu";
+
   // A cache for proxied FileSystems by owners
   private static Cache<String, FileSystem> fileSystemCacheByOwners = 
CacheBuilder.newBuilder().build();
 
@@ -114,7 +117,7 @@ public class JobLauncherUtils {
   public static List<WorkUnit> flattenWorkUnits(Collection<WorkUnit> 
workUnits) {
     List<WorkUnit> flattenedWorkUnits = Lists.newArrayList();
     for (WorkUnit workUnit : workUnits) {
-      if (workUnit instanceof MultiWorkUnit) {
+      if (workUnit.isMultiWorkUnit()) {
         flattenedWorkUnits.addAll(flattenWorkUnits(((MultiWorkUnit) 
workUnit).getWorkUnits()));
       } else {
         flattenedWorkUnits.add(workUnit);
@@ -123,6 +126,34 @@ public class JobLauncherUtils {
     return flattenedWorkUnits;
   }
 
+  /** @return flattened list of {@link WorkUnit}s loaded from `path`, which 
may possibly hold a multi-work unit */
+  public static List<WorkUnit> loadFlattenedWorkUnits(FileSystem fs, Path 
path) throws IOException {
+    WorkUnit workUnit = JobLauncherUtils.createEmptyWorkUnitPerExtension(path);
+    SerializationUtils.deserializeState(fs, path, workUnit);
+
+    if (workUnit.isMultiWorkUnit()) {
+      return JobLauncherUtils.flattenWorkUnits(((MultiWorkUnit) 
workUnit).getWorkUnits());
+    } else {
+      return Lists.newArrayList(workUnit);
+    }
+  }
+
+  /** @return an empty {@link WorkUnit}, potentially an empty {@link 
MultiWorkUnit}, based on the {@link Path} extension */
+  public static WorkUnit createEmptyWorkUnitPerExtension(Path p) {
+    return JobLauncherUtils.hasMultiWorkUnitExtension(p) ? 
MultiWorkUnit.createEmpty() : WorkUnit.createEmpty();
+  }
+
+  /** @return whether {@link Path} ends with {@link 
JobLauncherUtils#MULTI_WORK_UNIT_FILE_EXTENSION} */
+  public static boolean hasMultiWorkUnitExtension(Path p) {
+    return 
p.getName().endsWith(JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION);
+  }
+
+  /** @return whether {@link Path} ends with {@link 
JobLauncherUtils#MULTI_WORK_UNIT_FILE_EXTENSION} or {@link 
JobLauncherUtils#WORK_UNIT_FILE_EXTENSION} */
+  public static boolean hasAnyWorkUnitExtension(Path p) {
+    return 
p.getName().endsWith(JobLauncherUtils.MULTI_WORK_UNIT_FILE_EXTENSION)
+        || p.getName().endsWith(JobLauncherUtils.WORK_UNIT_FILE_EXTENSION);
+  }
+
   /**
    * Cleanup the staging data for a list of Gobblin tasks. This method calls 
the
    * {@link #cleanTaskStagingData(State, Logger)} method.


Reply via email to