[GOBBLIN-382] Support storing job.state file in mysql state store for 
standalone cluster

Closes #2262 from htran1/cluster_job_state_store


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f2f6e468
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f2f6e468
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f2f6e468

Branch: refs/heads/0.12.0
Commit: f2f6e468536bf3f0d79a3c126f620ac0741df65d
Parents: fd0c30a
Author: Hung Tran <[email protected]>
Authored: Tue Jan 23 20:23:20 2018 -0800
Committer: Hung Tran <[email protected]>
Committed: Tue Jan 23 20:23:20 2018 -0800

----------------------------------------------------------------------
 .../configuration/ConfigurationKeys.java        |  3 ++
 .../GobblinClusterConfigurationKeys.java        |  3 ++
 .../gobblin/cluster/GobblinClusterUtils.java    | 34 +++++++++++++++-
 .../cluster/GobblinHelixJobLauncher.java        | 35 +++++++++++-----
 .../gobblin/cluster/GobblinHelixTask.java       |  7 +---
 .../cluster/GobblinHelixTaskFactory.java        |  5 ++-
 .../org/apache/gobblin/cluster/SingleTask.java  | 18 ++++++--
 .../gobblin/cluster/SingleTaskRunner.java       | 15 +++----
 .../cluster/GobblinHelixJobLauncherTest.java    |  6 +++
 .../gobblin/runtime/util/StateStores.java       | 43 +++++++++++++++++---
 .../util/test/TestStressTestingSource.java      | 11 +++--
 11 files changed, 138 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f2f6e468/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index ed360d9..267a17e 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -54,6 +54,9 @@ public class ConfigurationKeys {
   public static final String INTERMEDIATE_STATE_STORE_TYPE_KEY = 
INTERMEDIATE_STATE_STORE_PREFIX + ".state.store.type";
   public static final String DEFAULT_STATE_STORE_TYPE = "fs";
   public static final String STATE_STORE_TYPE_NOOP = "noop";
+  // are the job.state files stored using the state store?
+  public static final String JOB_STATE_IN_STATE_STORE = 
"state.store.jobStateInStateStore";
+  public static final boolean DEFAULT_JOB_STATE_IN_STATE_STORE = false;
 
   public static final String CONFIG_RUNTIME_PREFIX = "gobblin.config.runtime.";
   // Root directory where task state files are stored

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f2f6e468/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 5e25194..4e78078 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -63,6 +63,9 @@ public class GobblinClusterConfigurationKeys {
   public static final String JOB_CONF_PATH_KEY = GOBBLIN_CLUSTER_PREFIX + 
"job.conf.path";
   public static final String INPUT_WORK_UNIT_DIR_NAME = "_workunits";
   public static final String OUTPUT_TASK_STATE_DIR_NAME = "_taskstates";
+  // This is the directory to store job.state files when a state store is used.
+  // Note that a .job.state file is not the same thing as a .jst file.
+  public static final String JOB_STATE_DIR_NAME = "_jobstates";
   public static final String TAR_GZ_FILE_SUFFIX = ".tar.gz";
 
   // Other misc configuration properties.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f2f6e468/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
index 3f53443..6b6ead8 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
@@ -19,14 +19,20 @@ package org.apache.gobblin.cluster;
 
 import static 
org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR;
 
-import com.typesafe.config.Config;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+
+import com.typesafe.config.Config;
+
 import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.AbstractJobLauncher;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import lombok.extern.slf4j.Slf4j;
+
 @Alpha
+@Slf4j
 public class GobblinClusterUtils {
 
   /**
@@ -70,4 +76,30 @@ public class GobblinClusterUtils {
   public static String getAppWorkDirPath(String applicationName, String 
applicationId) {
     return applicationName + Path.SEPARATOR + applicationId;
   }
+
+  /**
+   * Generate the path to the job.state file
+   * @param usingStateStore is a state store being used to store the job.state 
content
+   * @param appWorkPath work directory
+   * @param jobId job id
+   * @return a {@link Path} referring to the job.state
+   */
+  public static Path getJobStateFilePath(boolean usingStateStore, Path 
appWorkPath, String jobId) {
+    final Path jobStateFilePath;
+
+    // the state store uses a path of the form 
workdir/_jobstate/job_id/job_id.job.state while old method stores the file
+    // in the app work dir.
+    if (usingStateStore) {
+      jobStateFilePath = new Path(appWorkPath, 
GobblinClusterConfigurationKeys.JOB_STATE_DIR_NAME
+          + Path.SEPARATOR + jobId + Path.SEPARATOR + jobId + "."
+          + AbstractJobLauncher.JOB_STATE_FILE_NAME);
+
+    } else {
+      jobStateFilePath = new Path(appWorkPath, jobId + "." + 
AbstractJobLauncher.JOB_STATE_FILE_NAME);
+    }
+
+    log.info("job state file path: " + jobStateFilePath);
+
+    return jobStateFilePath;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f2f6e468/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index fc78053..1a39dfb 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -162,13 +162,14 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
 
     this.stateStores = new StateStores(stateStoreJobConfig, appWorkDir,
         GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME, appWorkDir,
-        GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME);
+        GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME, appWorkDir,
+        GobblinClusterConfigurationKeys.JOB_STATE_DIR_NAME);
 
     URI fsUri = URI.create(jobProps.getProperty(ConfigurationKeys.FS_URI_KEY, 
ConfigurationKeys.LOCAL_FS_URI));
     this.fs = FileSystem.get(fsUri, new Configuration());
 
     this.taskStateCollectorService = new TaskStateCollectorService(jobProps, 
this.jobContext.getJobState(),
-        this.eventBus, this.stateStores.taskStateStore, outputTaskStateDir);
+        this.eventBus, this.stateStores.getTaskStateStore(), 
outputTaskStateDir);
 
     if 
(Task.getExecutionModel(ConfigUtils.configToState(jobConfig)).equals(ExecutionModel.STREAMING))
 {
       // Fix-up Ideal State with a custom rebalancer that will re-balance 
long-running jobs
@@ -268,8 +269,17 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
         addWorkUnit(workUnit, stateSerDeRunner, taskConfigMap);
       }
 
-      Path jobStateFilePath = new Path(this.appWorkDir, 
this.jobContext.getJobId() + "." + JOB_STATE_FILE_NAME);
-      SerializationUtils.serializeState(this.fs, jobStateFilePath, 
this.jobContext.getJobState());
+      Path jobStateFilePath;
+
+      // write the job.state using the state store if present, otherwise 
serialize directly to the file
+      if (this.stateStores.haveJobStateStore()) {
+        jobStateFilePath = GobblinClusterUtils.getJobStateFilePath(true, 
this.appWorkDir, this.jobContext.getJobId());
+        
this.stateStores.getJobStateStore().put(jobStateFilePath.getParent().getName(), 
jobStateFilePath.getName(),
+            this.jobContext.getJobState());
+      } else {
+        jobStateFilePath = GobblinClusterUtils.getJobStateFilePath(false, 
this.appWorkDir, this.jobContext.getJobId());
+        SerializationUtils.serializeState(this.fs, jobStateFilePath, 
this.jobContext.getJobState());
+      }
 
       LOGGER.debug("GobblinHelixJobLauncher.createJob: jobStateFilePath {}, 
jobState {} jobProperties {}",
           jobStateFilePath, this.jobContext.getJobState().toString(), 
this.jobContext.getJobState().getProperties());
@@ -355,10 +365,10 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
 
     if (workUnit instanceof MultiWorkUnit) {
       workUnitFileName += MULTI_WORK_UNIT_FILE_EXTENSION;
-      stateStore = stateStores.mwuStateStore;
+      stateStore = stateStores.getMwuStateStore();
     } else {
       workUnitFileName += WORK_UNIT_FILE_EXTENSION;
-      stateStore = stateStores.wuStateStore;
+      stateStore = stateStores.getWuStateStore();
     }
 
     Path workUnitFile = new Path(workUnitFileDir, workUnitFileName);
@@ -396,14 +406,19 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
    */
   private void cleanupWorkingDirectory() throws IOException {
     LOGGER.info("Deleting persisted work units for job " + 
this.jobContext.getJobId());
-    stateStores.wuStateStore.delete(this.jobContext.getJobId());
+    stateStores.getWuStateStore().delete(this.jobContext.getJobId());
 
     // delete the directory that stores the task state files
-    stateStores.taskStateStore.delete(outputTaskStateDir.getName());
+    stateStores.getTaskStateStore().delete(outputTaskStateDir.getName());
 
     LOGGER.info("Deleting job state file for job " + 
this.jobContext.getJobId());
-    Path jobStateFilePath = new Path(this.appWorkDir, 
this.jobContext.getJobId() + "." + JOB_STATE_FILE_NAME);
-    this.fs.delete(jobStateFilePath, false);
+
+    if (this.stateStores.haveJobStateStore()) {
+      this.stateStores.getJobStateStore().delete(this.jobContext.getJobId());
+    } else {
+      Path jobStateFilePath = GobblinClusterUtils.getJobStateFilePath(false, 
this.appWorkDir, this.jobContext.getJobId());
+      this.fs.delete(jobStateFilePath, false);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f2f6e468/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
index 6a6e60d..c6b9514 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
@@ -78,17 +78,14 @@ public class GobblinHelixTask implements Task {
     this.taskConfig = taskCallbackContext.getTaskConfig();
     getInfoFromTaskConfig();
 
-    Path jobStateFilePath = constructJobStateFilePath(appWorkDir);
+    Path jobStateFilePath =
+        
GobblinClusterUtils.getJobStateFilePath(stateStores.haveJobStateStore(), 
appWorkDir, this.jobId);
 
     this.task =
         new SingleTask(this.jobId, workUnitFilePath, jobStateFilePath, fs, 
taskAttemptBuilder,
             stateStores);
   }
 
-  private Path constructJobStateFilePath(Path appWorkDir) {
-    return new Path(appWorkDir, this.jobId + "." + 
AbstractJobLauncher.JOB_STATE_FILE_NAME);
-  }
-
   private void getInfoFromTaskConfig() {
     Map<String, String> configMap = this.taskConfig.getConfigMap();
     this.jobName = configMap.get(ConfigurationKeys.JOB_NAME_KEY);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f2f6e468/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java
index b8e55d8..2fa845c 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java
@@ -79,14 +79,15 @@ public class GobblinHelixTaskFactory implements TaskFactory 
{
     this.fs = fs;
     this.appWorkDir = appWorkDir;
     this.stateStores = new StateStores(config, appWorkDir, 
GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME,
-        appWorkDir, GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME);
+        appWorkDir, GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME, 
appWorkDir,
+        GobblinClusterConfigurationKeys.JOB_STATE_DIR_NAME);
     this.taskAttemptBuilder = createTaskAttemptBuilder();
   }
 
   private TaskAttemptBuilder createTaskAttemptBuilder() {
     TaskAttemptBuilder builder = new TaskAttemptBuilder(this.taskStateTracker, 
this.taskExecutor);
     builder.setContainerId(this.helixManager.getInstanceName());
-    builder.setTaskStateStore(this.stateStores.taskStateStore);
+    builder.setTaskStateStore(this.stateStores.getTaskStateStore());
 
     return builder;
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f2f6e468/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
index 3b69e0c..89f2bfa 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
@@ -97,8 +97,18 @@ public class SingleTask {
 
   private JobState getJobState()
       throws java.io.IOException {
-    JobState jobState = new JobState();
-    SerializationUtils.deserializeState(_fs, _jobStateFilePath, jobState);
+    JobState jobState;
+
+    // read the state from the state store if present, otherwise deserialize 
directly from the file
+    if (_stateStores.haveJobStateStore()) {
+      jobState = 
_stateStores.getJobStateStore().get(_jobStateFilePath.getParent().getName(),
+          _jobStateFilePath.getName(),
+          _jobStateFilePath.getParent().getName());
+    } else {
+      jobState = new JobState();
+      SerializationUtils.deserializeState(_fs, _jobStateFilePath, jobState);
+    }
+
     return jobState;
   }
 
@@ -109,9 +119,9 @@ public class SingleTask {
     WorkUnit workUnit;
 
     if 
(_workUnitFilePath.getName().endsWith(AbstractJobLauncher.MULTI_WORK_UNIT_FILE_EXTENSION))
 {
-      workUnit = _stateStores.mwuStateStore.getAll(storeName, fileName).get(0);
+      workUnit = _stateStores.getMwuStateStore().getAll(storeName, 
fileName).get(0);
     } else {
-      workUnit = _stateStores.wuStateStore.getAll(storeName, fileName).get(0);
+      workUnit = _stateStores.getWuStateStore().getAll(storeName, 
fileName).get(0);
     }
 
     // The list of individual WorkUnits (flattened) to run

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f2f6e468/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
index 9cc4733..6226cf1 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
@@ -106,11 +106,13 @@ class SingleTaskRunner {
 
   private void getSingleHelixTask()
       throws IOException {
-    final Path jobStateFilePath = getJobStateFilePath();
     final FileSystem fs = getFileSystem();
     final StateStores stateStores = new StateStores(this.clusterConfig, 
this.appWorkPath,
         GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME, 
this.appWorkPath,
-        GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME);
+        GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME, 
this.appWorkPath,
+        GobblinClusterConfigurationKeys.JOB_STATE_DIR_NAME);
+    final Path jobStateFilePath =
+        
GobblinClusterUtils.getJobStateFilePath(stateStores.haveJobStateStore(), 
this.appWorkPath, this.jobId);
 
     final TaskAttemptBuilder taskAttemptBuilder = 
getTaskAttemptBuilder(stateStores);
 
@@ -122,7 +124,7 @@ class SingleTaskRunner {
     final TaskAttemptBuilder taskAttemptBuilder =
         new TaskAttemptBuilder(this.taskStateTracker, this.taskExecutor);
     // No container id is set. Use the default.
-    taskAttemptBuilder.setTaskStateStore(stateStores.taskStateStore);
+    taskAttemptBuilder.setTaskStateStore(stateStores.getTaskStateStore());
     return taskAttemptBuilder;
   }
 
@@ -135,13 +137,6 @@ class SingleTaskRunner {
     this.serviceManager = new ServiceManager(services);
   }
 
-  private Path getJobStateFilePath() {
-    final String jobStateFileName = this.jobId + "." + 
AbstractJobLauncher.JOB_STATE_FILE_NAME;
-    final Path jobStateFilePath = new Path(this.appWorkPath, jobStateFileName);
-    logger.info("job state file path: " + jobStateFilePath);
-    return jobStateFilePath;
-  }
-
   private FileSystem getFileSystem()
       throws IOException {
     final Configuration conf = HadoopUtils.newConfiguration();

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f2f6e468/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
index 77a33af..aaf5f05 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
@@ -121,6 +121,7 @@ public class GobblinHelixJobLauncherTest {
                    
ConfigValueFactory.fromAnyRef(testingZKServer.getConnectString()))
         .withValue(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL,
             ConfigValueFactory.fromAnyRef(sourceJsonFile.getAbsolutePath()))
+        .withValue(ConfigurationKeys.JOB_STATE_IN_STATE_STORE, 
ConfigValueFactory.fromAnyRef("true"))
         .resolve();
 
     String zkConnectingString = 
baseConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
@@ -339,6 +340,11 @@ public class GobblinHelixJobLauncherTest {
 
     Assert.assertFalse(workunitsDir.exists());
     Assert.assertFalse(taskstatesDir.exists());
+
+    // check that job.state file is cleaned up
+    final File jobStateFile = new 
File(GobblinClusterUtils.getJobStateFilePath(true, this.appWorkDir, 
jobIdKey).toString());
+
+    Assert.assertFalse(jobStateFile.exists());
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f2f6e468/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/StateStores.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/StateStores.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/StateStores.java
index 8d1c51f..cc892f8 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/StateStores.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/StateStores.java
@@ -16,40 +16,52 @@
  */
 package org.apache.gobblin.runtime.util;
 
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValue;
 import com.typesafe.config.ConfigValueFactory;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobState;
 import org.apache.gobblin.runtime.TaskState;
 import org.apache.gobblin.source.workunit.MultiWorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
-import org.apache.hadoop.fs.Path;
 
-import java.util.Map;
+import lombok.Getter;
 
 /**
  * state stores used for storing work units and task states
  */
 public class StateStores {
-  public final StateStore<TaskState> taskStateStore;
-  public final StateStore<WorkUnit> wuStateStore;
-  public final StateStore<MultiWorkUnit> mwuStateStore;
+  @Getter
+  private final StateStore<TaskState> taskStateStore;
+  @Getter
+  private final StateStore<WorkUnit> wuStateStore;
+  @Getter
+  private final StateStore<MultiWorkUnit> mwuStateStore;
+  // state store for job.state files. This should not be confused with the jst 
state store
+  @Getter
+  private final StateStore<JobState> jobStateStore;
 
   /**
    * Creates the state stores under storeBase
    * {@link WorkUnit}s will be stored under 
storeBase/_workunits/subdir/filename.(m)wu
    * {@link TaskState}s will be stored under 
storeBase/_taskstates/subdir/filename.tst
+   * {@link JobState}s will be stored under 
StoreBase/_jobStates/subdir/filename.job.state
    * Some state stores such as the MysqlStateStore do not preserve the path 
prefix of storeRoot.
    * In those cases only the last three components of the path determine the 
key for the data.
    * @param config config properties
    * @param taskStoreBase the base directory that holds the store root for the 
task state store
    */
   public StateStores(Config config, Path taskStoreBase, String taskStoreTable, 
Path workUnitStoreBase,
-      String workUnitStoreTable) {
+      String workUnitStoreTable, Path jobStateStoreBase, String 
jobStateStoreTable) {
     String stateStoreType = ConfigUtils.getString(config, 
ConfigurationKeys.INTERMEDIATE_STATE_STORE_TYPE_KEY,
         ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_TYPE_KEY,
             ConfigurationKeys.DEFAULT_STATE_STORE_TYPE));
@@ -79,6 +91,25 @@ public class StateStores {
     taskStateStore = stateStoreFactory.createStateStore(taskStateStoreConfig, 
TaskState.class);
     wuStateStore = stateStoreFactory.createStateStore(wuStateStoreConfig, 
WorkUnit.class);
     mwuStateStore = stateStoreFactory.createStateStore(wuStateStoreConfig, 
MultiWorkUnit.class);
+
+    // create a state store to store job.state content if configured
+    if (ConfigUtils.getBoolean(config, 
ConfigurationKeys.JOB_STATE_IN_STATE_STORE,
+        ConfigurationKeys.DEFAULT_JOB_STATE_IN_STATE_STORE)) {
+      // Override properties to place the JobState StateStore at the 
appropriate location
+      Path jobStateOutputDir = new Path(jobStateStoreBase, jobStateStoreTable);
+      Config jobStateStoreConfig = getStateStoreConfig(config, 
jobStateOutputDir.toString(), jobStateStoreTable);
+
+      jobStateStore = stateStoreFactory.createStateStore(jobStateStoreConfig, 
JobState.class);
+    } else {
+      jobStateStore = null;
+    }
+  }
+
+  /**
+   * @return true if a state store is present for storing job.state content
+   */
+  public boolean haveJobStateStore() {
+    return this.jobStateStore != null;
   }
 
   private static Config getStateStoreConfig(Config config, String rootDir, 
String dbTableKey) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f2f6e468/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestStressTestingSource.java
----------------------------------------------------------------------
diff --git 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestStressTestingSource.java
 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestStressTestingSource.java
index aad8a3a..040d69b 100644
--- 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestStressTestingSource.java
+++ 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestStressTestingSource.java
@@ -92,8 +92,9 @@ public class TestStressTestingSource {
     long endTimeNano = System.nanoTime();
 
     long timeSpentMicro = (endTimeNano - startTimeNano)/(1000);
-    // check that there is less than 2 second difference between expected and 
actual time spent
-    Assert.assertTrue(Math.abs(timeSpentMicro - (COMPUTE_TIME_MICRO * 
NUM_RECORDS)) < (2000000));
+    // check that there is less than 5 second difference between expected and 
actual time spent
+    Assert.assertTrue(Math.abs(timeSpentMicro - (COMPUTE_TIME_MICRO * 
NUM_RECORDS)) < (5000000),
+        "Time spent " + timeSpentMicro);
   }
 
   @Test
@@ -127,7 +128,8 @@ public class TestStressTestingSource {
 
     long timeSpentMicro = (endTimeNano - startTimeNano)/(1000);
     // check that there is less than 2 second difference between expected and 
actual time spent
-    Assert.assertTrue(Math.abs(timeSpentMicro - (SLEEP_TIME_MICRO * 
NUM_RECORDS)) < (2000000));
+    Assert.assertTrue(Math.abs(timeSpentMicro - (SLEEP_TIME_MICRO * 
NUM_RECORDS)) < (2000000),
+        "Time spent " + timeSpentMicro);
   }
 
   @Test
@@ -163,6 +165,7 @@ public class TestStressTestingSource {
 
     long timeSpentMicro = (endTimeNano - startTimeNano)/(1000);
     // check that there is less than 1 second difference between expected and 
actual time spent
-    Assert.assertTrue(Math.abs(timeSpentMicro - (RUN_DURATION_SECS * 1000000)) 
< (1000000));
+    Assert.assertTrue(Math.abs(timeSpentMicro - (RUN_DURATION_SECS * 1000000)) 
< (1000000),
+        "Time spent " + timeSpentMicro);
   }
 }

Reply via email to