Repository: samza Updated Branches: refs/heads/master 4f7bbb054 -> 81542ecf4
SAMZA-625: added a tool to consume changelog and materialize a state store Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/81542ecf Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/81542ecf Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/81542ecf Branch: refs/heads/master Commit: 81542ecf484eccab2bfbb8f6ce1443c2f57eb6e4 Parents: 4f7bbb0 Author: Yan Fang <[email protected]> Authored: Wed Jul 15 22:47:53 2015 -0700 Committer: Yan Fang <[email protected]> Committed: Wed Jul 15 22:47:53 2015 -0700 ---------------------------------------------------------------------- checkstyle/import-control.xml | 7 + .../versioned/container/state-management.md | 10 + .../apache/samza/config/JavaStorageConfig.java | 63 +++++ .../apache/samza/config/JavaSystemConfig.java | 63 +++++ .../apache/samza/storage/StateStorageTool.java | 61 +++++ .../apache/samza/storage/StorageRecovery.java | 256 +++++++++++++++++++ .../samza/storage/TaskStorageManager.scala | 9 +- .../main/scala/org/apache/samza/util/Util.scala | 12 +- .../samza/config/TestJavaStorageConfig.java | 46 ++++ .../samza/config/TestJavaSystemConfig.java | 41 +++ .../apache/samza/storage/MockStorageEngine.java | 60 +++++ .../samza/storage/MockStorageEngineFactory.java | 37 +++ .../samza/storage/MockSystemConsumer.java | 59 +++++ .../apache/samza/storage/MockSystemFactory.java | 45 ++++ .../samza/storage/TestStorageRecovery.java | 111 ++++++++ .../apache/samza/config/Log4jSystemConfig.java | 47 +--- samza-shell/src/main/bash/state-storage-tool.sh | 21 ++ 17 files changed, 905 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/checkstyle/import-control.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index de835c7..eef3370 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -142,6 +142,13 @@ <allow pkg="org.apache.samza.serializers" /> <allow pkg="org.apache.samza.system" /> <allow pkg="org.apache.samza.task" /> + <allow pkg="org.apache.samza.util" /> + <allow pkg="org.apache.samza.job" /> + <allow pkg="org.apache.samza.config" /> + <allow pkg="joptsimple" /> + + <allow class="org.apache.samza.SamzaException" /> + <allow class="org.apache.samza.Partition" /> </subpackage> <subpackage name="logging"> http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/docs/learn/documentation/versioned/container/state-management.md ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/container/state-management.md b/docs/learn/documentation/versioned/container/state-management.md index 79067bb..50d4b65 100644 --- a/docs/learn/documentation/versioned/container/state-management.md +++ b/docs/learn/documentation/versioned/container/state-management.md @@ -184,6 +184,16 @@ public interface KeyValueStore<K, V> { Additional configuration properties for the key-value store are documented in the [configuration reference](../jobs/configuration-table.html#keyvalue-rocksdb). +### Debug Key-value storage + +Currently Samza provides a state storage tool which can recover the state store from the changelog stream to user-specified directory for reusing and debugging. + +{% highlight bash %} +samza-example/target/bin/state-storage-tool.sh \ + --config-path=file:///path/to/job/config.properties \ + --path=directory/to/put/state/stores +{% endhighlight %} + #### Known Issues RocksDB has several rough edges. It's recommended that you read the RocksDB [tuning guide](https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide). Some other notes to be aware of are: http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java new file mode 100644 index 0000000..af7d4ca --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.config; + +import java.util.ArrayList; +import java.util.List; + +/** + * a java version of the storage config + */ +public class JavaStorageConfig extends MapConfig { + + private static final String FACTORY_SUFFIX = ".factory"; + private static final String STORE_PREFIX = "stores."; + + public JavaStorageConfig(Config config) { + super(config); + } + + public List<String> getStoreNames() { + Config subConfig = subset(STORE_PREFIX, true); + List<String> storeNames = new ArrayList<String>(); + for (String key : subConfig.keySet()) { + if (key.endsWith(FACTORY_SUFFIX)) { + storeNames.add(key.substring(0, key.length() - FACTORY_SUFFIX.length())); + } + } + return storeNames; + } + + public String getChangelogStream(String storeName) { + return get(String.format(StorageConfig.CHANGELOG_STREAM(), storeName), null); + } + + public String getStorageFactoryClassName(String storeName) { + return get(String.format(StorageConfig.FACTORY(), storeName), null); + } + + public String getStorageKeySerde(String storeName) { + return get(String.format(StorageConfig.KEY_SERDE(), storeName), null); + } + + public String getStorageMsgSerde(String storeName) { + return get(String.format(StorageConfig.MSG_SERDE(), storeName), null); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java new file mode 100644 index 0000000..cf8d640 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.config; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * a java version of the system config + */ +public class JavaSystemConfig extends MapConfig { + private static final String SYSTEM_PREFIX = "systems."; + private static final String SYSTEM_FACTORY_SUFFIX = ".samza.factory"; + private static final String SYSTEM_FACTORY = "systems.%s.samza.factory"; + private static final String EMPTY = ""; + + public JavaSystemConfig(Config config) { + super(config); + } + + public String getSystemFactory(String name) { + if (name == null) { + return null; + } + String systemFactory = String.format(SYSTEM_FACTORY, name); + return get(systemFactory, null); + } + + /** + * Get a list of system names. + * + * @return A list system names + */ + public List<String> getSystemNames() { + Config subConf = subset(SYSTEM_PREFIX, true); + ArrayList<String> systemNames = new ArrayList<String>(); + for (Map.Entry<String, String> entry : subConf.entrySet()) { + String key = entry.getKey(); + if (key.endsWith(SYSTEM_FACTORY_SUFFIX)) { + systemNames.add(key.replace(SYSTEM_FACTORY_SUFFIX, EMPTY)); + } + } + return systemNames; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-core/src/main/java/org/apache/samza/storage/StateStorageTool.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/storage/StateStorageTool.java b/samza-core/src/main/java/org/apache/samza/storage/StateStorageTool.java new file mode 100644 index 0000000..beba35c --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/storage/StateStorageTool.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage; + +import joptsimple.ArgumentAcceptingOptionSpec; +import joptsimple.OptionSet; + +import org.apache.samza.config.MapConfig; +import org.apache.samza.util.CommandLine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Commandline tool to recover the state storage to a specified directory + */ +public class StateStorageTool extends CommandLine { + private ArgumentAcceptingOptionSpec<String> newPathArgu = parser().accepts("path", "path of the new state storage").withRequiredArg().ofType(String.class).describedAs("path"); + private String newPath = ""; + private Logger log = LoggerFactory.getLogger(StateStorageTool.class); + + @Override + public MapConfig loadConfig(OptionSet options) { + MapConfig config = super.loadConfig(options); + if (options.has(newPathArgu)) { + newPath = options.valueOf(newPathArgu); + log.info("new state storage is " + newPath); + } + return config; + } + + public String getPath() { + return newPath; + } + + public static void main(String[] args) { + StateStorageTool tool = new StateStorageTool(); + OptionSet options = tool.parser().parse(args); + MapConfig config = tool.loadConfig(options); + String path = tool.getPath(); + + StorageRecovery storageRecovery = new StorageRecovery(config, path); + storageRecovery.run(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java new file mode 100644 index 0000000..c564964 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage; + +import java.io.File; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.config.JavaStorageConfig; +import org.apache.samza.config.JavaSystemConfig; +import org.apache.samza.container.SamzaContainerContext; +import org.apache.samza.coordinator.JobCoordinator; +import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.job.model.JobModel; +import org.apache.samza.job.model.TaskModel; +import org.apache.samza.metrics.MetricsRegistryMap; +import org.apache.samza.serializers.ByteSerde; +import org.apache.samza.serializers.Serde; +import org.apache.samza.system.StreamMetadataCache; +import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.SystemConsumer; +import org.apache.samza.system.SystemFactory; +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.util.CommandLine; +import org.apache.samza.util.SystemClock; +import org.apache.samza.util.Util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Recovers the state storages from the changelog streams and store the storages + * in the directory provided by the users. The changelog streams are derived + * from the job's config file. + */ +public class StorageRecovery extends CommandLine { + + private Config jobConfig; + private int maxPartitionNumber = 0; + private File storeBaseDir = null; + private HashMap<String, SystemStream> changeLogSystemStreams = new HashMap<String, SystemStream>(); + private HashMap<String, StorageEngineFactory<?, ?>> storageEngineFactories = new HashMap<String, StorageEngineFactory<?, ?>>(); + private HashMap<String, SystemFactory> systemFactories = new HashMap<String, SystemFactory>(); + private HashMap<String, SystemAdmin> systemAdmins = new HashMap<String, SystemAdmin>(); + private Map<Integer, ContainerModel> containers = new HashMap<Integer, ContainerModel>(); + private List<TaskStorageManager> taskStorageManagers = new ArrayList<TaskStorageManager>(); + private Logger log = LoggerFactory.getLogger(StorageRecovery.class); + + /** + * Construct the StorageRecovery + * + * @param config + * the job config + * @param path + * the directory path where we put the stores + */ + StorageRecovery(Config config, String path) { + jobConfig = config; + storeBaseDir = new File(path, "state"); + } + + /** + * setup phase which assigns required values to the variables used for all + * tasks. + */ + private void setup() { + log.info("setting up the recovery..."); + + getContainerModels(); + getSystemFactoriesAndAdmins(); + getChangeLogSystemStreamsAndStorageFactories(); + getChangeLogMaxPartitionNumber(); + getTaskStorageManagers(); + } + + /** + * run the setup phase and restore all the task storages + */ + public void run() { + setup(); + + log.info("start recovering..."); + + for (TaskStorageManager taskStorageManager : taskStorageManagers) { + taskStorageManager.init(); + taskStorageManager.stopStores(); + log.debug("restored " + taskStorageManager.toString()); + } + + log.info("successfully recovered in " + storeBaseDir.toString()); + } + + /** + * build the ContainerModels from job config file and put the results in the + * map + */ + private void getContainerModels() { + JobModel jobModel = JobCoordinator.apply(jobConfig).jobModel(); + containers = jobModel.getContainers(); + } + + /** + * get the SystemFactories and SystemAdmins specified in the config file and + * put them into the maps + */ + private void getSystemFactoriesAndAdmins() { + JavaSystemConfig systemConfig = new JavaSystemConfig(jobConfig); + List<String> systems = systemConfig.getSystemNames(); + + for (String system : systems) { + String systemFactory = systemConfig.getSystemFactory(system); + if (systemFactory == null) { + throw new SamzaException("A stream uses system " + system + " which is missing from the configuration."); + } + systemFactories.put(system, Util.<SystemFactory>getObj(systemFactory)); + systemAdmins.put(system, Util.<SystemFactory>getObj(systemFactory).getAdmin(system, jobConfig)); + } + + log.info("Got system factories: " + systemFactories.keySet().toString()); + log.info("Got system admins: " + systemAdmins.keySet().toString()); + } + + /** + * get the changelog streams and the storage factories from the config file + * and put them into the maps + */ + private void getChangeLogSystemStreamsAndStorageFactories() { + JavaStorageConfig config = new JavaStorageConfig(jobConfig); + List<String> storeNames = config.getStoreNames(); + + log.info("Got store names: " + storeNames.toString()); + + for (String storeName : storeNames) { + String streamName = config.getChangelogStream(storeName); + + log.info("stream name for " + storeName + " is " + streamName); + + if (streamName != null) { + changeLogSystemStreams.put(storeName, Util.getSystemStreamFromNames(streamName)); + } + + String factoryClass = config.getStorageFactoryClassName(storeName); + if (factoryClass != null) { + storageEngineFactories.put(storeName, Util.<StorageEngineFactory<Object, Object>>getObj(factoryClass)); + } else { + throw new SamzaException("Missing storage factory for " + storeName + "."); + } + } + } + + /** + * get the SystemConsumers for the stores + */ + private HashMap<String, SystemConsumer> getStoreConsumers() { + HashMap<String, SystemConsumer> storeConsumers = new HashMap<String, SystemConsumer>(); + + for (Entry<String, SystemStream> entry : changeLogSystemStreams.entrySet()) { + String storeSystem = entry.getValue().getSystem(); + if (!systemFactories.containsKey(storeSystem)) { + throw new SamzaException("Changelog system " + storeSystem + " for store " + entry.getKey() + " does not exist in the config."); + } + storeConsumers.put(entry.getKey(), systemFactories.get(storeSystem).getConsumer(storeSystem, jobConfig, new MetricsRegistryMap())); + } + + return storeConsumers; + } + + /** + * get the max partition number of the changelog stream + */ + private void getChangeLogMaxPartitionNumber() { + int maxPartitionId = 0; + for (ContainerModel containerModel : containers.values()) { + for (TaskModel taskModel : containerModel.getTasks().values()) { + maxPartitionId = Math.max(maxPartitionId, taskModel.getChangelogPartition().getPartitionId()); + } + } + maxPartitionNumber = maxPartitionId + 1; + } + + /** + * create one TaskStorageManager for each task. Add all of them to the + * List<TaskStorageManager> + */ + private void getTaskStorageManagers() { + StreamMetadataCache streamMetadataCache = new StreamMetadataCache(Util.javaMapAsScalaMap(systemAdmins), 5000, SystemClock.instance()); + + for (ContainerModel containerModel : containers.values()) { + HashMap<String, StorageEngine> taskStores = new HashMap<String, StorageEngine>(); + SamzaContainerContext containerContext = new SamzaContainerContext(containerModel.getContainerId(), jobConfig, containerModel.getTasks() + .keySet()); + + for (TaskModel taskModel : containerModel.getTasks().values()) { + HashMap<String, SystemConsumer> storeConsumers = getStoreConsumers(); + + for (Entry<String, StorageEngineFactory<?, ?>> entry : storageEngineFactories.entrySet()) { + String storeName = entry.getKey(); + + if (changeLogSystemStreams.containsKey(storeName)) { + SystemStreamPartition changeLogSystemStreamPartition = new SystemStreamPartition(changeLogSystemStreams.get(storeName), + taskModel.getChangelogPartition()); + File storePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskModel.getTaskName()); + + log.info("Got storage engine directory: " + storePartitionDir); + + StorageEngine storageEngine = (entry.getValue()).getStorageEngine( + storeName, + storePartitionDir, + (Serde) new ByteSerde(), + (Serde) new ByteSerde(), + null, + new MetricsRegistryMap(), + changeLogSystemStreamPartition, + containerContext); + taskStores.put(storeName, storageEngine); + } + } + + TaskStorageManager taskStorageManager = new TaskStorageManager( + taskModel.getTaskName(), + Util.javaMapAsScalaMap(taskStores), + Util.javaMapAsScalaMap(storeConsumers), + Util.javaMapAsScalaMap(changeLogSystemStreams), + maxPartitionNumber, + streamMetadataCache, + storeBaseDir, + storeBaseDir, taskModel.getChangelogPartition(), + Util.javaMapAsScalaMap(systemAdmins)); + + taskStorageManagers.add(taskStorageManager); + } + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala index aeba61a..c39cdc7 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala @@ -36,7 +36,7 @@ object TaskStorageManager { def getStorePartitionDir(storeBaseDir: File, storeName: String, taskName: TaskName) = { // TODO: Sanitize, check and clean taskName string as a valid value for a file - new File(storeBaseDir, storeName + File.separator + taskName) + new File(storeBaseDir, (storeName + File.separator + taskName.toString).replace(' ', '_')) } } @@ -188,9 +188,13 @@ class TaskStorageManager( taskStores.values.foreach(_.flush) } - def stop() { + def stopStores() { debug("Stopping stores.") taskStores.values.foreach(_.stop) + } + + def stop() { + stopStores() debug("Persisting logged key value stores") changeLogSystemStreams.foreach { case (store, systemStream) => { @@ -208,7 +212,6 @@ class TaskStorageManager( }} } - /** * Builds a map from SystemStreamPartition to oldest offset for changelogs. */ http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-core/src/main/scala/org/apache/samza/util/Util.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala index 2feb65b..419452c 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala @@ -34,6 +34,10 @@ import org.apache.samza.config.ConfigException import org.apache.samza.config.MapConfig import scala.collection.JavaConversions._ import org.apache.samza.config.JobConfig +import org.apache.samza.job.model.JobModel +import java.io.InputStreamReader +import scala.collection.JavaConverters._ +import scala.collection.immutable.Map object Util extends Logging { val random = new Random @@ -167,7 +171,6 @@ object Util extends Logging { body } - /** * Generates a coordinator stream name based off of the job name and job id * for the jobd. The format is of the stream name will be @@ -294,4 +297,11 @@ object Util extends Logging { fis.close() } } + + /** + * Convert a java map to a Scala map + * */ + def javaMapAsScalaMap[T, K](javaMap: java.util.Map[T, K]): Map[T, K] = { + javaMap.toMap + } } http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-core/src/test/java/org/apache/samza/config/TestJavaStorageConfig.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/config/TestJavaStorageConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestJavaStorageConfig.java new file mode 100644 index 0000000..6c93697 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/config/TestJavaStorageConfig.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.config; + +import static org.junit.Assert.*; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Test; + +public class TestJavaStorageConfig { + + @Test + public void testStorageConfig() { + Map<String, String> map = new HashMap<String, String>(); + map.put("stores.test.factory", "testFactory"); + map.put("stores.test.changelog", "testChangelog"); + map.put("stores.test.key.serde", "string"); + map.put("stores.test.msg.serde", "integer"); + JavaStorageConfig config = new JavaStorageConfig(new MapConfig(map)); + + assertEquals("testFactory", config.getStorageFactoryClassName("test")); + assertEquals("testChangelog", config.getChangelogStream("test")); + assertEquals("string", config.getStorageKeySerde("test")); + assertEquals("integer", config.getStorageMsgSerde("test")); + assertEquals("test", config.getStoreNames().get(0)); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java new file mode 100644 index 0000000..9b39ec8 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.config; + +import static org.junit.Assert.*; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Test; + +public class TestJavaSystemConfig { + + @Test + public void testGetSystemNames() { + Map<String, String> map = new HashMap<String, String>(); + map.put("systems.system1.samza.factory", "1"); + map.put("systems.system2.samza.factory", "2"); + JavaSystemConfig systemConfig = new JavaSystemConfig( + new MapConfig(map)); + + assertEquals(2, systemConfig.getSystemNames().size()); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java new file mode 100644 index 0000000..b90ea87 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage; + +import java.io.File; +import java.util.ArrayList; +import java.util.Iterator; + +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemStreamPartition; + +/** + * A mock StorageEngine that stores what it receives from the StorageEngine. + * Those variables/values can be retrieved directly by variable names. + */ +public class MockStorageEngine implements StorageEngine { + + public static String storeName; + public static File storeDir; + public static SystemStreamPartition ssp; + public static ArrayList<IncomingMessageEnvelope> incomingMessageEnvelopes = new ArrayList<IncomingMessageEnvelope>(); + + public MockStorageEngine(String storeName, File storeDir, SystemStreamPartition changeLogSystemStreamPartition) { + MockStorageEngine.storeName = storeName; + MockStorageEngine.storeDir = storeDir; + MockStorageEngine.ssp = changeLogSystemStreamPartition; + } + + @Override + public void restore(Iterator<IncomingMessageEnvelope> envelopes) { + while (envelopes.hasNext()) { + incomingMessageEnvelopes.add(envelopes.next()); + } + } + + @Override + public void flush() { + } + + @Override + public void stop() { + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java new file mode 100644 index 0000000..c00c454 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage; + +import java.io.File; + +import org.apache.samza.container.SamzaContainerContext; +import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.serializers.Serde; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.task.MessageCollector; + +public class MockStorageEngineFactory implements StorageEngineFactory<Object, Object> { + @Override + public StorageEngine getStorageEngine(String storeName, File storeDir, Serde<Object> keySerde, Serde<Object> msgSerde, + MessageCollector collector, MetricsRegistry registry, SystemStreamPartition changeLogSystemStreamPartition, + SamzaContainerContext containerContext) { + return new MockStorageEngine(storeName, storeDir, changeLogSystemStreamPartition); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-core/src/test/java/org/apache/samza/storage/MockSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/storage/MockSystemConsumer.java b/samza-core/src/test/java/org/apache/samza/storage/MockSystemConsumer.java new file mode 100644 index 0000000..07c4a24 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/storage/MockSystemConsumer.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemConsumer; +import org.apache.samza.system.SystemStreamPartition; + +public class MockSystemConsumer implements SystemConsumer { + public static Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messages = new HashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>(); + private boolean flag = true; // flag to make sure the messages only are + // returned once + + @Override + public void start() {} + + @Override + public void stop() {} + + @Override + public void register(SystemStreamPartition systemStreamPartition, String offset) {} + + @Override + public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException { + if (flag) { + ArrayList<IncomingMessageEnvelope> list = new ArrayList<IncomingMessageEnvelope>(); + list.add(TestStorageRecovery.msg); + messages.put(TestStorageRecovery.ssp, list); + flag = false; + return messages; + } else { + messages.clear(); + return messages; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-core/src/test/java/org/apache/samza/storage/MockSystemFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/storage/MockSystemFactory.java b/samza-core/src/test/java/org/apache/samza/storage/MockSystemFactory.java new file mode 100644 index 0000000..7abf82b --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/storage/MockSystemFactory.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage; + +import org.apache.samza.config.Config; +import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.SystemConsumer; +import org.apache.samza.system.SystemFactory; +import org.apache.samza.system.SystemProducer; + +public class MockSystemFactory implements SystemFactory { + + @Override + public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) { + return new MockSystemConsumer(); + } + + @Override + public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) { + return null; + } + + @Override + public SystemAdmin getAdmin(String systemName, Config config) { + return TestStorageRecovery.systemAdmin; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java b/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java new file mode 100644 index 0000000..b8ae592 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.samza.Partition; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.SystemConsumer; +import org.apache.samza.system.SystemStreamMetadata; +import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata; +import org.apache.samza.system.SystemStreamPartition; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +public class TestStorageRecovery { + + public static SystemConsumer systemConsumer1 = null; + public static SystemConsumer systemConsumer2 = null; + public static SystemAdmin systemAdmin = null; + public static Config config = null; + public static SystemStreamMetadata systemStreamMetadata = null; + public static SystemStreamMetadata inputSystemStreamMetadata = null; + public final static String SYSTEM_STREAM_NAME = "changelog"; + public final static String INPUT_STREAM = "input"; + public static SystemStreamPartition ssp = new SystemStreamPartition("mockSystem", SYSTEM_STREAM_NAME, new Partition(0)); + public static IncomingMessageEnvelope msg = new IncomingMessageEnvelope(TestStorageRecovery.ssp, "0", "test", "test"); + + @Before + public void setup() throws InterruptedException { + putConfig(); + putMetadata(); + + systemAdmin = mock(SystemAdmin.class); + + Set<String> set1 = new HashSet<String>(Arrays.asList(SYSTEM_STREAM_NAME)); + Set<String> set2 = new HashSet<String>(Arrays.asList(INPUT_STREAM)); + HashMap<String, SystemStreamMetadata> ssmMap = new HashMap<String, SystemStreamMetadata>(); + ssmMap.put(SYSTEM_STREAM_NAME, systemStreamMetadata); + ssmMap.put(INPUT_STREAM, inputSystemStreamMetadata); + when(systemAdmin.getSystemStreamMetadata(set1)).thenReturn(ssmMap); + when(systemAdmin.getSystemStreamMetadata(set2)).thenReturn(ssmMap); + } + + @Test + public void testStorageEngineReceivedAllValues() { + String path = "/tmp/testing"; + StorageRecovery storageRecovery = new StorageRecovery(config, path); + storageRecovery.run(); + + // because the stream has two partitions + assertEquals(2, MockStorageEngine.incomingMessageEnvelopes.size()); + assertEquals(TestStorageRecovery.msg, MockStorageEngine.incomingMessageEnvelopes.get(0)); + assertEquals(TestStorageRecovery.msg, MockStorageEngine.incomingMessageEnvelopes.get(1)); + // correct path is passed to the store engine + assertEquals(path + "/state/testStore/Partition_1", MockStorageEngine.storeDir.toString()); + } + + private void putConfig() { + Map<String, String> map = new HashMap<String, String>(); + map.put("job.name", "changelogTest"); + map.put("systems.mockSystem.samza.factory", MockSystemFactory.class.getCanonicalName()); + map.put("stores.testStore.factory", MockStorageEngineFactory.class.getCanonicalName()); + map.put("stores.testStore.changelog", "mockSystem." + SYSTEM_STREAM_NAME); + map.put("task.inputs", "mockSystem.input"); + map.put("job.coordinator.system", "coordinator"); + map.put("systems.coordinator.samza.factory", MockCoordinatorStreamSystemFactory.class.getCanonicalName()); + config = new MapConfig(map); + } + + private void putMetadata() { + SystemStreamMetadata.SystemStreamPartitionMetadata sspm = new SystemStreamMetadata.SystemStreamPartitionMetadata("0", "1", "2"); + HashMap<Partition, SystemStreamPartitionMetadata> map = new HashMap<Partition, SystemStreamPartitionMetadata>(); + map.put(new Partition(0), sspm); + map.put(new Partition(1), sspm); + systemStreamMetadata = new SystemStreamMetadata(SYSTEM_STREAM_NAME, map); + + HashMap<Partition, SystemStreamPartitionMetadata> map1 = new HashMap<Partition, SystemStreamPartitionMetadata>(); + map1.put(new Partition(0), sspm); + map1.put(new Partition(1), sspm); + inputSystemStreamMetadata = new SystemStreamMetadata(INPUT_STREAM, map1); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java ---------------------------------------------------------------------- diff --git a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java index d5e24f2..209296d 100644 --- a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java +++ b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java @@ -19,25 +19,19 @@ package org.apache.samza.config; -import java.util.ArrayList; import java.util.List; -import java.util.Map; /** * This class contains the methods for getting properties that are needed by the * StreamAppender. */ -public class Log4jSystemConfig { +public class Log4jSystemConfig extends JavaSystemConfig { private static final String LOCATION_ENABLED = "task.log4j.location.info.enabled"; private static final String TASK_LOG4J_SYSTEM = "task.log4j.system"; - private static final String SYSTEM_PREFIX = "systems."; - private static final String SYSTEM_FACTORY_SUFFIX = ".samza.factory"; - private static final String EMPTY = ""; - private Config config = null; public Log4jSystemConfig(Config config) { - this.config = config; + super(config); } /** @@ -49,7 +43,7 @@ public class Log4jSystemConfig { * information in Log4J appender messages. */ public boolean getLocationEnabled() { - return "true".equals(config.get(Log4jSystemConfig.LOCATION_ENABLED, "false")); + return "true".equals(get(Log4jSystemConfig.LOCATION_ENABLED, "false")); } /** @@ -59,7 +53,7 @@ public class Log4jSystemConfig { * @return log4j system name */ public String getSystemName() { - String log4jSystem = config.get(TASK_LOG4J_SYSTEM, null); + String log4jSystem = get(TASK_LOG4J_SYSTEM, null); if (log4jSystem == null) { List<String> systemNames = getSystemNames(); if (systemNames.size() == 1) { @@ -72,19 +66,11 @@ public class Log4jSystemConfig { } public String getJobName() { - return config.get(JobConfig.JOB_NAME(), null); + return get(JobConfig.JOB_NAME(), null); } public String getJobId() { - return config.get(JobConfig.JOB_ID(), null); - } - - public String getSystemFactory(String name) { - if (name == null) { - return null; - } - String systemFactory = String.format(SystemConfig.SYSTEM_FACTORY(), name); - return config.get(systemFactory, null); + return get(JobConfig.JOB_ID(), null); } /** @@ -96,28 +82,11 @@ public class Log4jSystemConfig { * supplied serde name. */ public String getSerdeClass(String name) { - return config.get(String.format(SerializerConfig.SERDE(), name), null); + return get(String.format(SerializerConfig.SERDE(), name), null); } public String getStreamSerdeName(String systemName, String streamName) { String streamSerdeNameConfig = String.format(StreamConfig.MSG_SERDE(), systemName, streamName); - return config.get(streamSerdeNameConfig, null); - } - - /** - * Get a list of system names. - * - * @return A list system names - */ - protected List<String> getSystemNames() { - Config subConf = config.subset(SYSTEM_PREFIX, true); - ArrayList<String> systemNames = new ArrayList<String>(); - for (Map.Entry<String, String> entry : subConf.entrySet()) { - String key = entry.getKey(); - if (key.endsWith(SYSTEM_FACTORY_SUFFIX)) { - systemNames.add(key.replace(SYSTEM_FACTORY_SUFFIX, EMPTY)); - } - } - return systemNames; + return get(streamSerdeNameConfig, null); } } http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-shell/src/main/bash/state-storage-tool.sh ---------------------------------------------------------------------- diff --git a/samza-shell/src/main/bash/state-storage-tool.sh b/samza-shell/src/main/bash/state-storage-tool.sh new file mode 100755 index 0000000..05a4f25 --- /dev/null +++ b/samza-shell/src/main/bash/state-storage-tool.sh @@ -0,0 +1,21 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml" + +exec $(dirname $0)/run-class.sh org.apache.samza.storage.StateStorageTool "$@"
