This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4de10b8 [GOBBLIN-1350] Adds a CLI for handling state store reads and
deletes
4de10b8 is described below
commit 4de10b8ee7c625cfc079f5ddaefb170f8b762a52
Author: William Lo <[email protected]>
AuthorDate: Mon Jan 11 10:21:56 2021 -0800
[GOBBLIN-1350] Adds a CLI for handling state store reads and deletes
Closes #3189 from Will-Lo/extend-statestore-cli-
to-delete
---
bin/statestore-checker.sh | 23 ---
.../gobblin/runtime/cli/JobStateStoreCLI.java | 211 +++++++++++++++++++++
.../runtime/util/JobStateToJsonConverter.java | 87 ++-------
.../gobblin/runtime/cli/JobStateStoreCliTest.java | 175 +++++++++++++++++
4 files changed, 403 insertions(+), 93 deletions(-)
diff --git a/bin/statestore-checker.sh b/bin/statestore-checker.sh
deleted file mode 100755
index 60c0279..0000000
--- a/bin/statestore-checker.sh
+++ /dev/null
@@ -1,23 +0,0 @@
-#!/bin/bash
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# @deprecated: This script is kept for backward compatibility only and will be
removed in future. Use gobblin.sh
-
-CURRENT_DIR="$(cd `dirname $0`/..; pwd)"
-$CURRENT_DIR/bin/gobblin cli job-state-to-json $@
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/cli/JobStateStoreCLI.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/cli/JobStateStoreCLI.java
new file mode 100644
index 0000000..308b081
--- /dev/null
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/cli/JobStateStoreCLI.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.cli;
+
+import com.typesafe.config.Config;
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang.StringUtils;
+import org.apache.gobblin.annotation.Alias;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.FsStateStoreFactory;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.util.JobStateToJsonConverter;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.JobConfigurationUtils;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Slf4j
+@Alias(value = "job-state-store", description = "View or delete JobState in
state store")
+public class JobStateStoreCLI implements CliApplication {
+
+ Option sysConfigOption = Option.builder("sc").argName("system configuration
file")
+ .desc("Gobblin system configuration file (required if no state store URL
specified)").longOpt("sysconfig").hasArg().build();
+ Option storeUrlOption = Option.builder("u").argName("gobblin state store
URL")
+ .desc("Gobblin state store root path URL (required if no sysconfig
specified)").longOpt("storeurl").hasArg().build();
+ Option jobNameOption = Option.builder("n").argName("gobblin job
name").desc("Gobblin job name").longOpt("name")
+ .hasArg().build();
+ Option jobIdOption =
+ Option.builder("i").argName("gobblin job id").desc("Gobblin job
id").longOpt("id").hasArg().build();
+ Option helpOption =
+
Option.builder("h").argName("help").desc("Usage").longOpt("help").hasArg().build();
+ Option deleteOption =
+ Option.builder("d").argName("delete state").desc("Deletes a state from
the state store with a job id")
+ .longOpt("delete").build();
+ Option bulkDeleteOption =
+ Option.builder("bd").argName("bulk delete")
+ .desc("Deletes states from the state store based on a file with job
ids to delete, separated by newline")
+ .longOpt("bulkDelete").hasArg().build();
+
+ // For reading state store in json format
+ Option getAsJsonOption =
+ Option.builder("r").argName("read job state").desc("Converts a job state
to json").longOpt("read-job-state").build();
+ Option convertAllOption =
+ Option.builder("a").desc("Whether to convert all past job states of the
given job when viewing as json").longOpt("all").build();
+ Option keepConfigOption =
+ Option.builder("kc").desc("Whether to keep all configuration properties
when viewing as json").longOpt("keepConfig").build();
+ Option outputToFile =
+ Option.builder("t").argName("output file name").desc("Output file name
when viewing as json").longOpt("toFile").hasArg().build();
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(JobStateStoreCLI.class);
+ private StateStore<? extends JobState> jobStateStore;
+
+ CommandLine initializeOptions(String[] args) {
+ Options options = new Options();
+ options.addOption(sysConfigOption);
+ options.addOption(storeUrlOption);
+ options.addOption(jobNameOption);
+ options.addOption(jobIdOption);
+ options.addOption(deleteOption);
+ options.addOption(getAsJsonOption);
+ options.addOption(convertAllOption);
+ options.addOption(keepConfigOption);
+ options.addOption(outputToFile);
+ options.addOption(bulkDeleteOption);
+
+ CommandLine cmd = null;
+
+ try {
+ CommandLineParser parser = new DefaultParser();
+ cmd = parser.parse(options, Arrays.copyOfRange(args, 1, args.length));
+ } catch (ParseException pe) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("JobStateStoreCLI", options);
+ throw new RuntimeException(pe);
+ }
+
+ if (!cmd.hasOption(sysConfigOption.getLongOpt()) &&
!cmd.hasOption(storeUrlOption.getLongOpt()) ){
+ System.out.println("State store configuration or state store url options
missing");
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("JobStateStoreCLI", options);
+ return null;
+ }
+
+ if (cmd.hasOption(getAsJsonOption.getOpt()) &&
!cmd.hasOption(jobNameOption.getOpt())) {
+ System.out.println("Job name option missing for reading job states as
json");
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("JobStateStoreCLI", options);
+ return null;
+ }
+
+ if (cmd.hasOption(deleteOption.getOpt()) &&
!cmd.hasOption(jobNameOption.getOpt())) {
+ System.out.println("Job name option missing for delete job id");
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("JobStateStoreCLI", options);
+ return null;
+ }
+
+ if (cmd.hasOption(helpOption.getOpt())) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("JobStateStoreCLI", options);
+ return null;
+ }
+
+ return cmd;
+ }
+
+
+ @Override
+ public void run(String[] args) throws Exception {
+ CommandLine cmd = initializeOptions(args);
+ if (cmd == null) {
+ return; // incorrect args were called
+ }
+
+ Properties props = new Properties();
+
+ if (cmd.hasOption(sysConfigOption.getOpt())) {
+ props =
JobConfigurationUtils.fileToProperties(cmd.getOptionValue(sysConfigOption.getOpt()));
+ }
+
+ String storeUrl = cmd.getOptionValue(storeUrlOption.getLongOpt());
+ if (StringUtils.isNotBlank(storeUrl)) {
+ props.setProperty(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, storeUrl);
+ }
+
+ Config stateStoreConfig = ConfigUtils.propertiesToConfig(props);
+ ClassAliasResolver<StateStore.Factory> resolver =
+ new ClassAliasResolver<>(StateStore.Factory.class);
+ StateStore.Factory stateStoreFactory;
+
+ try {
+ stateStoreFactory =
resolver.resolveClass(ConfigUtils.getString(stateStoreConfig,
ConfigurationKeys.STATE_STORE_TYPE_KEY,
+ FsStateStoreFactory.class.getName())).newInstance();
+ } catch (ClassNotFoundException | InstantiationException |
IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ this.jobStateStore = stateStoreFactory.createStateStore(stateStoreConfig,
JobState.class);
+
+ if (cmd.hasOption(getAsJsonOption.getOpt())) {
+ this.viewStateAsJson(cmd);
+ } else if (cmd.hasOption(bulkDeleteOption.getOpt())) {
+ this.deleteJobBulk(cmd.getOptionValue(bulkDeleteOption.getOpt()));
+ } else if (cmd.hasOption(deleteOption.getOpt())) {
+ this.deleteJob(cmd.getOptionValue(jobNameOption.getOpt()));
+ }
+ }
+
+ private void deleteJobBulk(String path) throws IOException {
+ Path filePath = new Path(path);
+ try (BufferedReader br = new BufferedReader(new InputStreamReader(
+ new FileInputStream(filePath.toString()), Charset.forName("UTF-8")))) {
+ String jobName;
+ while ((jobName = br.readLine()) != null) {
+ System.out.println("Deleting " + jobName);
+ try {
+ this.jobStateStore.delete(jobName);
+ } catch (IOException e) {
+ System.out.println("Could not delete job name: " + jobName + " due
to " + e.getMessage());
+ }
+ }
+ }
+ }
+
+ private void deleteJob(String jobName) {
+ System.out.println("Deleting " + jobName);
+ try {
+ this.jobStateStore.delete(jobName);
+ } catch (IOException e) {
+ System.out.println("Could not delete job name: " + jobName + " due to "
+ e.getMessage());
+ }
+ }
+
+ private void viewStateAsJson(CommandLine cmd) throws IOException {
+ JobStateToJsonConverter converter = new
JobStateToJsonConverter(this.jobStateStore, cmd.hasOption("kc"));
+ converter.outputToJson(cmd);
+ }
+}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobStateToJsonConverter.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobStateToJsonConverter.java
index 8288923..e0dc490 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobStateToJsonConverter.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobStateToJsonConverter.java
@@ -24,19 +24,14 @@ import java.io.OutputStreamWriter;
import java.io.StringWriter;
import java.io.Writer;
import java.util.List;
-import java.util.Properties;
+import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.DefaultParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
import org.apache.commons.lang.StringUtils;
-import org.apache.gobblin.annotation.Alias;
-import org.apache.gobblin.runtime.cli.CliApplication;
+import org.apache.gobblin.metastore.DatasetStateStore;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.JobConfigurationUtils;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,11 +40,8 @@ import com.google.common.io.Closer;
import com.google.gson.stream.JsonWriter;
import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.metastore.DatasetStateStore;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.runtime.JobState;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.JobConfigurationUtils;
/**
@@ -58,8 +50,7 @@ import org.apache.gobblin.util.JobConfigurationUtils;
* @author Yinan Li
*/
@Slf4j
-@Alias(value = "job-state-to-json", description = "To convert Job state to
JSON")
-public class JobStateToJsonConverter implements CliApplication {
+public class JobStateToJsonConverter {
private static final Logger LOGGER =
LoggerFactory.getLogger(JobStateToJsonConverter.class);
@@ -68,6 +59,12 @@ public class JobStateToJsonConverter implements
CliApplication {
private final StateStore<? extends JobState> jobStateStore;
private final boolean keepConfig;
+ public JobStateToJsonConverter(StateStore stateStore, boolean keepConfig) {
+ this.keepConfig = keepConfig;
+ this.jobStateStore = stateStore;
+ }
+
+ // Constructor for backwards compatibility
public JobStateToJsonConverter(Properties props, String storeUrl, boolean
keepConfig) throws IOException {
Configuration conf = new Configuration();
JobConfigurationUtils.putPropertiesIntoConfiguration(props, conf);
@@ -76,8 +73,8 @@ public class JobStateToJsonConverter implements
CliApplication {
props.setProperty(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, storeUrl);
}
- this.keepConfig = keepConfig;
this.jobStateStore = (StateStore)
DatasetStateStore.buildDatasetStateStore(ConfigUtils.propertiesToConfig(props));
+ this.keepConfig = keepConfig;
}
/**
@@ -158,64 +155,15 @@ public class JobStateToJsonConverter implements
CliApplication {
jsonWriter.endArray();
}
- @SuppressWarnings("all")
- @Override
- public void run(String[] args) throws Exception {
- Option sysConfigOption = Option.builder("sc").argName("system
configuration file")
- .desc("Gobblin system configuration file (required if no state store
URL specified)").longOpt("sysconfig").hasArg().build();
- Option storeUrlOption = Option.builder("u").argName("gobblin state store
URL")
- .desc("Gobblin state store root path URL (required if no sysconfig
specified)").longOpt("storeurl").hasArg().build();
- Option jobNameOption = Option.builder("n").argName("gobblin job
name").desc("Gobblin job name (required)").longOpt("name")
- .hasArg().required().build();
- Option jobIdOption =
- Option.builder("i").argName("gobblin job id").desc("Gobblin job
id").longOpt("id").hasArg().build();
- Option convertAllOption =
- Option.builder("a").desc("Whether to convert all past job states of
the given job").longOpt("all").build();
- Option keepConfigOption =
- Option.builder("kc").desc("Whether to keep all configuration
properties").longOpt("keepConfig").build();
- Option outputToFile =
- Option.builder("t").argName("output file name").desc("Output file
name").longOpt("toFile").hasArg().build();
-
- Options options = new Options();
- options.addOption(sysConfigOption);
- options.addOption(storeUrlOption);
- options.addOption(jobNameOption);
- options.addOption(jobIdOption);
- options.addOption(convertAllOption);
- options.addOption(keepConfigOption);
- options.addOption(outputToFile);
-
- CommandLine cmd = null;
- try {
- CommandLineParser parser = new DefaultParser();
- cmd = parser.parse(options, args);
- } catch (ParseException pe) {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp("JobStateToJsonConverter", options);
- return;
- }
-
- if (!cmd.hasOption(sysConfigOption.getLongOpt()) &&
!cmd.hasOption(storeUrlOption.getLongOpt()) ){
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp("JobStateToJsonConverter", options);
- return;
- }
-
- Properties sysConfig = new Properties();
- if (cmd.hasOption(sysConfigOption.getLongOpt())) {
- sysConfig =
JobConfigurationUtils.fileToProperties(cmd.getOptionValue(sysConfigOption.getLongOpt()));
- }
-
- JobStateToJsonConverter converter =
- new JobStateToJsonConverter(sysConfig, cmd.getOptionValue('u'),
cmd.hasOption("kc"));
+ public void outputToJson(CommandLine cmd) throws IOException {
StringWriter stringWriter = new StringWriter();
if (cmd.hasOption('i')) {
- converter.convert(cmd.getOptionValue('n'), cmd.getOptionValue('i'),
stringWriter);
+ this.convert(cmd.getOptionValue('n'), cmd.getOptionValue('i'),
stringWriter);
} else {
if (cmd.hasOption('a')) {
- converter.convertAll(cmd.getOptionValue('n'), stringWriter);
+ this.convertAll(cmd.getOptionValue('n'), stringWriter);
} else {
- converter.convert(cmd.getOptionValue('n'), stringWriter);
+ this.convert(cmd.getOptionValue('n'), stringWriter);
}
}
@@ -223,8 +171,7 @@ public class JobStateToJsonConverter implements
CliApplication {
Closer closer = Closer.create();
try {
FileOutputStream fileOutputStream = closer.register(new
FileOutputStream(cmd.getOptionValue('t')));
- OutputStreamWriter outputStreamWriter =
- closer.register(new OutputStreamWriter(fileOutputStream,
ConfigurationKeys.DEFAULT_CHARSET_ENCODING));
+ OutputStreamWriter outputStreamWriter = closer.register(new
OutputStreamWriter(fileOutputStream,
ConfigurationKeys.DEFAULT_CHARSET_ENCODING));
BufferedWriter bufferedWriter = closer.register(new
BufferedWriter(outputStreamWriter));
bufferedWriter.write(stringWriter.toString());
} catch (Throwable t) {
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/cli/JobStateStoreCliTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/cli/JobStateStoreCliTest.java
new file mode 100644
index 0000000..075d056
--- /dev/null
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/cli/JobStateStoreCliTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.cli;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
+import org.apache.commons.dbcp.BasicDataSource;
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.metastore.DatasetStateStore;
+import org.apache.gobblin.metastore.MysqlStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.MysqlDatasetStateStore;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+@Test(groups = { "gobblin.runtime" })
+public class JobStateStoreCliTest {
+
+ private static final String TEST_STATE_STORE = "TestStateStore";
+ private static final String TEST_JOB_NAME = "TestJob";
+ private static final String TEST_JOB_NAME2 = "TestJob2";
+ private static final String TEST_JOB_NAME3 = "TestJob3";
+ private static final String TEST_JOB_ID = "TestJob1";
+ private static final String TEST_TASK_ID_PREFIX = "TestTask-";
+
+ private StateStore<JobState> dbJobStateStore;
+ private DatasetStateStore<JobState.DatasetState> dbDatasetStateStore;
+ private long startTime = System.currentTimeMillis();
+ private String configPath;
+
+ private ITestMetastoreDatabase testMetastoreDatabase;
+ private static final String TEST_USER = "testUser";
+ private static final String TEST_PASSWORD = "testPassword";
+ File deleteFile;
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+ String jdbcUrl = testMetastoreDatabase.getJdbcUrl();
+ ConfigBuilder configBuilder = ConfigBuilder.create();
+ BasicDataSource mySqlDs = new BasicDataSource();
+
+
mySqlDs.setDriverClassName(ConfigurationKeys.DEFAULT_STATE_STORE_DB_JDBC_DRIVER);
+ mySqlDs.setDefaultAutoCommit(false);
+ mySqlDs.setUrl(jdbcUrl);
+ mySqlDs.setUsername(TEST_USER);
+ mySqlDs.setPassword(TEST_PASSWORD);
+
+ dbJobStateStore = new MysqlStateStore<>(mySqlDs, TEST_STATE_STORE, false,
JobState.class);
+
+ configBuilder.addPrimitive(ConfigurationKeys.STATE_STORE_DB_URL_KEY,
jdbcUrl);
+ configBuilder.addPrimitive(ConfigurationKeys.STATE_STORE_DB_USER_KEY,
TEST_USER);
+ configBuilder.addPrimitive(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY,
TEST_PASSWORD);
+ configBuilder.addPrimitive(ConfigurationKeys.STATE_STORE_TYPE_KEY,
"mysql");
+ configBuilder.addPrimitive(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY,
TEST_STATE_STORE);
+
+ // store the config into a temp file to be read by cli
+ configPath = File.createTempFile("config.properties", null).getPath();
+ try (OutputStream output = new FileOutputStream(configPath)) {
+ ConfigUtils.configToProperties(configBuilder.build()).store(output, "");
+ }
+
+ ClassAliasResolver<DatasetStateStore.Factory> resolver =
+ new ClassAliasResolver<>(DatasetStateStore.Factory.class);
+ DatasetStateStore.Factory stateStoreFactory =
+ resolver.resolveClass("mysql").newInstance();
+ dbDatasetStateStore =
stateStoreFactory.createStateStore(configBuilder.build());
+
+ // clear data that may have been left behind by a prior test run
+ dbJobStateStore.delete(TEST_JOB_NAME);
+ dbJobStateStore.delete(TEST_JOB_NAME2);
+
+ JobState jobState = new JobState(TEST_JOB_NAME, TEST_JOB_ID);
+ jobState.setId(TEST_JOB_ID);
+ jobState.setProp("foo", "bar");
+ jobState.setState(JobState.RunningState.COMMITTED);
+ jobState.setStartTime(this.startTime);
+ jobState.setEndTime(this.startTime + 1000);
+ jobState.setDuration(1000);
+
+ for (int i = 0; i < 3; i++) {
+ TaskState taskState = new TaskState();
+ taskState.setJobId(TEST_JOB_ID);
+ taskState.setTaskId(TEST_TASK_ID_PREFIX + i);
+ taskState.setId(TEST_TASK_ID_PREFIX + i);
+ taskState.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
+ jobState.addTaskState(taskState);
+ }
+
+ dbJobStateStore.put(TEST_JOB_NAME,
+ MysqlDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX +
MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX,
+ jobState);
+
+ jobState.setJobName(TEST_JOB_NAME2);
+ dbJobStateStore.put(TEST_JOB_NAME2,
+ MysqlDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX +
MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX,
+ jobState);
+
+ jobState.setJobName(TEST_JOB_NAME3);
+ dbJobStateStore.put(TEST_JOB_NAME3,
+ MysqlDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX +
MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX,
+ jobState);
+ }
+
+ @Test
+ public void testClBulkDelete() throws Exception {
+ String deleteFileText = TEST_JOB_NAME +"\n" + TEST_JOB_NAME2;
+ deleteFile = File.createTempFile("deleteFile.txt", null);
+ FileOutputStream outputStream = new FileOutputStream(deleteFile.getPath());
+ byte[] strToBytes = deleteFileText.getBytes();
+ outputStream.write(strToBytes);
+
+ outputStream.close();
+
+ JobStateStoreCLI cli = new JobStateStoreCLI();
+ String[] args = {"job-state-store", "-sc", configPath, "-bd",
deleteFile.getPath()};
+ cli.run(args);
+
+ JobState jobState = dbJobStateStore.get(TEST_JOB_NAME,
+ dbDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX +
dbDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX,
+ TEST_JOB_ID);
+
+ JobState jobState2 = dbJobStateStore.get(TEST_JOB_NAME2,
+ dbDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX +
dbDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX,
+ TEST_JOB_ID);
+
+ JobState jobState3 = dbJobStateStore.get(TEST_JOB_NAME3,
+ dbDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX +
dbDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX,
+ TEST_JOB_ID);
+
+ Assert.assertNull(jobState);
+ Assert.assertNull(jobState2);
+ Assert.assertNotNull(jobState3);
+ }
+
+ @Test(dependsOnMethods = "testClBulkDelete")
+ public void testCliDeleteSingle() throws Exception {
+ JobStateStoreCLI cli = new JobStateStoreCLI();
+ String[] args = {"job-state-store", "-sc", configPath, "-d", "-n",
TEST_JOB_NAME3};
+ cli.run(args);
+
+ JobState jobState = dbJobStateStore.get(TEST_JOB_NAME3,
+ dbDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX +
dbDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX,
+ TEST_JOB_ID);
+
+ Assert.assertNull(jobState);
+ }
+
+}