[GOBBLIN-382] Support storing job.state file in mysql state store for standalone cluster
Closes #2262 from htran1/cluster_job_state_store Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f2f6e468 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f2f6e468 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f2f6e468 Branch: refs/heads/0.12.0 Commit: f2f6e468536bf3f0d79a3c126f620ac0741df65d Parents: fd0c30a Author: Hung Tran <[email protected]> Authored: Tue Jan 23 20:23:20 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Tue Jan 23 20:23:20 2018 -0800 ---------------------------------------------------------------------- .../configuration/ConfigurationKeys.java | 3 ++ .../GobblinClusterConfigurationKeys.java | 3 ++ .../gobblin/cluster/GobblinClusterUtils.java | 34 +++++++++++++++- .../cluster/GobblinHelixJobLauncher.java | 35 +++++++++++----- .../gobblin/cluster/GobblinHelixTask.java | 7 +--- .../cluster/GobblinHelixTaskFactory.java | 5 ++- .../org/apache/gobblin/cluster/SingleTask.java | 18 ++++++-- .../gobblin/cluster/SingleTaskRunner.java | 15 +++---- .../cluster/GobblinHelixJobLauncherTest.java | 6 +++ .../gobblin/runtime/util/StateStores.java | 43 +++++++++++++++++--- .../util/test/TestStressTestingSource.java | 11 +++-- 11 files changed, 138 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f2f6e468/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index ed360d9..267a17e 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -54,6 +54,9 @@ public class ConfigurationKeys { public static final String INTERMEDIATE_STATE_STORE_TYPE_KEY = INTERMEDIATE_STATE_STORE_PREFIX + ".state.store.type"; public static final String DEFAULT_STATE_STORE_TYPE = "fs"; public static final String STATE_STORE_TYPE_NOOP = "noop"; + // are the job.state files stored using the state store? + public static final String JOB_STATE_IN_STATE_STORE = "state.store.jobStateInStateStore"; + public static final boolean DEFAULT_JOB_STATE_IN_STATE_STORE = false; public static final String CONFIG_RUNTIME_PREFIX = "gobblin.config.runtime."; // Root directory where task state files are stored http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f2f6e468/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java index 5e25194..4e78078 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java @@ -63,6 +63,9 @@ public class GobblinClusterConfigurationKeys { public static final String JOB_CONF_PATH_KEY = GOBBLIN_CLUSTER_PREFIX + "job.conf.path"; public static final String INPUT_WORK_UNIT_DIR_NAME = "_workunits"; public static final String OUTPUT_TASK_STATE_DIR_NAME = "_taskstates"; + // This is the directory to store job.state files when a state store is used. + // Note that a .job.state file is not the same thing as a .jst file. + public static final String JOB_STATE_DIR_NAME = "_jobstates"; public static final String TAR_GZ_FILE_SUFFIX = ".tar.gz"; // Other misc configuration properties. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f2f6e468/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java index 3f53443..6b6ead8 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java @@ -19,14 +19,20 @@ package org.apache.gobblin.cluster; import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR; -import com.typesafe.config.Config; import java.net.InetAddress; import java.net.UnknownHostException; + +import com.typesafe.config.Config; + import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.runtime.AbstractJobLauncher; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import lombok.extern.slf4j.Slf4j; + @Alpha +@Slf4j public class GobblinClusterUtils { /** @@ -70,4 +76,30 @@ public class GobblinClusterUtils { public static String getAppWorkDirPath(String applicationName, String applicationId) { return applicationName + Path.SEPARATOR + applicationId; } + + /** + * Generate the path to the job.state file + * @param usingStateStore is a state store being used to store the job.state content + * @param appWorkPath work directory + * @param jobId job id + * @return a {@link Path} referring to the job.state + */ + public static Path getJobStateFilePath(boolean usingStateStore, Path appWorkPath, String jobId) { + final Path jobStateFilePath; + + // the state store uses a path of the form workdir/_jobstate/job_id/job_id.job.state while old method stores the file + // in the app work dir. + if (usingStateStore) { + jobStateFilePath = new Path(appWorkPath, GobblinClusterConfigurationKeys.JOB_STATE_DIR_NAME + + Path.SEPARATOR + jobId + Path.SEPARATOR + jobId + "." + + AbstractJobLauncher.JOB_STATE_FILE_NAME); + + } else { + jobStateFilePath = new Path(appWorkPath, jobId + "." + AbstractJobLauncher.JOB_STATE_FILE_NAME); + } + + log.info("job state file path: " + jobStateFilePath); + + return jobStateFilePath; + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f2f6e468/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java index fc78053..1a39dfb 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java @@ -162,13 +162,14 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { this.stateStores = new StateStores(stateStoreJobConfig, appWorkDir, GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME, appWorkDir, - GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME); + GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME, appWorkDir, + GobblinClusterConfigurationKeys.JOB_STATE_DIR_NAME); URI fsUri = URI.create(jobProps.getProperty(ConfigurationKeys.FS_URI_KEY, ConfigurationKeys.LOCAL_FS_URI)); this.fs = FileSystem.get(fsUri, new Configuration()); this.taskStateCollectorService = new TaskStateCollectorService(jobProps, this.jobContext.getJobState(), - this.eventBus, this.stateStores.taskStateStore, outputTaskStateDir); + this.eventBus, this.stateStores.getTaskStateStore(), outputTaskStateDir); if (Task.getExecutionModel(ConfigUtils.configToState(jobConfig)).equals(ExecutionModel.STREAMING)) { // Fix-up Ideal State with a custom rebalancer that will re-balance long-running jobs @@ -268,8 +269,17 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { addWorkUnit(workUnit, stateSerDeRunner, taskConfigMap); } - Path jobStateFilePath = new Path(this.appWorkDir, this.jobContext.getJobId() + "." + JOB_STATE_FILE_NAME); - SerializationUtils.serializeState(this.fs, jobStateFilePath, this.jobContext.getJobState()); + Path jobStateFilePath; + + // write the job.state using the state store if present, otherwise serialize directly to the file + if (this.stateStores.haveJobStateStore()) { + jobStateFilePath = GobblinClusterUtils.getJobStateFilePath(true, this.appWorkDir, this.jobContext.getJobId()); + this.stateStores.getJobStateStore().put(jobStateFilePath.getParent().getName(), jobStateFilePath.getName(), + this.jobContext.getJobState()); + } else { + jobStateFilePath = GobblinClusterUtils.getJobStateFilePath(false, this.appWorkDir, this.jobContext.getJobId()); + SerializationUtils.serializeState(this.fs, jobStateFilePath, this.jobContext.getJobState()); + } LOGGER.debug("GobblinHelixJobLauncher.createJob: jobStateFilePath {}, jobState {} jobProperties {}", jobStateFilePath, this.jobContext.getJobState().toString(), this.jobContext.getJobState().getProperties()); @@ -355,10 +365,10 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { if (workUnit instanceof MultiWorkUnit) { workUnitFileName += MULTI_WORK_UNIT_FILE_EXTENSION; - stateStore = stateStores.mwuStateStore; + stateStore = stateStores.getMwuStateStore(); } else { workUnitFileName += WORK_UNIT_FILE_EXTENSION; - stateStore = stateStores.wuStateStore; + stateStore = stateStores.getWuStateStore(); } Path workUnitFile = new Path(workUnitFileDir, workUnitFileName); @@ -396,14 +406,19 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { */ private void cleanupWorkingDirectory() throws IOException { LOGGER.info("Deleting persisted work units for job " + this.jobContext.getJobId()); - stateStores.wuStateStore.delete(this.jobContext.getJobId()); + stateStores.getWuStateStore().delete(this.jobContext.getJobId()); // delete the directory that stores the task state files - stateStores.taskStateStore.delete(outputTaskStateDir.getName()); + stateStores.getTaskStateStore().delete(outputTaskStateDir.getName()); LOGGER.info("Deleting job state file for job " + this.jobContext.getJobId()); - Path jobStateFilePath = new Path(this.appWorkDir, this.jobContext.getJobId() + "." + JOB_STATE_FILE_NAME); - this.fs.delete(jobStateFilePath, false); + + if (this.stateStores.haveJobStateStore()) { + this.stateStores.getJobStateStore().delete(this.jobContext.getJobId()); + } else { + Path jobStateFilePath = GobblinClusterUtils.getJobStateFilePath(false, this.appWorkDir, this.jobContext.getJobId()); + this.fs.delete(jobStateFilePath, false); + } } /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f2f6e468/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java index 6a6e60d..c6b9514 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java @@ -78,17 +78,14 @@ public class GobblinHelixTask implements Task { this.taskConfig = taskCallbackContext.getTaskConfig(); getInfoFromTaskConfig(); - Path jobStateFilePath = constructJobStateFilePath(appWorkDir); + Path jobStateFilePath = + GobblinClusterUtils.getJobStateFilePath(stateStores.haveJobStateStore(), appWorkDir, this.jobId); this.task = new SingleTask(this.jobId, workUnitFilePath, jobStateFilePath, fs, taskAttemptBuilder, stateStores); } - private Path constructJobStateFilePath(Path appWorkDir) { - return new Path(appWorkDir, this.jobId + "." + AbstractJobLauncher.JOB_STATE_FILE_NAME); - } - private void getInfoFromTaskConfig() { Map<String, String> configMap = this.taskConfig.getConfigMap(); this.jobName = configMap.get(ConfigurationKeys.JOB_NAME_KEY); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f2f6e468/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java index b8e55d8..2fa845c 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java @@ -79,14 +79,15 @@ public class GobblinHelixTaskFactory implements TaskFactory { this.fs = fs; this.appWorkDir = appWorkDir; this.stateStores = new StateStores(config, appWorkDir, GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME, - appWorkDir, GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME); + appWorkDir, GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME, appWorkDir, + GobblinClusterConfigurationKeys.JOB_STATE_DIR_NAME); this.taskAttemptBuilder = createTaskAttemptBuilder(); } private TaskAttemptBuilder createTaskAttemptBuilder() { TaskAttemptBuilder builder = new TaskAttemptBuilder(this.taskStateTracker, this.taskExecutor); builder.setContainerId(this.helixManager.getInstanceName()); - builder.setTaskStateStore(this.stateStores.taskStateStore); + builder.setTaskStateStore(this.stateStores.getTaskStateStore()); return builder; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f2f6e468/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java index 3b69e0c..89f2bfa 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java @@ -97,8 +97,18 @@ public class SingleTask { private JobState getJobState() throws java.io.IOException { - JobState jobState = new JobState(); - SerializationUtils.deserializeState(_fs, _jobStateFilePath, jobState); + JobState jobState; + + // read the state from the state store if present, otherwise deserialize directly from the file + if (_stateStores.haveJobStateStore()) { + jobState = _stateStores.getJobStateStore().get(_jobStateFilePath.getParent().getName(), + _jobStateFilePath.getName(), + _jobStateFilePath.getParent().getName()); + } else { + jobState = new JobState(); + SerializationUtils.deserializeState(_fs, _jobStateFilePath, jobState); + } + return jobState; } @@ -109,9 +119,9 @@ public class SingleTask { WorkUnit workUnit; if (_workUnitFilePath.getName().endsWith(AbstractJobLauncher.MULTI_WORK_UNIT_FILE_EXTENSION)) { - workUnit = _stateStores.mwuStateStore.getAll(storeName, fileName).get(0); + workUnit = _stateStores.getMwuStateStore().getAll(storeName, fileName).get(0); } else { - workUnit = _stateStores.wuStateStore.getAll(storeName, fileName).get(0); + workUnit = _stateStores.getWuStateStore().getAll(storeName, fileName).get(0); } // The list of individual WorkUnits (flattened) to run http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f2f6e468/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java index 9cc4733..6226cf1 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java @@ -106,11 +106,13 @@ class SingleTaskRunner { private void getSingleHelixTask() throws IOException { - final Path jobStateFilePath = getJobStateFilePath(); final FileSystem fs = getFileSystem(); final StateStores stateStores = new StateStores(this.clusterConfig, this.appWorkPath, GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME, this.appWorkPath, - GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME); + GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME, this.appWorkPath, + GobblinClusterConfigurationKeys.JOB_STATE_DIR_NAME); + final Path jobStateFilePath = + GobblinClusterUtils.getJobStateFilePath(stateStores.haveJobStateStore(), this.appWorkPath, this.jobId); final TaskAttemptBuilder taskAttemptBuilder = getTaskAttemptBuilder(stateStores); @@ -122,7 +124,7 @@ class SingleTaskRunner { final TaskAttemptBuilder taskAttemptBuilder = new TaskAttemptBuilder(this.taskStateTracker, this.taskExecutor); // No container id is set. Use the default. - taskAttemptBuilder.setTaskStateStore(stateStores.taskStateStore); + taskAttemptBuilder.setTaskStateStore(stateStores.getTaskStateStore()); return taskAttemptBuilder; } @@ -135,13 +137,6 @@ class SingleTaskRunner { this.serviceManager = new ServiceManager(services); } - private Path getJobStateFilePath() { - final String jobStateFileName = this.jobId + "." + AbstractJobLauncher.JOB_STATE_FILE_NAME; - final Path jobStateFilePath = new Path(this.appWorkPath, jobStateFileName); - logger.info("job state file path: " + jobStateFilePath); - return jobStateFilePath; - } - private FileSystem getFileSystem() throws IOException { final Configuration conf = HadoopUtils.newConfiguration(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f2f6e468/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java index 77a33af..aaf5f05 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java @@ -121,6 +121,7 @@ public class GobblinHelixJobLauncherTest { ConfigValueFactory.fromAnyRef(testingZKServer.getConnectString())) .withValue(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL, ConfigValueFactory.fromAnyRef(sourceJsonFile.getAbsolutePath())) + .withValue(ConfigurationKeys.JOB_STATE_IN_STATE_STORE, ConfigValueFactory.fromAnyRef("true")) .resolve(); String zkConnectingString = baseConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY); @@ -339,6 +340,11 @@ public class GobblinHelixJobLauncherTest { Assert.assertFalse(workunitsDir.exists()); Assert.assertFalse(taskstatesDir.exists()); + + // check that job.state file is cleaned up + final File jobStateFile = new File(GobblinClusterUtils.getJobStateFilePath(true, this.appWorkDir, jobIdKey).toString()); + + Assert.assertFalse(jobStateFile.exists()); } @AfterClass http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f2f6e468/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/StateStores.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/StateStores.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/StateStores.java index 8d1c51f..cc892f8 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/StateStores.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/StateStores.java @@ -16,40 +16,52 @@ */ package org.apache.gobblin.runtime.util; +import java.util.Map; + +import org.apache.hadoop.fs.Path; + import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigValue; import com.typesafe.config.ConfigValueFactory; + import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.metastore.StateStore; +import org.apache.gobblin.runtime.JobState; import org.apache.gobblin.runtime.TaskState; import org.apache.gobblin.source.workunit.MultiWorkUnit; import org.apache.gobblin.source.workunit.WorkUnit; import org.apache.gobblin.util.ClassAliasResolver; import org.apache.gobblin.util.ConfigUtils; -import org.apache.hadoop.fs.Path; -import java.util.Map; +import lombok.Getter; /** * state stores used for storing work units and task states */ public class StateStores { - public final StateStore<TaskState> taskStateStore; - public final StateStore<WorkUnit> wuStateStore; - public final StateStore<MultiWorkUnit> mwuStateStore; + @Getter + private final StateStore<TaskState> taskStateStore; + @Getter + private final StateStore<WorkUnit> wuStateStore; + @Getter + private final StateStore<MultiWorkUnit> mwuStateStore; + // state store for job.state files. This should not be confused with the jst state store + @Getter + private final StateStore<JobState> jobStateStore; /** * Creates the state stores under storeBase * {@link WorkUnit}s will be stored under storeBase/_workunits/subdir/filename.(m)wu * {@link TaskState}s will be stored under storeBase/_taskstates/subdir/filename.tst + * {@link JobState}s will be stored under StoreBase/_jobStates/subdir/filename.job.state * Some state stores such as the MysqlStateStore do not preserve the path prefix of storeRoot. * In those cases only the last three components of the path determine the key for the data. * @param config config properties * @param taskStoreBase the base directory that holds the store root for the task state store */ public StateStores(Config config, Path taskStoreBase, String taskStoreTable, Path workUnitStoreBase, - String workUnitStoreTable) { + String workUnitStoreTable, Path jobStateStoreBase, String jobStateStoreTable) { String stateStoreType = ConfigUtils.getString(config, ConfigurationKeys.INTERMEDIATE_STATE_STORE_TYPE_KEY, ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_TYPE_KEY, ConfigurationKeys.DEFAULT_STATE_STORE_TYPE)); @@ -79,6 +91,25 @@ public class StateStores { taskStateStore = stateStoreFactory.createStateStore(taskStateStoreConfig, TaskState.class); wuStateStore = stateStoreFactory.createStateStore(wuStateStoreConfig, WorkUnit.class); mwuStateStore = stateStoreFactory.createStateStore(wuStateStoreConfig, MultiWorkUnit.class); + + // create a state store to store job.state content if configured + if (ConfigUtils.getBoolean(config, ConfigurationKeys.JOB_STATE_IN_STATE_STORE, + ConfigurationKeys.DEFAULT_JOB_STATE_IN_STATE_STORE)) { + // Override properties to place the JobState StateStore at the appropriate location + Path jobStateOutputDir = new Path(jobStateStoreBase, jobStateStoreTable); + Config jobStateStoreConfig = getStateStoreConfig(config, jobStateOutputDir.toString(), jobStateStoreTable); + + jobStateStore = stateStoreFactory.createStateStore(jobStateStoreConfig, JobState.class); + } else { + jobStateStore = null; + } + } + + /** + * @return true if a state store is present for storing job.state content + */ + public boolean haveJobStateStore() { + return this.jobStateStore != null; } private static Config getStateStoreConfig(Config config, String rootDir, String dbTableKey) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f2f6e468/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestStressTestingSource.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestStressTestingSource.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestStressTestingSource.java index aad8a3a..040d69b 100644 --- a/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestStressTestingSource.java +++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestStressTestingSource.java @@ -92,8 +92,9 @@ public class TestStressTestingSource { long endTimeNano = System.nanoTime(); long timeSpentMicro = (endTimeNano - startTimeNano)/(1000); - // check that there is less than 2 second difference between expected and actual time spent - Assert.assertTrue(Math.abs(timeSpentMicro - (COMPUTE_TIME_MICRO * NUM_RECORDS)) < (2000000)); + // check that there is less than 5 second difference between expected and actual time spent + Assert.assertTrue(Math.abs(timeSpentMicro - (COMPUTE_TIME_MICRO * NUM_RECORDS)) < (5000000), + "Time spent " + timeSpentMicro); } @Test @@ -127,7 +128,8 @@ public class TestStressTestingSource { long timeSpentMicro = (endTimeNano - startTimeNano)/(1000); // check that there is less than 2 second difference between expected and actual time spent - Assert.assertTrue(Math.abs(timeSpentMicro - (SLEEP_TIME_MICRO * NUM_RECORDS)) < (2000000)); + Assert.assertTrue(Math.abs(timeSpentMicro - (SLEEP_TIME_MICRO * NUM_RECORDS)) < (2000000), + "Time spent " + timeSpentMicro); } @Test @@ -163,6 +165,7 @@ public class TestStressTestingSource { long timeSpentMicro = (endTimeNano - startTimeNano)/(1000); // check that there is less than 1 second difference between expected and actual time spent - Assert.assertTrue(Math.abs(timeSpentMicro - (RUN_DURATION_SECS * 1000000)) < (1000000)); + Assert.assertTrue(Math.abs(timeSpentMicro - (RUN_DURATION_SECS * 1000000)) < (1000000), + "Time spent " + timeSpentMicro); } }
