Repository: samza Updated Branches: refs/heads/master 94e4e396e -> 38e81c0f9
SAMZA-889: Change log not working properly with In memory Store Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/38e81c0f Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/38e81c0f Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/38e81c0f Branch: refs/heads/master Commit: 38e81c0f9b5a1dfeeac1a1c146aaaec9ec0987f2 Parents: 94e4e39 Author: Navina Ramesh <[email protected]> Authored: Wed Jun 22 15:31:39 2016 -0700 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Wed Jun 22 16:08:27 2016 -0700 ---------------------------------------------------------------------- checkstyle/import-control.xml | 4 + .../org/apache/samza/storage/StorageEngine.java | 4 + .../samza/storage/StorageEngineFactory.java | 18 +- .../apache/samza/storage/StoreProperties.java | 71 ++ .../apache/samza/container/SamzaContainer.scala | 41 +- .../samza/container/SamzaContainer.scala.orig | 787 ------------------- .../samza/storage/TaskStorageManager.scala | 77 +- .../apache/samza/storage/MockStorageEngine.java | 9 +- .../samza/storage/MockStorageEngineFactory.java | 3 +- .../samza/storage/TestStorageRecovery.java | 2 +- .../samza/storage/TestTaskStorageManager.scala | 301 ++++++- .../RocksDbKeyValueStorageEngineFactory.scala | 14 +- .../samza/storage/kv/RocksDbKeyValueStore.scala | 3 +- .../kv/BaseKeyValueStorageEngineFactory.scala | 18 +- .../storage/kv/KeyValueStorageEngine.scala | 5 +- .../samza/storage/kv/TestKeyValueStores.scala | 24 +- 16 files changed, 466 insertions(+), 915 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/38e81c0f/checkstyle/import-control.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index d0a5c66..325c381 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -202,6 +202,10 @@ <allow class="org.apache.samza.SamzaException" /> <allow class="org.apache.samza.Partition" /> + + <subpackage name="kv"> + <allow pkg="org.apache.samza.storage" /> + </subpackage> </subpackage> <subpackage name="logging"> http://git-wip-us.apache.org/repos/asf/samza/blob/38e81c0f/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java b/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java index 5463648..e30a2ab 100644 --- a/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java +++ b/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java @@ -56,4 +56,8 @@ public interface StorageEngine { */ void stop(); + /** + * Get store properties + */ + StoreProperties getStoreProperties(); } http://git-wip-us.apache.org/repos/asf/samza/blob/38e81c0f/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java b/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java index adb6264..800deeb 100644 --- a/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java +++ b/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java @@ -46,13 +46,13 @@ public interface StorageEngineFactory<K, V> { * @param containerContext Information about the container in which the task is executing. * @return The storage engine instance. */ - public StorageEngine getStorageEngine( - String storeName, - File storeDir, - Serde<K> keySerde, - Serde<V> msgSerde, - MessageCollector collector, - MetricsRegistry registry, - SystemStreamPartition changeLogSystemStreamPartition, - SamzaContainerContext containerContext); + StorageEngine getStorageEngine( + String storeName, + File storeDir, + Serde<K> keySerde, + Serde<V> msgSerde, + MessageCollector collector, + MetricsRegistry registry, + SystemStreamPartition changeLogSystemStreamPartition, + SamzaContainerContext containerContext); } http://git-wip-us.apache.org/repos/asf/samza/blob/38e81c0f/samza-api/src/main/java/org/apache/samza/storage/StoreProperties.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/storage/StoreProperties.java b/samza-api/src/main/java/org/apache/samza/storage/StoreProperties.java new file mode 100644 index 0000000..a398271 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/storage/StoreProperties.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.storage; + +/** + * Immutable class that defines the properties of a Store + */ +public class StoreProperties { + private final boolean persistedToDisk; + private final boolean loggedStore; + + private StoreProperties( + final boolean persistedToDisk, + final boolean loggedStore) { + this.persistedToDisk = persistedToDisk; + this.loggedStore = loggedStore; + } + + /** + * Flag to indicate whether a store can be persisted to disk or not + * + * @return True, if store can be flushed to disk. False, by default. + */ + public boolean isPersistedToDisk() { + return persistedToDisk; + } + + /** + * Flag to indicate whether a store is associated with a changelog (used for recovery) or not + * + * @return True, if changelog is enabled. False, by default. + */ + public boolean isLoggedStore() { + return loggedStore; + } + + public static class StorePropertiesBuilder { + private boolean persistedToDisk = false; + private boolean loggedStore = false; + + public StorePropertiesBuilder setPersistedToDisk(boolean persistedToDisk) { + this.persistedToDisk = persistedToDisk; + return this; + } + + public StorePropertiesBuilder setLoggedStore(boolean loggedStore) { + this.loggedStore = loggedStore; + return this; + } + + public StoreProperties build() { + return new StoreProperties(persistedToDisk, loggedStore); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/38e81c0f/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 5cbdb4b..18c0922 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -22,8 +22,11 @@ package org.apache.samza.container import java.io.File import java.nio.file.Path import java.util +import java.lang.Thread.UncaughtExceptionHandler +import java.net.{URL, UnknownHostException} import org.apache.samza.SamzaException import org.apache.samza.checkpoint.{CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics} +import org.apache.samza.config.JobConfig.Config2Job import org.apache.samza.config.MetricsConfig.Config2Metrics import org.apache.samza.config.SerializerConfig.Config2Serializer import org.apache.samza.config.ShellCommandConfig @@ -32,38 +35,18 @@ import org.apache.samza.config.StreamConfig.Config2Stream import org.apache.samza.config.SystemConfig.Config2System import org.apache.samza.config.TaskConfig.Config2Task import org.apache.samza.container.disk.DiskSpaceMonitor.Listener -import org.apache.samza.container.disk.WatermarkDiskQuotaPolicy.Entry -import org.apache.samza.container.disk.{NoThrottlingDiskQuotaPolicyFactory, DiskQuotaPolicyFactory, NoThrottlingDiskQuotaPolicy, WatermarkDiskQuotaPolicy, PollingScanDiskSpaceMonitor, DiskSpaceMonitor} +import org.apache.samza.container.disk.{NoThrottlingDiskQuotaPolicyFactory, DiskQuotaPolicyFactory, PollingScanDiskSpaceMonitor, DiskSpaceMonitor} import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory -import org.apache.samza.metrics.JmxServer -import org.apache.samza.metrics.JvmMetrics -import org.apache.samza.metrics.MetricsRegistryMap -import org.apache.samza.metrics.MetricsReporter -import org.apache.samza.metrics.MetricsReporterFactory -import org.apache.samza.serializers.SerdeFactory -import org.apache.samza.serializers.SerdeManager -import org.apache.samza.storage.StorageEngineFactory -import org.apache.samza.storage.TaskStorageManager -import org.apache.samza.system.StreamMetadataCache -import org.apache.samza.system.SystemConsumers -import org.apache.samza.system.SystemConsumersMetrics -import org.apache.samza.system.SystemFactory -import org.apache.samza.system.SystemProducers -import org.apache.samza.system.SystemProducersMetrics -import org.apache.samza.system.SystemStream -import org.apache.samza.system.SystemStreamPartition -import org.apache.samza.system.chooser.DefaultChooser -import org.apache.samza.system.chooser.MessageChooserFactory -import org.apache.samza.system.chooser.RoundRobinChooserFactory -import org.apache.samza.task.StreamTask -import org.apache.samza.task.TaskInstanceCollector +import org.apache.samza.job.model.{ContainerModel, JobModel} +import org.apache.samza.metrics.{JmxServer, JvmMetrics, MetricsRegistryMap, MetricsReporter, MetricsReporterFactory} +import org.apache.samza.serializers.{SerdeFactory, SerdeManager} +import org.apache.samza.serializers.model.SamzaObjectMapper +import org.apache.samza.storage.{StorageEngineFactory, TaskStorageManager} +import org.apache.samza.system.{StreamMetadataCache, SystemConsumers, SystemConsumersMetrics, SystemFactory, SystemProducers, SystemProducersMetrics, SystemStream, SystemStreamPartition} +import org.apache.samza.system.chooser.{DefaultChooser, MessageChooserFactory, RoundRobinChooserFactory} +import org.apache.samza.task.{StreamTask, TaskInstanceCollector} import org.apache.samza.util.{ThrottlingExecutor, ExponentialSleepStrategy, Logging, Util} import scala.collection.JavaConversions._ -import java.net.{UnknownHostException, InetAddress, URL} -import org.apache.samza.job.model.{TaskModel, ContainerModel, JobModel} -import org.apache.samza.serializers.model.SamzaObjectMapper -import org.apache.samza.config.JobConfig.Config2Job -import java.lang.Thread.UncaughtExceptionHandler object SamzaContainer extends Logging { val DEFAULT_READ_JOBMODEL_DELAY_MS = 100 http://git-wip-us.apache.org/repos/asf/samza/blob/38e81c0f/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala.orig ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala.orig b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala.orig deleted file mode 100644 index 086531e..0000000 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala.orig +++ /dev/null @@ -1,787 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.container - -import java.io.File -import java.nio.file.Path -import java.util -import org.apache.samza.SamzaException -import org.apache.samza.checkpoint.{CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics} -import org.apache.samza.config.MetricsConfig.Config2Metrics -import org.apache.samza.config.SerializerConfig.Config2Serializer -import org.apache.samza.config.ShellCommandConfig -import org.apache.samza.config.StorageConfig.Config2Storage -import org.apache.samza.config.StreamConfig.Config2Stream -import org.apache.samza.config.SystemConfig.Config2System -import org.apache.samza.config.TaskConfig.Config2Task -import org.apache.samza.container.disk.DiskSpaceMonitor.Listener -import org.apache.samza.container.disk.{PollingScanDiskSpaceMonitor, DiskSpaceMonitor} -import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory -import org.apache.samza.metrics.JmxServer -import org.apache.samza.metrics.JvmMetrics -import org.apache.samza.metrics.MetricsRegistryMap -import org.apache.samza.metrics.MetricsReporter -import org.apache.samza.metrics.MetricsReporterFactory -import org.apache.samza.serializers.SerdeFactory -import org.apache.samza.serializers.SerdeManager -import org.apache.samza.storage.StorageEngineFactory -import org.apache.samza.storage.TaskStorageManager -import org.apache.samza.system.StreamMetadataCache -import org.apache.samza.system.SystemConsumers -import org.apache.samza.system.SystemConsumersMetrics -import org.apache.samza.system.SystemFactory -import org.apache.samza.system.SystemProducers -import org.apache.samza.system.SystemProducersMetrics -import org.apache.samza.system.SystemStream -import org.apache.samza.system.SystemStreamPartition -import org.apache.samza.system.chooser.DefaultChooser -import org.apache.samza.system.chooser.MessageChooserFactory -import org.apache.samza.system.chooser.RoundRobinChooserFactory -import org.apache.samza.task.StreamTask -import org.apache.samza.task.TaskInstanceCollector -import org.apache.samza.util.{ExponentialSleepStrategy, Logging, Util} -import scala.collection.JavaConversions._ -import java.net.{UnknownHostException, InetAddress, URL} -import org.apache.samza.job.model.{TaskModel, ContainerModel, JobModel} -import org.apache.samza.serializers.model.SamzaObjectMapper -import org.apache.samza.config.JobConfig.Config2Job -import java.lang.Thread.UncaughtExceptionHandler - -object SamzaContainer extends Logging { - val DEFAULT_READ_JOBMODEL_DELAY_MS = 100 - - def main(args: Array[String]) { - safeMain(() => new JmxServer, new SamzaContainerExceptionHandler(() => System.exit(1))) - } - - def safeMain( - newJmxServer: () => JmxServer, - exceptionHandler: UncaughtExceptionHandler = null) { - if (exceptionHandler != null) { - Thread.setDefaultUncaughtExceptionHandler(exceptionHandler) - } - putMDC("containerName", "samza-container-" + System.getenv(ShellCommandConfig.ENV_CONTAINER_ID)) - // Break out the main method to make the JmxServer injectable so we can - // validate that we don't leak JMX non-daemon threads if we have an - // exception in the main method. - val containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID).toInt - logger.info("Got container ID: %s" format containerId) - val coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL) - logger.info("Got coordinator URL: %s" format coordinatorUrl) - val jobModel = readJobModel(coordinatorUrl) - val containerModel = jobModel.getContainers()(containerId.toInt) - val config = jobModel.getConfig - putMDC("jobName", config.getName.getOrElse(throw new SamzaException("can not find the job name"))) - putMDC("jobId", config.getJobId.getOrElse("1")) - var jmxServer: JmxServer = null - - try { - jmxServer = newJmxServer() - SamzaContainer(containerModel, jobModel, jmxServer).run - } finally { - if (jmxServer != null) { - jmxServer.stop - } - } - } - - /** - * Fetches config, task:SSP assignments, and task:changelog partition - * assignments, and returns objects to be used for SamzaContainer's - * constructor. - */ - def readJobModel(url: String, initialDelayMs: Int = scala.util.Random.nextInt(DEFAULT_READ_JOBMODEL_DELAY_MS) + 1) = { - info("Fetching configuration from: %s" format url) - SamzaObjectMapper - .getObjectMapper - .readValue( - Util.read( - url = new URL(url), - retryBackoff = new ExponentialSleepStrategy(initialDelayMs = initialDelayMs)), - classOf[JobModel]) - } - - def apply(containerModel: ContainerModel, jobModel: JobModel, jmxServer: JmxServer) = { - val config = jobModel.getConfig - val containerId = containerModel.getContainerId - val containerName = "samza-container-%s" format containerId - val containerPID = Util.getContainerPID - - info("Setting up Samza container: %s" format containerName) - info("Samza container PID: %s" format containerPID) - info("Using configuration: %s" format config) - info("Using container model: %s" format containerModel) - - val registry = new MetricsRegistryMap(containerName) - val samzaContainerMetrics = new SamzaContainerMetrics(containerName, registry) - val systemProducersMetrics = new SystemProducersMetrics(registry) - val systemConsumersMetrics = new SystemConsumersMetrics(registry) - val offsetManagerMetrics = new OffsetManagerMetrics(registry) - - val inputSystemStreamPartitions = containerModel - .getTasks - .values - .flatMap(_.getSystemStreamPartitions) - .toSet - - val inputSystemStreams = inputSystemStreamPartitions - .map(_.getSystemStream) - .toSet - - val inputSystems = inputSystemStreams - .map(_.getSystem) - .toSet - - val systemNames = config.getSystemNames - - info("Got system names: %s" format systemNames) - - val serdeStreams = systemNames.foldLeft(Set[SystemStream]())(_ ++ config.getSerdeStreams(_)) - - debug("Got serde streams: %s" format serdeStreams) - - val serdeNames = config.getSerdeNames - - info("Got serde names: %s" format serdeNames) - - val systemFactories = systemNames.map(systemName => { - val systemFactoryClassName = config - .getSystemFactory(systemName) - .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName)) - (systemName, Util.getObj[SystemFactory](systemFactoryClassName)) - }).toMap - - val systemAdmins = systemNames - .map(systemName => (systemName, systemFactories(systemName).getAdmin(systemName, config))) - .toMap - - info("Got system factories: %s" format systemFactories.keys) - - val streamMetadataCache = new StreamMetadataCache(systemAdmins) - val inputStreamMetadata = streamMetadataCache.getStreamMetadata(inputSystemStreams) - - info("Got input stream metadata: %s" format inputStreamMetadata) - - val consumers = inputSystems - .map(systemName => { - val systemFactory = systemFactories(systemName) - - try { - (systemName, systemFactory.getConsumer(systemName, config, samzaContainerMetrics.registry)) - } catch { - case e: Exception => - error("Failed to create a consumer for %s, so skipping." format(systemName), e) - (systemName, null) - } - }) - .filter(_._2 != null) - .toMap - - info("Got system consumers: %s" format consumers.keys) - - val producers = systemFactories - .map { - case (systemName, systemFactory) => - try { - (systemName, systemFactory.getProducer(systemName, config, samzaContainerMetrics.registry)) - } catch { - case e: Exception => - error("Failed to create a producer for %s, so skipping." format(systemName), e) - (systemName, null) - } - } - .filter(_._2 != null) - .toMap - - info("Got system producers: %s" format producers.keys) - - val serdes = serdeNames.map(serdeName => { - val serdeClassName = config - .getSerdeClass(serdeName) - .getOrElse(Util.defaultSerdeFactoryFromSerdeName(serdeName)) - - val serde = Util.getObj[SerdeFactory[Object]](serdeClassName) - .getSerde(serdeName, config) - - (serdeName, serde) - }).toMap - - info("Got serdes: %s" format serdes.keys) - - /* - * A Helper function to build a Map[String, Serde] (systemName -> Serde) for systems defined in the config. This is useful to build both key and message serde maps. - */ - val buildSystemSerdeMap = (getSerdeName: (String) => Option[String]) => { - systemNames - .filter( sn => { - val serde = getSerdeName(sn) - serde.isDefined && !serde.get.equals("") - }).map(systemName => { - val serdeName = getSerdeName(systemName).get - val serde = serdes.getOrElse(serdeName, throw new SamzaException("No class defined for serde: %s." format serdeName)) - (systemName, serde) - }).toMap - } - - /* - * A Helper function to build a Map[SystemStream, Serde] for streams defined in the config. This is useful to build both key and message serde maps. - */ - val buildSystemStreamSerdeMap = (getSerdeName: (SystemStream) => Option[String]) => { - (serdeStreams ++ inputSystemStreamPartitions) - .filter(systemStream => getSerdeName(systemStream).isDefined) - .map(systemStream => { - val serdeName = getSerdeName(systemStream).get - val serde = serdes.getOrElse(serdeName, throw new SamzaException("No class defined for serde: %s." format serdeName)) - (systemStream, serde) - }).toMap - } - - val systemKeySerdes = buildSystemSerdeMap((systemName: String) => config.getSystemKeySerde(systemName)) - - debug("Got system key serdes: %s" format systemKeySerdes) - - val systemMessageSerdes = buildSystemSerdeMap((systemName: String) => config.getSystemMsgSerde(systemName)) - - debug("Got system message serdes: %s" format systemMessageSerdes) - - val systemStreamKeySerdes = buildSystemStreamSerdeMap((systemStream: SystemStream) => config.getStreamKeySerde(systemStream)) - - debug("Got system stream key serdes: %s" format systemStreamKeySerdes) - - val systemStreamMessageSerdes = buildSystemStreamSerdeMap((systemStream: SystemStream) => config.getStreamMsgSerde(systemStream)) - - debug("Got system stream message serdes: %s" format systemStreamMessageSerdes) - - val changeLogSystemStreams = config - .getStoreNames - .filter(config.getChangelogStream(_).isDefined) - .map(name => (name, config.getChangelogStream(name).get)).toMap - .mapValues(Util.getSystemStreamFromNames(_)) - - info("Got change log system streams: %s" format changeLogSystemStreams) - - val serdeManager = new SerdeManager( - serdes = serdes, - systemKeySerdes = systemKeySerdes, - systemMessageSerdes = systemMessageSerdes, - systemStreamKeySerdes = systemStreamKeySerdes, - systemStreamMessageSerdes = systemStreamMessageSerdes, - changeLogSystemStreams = changeLogSystemStreams.values.toSet) - - info("Setting up JVM metrics.") - - val jvm = new JvmMetrics(samzaContainerMetrics.registry) - - info("Setting up message chooser.") - - val chooserFactoryClassName = config.getMessageChooserClass.getOrElse(classOf[RoundRobinChooserFactory].getName) - - val chooserFactory = Util.getObj[MessageChooserFactory](chooserFactoryClassName) - - val chooser = DefaultChooser(inputStreamMetadata, chooserFactory, config, samzaContainerMetrics.registry) - - info("Setting up metrics reporters.") - - val reporters = config.getMetricReporterNames.map(reporterName => { - val metricsFactoryClassName = config - .getMetricsFactoryClass(reporterName) - .getOrElse(throw new SamzaException("Metrics reporter %s missing .class config" format reporterName)) - - val reporter = - Util - .getObj[MetricsReporterFactory](metricsFactoryClassName) - .getMetricsReporter(reporterName, containerName, config) - (reporterName, reporter) - }).toMap - - info("Got metrics reporters: %s" format reporters.keys) - - val securityManager = config.getSecurityManagerFactory match { - case Some(securityManagerFactoryClassName) => - Util - .getObj[SecurityManagerFactory](securityManagerFactoryClassName) - .getSecurityManager(config) - case _ => null - } - info("Got security manager: %s" format securityManager) - - val coordinatorSystemProducer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemProducer(config, samzaContainerMetrics.registry) - val localityManager = new LocalityManager(coordinatorSystemProducer) - val checkpointManager = config.getCheckpointManagerFactory() match { - case Some(checkpointFactoryClassName) if (!checkpointFactoryClassName.isEmpty) => - Util - .getObj[CheckpointManagerFactory](checkpointFactoryClassName) - .getCheckpointManager(config, samzaContainerMetrics.registry) - case _ => null - } - info("Got checkpoint manager: %s" format checkpointManager) - - val offsetManager = OffsetManager(inputStreamMetadata, config, checkpointManager, systemAdmins, offsetManagerMetrics) - - info("Got offset manager: %s" format offsetManager) - - val dropDeserializationError = config.getDropDeserialization match { - case Some(dropError) => dropError.toBoolean - case _ => false - } - - val dropSerializationError = config.getDropSerialization match { - case Some(dropError) => dropError.toBoolean - case _ => false - } - - val pollIntervalMs = config - .getPollIntervalMs - .getOrElse(SystemConsumers.DEFAULT_POLL_INTERVAL_MS.toString) - .toInt - - val consumerMultiplexer = new SystemConsumers( - chooser = chooser, - consumers = consumers, - serdeManager = serdeManager, - metrics = systemConsumersMetrics, - dropDeserializationError = dropDeserializationError, - pollIntervalMs = pollIntervalMs) - - val producerMultiplexer = new SystemProducers( - producers = producers, - serdeManager = serdeManager, - metrics = systemProducersMetrics, - dropSerializationError = dropSerializationError) - - val storageEngineFactories = config - .getStoreNames - .map(storeName => { - val storageFactoryClassName = config - .getStorageFactoryClassName(storeName) - .getOrElse(throw new SamzaException("Missing storage factory for %s." format storeName)) - (storeName, Util.getObj[StorageEngineFactory[Object, Object]](storageFactoryClassName)) - }).toMap - - info("Got storage engines: %s" format storageEngineFactories.keys) - - val taskClassName = config - .getTaskClass - .getOrElse(throw new SamzaException("No task class defined in configuration.")) - - info("Got stream task class: %s" format taskClassName) - - val taskWindowMs = config.getWindowMs.getOrElse(-1L) - - info("Got window milliseconds: %s" format taskWindowMs) - - val taskCommitMs = config.getCommitMs.getOrElse(60000L) - - info("Got commit milliseconds: %s" format taskCommitMs) - - val taskShutdownMs = config.getShutdownMs.getOrElse(5000L) - - info("Got shutdown timeout milliseconds: %s" format taskShutdownMs) - - // Wire up all task-instance-level (unshared) objects. - - val taskNames = containerModel - .getTasks - .values - .map(_.getTaskName) - .toSet - val containerContext = new SamzaContainerContext(containerId, config, taskNames) - - // TODO not sure how we should make this config based, or not. Kind of - // strange, since it has some dynamic directories when used with YARN. - val defaultStoreBaseDir = new File(System.getProperty("user.dir"), "state") - info("Got default storage engine base directory: %s" format defaultStoreBaseDir) - - val storeWatchPaths = new util.HashSet[Path]() - storeWatchPaths.add(defaultStoreBaseDir.toPath) - - val taskInstances: Map[TaskName, TaskInstance] = containerModel.getTasks.values.map(taskModel => { - debug("Setting up task instance: %s" format taskModel) - - val taskName = taskModel.getTaskName - - val task = Util.getObj[StreamTask](taskClassName) - - val taskInstanceMetrics = new TaskInstanceMetrics("TaskName-%s" format taskName) - - val collector = new TaskInstanceCollector(producerMultiplexer, taskInstanceMetrics) - - val storeConsumers = changeLogSystemStreams - .map { - case (storeName, changeLogSystemStream) => - val systemConsumer = systemFactories - .getOrElse(changeLogSystemStream.getSystem, throw new SamzaException("Changelog system %s for store %s does not exist in the config." format (changeLogSystemStream, storeName))) - .getConsumer(changeLogSystemStream.getSystem, config, taskInstanceMetrics.registry) - samzaContainerMetrics.addStoreRestorationGauge(taskName, storeName) - (storeName, systemConsumer) - }.toMap - - info("Got store consumers: %s" format storeConsumers) - - var loggedStorageBaseDir: File = null - if(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR) != null) { - val jobNameAndId = Util.getJobNameAndId(config) - loggedStorageBaseDir = new File(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR) + File.separator + jobNameAndId._1 + "-" + jobNameAndId._2) - } else { - warn("No override was provided for logged store base directory. This disables local state re-use on " + - "application restart. If you want to enable this feature, set LOGGED_STORE_BASE_DIR as an environment " + - "variable in all machines running the Samza container") - loggedStorageBaseDir = defaultStoreBaseDir - } - - storeWatchPaths.add(loggedStorageBaseDir.toPath) - - info("Got base directory for logged data stores: %s" format loggedStorageBaseDir) - - val taskStores = storageEngineFactories - .map { - case (storeName, storageEngineFactory) => - val changeLogSystemStreamPartition = if (changeLogSystemStreams.contains(storeName)) { - new SystemStreamPartition(changeLogSystemStreams(storeName), taskModel.getChangelogPartition) - } else { - null - } - val keySerde = config.getStorageKeySerde(storeName) match { - case Some(keySerde) => serdes.getOrElse(keySerde, throw new SamzaException("No class defined for serde: %s." format keySerde)) - case _ => null - } - val msgSerde = config.getStorageMsgSerde(storeName) match { - case Some(msgSerde) => serdes.getOrElse(msgSerde, throw new SamzaException("No class defined for serde: %s." format msgSerde)) - case _ => null - } - val storeBaseDir = if(changeLogSystemStreamPartition != null) { - TaskStorageManager.getStorePartitionDir(loggedStorageBaseDir, storeName, taskName) - } - else { - TaskStorageManager.getStorePartitionDir(defaultStoreBaseDir, storeName, taskName) - } - val storageEngine = storageEngineFactory.getStorageEngine( - storeName, - storeBaseDir, - keySerde, - msgSerde, - collector, - taskInstanceMetrics.registry, - changeLogSystemStreamPartition, - containerContext) - (storeName, storageEngine) - } - - info("Got task stores: %s" format taskStores) - - val storageManager = new TaskStorageManager( - taskName = taskName, - taskStores = taskStores, - storeConsumers = storeConsumers, - changeLogSystemStreams = changeLogSystemStreams, - jobModel.maxChangeLogStreamPartitions, - streamMetadataCache = streamMetadataCache, - storeBaseDir = defaultStoreBaseDir, - loggedStoreBaseDir = loggedStorageBaseDir, - partition = taskModel.getChangelogPartition, - systemAdmins = systemAdmins) - - val systemStreamPartitions = taskModel - .getSystemStreamPartitions - .toSet - - info("Retrieved SystemStreamPartitions " + systemStreamPartitions + " for " + taskName) - - val taskInstance = new TaskInstance( - task = task, - taskName = taskName, - config = config, - metrics = taskInstanceMetrics, - systemAdmins = systemAdmins, - consumerMultiplexer = consumerMultiplexer, - collector = collector, - containerContext = containerContext, - offsetManager = offsetManager, - storageManager = storageManager, - reporters = reporters, - systemStreamPartitions = systemStreamPartitions, - exceptionHandler = TaskInstanceExceptionHandler(taskInstanceMetrics, config)) - - (taskName, taskInstance) - }).toMap - - val diskPollMillis = config.getInt("container.disk.poll.interval.ms", 0) - var diskSpaceMonitor: DiskSpaceMonitor = null - if (diskPollMillis != 0) { - val diskUsage = samzaContainerMetrics.createOrGetDiskUsageGauge() - - diskSpaceMonitor = new PollingScanDiskSpaceMonitor(storeWatchPaths, diskPollMillis) - diskSpaceMonitor.registerListener(new Listener { - override def onUpdate(diskUsageSample: Long): Unit = - diskUsage.set(diskUsageSample) - }) - - info("Initialized disk space monitor watch paths to: %s" format storeWatchPaths) - } - - val runLoop = new RunLoop( - taskInstances = taskInstances, - consumerMultiplexer = consumerMultiplexer, - metrics = samzaContainerMetrics, - windowMs = taskWindowMs, - commitMs = taskCommitMs, - shutdownMs = taskShutdownMs) - - info("Samza container setup complete.") - - new SamzaContainer( - containerContext = containerContext, - taskInstances = taskInstances, - runLoop = runLoop, - consumerMultiplexer = consumerMultiplexer, - producerMultiplexer = producerMultiplexer, - offsetManager = offsetManager, - localityManager = localityManager, - securityManager = securityManager, - metrics = samzaContainerMetrics, - reporters = reporters, - jvm = jvm, - jmxServer = jmxServer, - diskSpaceMonitor = diskSpaceMonitor) - } -} - -class SamzaContainer( - containerContext: SamzaContainerContext, - taskInstances: Map[TaskName, TaskInstance], - runLoop: RunLoop, - consumerMultiplexer: SystemConsumers, - producerMultiplexer: SystemProducers, - metrics: SamzaContainerMetrics, - jmxServer: JmxServer, - diskSpaceMonitor: DiskSpaceMonitor = null, - offsetManager: OffsetManager = new OffsetManager, - localityManager: LocalityManager = null, - securityManager: SecurityManager = null, - reporters: Map[String, MetricsReporter] = Map(), - jvm: JvmMetrics = null) extends Runnable with Logging { - - def run { - try { - info("Starting container.") - - startMetrics - startOffsetManager - startLocalityManager - startStores - startDiskSpaceMonitor - startProducers - startTask - startConsumers - startSecurityManger - - info("Entering run loop.") - runLoop.run - } catch { - case e: Exception => - error("Caught exception in process loop.", e) - throw e - } finally { - info("Shutting down.") - - shutdownConsumers - shutdownTask - shutdownStores - shutdownDiskSpaceMonitor - shutdownProducers - shutdownLocalityManager - shutdownOffsetManager - shutdownMetrics - shutdownSecurityManger - - info("Shutdown complete.") - } - } - - def startDiskSpaceMonitor: Unit = { - if (diskSpaceMonitor != null) { - info("Starting disk space monitor") - diskSpaceMonitor.start() - } - } - - def startMetrics { - info("Registering task instances with metrics.") - - taskInstances.values.foreach(_.registerMetrics) - - info("Starting JVM metrics.") - - if (jvm != null) { - jvm.start - } - - info("Starting metrics reporters.") - - reporters.values.foreach(reporter => { - reporter.register(metrics.source, metrics.registry) - reporter.start - }) - } - - def startOffsetManager { - info("Registering task instances with offsets.") - - taskInstances.values.foreach(_.registerOffsets) - - info("Starting offset manager.") - - offsetManager.start - } - - def startLocalityManager { - if(localityManager != null) { - info("Registering localityManager for the container") - localityManager.start - localityManager.register(String.valueOf(containerContext.id)) - - info("Writing container locality and JMX address to Coordinator Stream") - try { - val hostInet = Util.getLocalHost - val jmxUrl = if (jmxServer != null) jmxServer.getJmxUrl else "" - val jmxTunnelingUrl = if (jmxServer != null) jmxServer.getTunnelingJmxUrl else "" - localityManager.writeContainerToHostMapping(containerContext.id, hostInet.getHostName, jmxUrl, jmxTunnelingUrl) - } catch { - case uhe: UnknownHostException => - warn("Received UnknownHostException when persisting locality info for container %d: %s" format (containerContext.id, uhe.getMessage)) //No-op - case unknownException: Throwable => - warn("Received an exception when persisting locality info for container %d: %s" format (containerContext.id, unknownException.getMessage)) - } - } - } - - def startStores { - info("Starting task instance stores.") - taskInstances.values.foreach(taskInstance => { - val startTime = System.currentTimeMillis() - taskInstance.startStores - // Measuring the time to restore the stores - val timeToRestore = System.currentTimeMillis() - startTime - val taskGauge = metrics.taskStoreRestorationMetrics.getOrElse(taskInstance.taskName, null) - if (taskGauge != null) { - taskGauge.set(timeToRestore) - } - }) - } - - def startTask { - info("Initializing stream tasks.") - - taskInstances.values.foreach(_.initTask) - } - - def startProducers { - info("Registering task instances with producers.") - - taskInstances.values.foreach(_.registerProducers) - - info("Starting producer multiplexer.") - - producerMultiplexer.start - } - - def startConsumers { - info("Registering task instances with consumers.") - - taskInstances.values.foreach(_.registerConsumers) - - info("Starting consumer multiplexer.") - - consumerMultiplexer.start - } - - def startSecurityManger: Unit = { - if (securityManager != null) { - info("Starting security manager.") - - securityManager.start - } - } - - def shutdownConsumers { - info("Shutting down consumer multiplexer.") - - consumerMultiplexer.stop - } - - def shutdownProducers { - info("Shutting down producer multiplexer.") - - producerMultiplexer.stop - } - - def shutdownTask { - info("Shutting down task instance stream tasks.") - - taskInstances.values.foreach(_.shutdownTask) - } - - def shutdownStores { - info("Shutting down task instance stores.") - - taskInstances.values.foreach(_.shutdownStores) - } - - def shutdownLocalityManager { - if(localityManager != null) { - info("Shutting down locality manager.") - localityManager.stop - } - } - - def shutdownOffsetManager { - info("Shutting down offset manager.") - - offsetManager.stop - } - - - def shutdownMetrics { - info("Shutting down metrics reporters.") - - reporters.values.foreach(_.stop) - - if (jvm != null) { - info("Shutting down JVM metrics.") - - jvm.stop - } - } - - def shutdownSecurityManger: Unit = { - if (securityManager != null) { - info("Shutting down security manager.") - - securityManager.stop - } - } - - def shutdownDiskSpaceMonitor: Unit = { - if (diskSpaceMonitor != null) { - info("Shutting down disk space monitor.") - diskSpaceMonitor.stop() - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/38e81c0f/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala index 2a3535e..0b7bcdd 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala @@ -21,13 +21,13 @@ package org.apache.samza.storage import java.io._ import java.util -import scala.collection.{JavaConversions, Map} -import org.apache.samza.util.Logging -import org.apache.samza.Partition -import org.apache.samza.system._ -import org.apache.samza.util.Util -import org.apache.samza.SamzaException + +import org.apache.samza.{Partition, SamzaException} import org.apache.samza.container.TaskName +import org.apache.samza.system._ +import org.apache.samza.util.{Logging, Util} + +import scala.collection.{JavaConversions, Map} object TaskStorageManager { def getStoreDir(storeBaseDir: File, storeName: String) = { @@ -55,7 +55,13 @@ class TaskStorageManager( partition: Partition, systemAdmins: Map[String, SystemAdmin]) extends Logging { - var taskStoresToRestore = taskStores + var taskStoresToRestore = taskStores.filter{ + case (storeName, storageEngine) => storageEngine.getStoreProperties.isLoggedStore + } + val persistedStores = taskStores.filter{ + case (storeName, storageEngine) => storageEngine.getStoreProperties.isPersistedToDisk + } + var changeLogOldestOffsets: Map[SystemStream, String] = Map() val fileOffset: util.Map[SystemStreamPartition, String] = new util.HashMap[SystemStreamPartition, String]() val offsetFileName = "OFFSET" @@ -63,29 +69,15 @@ class TaskStorageManager( def apply(storageEngineName: String) = taskStores(storageEngineName) def init { - cleanBaseDirs - setupBaseDirs - validateChangelogStreams - startConsumers - restoreStores - stopConsumers + cleanBaseDirs() + setupBaseDirs() + validateChangelogStreams() + startConsumers() + restoreStores() + stopConsumers() } - private def setupBaseDirs { - debug("Setting up base directories for stores.") - - val loggedStores = changeLogSystemStreams.keySet - - (taskStores.keySet -- loggedStores) - .foreach(TaskStorageManager.getStorePartitionDir(storeBaseDir, _, taskName).mkdirs) - - loggedStores.foreach(storeName => { - val loggedStoragePartitionDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName) - if(!loggedStoragePartitionDir.exists()) loggedStoragePartitionDir.mkdirs - }) - } - - private def cleanBaseDirs { + private def cleanBaseDirs() { debug("Cleaning base directories for stores.") taskStores.keys.foreach(storeName => { @@ -99,14 +91,27 @@ class TaskStorageManager( val loggedStoragePartitionDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName) info("Got logged storage partition directory as %s" format loggedStoragePartitionDir.toPath.toString) - // If we find valid offsets s.t. we can restore the state, keep the disk files. Otherwise, delete them. - if(!readOffsetFile(storeName, loggedStoragePartitionDir) && loggedStoragePartitionDir.exists()) { - Util.rm(loggedStoragePartitionDir) + if (!persistedStores.contains(storeName) || + (loggedStoragePartitionDir.exists() && !readOffsetFile(storeName, loggedStoragePartitionDir))) { + Util.rm(loggedStoragePartitionDir) } }) } + private def setupBaseDirs() { + debug("Setting up base directories for stores.") + taskStores.foreach { + case (storeName, storageEngine) => + if (storageEngine.getStoreProperties.isLoggedStore) { + val loggedStoragePartitionDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName) + if (!loggedStoragePartitionDir.exists()) loggedStoragePartitionDir.mkdirs() + } else { + TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName).mkdirs() + } + } + } + /** * Attempts to read the offset file and returns {@code true} if the offsets were read successfully. * @@ -130,7 +135,7 @@ class TaskStorageManager( offsetsRead } - private def validateChangelogStreams = { + private def validateChangelogStreams() = { info("Validating change log streams") for ((storeName, systemStream) <- changeLogSystemStreams) { @@ -147,7 +152,7 @@ class TaskStorageManager( info("Assigning oldest change log offsets for taskName %s: %s" format (taskName, changeLogOldestOffsets)) } - private def startConsumers { + private def startConsumers() { debug("Starting consumers for stores.") for ((storeName, systemStream) <- changeLogSystemStreams) { @@ -170,7 +175,7 @@ class TaskStorageManager( storeConsumers.values.foreach(_.start) } - private def restoreStores { + private def restoreStores() { debug("Restoring stores.") for ((storeName, store) <- taskStoresToRestore) { @@ -184,7 +189,7 @@ class TaskStorageManager( } } - private def stopConsumers { + private def stopConsumers() { debug("Stopping consumers for stores.") storeConsumers.values.foreach(_.stop) @@ -219,7 +224,7 @@ class TaskStorageManager( private def flushChangelogOffsetFiles() { debug("Persisting logged key value stores") - for ((storeName, systemStream) <- changeLogSystemStreams) { + for ((storeName, systemStream) <- changeLogSystemStreams.filterKeys(storeName => persistedStores.contains(storeName))) { val systemAdmin = systemAdmins .getOrElse(systemStream.getSystem, throw new SamzaException("Unable to get systemAdmin for store " + storeName + " and systemStream" + systemStream)) http://git-wip-us.apache.org/repos/asf/samza/blob/38e81c0f/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java index b90ea87..4f71a54 100644 --- a/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java +++ b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java @@ -36,11 +36,13 @@ public class MockStorageEngine implements StorageEngine { public static File storeDir; public static SystemStreamPartition ssp; public static ArrayList<IncomingMessageEnvelope> incomingMessageEnvelopes = new ArrayList<IncomingMessageEnvelope>(); + public static StoreProperties storeProperties; - public MockStorageEngine(String storeName, File storeDir, SystemStreamPartition changeLogSystemStreamPartition) { + public MockStorageEngine(String storeName, File storeDir, SystemStreamPartition changeLogSystemStreamPartition, StoreProperties properties) { MockStorageEngine.storeName = storeName; MockStorageEngine.storeDir = storeDir; MockStorageEngine.ssp = changeLogSystemStreamPartition; + MockStorageEngine.storeProperties = properties; } @Override @@ -57,4 +59,9 @@ public class MockStorageEngine implements StorageEngine { @Override public void stop() { } + + @Override + public StoreProperties getStoreProperties() { + return storeProperties; + } } http://git-wip-us.apache.org/repos/asf/samza/blob/38e81c0f/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java index c00c454..d483ae6 100644 --- a/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java +++ b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java @@ -32,6 +32,7 @@ public class MockStorageEngineFactory implements StorageEngineFactory<Object, Ob public StorageEngine getStorageEngine(String storeName, File storeDir, Serde<Object> keySerde, Serde<Object> msgSerde, MessageCollector collector, MetricsRegistry registry, SystemStreamPartition changeLogSystemStreamPartition, SamzaContainerContext containerContext) { - return new MockStorageEngine(storeName, storeDir, changeLogSystemStreamPartition); + StoreProperties storeProperties = new StoreProperties.StorePropertiesBuilder().setLoggedStore(true).build(); + return new MockStorageEngine(storeName, storeDir, changeLogSystemStreamPartition, storeProperties); } } http://git-wip-us.apache.org/repos/asf/samza/blob/38e81c0f/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java b/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java index 13f4fa9..21d0150 100644 --- a/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java +++ b/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java @@ -62,7 +62,7 @@ public class TestStorageRecovery { Set<String> set1 = new HashSet<String>(Arrays.asList(SYSTEM_STREAM_NAME)); Set<String> set2 = new HashSet<String>(Arrays.asList(INPUT_STREAM)); - HashMap<String, SystemStreamMetadata> ssmMap = new HashMap<String, SystemStreamMetadata>(); + HashMap<String, SystemStreamMetadata> ssmMap = new HashMap<>(); ssmMap.put(SYSTEM_STREAM_NAME, systemStreamMetadata); ssmMap.put(INPUT_STREAM, inputSystemStreamMetadata); when(systemAdmin.getSystemStreamMetadata(set1)).thenReturn(ssmMap); http://git-wip-us.apache.org/repos/asf/samza/blob/38e81c0f/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala index e126481..4d40f52 100644 --- a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala +++ b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala @@ -21,18 +21,23 @@ package org.apache.samza.storage import java.io.File -import org.junit.{After, Before, Test} -import org.junit.Assert._ -import org.scalatest.mock.MockitoSugar -import org.mockito.Mockito._ -import org.mockito.Matchers._ -import scala.collection.JavaConversions +import java.util +import org.apache.samza.Partition import org.apache.samza.container.TaskName -import org.apache.samza.util.Util -import org.apache.samza.system._ +import org.apache.samza.storage.StoreProperties.StorePropertiesBuilder import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata -import org.apache.samza.Partition +import org.apache.samza.system._ +import org.apache.samza.util.Util +import org.junit.Assert._ +import org.junit.{After, Before, Test} +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer +import org.scalatest.mock.MockitoSugar + +import scala.collection.JavaConversions class TestTaskStorageManager extends MockitoSugar { @@ -54,16 +59,221 @@ class TestTaskStorageManager extends MockitoSugar { Util.rm(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir) } + /** + * This tests the entire TaskStorageManager lifecycle for a Persisted Logged Store + * For example, a RocksDb store with changelog needs to continuously update the offset file on flush & stop + * When the task is restarted, it should restore correctly from the offset in the OFFSET file on disk (if available) + */ + @Test + def testStoreLifecycleForLoggedPersistedStore(): Unit = { + // Basic test setup of SystemStream, SystemStreamPartition for this task + val ss = new SystemStream("kafka", "testStream") + val partition = new Partition(0) + val ssp = new SystemStreamPartition(ss, partition) + val storeDirectory = TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName) + val storeFile = new File(storeDirectory, "store.sst") + val offsetFile = new File(storeDirectory, "OFFSET") + + // getStoreProperties should always return the same StoreProperties + val mockStorageEngine = mock[StorageEngine] + when(mockStorageEngine.getStoreProperties).thenAnswer(new Answer[StoreProperties] { + override def answer(invocation: InvocationOnMock): StoreProperties = { + new StorePropertiesBuilder().setLoggedStore(true).setPersistedToDisk(true).build() + } + }) + // Restore simply creates the file + when(mockStorageEngine.restore(any())).thenAnswer(new Answer[Unit] { + override def answer(invocation: InvocationOnMock): Unit = { + storeFile.createNewFile() + } + }) + + // Mock for StreamMetadataCache, SystemConsumer, SystemAdmin + val mockStreamMetadataCache = mock[StreamMetadataCache] + val mockSystemConsumer = mock[SystemConsumer] + val mockSystemAdmin = mock[SystemAdmin] + doNothing().when(mockSystemAdmin).validateChangelogStream("testStream", 1) + var registerOffset = "0" + when(mockSystemConsumer.register(any(), any())).thenAnswer(new Answer[Unit] { + override def answer(invocation: InvocationOnMock): Unit = { + val args = invocation.getArguments + if (ssp.equals(args.apply(0).asInstanceOf[SystemStreamPartition])) { + val offset = args.apply(1).asInstanceOf[String] + assertNotNull(offset) + assertEquals(registerOffset, offset) + } + } + }) + doNothing().when(mockSystemConsumer).stop() + + // Test 1: Initial invocation - No store on disk (only changelog has data) + // Setup initial sspMetadata + val sspMetadata = new SystemStreamPartitionMetadata("0", "50", "51") + var metadata = new SystemStreamMetadata("testStream", new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() { + { + put(partition, sspMetadata) + } + }) + when(mockStreamMetadataCache.getStreamMetadata(any(), any())).thenReturn(Map(ss -> metadata)) + when(mockSystemAdmin.getSystemStreamMetadata(any())).thenReturn(new util.HashMap[String, SystemStreamMetadata](){ + { + put("testStream", metadata) + } + }) + val taskManager = new TaskStorageManagerBuilder() + .addStore(loggedStore, mockStorageEngine, mockSystemConsumer) + .setStreamMetadataCache(mockStreamMetadataCache) + .setSystemAdmin("kafka", mockSystemAdmin) + .build + + + taskManager.init + + assertTrue(storeFile.exists()) + assertFalse(offsetFile.exists()) + + // Test 2: flush should update the offset file + taskManager.flush() + assertTrue(offsetFile.exists()) + assertEquals("50", Util.readDataFromFile(offsetFile)) + + // Test 3: Update sspMetadata before shutdown and verify that offset file is updated correctly + metadata = new SystemStreamMetadata("testStream", new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() { + { + put(partition, new SystemStreamPartitionMetadata("0", "100", "101")) + } + }) + when(mockStreamMetadataCache.getStreamMetadata(any(), any())).thenReturn(Map(ss -> metadata)) + when(mockSystemAdmin.getSystemStreamMetadata(any())).thenReturn(new util.HashMap[String, SystemStreamMetadata](){ + { + put("testStream", metadata) + } + }) + taskManager.stop() + assertTrue(storeFile.exists()) + assertTrue(offsetFile.exists()) + assertEquals("100", Util.readDataFromFile(offsetFile)) + + + // Test 4: Initialize again with an updated sspMetadata; Verify that it restores from the correct offset + metadata = new SystemStreamMetadata("testStream", new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() { + { + put(partition, new SystemStreamPartitionMetadata("0", "150", "151")) + } + }) + when(mockStreamMetadataCache.getStreamMetadata(any(), any())).thenReturn(Map(ss -> metadata)) + when(mockSystemAdmin.getSystemStreamMetadata(any())).thenReturn(new util.HashMap[String, SystemStreamMetadata](){ + { + put("testStream", metadata) + } + }) + registerOffset = "100" + + taskManager.init + + assertTrue(storeFile.exists()) + assertTrue(offsetFile.exists()) + } + + /** + * This tests the entire TaskStorageManager lifecycle for an InMemory Logged Store + * For example, an InMemory KV store with changelog should not update the offset file on flush & stop + * When the task is restarted, it should ALWAYS restore correctly from the earliest offset + */ + @Test + def testStoreLifecycleForLoggedInMemoryStore(): Unit = { + // Basic test setup of SystemStream, SystemStreamPartition for this task + val ss = new SystemStream("kafka", "testStream") + val partition = new Partition(0) + val ssp = new SystemStreamPartition(ss, partition) + val storeDirectory = TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultStoreBaseDir, store, taskName) + + // getStoreProperties should always return the same StoreProperties + val mockStorageEngine = mock[StorageEngine] + when(mockStorageEngine.getStoreProperties).thenAnswer(new Answer[StoreProperties] { + override def answer(invocation: InvocationOnMock): StoreProperties = { + new StorePropertiesBuilder().setLoggedStore(true).setPersistedToDisk(false).build() + } + }) + // Restore simply creates the file + doNothing().when(mockStorageEngine).restore(any()) + + // Mock for StreamMetadataCache, SystemConsumer, SystemAdmin + val mockStreamMetadataCache = mock[StreamMetadataCache] + val mockSystemAdmin = mock[SystemAdmin] + doNothing().when(mockSystemAdmin).validateChangelogStream("testStream", 1) + + val mockSystemConsumer = mock[SystemConsumer] + when(mockSystemConsumer.register(any(), any())).thenAnswer(new Answer[Unit] { + override def answer(invocation: InvocationOnMock): Unit = { + val args = invocation.getArguments + if (ssp.equals(args.apply(0).asInstanceOf[SystemStreamPartition])) { + val offset = args.apply(1).asInstanceOf[String] + assertNotNull(offset) + assertEquals("0", offset) // Should always restore from earliest offset + } + } + }) + doNothing().when(mockSystemConsumer).stop() + + // Test 1: Initial invocation - No store data (only changelog has data) + // Setup initial sspMetadata + val sspMetadata = new SystemStreamPartitionMetadata("0", "50", "51") + var metadata = new SystemStreamMetadata("testStream", new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() { + { + put(partition, sspMetadata) + } + }) + when(mockStreamMetadataCache.getStreamMetadata(any(), any())).thenReturn(Map(ss -> metadata)) + val taskManager = new TaskStorageManagerBuilder() + .addStore(store, mockStorageEngine, mockSystemConsumer) + .setStreamMetadataCache(mockStreamMetadataCache) + .setSystemAdmin("kafka", mockSystemAdmin) + .build + + + taskManager.init + + // Verify that the store directory doesn't have ANY files + assertNull(storeDirectory.listFiles()) + + // Test 2: flush should NOT create/update the offset file. Store directory has no files + taskManager.flush() + assertNull(storeDirectory.listFiles()) + + // Test 3: Update sspMetadata before shutdown and verify that offset file is NOT created + metadata = new SystemStreamMetadata("testStream", new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() { + { + put(partition, new SystemStreamPartitionMetadata("0", "100", "101")) + } + }) + when(mockStreamMetadataCache.getStreamMetadata(any(), any())).thenReturn(Map(ss -> metadata)) + taskManager.stop() + assertNull(storeDirectory.listFiles()) + + // Test 4: Initialize again with an updated sspMetadata; Verify that it restores from the earliest offset + metadata = new SystemStreamMetadata("testStream", new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() { + { + put(partition, new SystemStreamPartitionMetadata("0", "150", "151")) + } + }) + when(mockStreamMetadataCache.getStreamMetadata(any(), any())).thenReturn(Map(ss -> metadata)) + + taskManager.init + + assertNull(storeDirectory.listFiles()) + } + @Test - def testCleanBaseDirs() { + def testStoreDirsWithoutOffsetFileAreDeletedInCleanBaseDirs() { val checkFilePath1 = new File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultStoreBaseDir, store, taskName), "check") checkFilePath1.createNewFile() val checkFilePath2 = new File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName), "check") checkFilePath2.createNewFile() val taskStorageManager = new TaskStorageManagerBuilder() - .addStore(store) - .addStore(loggedStore) + .addStore(store, false) + .addStore(loggedStore, true) .build //Invoke test method @@ -79,12 +289,12 @@ class TestTaskStorageManager extends MockitoSugar { } @Test - def testCleanBaseDirsWithOffsetFileForLoggedStore() { + def testLoggedStoreDirsWithOffsetFileAreNotDeletedInCleanBaseDirs() { val offsetFilePath = new File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName), "OFFSET") Util.writeDataToFile(offsetFilePath, "100") val taskStorageManager = new TaskStorageManagerBuilder() - .addStore(loggedStore) + .addStore(loggedStore, true) .build val cleanDirMethod = taskStorageManager.getClass.getDeclaredMethod("cleanBaseDirs", @@ -97,6 +307,23 @@ class TestTaskStorageManager extends MockitoSugar { } @Test + def testOffsetFileIsRemovedInCleanBaseDirsForInMemoryLoggedStore() { + val offsetFilePath = new File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName), "OFFSET") + Util.writeDataToFile(offsetFilePath, "100") + + val taskStorageManager = new TaskStorageManagerBuilder() + .addStore(loggedStore, false) + .build + + val cleanDirMethod = taskStorageManager.getClass.getDeclaredMethod("cleanBaseDirs", + new Array[java.lang.Class[_]](0):_*) + cleanDirMethod.setAccessible(true) + cleanDirMethod.invoke(taskStorageManager, new Array[Object](0):_*) + + assertFalse("Offset file was not removed. Clean up failed!", offsetFilePath.exists()) + } + + @Test def testStopCreatesOffsetFileForLoggedStore() { val partition = new Partition(0) @@ -109,7 +336,7 @@ class TestTaskStorageManager extends MockitoSugar { //Build TaskStorageManager val taskStorageManager = new TaskStorageManagerBuilder() - .addStore(loggedStore) + .addStore(loggedStore, true) .setSystemAdmin("kafka", mockSystemAdmin) .setPartition(partition) .build @@ -131,6 +358,9 @@ class TestTaskStorageManager extends MockitoSugar { val partition = new Partition(0) val offsetFilePath = new File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName) + File.separator + "OFFSET") + val anotherOffsetPath = new File( + TaskStorageManager.getStorePartitionDir( + TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, store, taskName) + File.separator + "OFFSET") val mockSystemAdmin = mock[SystemAdmin] val mockSspMetadata = Map("testStream" -> new SystemStreamMetadata("testStream" , JavaConversions.mapAsJavaMap[Partition, SystemStreamPartitionMetadata](Map(partition -> new SystemStreamPartitionMetadata("20", "100", "101"))))) @@ -139,7 +369,8 @@ class TestTaskStorageManager extends MockitoSugar { //Build TaskStorageManager val taskStorageManager = new TaskStorageManagerBuilder() - .addStore(loggedStore) + .addStore(loggedStore, true) + .addStore(store, false) .setSystemAdmin("kafka", mockSystemAdmin) .setPartition(partition) .build @@ -150,6 +381,8 @@ class TestTaskStorageManager extends MockitoSugar { //Check conditions assertTrue("Offset file doesn't exist!", offsetFilePath.exists()) assertEquals("Found incorrect value in offset file!", "100", Util.readDataFromFile(offsetFilePath)) + + assertTrue("Offset file got created for a store that is not persisted to the disk!!", !anotherOffsetPath.exists()) } /** @@ -167,7 +400,7 @@ class TestTaskStorageManager extends MockitoSugar { //Build TaskStorageManager val taskStorageManager = new TaskStorageManagerBuilder() - .addStore(loggedStore) + .addStore(loggedStore, true) .setSystemAdmin("kafka", mockSystemAdmin) .setPartition(partition) .build @@ -200,7 +433,7 @@ class TestTaskStorageManager extends MockitoSugar { //Build TaskStorageManager val taskStorageManager = new TaskStorageManagerBuilder() - .addStore(loggedStore) + .addStore(loggedStore, true) .setSystemAdmin("kafka", mockSystemAdmin) .setPartition(partition) .build @@ -226,11 +459,6 @@ class TestTaskStorageManager extends MockitoSugar { } @Test - def testFlushOffsetFileExceptionsHandledGracefully(): Unit = { - - } - - @Test def testStopShouldNotCreateOffsetFileForEmptyStore() { val partition = new Partition(0) @@ -243,7 +471,7 @@ class TestTaskStorageManager extends MockitoSugar { //Build TaskStorageManager val taskStorageManager = new TaskStorageManagerBuilder() - .addStore(loggedStore) + .addStore(loggedStore, true) .setSystemAdmin("kafka", mockSystemAdmin) .setPartition(partition) .build @@ -265,7 +493,7 @@ class TaskStorageManagerBuilder extends MockitoSugar { var taskStores: Map[String, StorageEngine] = Map() var storeConsumers: Map[String, SystemConsumer] = Map() var changeLogSystemStreams: Map[String, SystemStream] = Map() - val streamMetadataCache = mock[StreamMetadataCache] + var streamMetadataCache = mock[StreamMetadataCache] var partition: Partition = new Partition(0) var systemAdmins: Map[String, SystemAdmin] = Map("kafka" -> mock[SystemAdmin]) var taskName: TaskName = new TaskName("testTask") @@ -273,8 +501,20 @@ class TaskStorageManagerBuilder extends MockitoSugar { var loggedStoreBaseDir: File = TaskStorageManagerBuilder.defaultLoggedStoreBaseDir var changeLogStreamPartitions: Int = 1 - def addStore(storeName: String): TaskStorageManagerBuilder = { - taskStores = taskStores ++ Map(storeName -> mock[StorageEngine]) + def addStore(storeName: String, storageEngine: StorageEngine, systemConsumer: SystemConsumer): TaskStorageManagerBuilder = { + taskStores = taskStores ++ Map(storeName -> storageEngine) + storeConsumers = storeConsumers ++ Map(storeName -> systemConsumer) + changeLogSystemStreams = changeLogSystemStreams ++ Map(storeName -> new SystemStream("kafka", "testStream")) + this + } + + def addStore(storeName: String, isPersistedToDisk: Boolean): TaskStorageManagerBuilder = { + taskStores = taskStores ++ { + val mockStorageEngine = mock[StorageEngine] + when(mockStorageEngine.getStoreProperties) + .thenReturn(new StorePropertiesBuilder().setPersistedToDisk(isPersistedToDisk).setLoggedStore(false).build()) + Map(storeName -> mockStorageEngine) + } storeConsumers = storeConsumers ++ Map(storeName -> mock[SystemConsumer]) changeLogSystemStreams = changeLogSystemStreams ++ Map(storeName -> new SystemStream("kafka", "testStream")) this @@ -300,6 +540,11 @@ class TaskStorageManagerBuilder extends MockitoSugar { this } + def setStreamMetadataCache(metadataCache: StreamMetadataCache) = { + streamMetadataCache = metadataCache + this + } + def build: TaskStorageManager = { new TaskStorageManager( taskName = taskName, @@ -314,4 +559,4 @@ class TaskStorageManagerBuilder extends MockitoSugar { systemAdmins = systemAdmins ) } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/samza/blob/38e81c0f/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala index dae6e35..a7b748f 100644 --- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala +++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala @@ -22,13 +22,11 @@ package org.apache.samza.storage.kv import java.io.File import org.apache.samza.container.SamzaContainerContext import org.apache.samza.metrics.MetricsRegistry -import org.apache.samza.storage.kv._ import org.apache.samza.system.SystemStreamPartition import org.rocksdb.{FlushOptions, WriteOptions} import org.apache.samza.config.StorageConfig._ -class RocksDbKeyValueStorageEngineFactory [K, V] extends BaseKeyValueStorageEngineFactory[K, V] -{ +class RocksDbKeyValueStorageEngineFactory [K, V] extends BaseKeyValueStorageEngineFactory[K, V] { /** * Return a KeyValueStore instance for the given store name * @param storeName Name of the store @@ -49,7 +47,15 @@ class RocksDbKeyValueStorageEngineFactory [K, V] extends BaseKeyValueStorageEngi val rocksDbOptions = RocksDbOptionsHelper.options(storageConfig, containerContext) val rocksDbWriteOptions = new WriteOptions().setDisableWAL(true) val rocksDbFlushOptions = new FlushOptions().setWaitForFlush(true) - val rocksDb = new RocksDbKeyValueStore(storeDir, rocksDbOptions, storageConfig, isLoggedStore, storeName, rocksDbWriteOptions, rocksDbFlushOptions, rocksDbMetrics) + val rocksDb = new RocksDbKeyValueStore( + storeDir, + rocksDbOptions, + storageConfig, + isLoggedStore, + storeName, + rocksDbWriteOptions, + rocksDbFlushOptions, + rocksDbMetrics) rocksDb } } http://git-wip-us.apache.org/repos/asf/samza/blob/38e81c0f/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala index 38c8fa0..9b9b1f6 100644 --- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala +++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala @@ -23,9 +23,8 @@ import java.io.File import org.apache.samza.SamzaException import org.apache.samza.util.{ LexicographicComparator, Logging } import org.apache.samza.config.Config -import org.apache.samza.container.SamzaContainerContext import org.rocksdb._ -import org.rocksdb.TtlDB; +import org.rocksdb.TtlDB object RocksDbKeyValueStore extends Logging { http://git-wip-us.apache.org/repos/asf/samza/blob/38e81c0f/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala index 391cf89..c975893 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala @@ -25,7 +25,7 @@ import org.apache.samza.SamzaException import org.apache.samza.container.SamzaContainerContext import org.apache.samza.metrics.MetricsRegistry import org.apache.samza.serializers.Serde -import org.apache.samza.storage.{StorageEngine, StorageEngineFactory} +import org.apache.samza.storage.{StoreProperties, StorageEngine, StorageEngineFactory} import org.apache.samza.system.SystemStreamPartition import org.apache.samza.task.MessageCollector @@ -37,6 +37,8 @@ import org.apache.samza.task.MessageCollector */ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V] { + private val INMEMORY_KV_STORAGE_ENGINE_FACTORY = + "org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory" /** * Return a KeyValueStore instance for the given store name, * which will be used as the underlying raw store @@ -74,8 +76,17 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V] registry: MetricsRegistry, changeLogSystemStreamPartition: SystemStreamPartition, containerContext: SamzaContainerContext): StorageEngine = { - val storageConfig = containerContext.config.subset("stores." + storeName + ".", true) + val storeFactory = storageConfig.get("factory") + var storePropertiesBuilder = new StoreProperties.StorePropertiesBuilder() + + if (storeFactory == null) { + throw new SamzaException("Store factory not defined. Cannot proceed with KV store creation!") + } + if (!storeFactory.equals(INMEMORY_KV_STORAGE_ENGINE_FACTORY)) { + storePropertiesBuilder = storePropertiesBuilder.setPersistedToDisk(true) + } + val batchSize = storageConfig.getInt("write.batch.size", 500) val cacheSize = storageConfig.getInt("object.cache.size", math.max(batchSize, 1000)) val enableCache = cacheSize > 0 @@ -99,6 +110,7 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V] rawStore } else { val loggedStoreMetrics = new LoggedStoreMetrics(storeName, registry) + storePropertiesBuilder = storePropertiesBuilder.setLoggedStore(true) new LoggedStore(rawStore, changeLogSystemStreamPartition, collector, loggedStoreMetrics) } @@ -120,7 +132,7 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V] // create the storage engine and return // TODO: Decide if we should use raw bytes when restoring val keyValueStorageEngineMetrics = new KeyValueStorageEngineMetrics(storeName, registry) - new KeyValueStorageEngine(nullSafeStore, rawStore, keyValueStorageEngineMetrics, batchSize) + new KeyValueStorageEngine(storePropertiesBuilder.build(), nullSafeStore, rawStore, keyValueStorageEngineMetrics, batchSize) } } http://git-wip-us.apache.org/repos/asf/samza/blob/38e81c0f/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala index e5a66a4..a3ffc42 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala @@ -20,7 +20,7 @@ package org.apache.samza.storage.kv import org.apache.samza.util.Logging -import org.apache.samza.storage.StorageEngine +import org.apache.samza.storage.{StoreProperties, StorageEngine} import org.apache.samza.system.IncomingMessageEnvelope import scala.collection.JavaConversions._ @@ -31,6 +31,7 @@ import scala.collection.JavaConversions._ * This implements both the key/value interface and the storage engine interface. */ class KeyValueStorageEngine[K, V]( + storeProperties: StoreProperties, wrapperStore: KeyValueStore[K, V], rawStore: KeyValueStore[Array[Byte], Array[Byte]], metrics: KeyValueStorageEngineMetrics = new KeyValueStorageEngineMetrics, @@ -135,4 +136,6 @@ class KeyValueStorageEngine[K, V]( flush() wrapperStore.close() } + + override def getStoreProperties: StoreProperties = storeProperties } http://git-wip-us.apache.org/repos/asf/samza/blob/38e81c0f/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala b/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala index 23f8a1a..fd4e762 100644 --- a/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala +++ b/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala @@ -20,19 +20,16 @@ package org.apache.samza.storage.kv import java.io.File -import java.util.Arrays -import java.util.Random +import java.util.{Arrays, Random} -import org.apache.samza.config.{MapConfig, StorageConfig} +import org.apache.samza.config.MapConfig import org.apache.samza.serializers.Serde import org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStore -import org.junit.After import org.junit.Assert._ -import org.junit.Before -import org.junit.Test import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.runners.Parameterized.Parameters +import org.junit.{After, Before, Test} import org.scalatest.Assertions.intercept import scala.collection.JavaConversions._ @@ -59,13 +56,14 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) { case "inmemory" => new InMemoryKeyValueStore case "rocksdb" => - new RocksDbKeyValueStore (dir, - new org.rocksdb.Options() - .setCreateIfMissing(true) - .setCompressionType(org.rocksdb.CompressionType.SNAPPY_COMPRESSION), - new MapConfig(), - false, - "someStore") + new RocksDbKeyValueStore ( + dir, + new org.rocksdb.Options() + .setCreateIfMissing(true) + .setCompressionType(org.rocksdb.CompressionType.SNAPPY_COMPRESSION), + new MapConfig(), + false, + "someStore") case _ => throw new IllegalArgumentException("Type of store undefined: " + typeOfStore) }
