Repository: incubator-gobblin Updated Branches: refs/heads/master eda77bcb6 -> 7f55214e2
[GOBBLIN-446] Add support for migrating state for all jobs in a job store Closes #2321 from htran1/state_store_mig_all_jobs Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/7f55214e Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/7f55214e Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/7f55214e Branch: refs/heads/master Commit: 7f55214e20610a2b99652e003af9a7c684da7e49 Parents: eda77bc Author: Hung Tran <[email protected]> Authored: Thu Mar 29 11:27:20 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Thu Mar 29 11:27:20 2018 -0700 ---------------------------------------------------------------------- .../apache/gobblin/metastore/FsStateStore.java | 25 ++++++++ .../gobblin/metastore/MysqlStateStore.java | 59 +++++++++++++++---- .../apache/gobblin/metastore/StateStore.java | 17 +++++- .../gobblin/metastore/FsStateStoreTest.java | 26 ++++++-- .../apache/gobblin/metastore/ZkStateStore.java | 26 ++++++++ .../gobblin/runtime/ZkDatasetStateStore.java | 15 +++-- .../runtime/ZkDatasetStateStoreTest.java | 61 +++++++++++++++---- .../gobblin/runtime/MysqlDatasetStateStore.java | 16 +++-- .../gobblin/runtime/StateStoreMigrationCli.java | 62 +++++++++++++------- .../runtime/MysqlDatasetStateStoreTest.java | 54 ++++++++++++++--- 10 files changed, 291 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7f55214e/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java ---------------------------------------------------------------------- diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java index 54dbdd7..4dd100d 100644 --- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java +++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java @@ -317,6 +317,31 @@ public class FsStateStore<T extends State> implements StateStore<T> { return names; } + /** + * Get store names in the state store + * + * @param predicate only returns names matching predicate + * @return (possibly empty) list of store names from the given store + * @throws IOException + */ + public List<String> getStoreNames(Predicate<String> predicate) + throws IOException { + List<String> names = Lists.newArrayList(); + + Path storeRootPath = new Path(this.storeRootDir); + if (!this.fs.exists(storeRootPath)) { + return names; + } + + for (FileStatus status : this.fs.listStatus(storeRootPath)) { + if (predicate.apply(status.getPath().getName())) { + names.add(status.getPath().getName()); + } + } + + return names; + } + @Override public void createAlias(String storeName, String original, String alias) throws IOException { Path originalTablePath = new Path(new Path(this.storeRootDir, storeName), original); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7f55214e/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java ---------------------------------------------------------------------- diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java index d5ae6ff..169a5e1 100644 --- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java +++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java @@ -17,15 +17,6 @@ package org.apache.gobblin.metastore; -import com.google.common.base.Predicate; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; -import com.typesafe.config.Config; -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.configuration.State; -import org.apache.gobblin.password.PasswordManager; -import org.apache.gobblin.util.ConfigUtils; -import org.apache.gobblin.util.io.StreamUtils; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; @@ -45,10 +36,23 @@ import java.util.Collections; import java.util.List; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; -import javax.sql.DataSource; + import org.apache.commons.dbcp.BasicDataSource; import org.apache.hadoop.io.Text; +import com.google.common.base.Predicate; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.typesafe.config.Config; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.password.PasswordManager; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.io.StreamUtils; + +import javax.sql.DataSource; + /** * An implementation of {@link StateStore} backed by MySQL. * @@ -87,6 +91,9 @@ public class MysqlStateStore<T extends State> implements StateStore<T> { private static final String SELECT_JOB_STATE_NAMES_TEMPLATE = "SELECT table_name FROM $TABLE$ WHERE store_name = ?"; + private static final String SELECT_STORE_NAMES_TEMPLATE = + "SELECT distinct store_name FROM $TABLE$"; + private static final String DELETE_JOB_STORE_TEMPLATE = "DELETE FROM $TABLE$ WHERE store_name = ?"; @@ -114,6 +121,7 @@ public class MysqlStateStore<T extends State> implements StateStore<T> { private final String DELETE_JOB_STORE_SQL; private final String DELETE_JOB_STATE_SQL; private final String CLONE_JOB_STATE_SQL; + private final String SELECT_STORE_NAMES_SQL; /** * Manages the persistence and retrieval of {@link State} in a MySQL database @@ -137,6 +145,7 @@ public class MysqlStateStore<T extends State> implements StateStore<T> { DELETE_JOB_STORE_SQL = DELETE_JOB_STORE_TEMPLATE.replace("$TABLE$", stateStoreTableName); DELETE_JOB_STATE_SQL = DELETE_JOB_STATE_TEMPLATE.replace("$TABLE$", stateStoreTableName); CLONE_JOB_STATE_SQL = CLONE_JOB_STATE_TEMPLATE.replace("$TABLE$", stateStoreTableName); + SELECT_STORE_NAMES_SQL = SELECT_STORE_NAMES_TEMPLATE.replace("$TABLE$", stateStoreTableName); // create table if it does not exist String createJobTable = CREATE_JOB_STATE_TABLE_TEMPLATE.replace("$TABLE$", stateStoreTableName); @@ -379,6 +388,36 @@ public class MysqlStateStore<T extends State> implements StateStore<T> { return names; } + /** + * Get store names in the state store + * + * @param predicate only returns names matching predicate + * @return (possibly empty) list of store names from the given store + * @throws IOException + */ + public List<String> getStoreNames(Predicate<String> predicate) + throws IOException { + List<String> names = Lists.newArrayList(); + + try (Connection connection = dataSource.getConnection(); + PreparedStatement queryStatement = connection.prepareStatement(SELECT_STORE_NAMES_SQL)) { + + try (ResultSet rs = queryStatement.executeQuery()) { + while (rs.next()) { + String name = rs.getString(1); + if (predicate.apply(name)) { + names.add(name); + } + } + } + } catch (SQLException e) { + throw new IOException(String.format("Could not query store names"), e); + } + + return names; + } + + @Override public void createAlias(String storeName, String original, String alias) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7f55214e/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java ---------------------------------------------------------------------- diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java index 425914a..0a4cf43 100644 --- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java +++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java @@ -17,12 +17,13 @@ package org.apache.gobblin.metastore; -import com.google.common.base.Predicate; -import com.typesafe.config.Config; import java.io.IOException; import java.util.Collection; import java.util.List; +import com.google.common.base.Predicate; +import com.typesafe.config.Config; + import org.apache.gobblin.configuration.State; import org.apache.gobblin.metastore.metadata.StateStoreEntryManager; import org.apache.gobblin.metastore.predicates.StateStorePredicate; @@ -172,6 +173,18 @@ public interface StateStore<T extends State> { throws IOException; /** + * Get store names in the state store + * + * @param predicate only returns names matching predicate + * @return (possibly empty) list of store names from the given store + * @throws IOException + */ + public default List<String> getStoreNames(Predicate<String> predicate) + throws IOException { + throw new UnsupportedOperationException("Not implemented"); + } + + /** * Create an alias for an existing table. * * @param storeName store name http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7f55214e/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/FsStateStoreTest.java ---------------------------------------------------------------------- diff --git a/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/FsStateStoreTest.java b/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/FsStateStoreTest.java index 21c81ec..70160c8 100644 --- a/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/FsStateStoreTest.java +++ b/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/FsStateStoreTest.java @@ -17,13 +17,9 @@ package org.apache.gobblin.metastore; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigValueFactory; -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.util.ClassAliasResolver; import java.io.IOException; import java.net.URL; +import java.util.Collections; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -34,9 +30,15 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import com.google.common.base.Predicates; import com.google.common.collect.Lists; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; +import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; +import org.apache.gobblin.util.ClassAliasResolver; /** @@ -65,6 +67,7 @@ public class FsStateStoreTest { // cleanup in case files left behind by a prior run this.stateStore.delete("testStore"); + this.stateStore.delete("testStore2"); } @Test @@ -89,6 +92,9 @@ public class FsStateStoreTest { Assert.assertFalse(this.stateStore.exists("testStore", "testTable")); this.stateStore.putAll("testStore", "testTable", states); Assert.assertTrue(this.stateStore.exists("testStore", "testTable")); + + // for testing of getStoreNames + this.stateStore.putAll("testStore2", "testTable", states); } @Test(dependsOnMethods = { "testPut" }) @@ -117,6 +123,16 @@ public class FsStateStoreTest { Assert.assertEquals(states.get(2).getProp("k3"), "v3"); } + @Test(dependsOnMethods = { "testGetAlias" }) + public void testGetStoreNames() throws IOException { + List<String> storeNames = this.stateStore.getStoreNames(Predicates.alwaysTrue()); + Collections.sort(storeNames); + + Assert.assertTrue(storeNames.size() == 2); + Assert.assertEquals(storeNames.get(0), "testStore"); + Assert.assertEquals(storeNames.get(1), "testStore2"); + } + // Disable backwards compatibility change, since we are doing a major version upgrade // .. and this is related to previous migration. @Test http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7f55214e/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStore.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStore.java b/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStore.java index 59d0e4f..c09c42a 100644 --- a/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStore.java +++ b/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStore.java @@ -252,6 +252,32 @@ public class ZkStateStore<T extends State> implements StateStore<T> { return names; } + + /** + * Get store names in the state store + * + * @param predicate only returns names matching predicate + * @return (possibly empty) list of store names from the given store + * @throws IOException + */ + public List<String> getStoreNames(Predicate<String> predicate) + throws IOException { + List<String> names = Lists.newArrayList(); + String path = formPath(""); + + List<String> children = propStore.getChildNames(path, 0); + + if (children != null) { + for (String c : children) { + if (predicate.apply(c)) { + names.add(c); + } + } + } + + return names; + } + @Override public void createAlias(String storeName, String original, String alias) throws IOException { String pathOriginal = formPath(storeName, original); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7f55214e/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java b/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java index e9ecf35..db882c0 100644 --- a/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java +++ b/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java @@ -17,19 +17,22 @@ package org.apache.gobblin.runtime; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.google.common.base.CharMatcher; import com.google.common.base.Predicate; import com.google.common.base.Strings; import com.google.common.collect.Maps; + import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.metastore.DatasetStateStore; import org.apache.gobblin.metastore.ZkStateStore; -import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7f55214e/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java b/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java index 1091cf7..2ee5f1d 100644 --- a/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java +++ b/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java @@ -17,31 +17,39 @@ package org.apache.gobblin.runtime; -import org.apache.gobblin.config.ConfigBuilder; -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.configuration.WorkUnitState; -import org.apache.gobblin.metastore.DatasetStateStore; -import org.apache.gobblin.metastore.StateStore; -import org.apache.gobblin.metastore.ZkStateStore; -import org.apache.gobblin.metastore.ZkStateStoreConfigurationKeys; -import org.apache.gobblin.util.ClassAliasResolver; import java.io.IOException; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Properties; + import org.apache.curator.test.TestingServer; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import com.google.common.base.Predicates; + +import org.apache.gobblin.config.ConfigBuilder; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.metastore.DatasetStateStore; +import org.apache.gobblin.metastore.StateStore; +import org.apache.gobblin.metastore.ZkStateStore; +import org.apache.gobblin.metastore.ZkStateStoreConfigurationKeys; +import org.apache.gobblin.util.ClassAliasResolver; + /** * Unit tests for {@link ZkDatasetStateStore}. **/ @Test(groups = { "gobblin.runtime" }) public class ZkDatasetStateStoreTest { - private static final String TEST_JOB_NAME = "TestJob"; - private static final String TEST_JOB_ID = "TestJob1"; + private static final String TEST_JOB_NAME = "TestJobName1"; + private static final String TEST_JOB_NAME2 = "TestJobName2"; + + private static final String TEST_JOB_ID = "TestJobId1"; private static final String TEST_TASK_ID_PREFIX = "TestTask-"; private static final String TEST_DATASET_URN = "TestDataset"; private static final String TEST_DATASET_URN2 = "TestDataset2"; @@ -73,6 +81,8 @@ public class ZkDatasetStateStoreTest { // clear data that may have been left behind by a prior test run zkJobStateStore.delete(TEST_JOB_NAME); zkDatasetStateStore.delete(TEST_JOB_NAME); + zkJobStateStore.delete(TEST_JOB_NAME2); + zkDatasetStateStore.delete(TEST_JOB_NAME2); } @Test @@ -97,6 +107,12 @@ public class ZkDatasetStateStoreTest { zkJobStateStore.put(TEST_JOB_NAME, ZkDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + ZkDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX, jobState); + + // second job name for testing getting store names in a later test case + jobState.setJobName(TEST_JOB_NAME2); + zkJobStateStore.put(TEST_JOB_NAME2, + ZkDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + ZkDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX, + jobState); } @Test(dependsOnMethods = "testPersistJobState") @@ -150,6 +166,10 @@ public class ZkDatasetStateStoreTest { datasetState.setDuration(2000); zkDatasetStateStore.persistDatasetState(TEST_DATASET_URN2, datasetState); + + // second job name for testing getting store names in a later test case + datasetState.setJobName(TEST_JOB_NAME2); + zkDatasetStateStore.persistDatasetState(TEST_DATASET_URN2, datasetState); } @Test(dependsOnMethods = "testPersistDatasetState") @@ -175,7 +195,24 @@ public class ZkDatasetStateStoreTest { } } - @Test(dependsOnMethods = "testGetDatasetState") + @Test(dependsOnMethods = { "testGetDatasetState" }) + public void testGetStoreNames() throws IOException { + List<String> storeNames = this.zkJobStateStore.getStoreNames(Predicates.alwaysTrue()); + Collections.sort(storeNames); + + Assert.assertTrue(storeNames.size() == 2); + Assert.assertEquals(storeNames.get(0), TEST_JOB_NAME); + Assert.assertEquals(storeNames.get(1), TEST_JOB_NAME2); + + storeNames = this.zkDatasetStateStore.getStoreNames(Predicates.alwaysTrue()); + Collections.sort(storeNames); + + Assert.assertTrue(storeNames.size() == 2); + Assert.assertEquals(storeNames.get(0), TEST_JOB_NAME); + Assert.assertEquals(storeNames.get(1), TEST_JOB_NAME2); + } + + @Test(dependsOnMethods = "testGetStoreNames") public void testGetPreviousDatasetStatesByUrns() throws IOException { Map<String, JobState.DatasetState> datasetStatesByUrns = zkDatasetStateStore.getLatestDatasetStatesByUrns(TEST_JOB_NAME); @@ -240,6 +277,8 @@ public class ZkDatasetStateStoreTest { public void tearDown() throws IOException { zkJobStateStore.delete(TEST_JOB_NAME); zkDatasetStateStore.delete(TEST_JOB_NAME); + zkJobStateStore.delete(TEST_JOB_NAME2); + zkDatasetStateStore.delete(TEST_JOB_NAME2); if (testingServer != null) { testingServer.close(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7f55214e/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java index 741ac07..6e3e57a 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java @@ -17,19 +17,23 @@ package org.apache.gobblin.runtime; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.google.common.base.CharMatcher; import com.google.common.base.Strings; import com.google.common.collect.Maps; + import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.metastore.DatasetStateStore; import org.apache.gobblin.metastore.MysqlStateStore; -import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.Map; + import javax.sql.DataSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7f55214e/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreMigrationCli.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreMigrationCli.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreMigrationCli.java index d4cb91e..2c57da9 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreMigrationCli.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreMigrationCli.java @@ -17,26 +17,33 @@ package org.apache.gobblin.runtime; -import com.google.common.base.Preconditions; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; import java.io.IOException; import java.io.InputStreamReader; import java.net.URISyntaxException; import java.nio.charset.Charset; +import java.util.List; import java.util.Map; -import lombok.extern.slf4j.Slf4j; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.google.common.base.Preconditions; +import com.google.common.base.Predicates; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + import org.apache.gobblin.annotation.Alias; import org.apache.gobblin.metastore.DatasetStateStore; import org.apache.gobblin.runtime.cli.CliApplication; import org.apache.gobblin.runtime.cli.CliObjectFactory; +import org.apache.gobblin.runtime.cli.CliObjectOption; import org.apache.gobblin.runtime.cli.CliObjectSupport; import org.apache.gobblin.runtime.cli.ConstructorAndPublicMethodsCliObjectFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +import org.apache.gobblin.util.ConfigUtils; +import lombok.extern.slf4j.Slf4j; import static org.apache.gobblin.configuration.ConfigurationKeys.*; @@ -53,13 +60,8 @@ public class StateStoreMigrationCli implements CliApplication { private static final String SOURCE_KEY = "source"; private static final String DESTINATION_KEY = "destination"; private static final String JOB_NAME_KEY = "jobName"; - - /** - * Assume that there's no additional '/' in the end of state.store.dir's value - */ - private String extractStoreName(String dirPath) { - return dirPath.substring(dirPath.lastIndexOf('/') + 1); - } + private static final String MIGRATE_ALL_JOBS = "migrateAllJobs"; + private static final String DEFAULT_MIGRATE_ALL_JOBS = "false"; @Override public void run(String[] args) throws Exception { @@ -69,26 +71,41 @@ public class StateStoreMigrationCli implements CliApplication { FileSystem fs = FileSystem.get(new Configuration()); FSDataInputStream inputStream = fs.open(command.path); Config config = ConfigFactory.parseReader(new InputStreamReader(inputStream, Charset.defaultCharset())); - String storeName = this.extractStoreName(config.getConfig(SOURCE_KEY).getString(STATE_STORE_ROOT_DIR_KEY)); Preconditions.checkNotNull(config.getObject(SOURCE_KEY)); Preconditions.checkNotNull(config.getObject(DESTINATION_KEY)); - Preconditions.checkNotNull(config.getString(JOB_NAME_KEY)); DatasetStateStore dstDatasetStateStore = DatasetStateStore.buildDatasetStateStore(config.getConfig(DESTINATION_KEY)); DatasetStateStore srcDatasetStateStore = DatasetStateStore.buildDatasetStateStore(config.getConfig(SOURCE_KEY)); + Map<String, JobState.DatasetState> map; + + // if migrating state for all jobs then list the store names (job names) and copy the current jst files + if (ConfigUtils.getBoolean(config, MIGRATE_ALL_JOBS, Boolean.valueOf(DEFAULT_MIGRATE_ALL_JOBS))) { + List<String> jobNames = srcDatasetStateStore.getStoreNames(Predicates.alwaysTrue()); - Map<String, JobState.DatasetState> map = - srcDatasetStateStore.getLatestDatasetStatesByUrns(config.getString(JOB_NAME_KEY)); + for (String jobName : jobNames) { + migrateStateForJob(srcDatasetStateStore, dstDatasetStateStore, jobName, command.deleteSourceStateStore); + } + } else { + Preconditions.checkNotNull(config.getString(JOB_NAME_KEY)); + migrateStateForJob(srcDatasetStateStore, dstDatasetStateStore, config.getString(JOB_NAME_KEY), + command.deleteSourceStateStore); + } + } + + private static void migrateStateForJob(DatasetStateStore srcDatasetStateStore, DatasetStateStore dstDatasetStateStore, + String jobName, boolean deleteFromSource) throws IOException { + Map<String, JobState.DatasetState> map = srcDatasetStateStore.getLatestDatasetStatesByUrns(jobName); for (Map.Entry<String, JobState.DatasetState> entry : map.entrySet()) { dstDatasetStateStore.persistDatasetState(entry.getKey(), entry.getValue()); } - if (command.deleteSourceStateStore) { + + if (deleteFromSource) { try { - srcDatasetStateStore.delete(storeName); + srcDatasetStateStore.delete(jobName); } catch (IOException ioe) { - log.warn("The source state store has been deleted.", ioe); + log.warn("The source state store has been deleted", ioe); } } } @@ -107,6 +124,7 @@ public class StateStoreMigrationCli implements CliApplication { this.path = new Path(path); } + @CliObjectOption public void deleteSourceStateStore() { this.deleteSourceStateStore = true; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/7f55214e/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java index 45bb44d..e8ea281 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java @@ -17,6 +17,19 @@ package org.apache.gobblin.runtime; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.commons.dbcp.BasicDataSource; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.google.common.base.Predicates; + import org.apache.gobblin.config.ConfigBuilder; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.WorkUnitState; @@ -26,13 +39,6 @@ import org.apache.gobblin.metastore.StateStore; import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase; import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory; import org.apache.gobblin.util.ClassAliasResolver; -import java.io.IOException; -import java.util.Map; -import org.apache.commons.dbcp.BasicDataSource; -import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; /** @@ -44,6 +50,7 @@ public class MysqlDatasetStateStoreTest { private static final String TEST_STATE_STORE = "TestStateStore"; private static final String TEST_JOB_NAME = "TestJob"; private static final String TEST_JOB_NAME_LOWER = "testjob"; + private static final String TEST_JOB_NAME2 = "TestJob2"; private static final String TEST_JOB_ID = "TestJob1"; private static final String TEST_TASK_ID_PREFIX = "TestTask-"; private static final String TEST_DATASET_URN = "TestDataset"; @@ -87,6 +94,8 @@ public class MysqlDatasetStateStoreTest { // clear data that may have been left behind by a prior test run dbJobStateStore.delete(TEST_JOB_NAME); dbDatasetStateStore.delete(TEST_JOB_NAME); + dbJobStateStore.delete(TEST_JOB_NAME2); + dbDatasetStateStore.delete(TEST_JOB_NAME2); } @Test @@ -118,6 +127,12 @@ public class MysqlDatasetStateStoreTest { dbJobStateStore.put(TEST_JOB_NAME_LOWER, MysqlDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX, jobState); + + // second job name for testing getting store names in a later test case + jobState.setJobName(TEST_JOB_NAME2); + dbJobStateStore.put(TEST_JOB_NAME2, + MysqlDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX, + jobState); } @Test(dependsOnMethods = "testPersistJobState") @@ -189,6 +204,10 @@ public class MysqlDatasetStateStoreTest { datasetState.setDuration(3000); dbDatasetStateStore.persistDatasetState(TEST_DATASET_URN_LOWER, datasetState); + + // second job name for testing getting store names in a later test case + datasetState.setJobName(TEST_JOB_NAME2); + dbDatasetStateStore.persistDatasetState(TEST_DATASET_URN2, datasetState); } @Test(dependsOnMethods = "testPersistDatasetState") @@ -214,7 +233,26 @@ public class MysqlDatasetStateStoreTest { } } - @Test(dependsOnMethods = "testGetDatasetState") + @Test(dependsOnMethods = { "testGetDatasetState" }) + public void testGetStoreNames() throws IOException { + List<String> storeNames = this.dbJobStateStore.getStoreNames(Predicates.alwaysTrue()); + Collections.sort(storeNames); + + Assert.assertTrue(storeNames.size() == 3); + Assert.assertEquals(storeNames.get(0), TEST_JOB_NAME); + Assert.assertEquals(storeNames.get(1), TEST_JOB_NAME2); + Assert.assertEquals(storeNames.get(2), TEST_JOB_NAME_LOWER); + + + storeNames = this.dbDatasetStateStore.getStoreNames(Predicates.alwaysTrue()); + Collections.sort(storeNames); + + Assert.assertTrue(storeNames.size() == 2); + Assert.assertEquals(storeNames.get(0), TEST_JOB_NAME); + Assert.assertEquals(storeNames.get(1), TEST_JOB_NAME2); + } + + @Test(dependsOnMethods = "testGetStoreNames") public void testGetPreviousDatasetStatesByUrns() throws IOException { Map<String, JobState.DatasetState> datasetStatesByUrns = dbDatasetStateStore.getLatestDatasetStatesByUrns(TEST_JOB_NAME);
