This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4de10b8  [GOBBLIN-1350] Adds a CLI for handling state store reads and 
deletes
4de10b8 is described below

commit 4de10b8ee7c625cfc079f5ddaefb170f8b762a52
Author: William Lo <[email protected]>
AuthorDate: Mon Jan 11 10:21:56 2021 -0800

    [GOBBLIN-1350] Adds a CLI for handling state store reads and deletes
    
    Closes #3189 from Will-Lo/extend-statestore-cli-
    to-delete
---
 bin/statestore-checker.sh                          |  23 ---
 .../gobblin/runtime/cli/JobStateStoreCLI.java      | 211 +++++++++++++++++++++
 .../runtime/util/JobStateToJsonConverter.java      |  87 ++-------
 .../gobblin/runtime/cli/JobStateStoreCliTest.java  | 175 +++++++++++++++++
 4 files changed, 403 insertions(+), 93 deletions(-)

diff --git a/bin/statestore-checker.sh b/bin/statestore-checker.sh
deleted file mode 100755
index 60c0279..0000000
--- a/bin/statestore-checker.sh
+++ /dev/null
@@ -1,23 +0,0 @@
-#!/bin/bash
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# @deprecated: This script is kept for backward compatibility only and will be 
removed in future. Use gobblin.sh
-
-CURRENT_DIR="$(cd `dirname $0`/..; pwd)"
-$CURRENT_DIR/bin/gobblin cli job-state-to-json $@
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/cli/JobStateStoreCLI.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/cli/JobStateStoreCLI.java
new file mode 100644
index 0000000..308b081
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/cli/JobStateStoreCLI.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.cli;
+
+import com.typesafe.config.Config;
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang.StringUtils;
+import org.apache.gobblin.annotation.Alias;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.FsStateStoreFactory;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.util.JobStateToJsonConverter;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.JobConfigurationUtils;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Slf4j
+@Alias(value = "job-state-store", description = "View or delete JobState in 
state store")
+public class JobStateStoreCLI implements CliApplication {
+
+  Option sysConfigOption = Option.builder("sc").argName("system configuration 
file")
+      .desc("Gobblin system configuration file (required if no state store URL 
specified)").longOpt("sysconfig").hasArg().build();
+  Option storeUrlOption = Option.builder("u").argName("gobblin state store 
URL")
+      .desc("Gobblin state store root path URL (required if no sysconfig 
specified)").longOpt("storeurl").hasArg().build();
+  Option jobNameOption = Option.builder("n").argName("gobblin job 
name").desc("Gobblin job name").longOpt("name")
+      .hasArg().build();
+  Option jobIdOption =
+      Option.builder("i").argName("gobblin job id").desc("Gobblin job 
id").longOpt("id").hasArg().build();
+  Option helpOption =
+      
Option.builder("h").argName("help").desc("Usage").longOpt("help").hasArg().build();
+  Option deleteOption =
+      Option.builder("d").argName("delete state").desc("Deletes a state from 
the state store with a job id")
+          .longOpt("delete").build();
+  Option bulkDeleteOption =
+      Option.builder("bd").argName("bulk delete")
+          .desc("Deletes states from the state store based on a file with job 
ids to delete, separated by newline")
+          .longOpt("bulkDelete").hasArg().build();
+
+  // For reading state store in json format
+  Option getAsJsonOption =
+      Option.builder("r").argName("read job state").desc("Converts a job state 
to json").longOpt("read-job-state").build();
+  Option convertAllOption =
+      Option.builder("a").desc("Whether to convert all past job states of the 
given job when viewing as json").longOpt("all").build();
+  Option keepConfigOption =
+      Option.builder("kc").desc("Whether to keep all configuration properties 
when viewing as json").longOpt("keepConfig").build();
+  Option outputToFile =
+      Option.builder("t").argName("output file name").desc("Output file name 
when viewing as json").longOpt("toFile").hasArg().build();
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(JobStateStoreCLI.class);
+  private StateStore<? extends JobState> jobStateStore;
+
+  CommandLine initializeOptions(String[] args) {
+    Options options = new Options();
+    options.addOption(sysConfigOption);
+    options.addOption(storeUrlOption);
+    options.addOption(jobNameOption);
+    options.addOption(jobIdOption);
+    options.addOption(deleteOption);
+    options.addOption(getAsJsonOption);
+    options.addOption(convertAllOption);
+    options.addOption(keepConfigOption);
+    options.addOption(outputToFile);
+    options.addOption(bulkDeleteOption);
+
+    CommandLine cmd = null;
+
+    try {
+      CommandLineParser parser = new DefaultParser();
+      cmd = parser.parse(options, Arrays.copyOfRange(args, 1, args.length));
+    } catch (ParseException pe) {
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp("JobStateStoreCLI", options);
+      throw new RuntimeException(pe);
+    }
+
+    if (!cmd.hasOption(sysConfigOption.getLongOpt()) && 
!cmd.hasOption(storeUrlOption.getLongOpt()) ){
+      System.out.println("State store configuration or state store url options 
missing");
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp("JobStateStoreCLI", options);
+      return null;
+    }
+
+    if (cmd.hasOption(getAsJsonOption.getOpt()) && 
!cmd.hasOption(jobNameOption.getOpt())) {
+      System.out.println("Job name option missing for reading job states as 
json");
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp("JobStateStoreCLI", options);
+      return null;
+    }
+
+    if (cmd.hasOption(deleteOption.getOpt()) && 
!cmd.hasOption(jobNameOption.getOpt())) {
+      System.out.println("Job name option missing for delete job id");
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp("JobStateStoreCLI", options);
+      return null;
+    }
+
+    if (cmd.hasOption(helpOption.getOpt())) {
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp("JobStateStoreCLI", options);
+      return null;
+    }
+
+    return cmd;
+  }
+
+
+  @Override
+  public void run(String[] args) throws Exception {
+    CommandLine cmd = initializeOptions(args);
+    if (cmd == null) {
+      return; // incorrect args were called
+    }
+
+    Properties props = new Properties();
+
+    if (cmd.hasOption(sysConfigOption.getOpt())) {
+      props = 
JobConfigurationUtils.fileToProperties(cmd.getOptionValue(sysConfigOption.getOpt()));
+    }
+
+    String storeUrl = cmd.getOptionValue(storeUrlOption.getLongOpt());
+    if (StringUtils.isNotBlank(storeUrl)) {
+      props.setProperty(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, storeUrl);
+    }
+
+    Config stateStoreConfig = ConfigUtils.propertiesToConfig(props);
+    ClassAliasResolver<StateStore.Factory> resolver =
+        new ClassAliasResolver<>(StateStore.Factory.class);
+    StateStore.Factory stateStoreFactory;
+
+    try {
+      stateStoreFactory = 
resolver.resolveClass(ConfigUtils.getString(stateStoreConfig, 
ConfigurationKeys.STATE_STORE_TYPE_KEY,
+          FsStateStoreFactory.class.getName())).newInstance();
+    } catch (ClassNotFoundException | InstantiationException | 
IllegalAccessException e) {
+      throw new RuntimeException(e);
+    }
+    this.jobStateStore = stateStoreFactory.createStateStore(stateStoreConfig, 
JobState.class);
+
+    if (cmd.hasOption(getAsJsonOption.getOpt())) {
+      this.viewStateAsJson(cmd);
+    } else if (cmd.hasOption(bulkDeleteOption.getOpt())) {
+      this.deleteJobBulk(cmd.getOptionValue(bulkDeleteOption.getOpt()));
+    } else if (cmd.hasOption(deleteOption.getOpt())) {
+      this.deleteJob(cmd.getOptionValue(jobNameOption.getOpt()));
+    }
+  }
+
+  private void deleteJobBulk(String path) throws IOException {
+    Path filePath = new Path(path);
+    try (BufferedReader br = new BufferedReader(new InputStreamReader(
+        new FileInputStream(filePath.toString()), Charset.forName("UTF-8")))) {
+      String jobName;
+      while ((jobName = br.readLine()) != null) {
+        System.out.println("Deleting " + jobName);
+        try {
+          this.jobStateStore.delete(jobName);
+        } catch (IOException e) {
+          System.out.println("Could not delete job name: " + jobName + " due 
to " + e.getMessage());
+        }
+      }
+    }
+  }
+
+  private void deleteJob(String jobName) {
+    System.out.println("Deleting " + jobName);
+    try {
+      this.jobStateStore.delete(jobName);
+    } catch (IOException e) {
+      System.out.println("Could not delete job name: " + jobName + " due to " 
+ e.getMessage());
+    }
+  }
+
+  private void viewStateAsJson(CommandLine cmd) throws IOException {
+    JobStateToJsonConverter converter = new 
JobStateToJsonConverter(this.jobStateStore, cmd.hasOption("kc"));
+    converter.outputToJson(cmd);
+  }
+}
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobStateToJsonConverter.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobStateToJsonConverter.java
index 8288923..e0dc490 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobStateToJsonConverter.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobStateToJsonConverter.java
@@ -24,19 +24,14 @@ import java.io.OutputStreamWriter;
 import java.io.StringWriter;
 import java.io.Writer;
 import java.util.List;
-import java.util.Properties;
 
+import java.util.Properties;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.DefaultParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
 import org.apache.commons.lang.StringUtils;
-import org.apache.gobblin.annotation.Alias;
-import org.apache.gobblin.runtime.cli.CliApplication;
+import org.apache.gobblin.metastore.DatasetStateStore;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.JobConfigurationUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,11 +40,8 @@ import com.google.common.io.Closer;
 import com.google.gson.stream.JsonWriter;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.metastore.DatasetStateStore;
 import org.apache.gobblin.metastore.StateStore;
 import org.apache.gobblin.runtime.JobState;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.JobConfigurationUtils;
 
 
 /**
@@ -58,8 +50,7 @@ import org.apache.gobblin.util.JobConfigurationUtils;
  * @author Yinan Li
  */
 @Slf4j
-@Alias(value = "job-state-to-json", description = "To convert Job state to 
JSON")
-public class JobStateToJsonConverter implements CliApplication {
+public class JobStateToJsonConverter {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(JobStateToJsonConverter.class);
 
@@ -68,6 +59,12 @@ public class JobStateToJsonConverter implements 
CliApplication {
   private final StateStore<? extends JobState> jobStateStore;
   private final boolean keepConfig;
 
+  public JobStateToJsonConverter(StateStore stateStore, boolean keepConfig) {
+    this.keepConfig = keepConfig;
+    this.jobStateStore = stateStore;
+  }
+
+  // Constructor for backwards compatibility
   public JobStateToJsonConverter(Properties props, String storeUrl, boolean 
keepConfig) throws IOException {
     Configuration conf = new Configuration();
     JobConfigurationUtils.putPropertiesIntoConfiguration(props, conf);
@@ -76,8 +73,8 @@ public class JobStateToJsonConverter implements 
CliApplication {
       props.setProperty(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, storeUrl);
     }
 
-    this.keepConfig = keepConfig;
     this.jobStateStore = (StateStore) 
DatasetStateStore.buildDatasetStateStore(ConfigUtils.propertiesToConfig(props));
+    this.keepConfig = keepConfig;
   }
 
   /**
@@ -158,64 +155,15 @@ public class JobStateToJsonConverter implements 
CliApplication {
     jsonWriter.endArray();
   }
 
-  @SuppressWarnings("all")
-  @Override
-  public void run(String[] args) throws Exception {
-    Option sysConfigOption = Option.builder("sc").argName("system 
configuration file")
-        .desc("Gobblin system configuration file (required if no state store 
URL specified)").longOpt("sysconfig").hasArg().build();
-    Option storeUrlOption = Option.builder("u").argName("gobblin state store 
URL")
-        .desc("Gobblin state store root path URL (required if no sysconfig 
specified)").longOpt("storeurl").hasArg().build();
-    Option jobNameOption = Option.builder("n").argName("gobblin job 
name").desc("Gobblin job name (required)").longOpt("name")
-        .hasArg().required().build();
-    Option jobIdOption =
-        Option.builder("i").argName("gobblin job id").desc("Gobblin job 
id").longOpt("id").hasArg().build();
-    Option convertAllOption =
-        Option.builder("a").desc("Whether to convert all past job states of 
the given job").longOpt("all").build();
-    Option keepConfigOption =
-        Option.builder("kc").desc("Whether to keep all configuration 
properties").longOpt("keepConfig").build();
-    Option outputToFile =
-        Option.builder("t").argName("output file name").desc("Output file 
name").longOpt("toFile").hasArg().build();
-
-    Options options = new Options();
-    options.addOption(sysConfigOption);
-    options.addOption(storeUrlOption);
-    options.addOption(jobNameOption);
-    options.addOption(jobIdOption);
-    options.addOption(convertAllOption);
-    options.addOption(keepConfigOption);
-    options.addOption(outputToFile);
-
-    CommandLine cmd = null;
-    try {
-      CommandLineParser parser = new DefaultParser();
-      cmd = parser.parse(options, args);
-    } catch (ParseException pe) {
-      HelpFormatter formatter = new HelpFormatter();
-      formatter.printHelp("JobStateToJsonConverter", options);
-      return;
-    }
-
-    if (!cmd.hasOption(sysConfigOption.getLongOpt()) && 
!cmd.hasOption(storeUrlOption.getLongOpt()) ){
-      HelpFormatter formatter = new HelpFormatter();
-      formatter.printHelp("JobStateToJsonConverter", options);
-      return;
-    }
-
-    Properties sysConfig = new Properties();
-    if (cmd.hasOption(sysConfigOption.getLongOpt())) {
-      sysConfig = 
JobConfigurationUtils.fileToProperties(cmd.getOptionValue(sysConfigOption.getLongOpt()));
-    }
-
-    JobStateToJsonConverter converter =
-        new JobStateToJsonConverter(sysConfig, cmd.getOptionValue('u'), 
cmd.hasOption("kc"));
+  public void outputToJson(CommandLine cmd) throws IOException {
     StringWriter stringWriter = new StringWriter();
     if (cmd.hasOption('i')) {
-      converter.convert(cmd.getOptionValue('n'), cmd.getOptionValue('i'), 
stringWriter);
+      this.convert(cmd.getOptionValue('n'), cmd.getOptionValue('i'), 
stringWriter);
     } else {
       if (cmd.hasOption('a')) {
-        converter.convertAll(cmd.getOptionValue('n'), stringWriter);
+        this.convertAll(cmd.getOptionValue('n'), stringWriter);
       } else {
-        converter.convert(cmd.getOptionValue('n'), stringWriter);
+        this.convert(cmd.getOptionValue('n'), stringWriter);
       }
     }
 
@@ -223,8 +171,7 @@ public class JobStateToJsonConverter implements 
CliApplication {
       Closer closer = Closer.create();
       try {
         FileOutputStream fileOutputStream = closer.register(new 
FileOutputStream(cmd.getOptionValue('t')));
-        OutputStreamWriter outputStreamWriter =
-            closer.register(new OutputStreamWriter(fileOutputStream, 
ConfigurationKeys.DEFAULT_CHARSET_ENCODING));
+        OutputStreamWriter outputStreamWriter = closer.register(new 
OutputStreamWriter(fileOutputStream, 
ConfigurationKeys.DEFAULT_CHARSET_ENCODING));
         BufferedWriter bufferedWriter = closer.register(new 
BufferedWriter(outputStreamWriter));
         bufferedWriter.write(stringWriter.toString());
       } catch (Throwable t) {
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/cli/JobStateStoreCliTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/cli/JobStateStoreCliTest.java
new file mode 100644
index 0000000..075d056
--- /dev/null
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/cli/JobStateStoreCliTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.cli;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
+import org.apache.commons.dbcp.BasicDataSource;
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.metastore.DatasetStateStore;
+import org.apache.gobblin.metastore.MysqlStateStore;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.MysqlDatasetStateStore;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+@Test(groups = { "gobblin.runtime" })
+public class JobStateStoreCliTest {
+
+  private static final String TEST_STATE_STORE = "TestStateStore";
+  private static final String TEST_JOB_NAME = "TestJob";
+  private static final String TEST_JOB_NAME2 = "TestJob2";
+  private static final String TEST_JOB_NAME3 = "TestJob3";
+  private static final String TEST_JOB_ID = "TestJob1";
+  private static final String TEST_TASK_ID_PREFIX = "TestTask-";
+
+  private StateStore<JobState> dbJobStateStore;
+  private DatasetStateStore<JobState.DatasetState> dbDatasetStateStore;
+  private long startTime = System.currentTimeMillis();
+  private String configPath;
+
+  private ITestMetastoreDatabase testMetastoreDatabase;
+  private static final String TEST_USER = "testUser";
+  private static final String TEST_PASSWORD = "testPassword";
+  File deleteFile;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+    String jdbcUrl = testMetastoreDatabase.getJdbcUrl();
+    ConfigBuilder configBuilder = ConfigBuilder.create();
+    BasicDataSource mySqlDs = new BasicDataSource();
+
+    
mySqlDs.setDriverClassName(ConfigurationKeys.DEFAULT_STATE_STORE_DB_JDBC_DRIVER);
+    mySqlDs.setDefaultAutoCommit(false);
+    mySqlDs.setUrl(jdbcUrl);
+    mySqlDs.setUsername(TEST_USER);
+    mySqlDs.setPassword(TEST_PASSWORD);
+
+    dbJobStateStore = new MysqlStateStore<>(mySqlDs, TEST_STATE_STORE, false, 
JobState.class);
+
+    configBuilder.addPrimitive(ConfigurationKeys.STATE_STORE_DB_URL_KEY, 
jdbcUrl);
+    configBuilder.addPrimitive(ConfigurationKeys.STATE_STORE_DB_USER_KEY, 
TEST_USER);
+    configBuilder.addPrimitive(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, 
TEST_PASSWORD);
+    configBuilder.addPrimitive(ConfigurationKeys.STATE_STORE_TYPE_KEY, 
"mysql");
+    configBuilder.addPrimitive(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, 
TEST_STATE_STORE);
+
+    // store the config into a temp file to be read by cli
+    configPath = File.createTempFile("config.properties", null).getPath();
+    try (OutputStream output = new FileOutputStream(configPath)) {
+      ConfigUtils.configToProperties(configBuilder.build()).store(output, "");
+    }
+
+    ClassAliasResolver<DatasetStateStore.Factory> resolver =
+        new ClassAliasResolver<>(DatasetStateStore.Factory.class);
+    DatasetStateStore.Factory stateStoreFactory =
+        resolver.resolveClass("mysql").newInstance();
+    dbDatasetStateStore = 
stateStoreFactory.createStateStore(configBuilder.build());
+
+    // clear data that may have been left behind by a prior test run
+    dbJobStateStore.delete(TEST_JOB_NAME);
+    dbJobStateStore.delete(TEST_JOB_NAME2);
+
+    JobState jobState = new JobState(TEST_JOB_NAME, TEST_JOB_ID);
+    jobState.setId(TEST_JOB_ID);
+    jobState.setProp("foo", "bar");
+    jobState.setState(JobState.RunningState.COMMITTED);
+    jobState.setStartTime(this.startTime);
+    jobState.setEndTime(this.startTime + 1000);
+    jobState.setDuration(1000);
+
+    for (int i = 0; i < 3; i++) {
+      TaskState taskState = new TaskState();
+      taskState.setJobId(TEST_JOB_ID);
+      taskState.setTaskId(TEST_TASK_ID_PREFIX + i);
+      taskState.setId(TEST_TASK_ID_PREFIX + i);
+      taskState.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
+      jobState.addTaskState(taskState);
+    }
+
+    dbJobStateStore.put(TEST_JOB_NAME,
+        MysqlDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + 
MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX,
+        jobState);
+
+    jobState.setJobName(TEST_JOB_NAME2);
+    dbJobStateStore.put(TEST_JOB_NAME2,
+        MysqlDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + 
MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX,
+        jobState);
+
+    jobState.setJobName(TEST_JOB_NAME3);
+    dbJobStateStore.put(TEST_JOB_NAME3,
+        MysqlDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + 
MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX,
+        jobState);
+  }
+
+  @Test
+  public void testClBulkDelete() throws Exception {
+    String deleteFileText = TEST_JOB_NAME +"\n" + TEST_JOB_NAME2;
+    deleteFile = File.createTempFile("deleteFile.txt", null);
+    FileOutputStream outputStream = new FileOutputStream(deleteFile.getPath());
+    byte[] strToBytes = deleteFileText.getBytes();
+    outputStream.write(strToBytes);
+
+    outputStream.close();
+
+    JobStateStoreCLI cli = new JobStateStoreCLI();
+    String[] args = {"job-state-store", "-sc", configPath, "-bd", 
deleteFile.getPath()};
+    cli.run(args);
+
+    JobState jobState = dbJobStateStore.get(TEST_JOB_NAME,
+        dbDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + 
dbDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX,
+        TEST_JOB_ID);
+
+    JobState jobState2 = dbJobStateStore.get(TEST_JOB_NAME2,
+        dbDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + 
dbDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX,
+        TEST_JOB_ID);
+
+    JobState jobState3 = dbJobStateStore.get(TEST_JOB_NAME3,
+        dbDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + 
dbDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX,
+        TEST_JOB_ID);
+
+    Assert.assertNull(jobState);
+    Assert.assertNull(jobState2);
+    Assert.assertNotNull(jobState3);
+  }
+
+  @Test(dependsOnMethods = "testClBulkDelete")
+  public void testCliDeleteSingle() throws Exception {
+    JobStateStoreCLI cli = new JobStateStoreCLI();
+    String[] args = {"job-state-store", "-sc", configPath, "-d", "-n", 
TEST_JOB_NAME3};
+    cli.run(args);
+
+    JobState jobState = dbJobStateStore.get(TEST_JOB_NAME3,
+        dbDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + 
dbDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX,
+        TEST_JOB_ID);
+
+    Assert.assertNull(jobState);
+  }
+
+}

Reply via email to