Repository: incubator-gobblin Updated Branches: refs/heads/master 9795cd58d -> 8af87cb78
[GOBBLIN-270] Statestore Migration Script Closes #2122 from autumnust/statestoremigration Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/8af87cb7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/8af87cb7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/8af87cb7 Branch: refs/heads/master Commit: 8af87cb7807749b14640e4cc1feaeec173552d55 Parents: 9795cd5 Author: Lei Sun <[email protected]> Authored: Wed Oct 4 09:24:43 2017 -0700 Committer: Issac Buenrostro <[email protected]> Committed: Wed Oct 4 09:24:43 2017 -0700 ---------------------------------------------------------------------- gobblin-core/build.gradle | 3 +- .../metastore/DatasetStoreDatasetFinder.java | 10 +- .../apache/gobblin/metastore/StateStore.java | 2 +- .../metadata/StateStoreEntryManager.java | 1 + .../src/main/resources/migrationConfig | 25 ++++ .../gobblin/runtime/StateStoreMigrationCli.java | 114 +++++++++++++++++++ 6 files changed, 151 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8af87cb7/gobblin-core/build.gradle ---------------------------------------------------------------------- diff --git a/gobblin-core/build.gradle b/gobblin-core/build.gradle index ee7a77c..09eb38b 100644 --- a/gobblin-core/build.gradle +++ b/gobblin-core/build.gradle @@ -19,9 +19,10 @@ apply plugin: 'java' dependencies { compile project(":gobblin-api") - compile project(":gobblin-core-base") compile project(":gobblin-tunnel") compile project(":gobblin-utility") + compile project(":gobblin-metastore") + compile project(":gobblin-core-base") compile project(":gobblin-metrics-libs:gobblin-metrics") compile project(":gobblin-modules:gobblin-avro-json") compile project(":gobblin-modules:gobblin-metadata") http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8af87cb7/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDatasetFinder.java ---------------------------------------------------------------------- diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDatasetFinder.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDatasetFinder.java index 75d083d..51d03ca 100644 --- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDatasetFinder.java +++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/DatasetStoreDatasetFinder.java @@ -29,6 +29,7 @@ import org.apache.gobblin.metastore.predicates.DatasetPredicate; import org.apache.gobblin.metastore.predicates.StateStorePredicate; import org.apache.gobblin.metastore.predicates.StoreNamePredicate; import org.apache.gobblin.util.ConfigUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -54,8 +55,12 @@ public class DatasetStoreDatasetFinder implements DatasetsFinder<DatasetStoreDat this.predicate = buildPredicate(); } + public DatasetStoreDatasetFinder(Properties props) throws IOException { + this(FileSystem.get(new Configuration()), props); + } + private StateStorePredicate buildPredicate() { - StateStorePredicate predicate= null; + StateStorePredicate predicate = null; String storeName = null; String datasetUrn; @@ -66,7 +71,8 @@ public class DatasetStoreDatasetFinder implements DatasetsFinder<DatasetStoreDat if (ConfigUtils.hasNonEmptyPath(this.config, DATASET_URN_FILTER)) { if (storeName == null) { - throw new IllegalArgumentException(DATASET_URN_FILTER + " requires " + STORE_NAME_FILTER + " to also be defined."); + throw new IllegalArgumentException( + DATASET_URN_FILTER + " requires " + STORE_NAME_FILTER + " to also be defined."); } datasetUrn = this.config.getString(DATASET_URN_FILTER); predicate = new DatasetPredicate(storeName, datasetUrn, x -> true); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8af87cb7/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java ---------------------------------------------------------------------- diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java index 46c2aa8..425914a 100644 --- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java +++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/StateStore.java @@ -202,7 +202,7 @@ public interface StateStore<T extends State> { throws IOException; /** - * Gets metadata for all tables matching the input + * Gets entry managers for all tables matching the input * @param predicate Predicate used to filter tables. To allow state stores to push down predicates, use native extensions * of {@link StateStorePredicate}. * @return A list of all {@link StateStoreEntryManager}s matching the predicate. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8af87cb7/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java ---------------------------------------------------------------------- diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java index c4c7796..33d404b 100644 --- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java +++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java @@ -28,6 +28,7 @@ import lombok.Data; /** * Contains metadata about an entry in a {@link StateStore}. + * Exposes access to the {@link StateStore} that contains the entry. * @param <T> type of {@link State} that can be read from this entry. */ @Data http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8af87cb7/gobblin-metastore/src/main/resources/migrationConfig ---------------------------------------------------------------------- diff --git a/gobblin-metastore/src/main/resources/migrationConfig b/gobblin-metastore/src/main/resources/migrationConfig new file mode 100644 index 0000000..c238980 --- /dev/null +++ b/gobblin-metastore/src/main/resources/migrationConfig @@ -0,0 +1,25 @@ +# A simple template of configuration for migration of state store. + +source: { + state.store.type:fs + + #Path up to store name. Don't add table name. + state.store.dir: "" +} + + +destination: { + state.store.db.user : + state.store.db.table : + state.store.type : org.apache.gobblin.runtime.MysqlDatasetStateStoreFactory + state.store.db.url : "" + state.store.db.password : "" + + #Required + encrypt.key.loc: "" +} + + +keepOldState:true + +jobName:TestJob \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8af87cb7/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreMigrationCli.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreMigrationCli.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreMigrationCli.java new file mode 100644 index 0000000..d4cb91e --- /dev/null +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StateStoreMigrationCli.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime; + +import com.google.common.base.Preconditions; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.URISyntaxException; +import java.nio.charset.Charset; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.annotation.Alias; +import org.apache.gobblin.metastore.DatasetStateStore; +import org.apache.gobblin.runtime.cli.CliApplication; +import org.apache.gobblin.runtime.cli.CliObjectFactory; +import org.apache.gobblin.runtime.cli.CliObjectSupport; +import org.apache.gobblin.runtime.cli.ConstructorAndPublicMethodsCliObjectFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import static org.apache.gobblin.configuration.ConfigurationKeys.*; + + +/** + * A script used for state store migration: + * In the case that users are willing to change the storage medium of job state due to some reasons. + * + * Current implementation doesn't support data awareness on either source or target side. + * And only migrate a single job state instead of migrating all history versions. + */ +@Slf4j +@Alias(value = "stateMigration", description = "Command line tools for migrating state store") +public class StateStoreMigrationCli implements CliApplication { + private static final String SOURCE_KEY = "source"; + private static final String DESTINATION_KEY = "destination"; + private static final String JOB_NAME_KEY = "jobName"; + + /** + * Assume that there's no additional '/' in the end of state.store.dir's value + */ + private String extractStoreName(String dirPath) { + return dirPath.substring(dirPath.lastIndexOf('/') + 1); + } + + @Override + public void run(String[] args) throws Exception { + CliObjectFactory<Command> factory = new ConstructorAndPublicMethodsCliObjectFactory<>(Command.class); + Command command = factory.buildObject(args, 1, true, args[0]); + + FileSystem fs = FileSystem.get(new Configuration()); + FSDataInputStream inputStream = fs.open(command.path); + Config config = ConfigFactory.parseReader(new InputStreamReader(inputStream, Charset.defaultCharset())); + String storeName = this.extractStoreName(config.getConfig(SOURCE_KEY).getString(STATE_STORE_ROOT_DIR_KEY)); + + Preconditions.checkNotNull(config.getObject(SOURCE_KEY)); + Preconditions.checkNotNull(config.getObject(DESTINATION_KEY)); + Preconditions.checkNotNull(config.getString(JOB_NAME_KEY)); + + DatasetStateStore dstDatasetStateStore = + DatasetStateStore.buildDatasetStateStore(config.getConfig(DESTINATION_KEY)); + DatasetStateStore srcDatasetStateStore = DatasetStateStore.buildDatasetStateStore(config.getConfig(SOURCE_KEY)); + + Map<String, JobState.DatasetState> map = + srcDatasetStateStore.getLatestDatasetStatesByUrns(config.getString(JOB_NAME_KEY)); + for (Map.Entry<String, JobState.DatasetState> entry : map.entrySet()) { + dstDatasetStateStore.persistDatasetState(entry.getKey(), entry.getValue()); + } + if (command.deleteSourceStateStore) { + try { + srcDatasetStateStore.delete(storeName); + } catch (IOException ioe) { + log.warn("The source state store has been deleted.", ioe); + } + } + } + + /** + * This class has to been public static for being accessed by + * {@link ConstructorAndPublicMethodsCliObjectFactory#inferConstructorOptions} + */ + public static class Command { + + private final Path path; + private boolean deleteSourceStateStore = false; + + @CliObjectSupport(argumentNames = "configPath") + public Command(String path) throws URISyntaxException, IOException { + this.path = new Path(path); + } + + public void deleteSourceStateStore() { + this.deleteSourceStateStore = true; + } + } +}
