Repository: samza
Updated Branches:
  refs/heads/master 94e4e396e -> 38e81c0f9


SAMZA-889: Change log not working properly with In memory Store


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/38e81c0f
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/38e81c0f
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/38e81c0f

Branch: refs/heads/master
Commit: 38e81c0f9b5a1dfeeac1a1c146aaaec9ec0987f2
Parents: 94e4e39
Author: Navina Ramesh <[email protected]>
Authored: Wed Jun 22 15:31:39 2016 -0700
Committer: Yi Pan (Data Infrastructure) <[email protected]>
Committed: Wed Jun 22 16:08:27 2016 -0700

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |   4 +
 .../org/apache/samza/storage/StorageEngine.java |   4 +
 .../samza/storage/StorageEngineFactory.java     |  18 +-
 .../apache/samza/storage/StoreProperties.java   |  71 ++
 .../apache/samza/container/SamzaContainer.scala |  41 +-
 .../samza/container/SamzaContainer.scala.orig   | 787 -------------------
 .../samza/storage/TaskStorageManager.scala      |  77 +-
 .../apache/samza/storage/MockStorageEngine.java |   9 +-
 .../samza/storage/MockStorageEngineFactory.java |   3 +-
 .../samza/storage/TestStorageRecovery.java      |   2 +-
 .../samza/storage/TestTaskStorageManager.scala  | 301 ++++++-
 .../RocksDbKeyValueStorageEngineFactory.scala   |  14 +-
 .../samza/storage/kv/RocksDbKeyValueStore.scala |   3 +-
 .../kv/BaseKeyValueStorageEngineFactory.scala   |  18 +-
 .../storage/kv/KeyValueStorageEngine.scala      |   5 +-
 .../samza/storage/kv/TestKeyValueStores.scala   |  24 +-
 16 files changed, 466 insertions(+), 915 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/38e81c0f/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index d0a5c66..325c381 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -202,6 +202,10 @@
 
         <allow class="org.apache.samza.SamzaException" />
         <allow class="org.apache.samza.Partition" />
+
+        <subpackage name="kv">
+            <allow pkg="org.apache.samza.storage" />
+        </subpackage>
     </subpackage>
 
     <subpackage name="logging">

http://git-wip-us.apache.org/repos/asf/samza/blob/38e81c0f/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java 
b/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
index 5463648..e30a2ab 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
@@ -56,4 +56,8 @@ public interface StorageEngine {
    */
   void stop();
 
+  /**
+   * Get store properties
+   */
+  StoreProperties getStoreProperties();
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/38e81c0f/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java 
b/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java
index adb6264..800deeb 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java
@@ -46,13 +46,13 @@ public interface StorageEngineFactory<K, V> {
    * @param containerContext Information about the container in which the task 
is executing.
    * @return The storage engine instance.
    */
-  public StorageEngine getStorageEngine(
-      String storeName,
-      File storeDir,
-      Serde<K> keySerde,
-      Serde<V> msgSerde,
-      MessageCollector collector,
-      MetricsRegistry registry,
-      SystemStreamPartition changeLogSystemStreamPartition,
-      SamzaContainerContext containerContext);
+  StorageEngine getStorageEngine(
+    String storeName,
+    File storeDir,
+    Serde<K> keySerde,
+    Serde<V> msgSerde,
+    MessageCollector collector,
+    MetricsRegistry registry,
+    SystemStreamPartition changeLogSystemStreamPartition,
+    SamzaContainerContext containerContext);
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/38e81c0f/samza-api/src/main/java/org/apache/samza/storage/StoreProperties.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/storage/StoreProperties.java 
b/samza-api/src/main/java/org/apache/samza/storage/StoreProperties.java
new file mode 100644
index 0000000..a398271
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/storage/StoreProperties.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage;
+
+/**
+ * Immutable class that defines the properties of a Store
+ */
+public class StoreProperties {
+  private final boolean persistedToDisk;
+  private final boolean loggedStore;
+
+  private StoreProperties(
+      final boolean persistedToDisk,
+      final boolean loggedStore) {
+    this.persistedToDisk = persistedToDisk;
+    this.loggedStore = loggedStore;
+  }
+
+  /**
+   * Flag to indicate whether a store can be persisted to disk or not
+   *
+   * @return True, if store can be flushed to disk. False, by default.
+   */
+  public boolean isPersistedToDisk() {
+    return persistedToDisk;
+  }
+
+  /**
+   * Flag to indicate whether a store is associated with a changelog (used for 
recovery) or not
+   *
+   * @return True, if changelog is enabled. False, by default.
+   */
+  public boolean isLoggedStore() {
+    return loggedStore;
+  }
+
+  public static class StorePropertiesBuilder {
+    private boolean persistedToDisk = false;
+    private boolean loggedStore = false;
+
+    public StorePropertiesBuilder setPersistedToDisk(boolean persistedToDisk) {
+      this.persistedToDisk = persistedToDisk;
+      return this;
+    }
+
+    public StorePropertiesBuilder setLoggedStore(boolean loggedStore) {
+      this.loggedStore = loggedStore;
+      return this;
+    }
+
+    public StoreProperties build() {
+      return new StoreProperties(persistedToDisk, loggedStore);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/38e81c0f/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 5cbdb4b..18c0922 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -22,8 +22,11 @@ package org.apache.samza.container
 import java.io.File
 import java.nio.file.Path
 import java.util
+import java.lang.Thread.UncaughtExceptionHandler
+import java.net.{URL, UnknownHostException}
 import org.apache.samza.SamzaException
 import org.apache.samza.checkpoint.{CheckpointManagerFactory, OffsetManager, 
OffsetManagerMetrics}
+import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config.MetricsConfig.Config2Metrics
 import org.apache.samza.config.SerializerConfig.Config2Serializer
 import org.apache.samza.config.ShellCommandConfig
@@ -32,38 +35,18 @@ import org.apache.samza.config.StreamConfig.Config2Stream
 import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.samza.config.TaskConfig.Config2Task
 import org.apache.samza.container.disk.DiskSpaceMonitor.Listener
-import org.apache.samza.container.disk.WatermarkDiskQuotaPolicy.Entry
-import org.apache.samza.container.disk.{NoThrottlingDiskQuotaPolicyFactory, 
DiskQuotaPolicyFactory, NoThrottlingDiskQuotaPolicy, WatermarkDiskQuotaPolicy, 
PollingScanDiskSpaceMonitor, DiskSpaceMonitor}
+import org.apache.samza.container.disk.{NoThrottlingDiskQuotaPolicyFactory, 
DiskQuotaPolicyFactory, PollingScanDiskSpaceMonitor, DiskSpaceMonitor}
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
-import org.apache.samza.metrics.JmxServer
-import org.apache.samza.metrics.JvmMetrics
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.metrics.MetricsReporter
-import org.apache.samza.metrics.MetricsReporterFactory
-import org.apache.samza.serializers.SerdeFactory
-import org.apache.samza.serializers.SerdeManager
-import org.apache.samza.storage.StorageEngineFactory
-import org.apache.samza.storage.TaskStorageManager
-import org.apache.samza.system.StreamMetadataCache
-import org.apache.samza.system.SystemConsumers
-import org.apache.samza.system.SystemConsumersMetrics
-import org.apache.samza.system.SystemFactory
-import org.apache.samza.system.SystemProducers
-import org.apache.samza.system.SystemProducersMetrics
-import org.apache.samza.system.SystemStream
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.system.chooser.DefaultChooser
-import org.apache.samza.system.chooser.MessageChooserFactory
-import org.apache.samza.system.chooser.RoundRobinChooserFactory
-import org.apache.samza.task.StreamTask
-import org.apache.samza.task.TaskInstanceCollector
+import org.apache.samza.job.model.{ContainerModel, JobModel}
+import org.apache.samza.metrics.{JmxServer, JvmMetrics, MetricsRegistryMap, 
MetricsReporter, MetricsReporterFactory}
+import org.apache.samza.serializers.{SerdeFactory, SerdeManager}
+import org.apache.samza.serializers.model.SamzaObjectMapper
+import org.apache.samza.storage.{StorageEngineFactory, TaskStorageManager}
+import org.apache.samza.system.{StreamMetadataCache, SystemConsumers, 
SystemConsumersMetrics, SystemFactory, SystemProducers, SystemProducersMetrics, 
SystemStream, SystemStreamPartition}
+import org.apache.samza.system.chooser.{DefaultChooser, MessageChooserFactory, 
RoundRobinChooserFactory}
+import org.apache.samza.task.{StreamTask, TaskInstanceCollector}
 import org.apache.samza.util.{ThrottlingExecutor, ExponentialSleepStrategy, 
Logging, Util}
 import scala.collection.JavaConversions._
-import java.net.{UnknownHostException, InetAddress, URL}
-import org.apache.samza.job.model.{TaskModel, ContainerModel, JobModel}
-import org.apache.samza.serializers.model.SamzaObjectMapper
-import org.apache.samza.config.JobConfig.Config2Job
-import java.lang.Thread.UncaughtExceptionHandler
 
 object SamzaContainer extends Logging {
   val DEFAULT_READ_JOBMODEL_DELAY_MS = 100

http://git-wip-us.apache.org/repos/asf/samza/blob/38e81c0f/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala.orig
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala.orig
 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala.orig
deleted file mode 100644
index 086531e..0000000
--- 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala.orig
+++ /dev/null
@@ -1,787 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.container
-
-import java.io.File
-import java.nio.file.Path
-import java.util
-import org.apache.samza.SamzaException
-import org.apache.samza.checkpoint.{CheckpointManagerFactory, OffsetManager, 
OffsetManagerMetrics}
-import org.apache.samza.config.MetricsConfig.Config2Metrics
-import org.apache.samza.config.SerializerConfig.Config2Serializer
-import org.apache.samza.config.ShellCommandConfig
-import org.apache.samza.config.StorageConfig.Config2Storage
-import org.apache.samza.config.StreamConfig.Config2Stream
-import org.apache.samza.config.SystemConfig.Config2System
-import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.samza.container.disk.DiskSpaceMonitor.Listener
-import org.apache.samza.container.disk.{PollingScanDiskSpaceMonitor, 
DiskSpaceMonitor}
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
-import org.apache.samza.metrics.JmxServer
-import org.apache.samza.metrics.JvmMetrics
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.metrics.MetricsReporter
-import org.apache.samza.metrics.MetricsReporterFactory
-import org.apache.samza.serializers.SerdeFactory
-import org.apache.samza.serializers.SerdeManager
-import org.apache.samza.storage.StorageEngineFactory
-import org.apache.samza.storage.TaskStorageManager
-import org.apache.samza.system.StreamMetadataCache
-import org.apache.samza.system.SystemConsumers
-import org.apache.samza.system.SystemConsumersMetrics
-import org.apache.samza.system.SystemFactory
-import org.apache.samza.system.SystemProducers
-import org.apache.samza.system.SystemProducersMetrics
-import org.apache.samza.system.SystemStream
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.system.chooser.DefaultChooser
-import org.apache.samza.system.chooser.MessageChooserFactory
-import org.apache.samza.system.chooser.RoundRobinChooserFactory
-import org.apache.samza.task.StreamTask
-import org.apache.samza.task.TaskInstanceCollector
-import org.apache.samza.util.{ExponentialSleepStrategy, Logging, Util}
-import scala.collection.JavaConversions._
-import java.net.{UnknownHostException, InetAddress, URL}
-import org.apache.samza.job.model.{TaskModel, ContainerModel, JobModel}
-import org.apache.samza.serializers.model.SamzaObjectMapper
-import org.apache.samza.config.JobConfig.Config2Job
-import java.lang.Thread.UncaughtExceptionHandler
-
-object SamzaContainer extends Logging {
-  val DEFAULT_READ_JOBMODEL_DELAY_MS = 100
-
-  def main(args: Array[String]) {
-    safeMain(() => new JmxServer, new SamzaContainerExceptionHandler(() => 
System.exit(1)))
-  }
-
-  def safeMain(
-    newJmxServer: () => JmxServer,
-    exceptionHandler: UncaughtExceptionHandler = null) {
-    if (exceptionHandler != null) {
-      Thread.setDefaultUncaughtExceptionHandler(exceptionHandler)
-    }
-    putMDC("containerName", "samza-container-" + 
System.getenv(ShellCommandConfig.ENV_CONTAINER_ID))
-    // Break out the main method to make the JmxServer injectable so we can
-    // validate that we don't leak JMX non-daemon threads if we have an
-    // exception in the main method.
-    val containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID).toInt
-    logger.info("Got container ID: %s" format containerId)
-    val coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL)
-    logger.info("Got coordinator URL: %s" format coordinatorUrl)
-    val jobModel = readJobModel(coordinatorUrl)
-    val containerModel = jobModel.getContainers()(containerId.toInt)
-    val config = jobModel.getConfig
-    putMDC("jobName", config.getName.getOrElse(throw new SamzaException("can 
not find the job name")))
-    putMDC("jobId", config.getJobId.getOrElse("1"))
-    var jmxServer: JmxServer = null
-
-    try {
-      jmxServer = newJmxServer()
-      SamzaContainer(containerModel, jobModel, jmxServer).run
-    } finally {
-      if (jmxServer != null) {
-        jmxServer.stop
-      }
-    }
-  }
-
-  /**
-   * Fetches config, task:SSP assignments, and task:changelog partition
-   * assignments, and returns objects to be used for SamzaContainer's
-   * constructor.
-   */
-  def readJobModel(url: String, initialDelayMs: Int = 
scala.util.Random.nextInt(DEFAULT_READ_JOBMODEL_DELAY_MS) + 1) = {
-    info("Fetching configuration from: %s" format url)
-    SamzaObjectMapper
-      .getObjectMapper
-      .readValue(
-        Util.read(
-          url = new URL(url),
-          retryBackoff = new ExponentialSleepStrategy(initialDelayMs = 
initialDelayMs)),
-        classOf[JobModel])
-  }
-
-  def apply(containerModel: ContainerModel, jobModel: JobModel, jmxServer: 
JmxServer) = {
-    val config = jobModel.getConfig
-    val containerId = containerModel.getContainerId
-    val containerName = "samza-container-%s" format containerId
-    val containerPID = Util.getContainerPID
-
-    info("Setting up Samza container: %s" format containerName)
-    info("Samza container PID: %s" format containerPID)
-    info("Using configuration: %s" format config)
-    info("Using container model: %s" format containerModel)
-
-    val registry = new MetricsRegistryMap(containerName)
-    val samzaContainerMetrics = new SamzaContainerMetrics(containerName, 
registry)
-    val systemProducersMetrics = new SystemProducersMetrics(registry)
-    val systemConsumersMetrics = new SystemConsumersMetrics(registry)
-    val offsetManagerMetrics = new OffsetManagerMetrics(registry)
-
-    val inputSystemStreamPartitions = containerModel
-      .getTasks
-      .values
-      .flatMap(_.getSystemStreamPartitions)
-      .toSet
-
-    val inputSystemStreams = inputSystemStreamPartitions
-      .map(_.getSystemStream)
-      .toSet
-
-    val inputSystems = inputSystemStreams
-      .map(_.getSystem)
-      .toSet
-
-    val systemNames = config.getSystemNames
-
-    info("Got system names: %s" format systemNames)
-
-    val serdeStreams = systemNames.foldLeft(Set[SystemStream]())(_ ++ 
config.getSerdeStreams(_))
-
-    debug("Got serde streams: %s" format serdeStreams)
-
-    val serdeNames = config.getSerdeNames
-
-    info("Got serde names: %s" format serdeNames)
-
-    val systemFactories = systemNames.map(systemName => {
-      val systemFactoryClassName = config
-        .getSystemFactory(systemName)
-        .getOrElse(throw new SamzaException("A stream uses system %s, which is 
missing from the configuration." format systemName))
-      (systemName, Util.getObj[SystemFactory](systemFactoryClassName))
-    }).toMap
-
-    val systemAdmins = systemNames
-      .map(systemName => (systemName, 
systemFactories(systemName).getAdmin(systemName, config)))
-      .toMap
-
-    info("Got system factories: %s" format systemFactories.keys)
-
-    val streamMetadataCache = new StreamMetadataCache(systemAdmins)
-    val inputStreamMetadata = 
streamMetadataCache.getStreamMetadata(inputSystemStreams)
-
-    info("Got input stream metadata: %s" format inputStreamMetadata)
-
-    val consumers = inputSystems
-      .map(systemName => {
-        val systemFactory = systemFactories(systemName)
-
-        try {
-          (systemName, systemFactory.getConsumer(systemName, config, 
samzaContainerMetrics.registry))
-        } catch {
-          case e: Exception =>
-            error("Failed to create a consumer for %s, so skipping." 
format(systemName), e)
-            (systemName, null)
-        }
-      })
-      .filter(_._2 != null)
-      .toMap
-
-    info("Got system consumers: %s" format consumers.keys)
-
-    val producers = systemFactories
-      .map {
-        case (systemName, systemFactory) =>
-          try {
-            (systemName, systemFactory.getProducer(systemName, config, 
samzaContainerMetrics.registry))
-          } catch {
-            case e: Exception =>
-              error("Failed to create a producer for %s, so skipping." 
format(systemName), e)
-              (systemName, null)
-          }
-      }
-      .filter(_._2 != null)
-      .toMap
-
-    info("Got system producers: %s" format producers.keys)
-
-    val serdes = serdeNames.map(serdeName => {
-      val serdeClassName = config
-        .getSerdeClass(serdeName)
-        .getOrElse(Util.defaultSerdeFactoryFromSerdeName(serdeName))
-
-      val serde = Util.getObj[SerdeFactory[Object]](serdeClassName)
-        .getSerde(serdeName, config)
-
-      (serdeName, serde)
-    }).toMap
-
-    info("Got serdes: %s" format serdes.keys)
-
-    /*
-     * A Helper function to build a Map[String, Serde] (systemName -> Serde) 
for systems defined in the config. This is useful to build both key and message 
serde maps.
-     */
-    val buildSystemSerdeMap = (getSerdeName: (String) => Option[String]) => {
-      systemNames
-        .filter( sn => {
-          val serde = getSerdeName(sn)
-          serde.isDefined && !serde.get.equals("")
-        }).map(systemName => {
-          val serdeName = getSerdeName(systemName).get
-          val serde = serdes.getOrElse(serdeName, throw new SamzaException("No 
class defined for serde: %s." format serdeName))
-          (systemName, serde)
-        }).toMap
-    }
-
-    /*
-     * A Helper function to build a Map[SystemStream, Serde] for streams 
defined in the config. This is useful to build both key and message serde maps.
-     */
-    val buildSystemStreamSerdeMap = (getSerdeName: (SystemStream) => 
Option[String]) => {
-      (serdeStreams ++ inputSystemStreamPartitions)
-        .filter(systemStream => getSerdeName(systemStream).isDefined)
-        .map(systemStream => {
-          val serdeName = getSerdeName(systemStream).get
-          val serde = serdes.getOrElse(serdeName, throw new SamzaException("No 
class defined for serde: %s." format serdeName))
-          (systemStream, serde)
-        }).toMap
-    }
-
-    val systemKeySerdes = buildSystemSerdeMap((systemName: String) => 
config.getSystemKeySerde(systemName))
-
-    debug("Got system key serdes: %s" format systemKeySerdes)
-
-    val systemMessageSerdes = buildSystemSerdeMap((systemName: String) => 
config.getSystemMsgSerde(systemName))
-
-    debug("Got system message serdes: %s" format systemMessageSerdes)
-
-    val systemStreamKeySerdes = buildSystemStreamSerdeMap((systemStream: 
SystemStream) => config.getStreamKeySerde(systemStream))
-
-    debug("Got system stream key serdes: %s" format systemStreamKeySerdes)
-
-    val systemStreamMessageSerdes = buildSystemStreamSerdeMap((systemStream: 
SystemStream) => config.getStreamMsgSerde(systemStream))
-
-    debug("Got system stream message serdes: %s" format 
systemStreamMessageSerdes)
-
-    val changeLogSystemStreams = config
-      .getStoreNames
-      .filter(config.getChangelogStream(_).isDefined)
-      .map(name => (name, config.getChangelogStream(name).get)).toMap
-      .mapValues(Util.getSystemStreamFromNames(_))
-
-    info("Got change log system streams: %s" format changeLogSystemStreams)
-
-    val serdeManager = new SerdeManager(
-      serdes = serdes,
-      systemKeySerdes = systemKeySerdes,
-      systemMessageSerdes = systemMessageSerdes,
-      systemStreamKeySerdes = systemStreamKeySerdes,
-      systemStreamMessageSerdes = systemStreamMessageSerdes,
-      changeLogSystemStreams = changeLogSystemStreams.values.toSet)
-
-    info("Setting up JVM metrics.")
-
-    val jvm = new JvmMetrics(samzaContainerMetrics.registry)
-
-    info("Setting up message chooser.")
-
-    val chooserFactoryClassName = 
config.getMessageChooserClass.getOrElse(classOf[RoundRobinChooserFactory].getName)
-
-    val chooserFactory = 
Util.getObj[MessageChooserFactory](chooserFactoryClassName)
-
-    val chooser = DefaultChooser(inputStreamMetadata, chooserFactory, config, 
samzaContainerMetrics.registry)
-
-    info("Setting up metrics reporters.")
-
-    val reporters = config.getMetricReporterNames.map(reporterName => {
-      val metricsFactoryClassName = config
-        .getMetricsFactoryClass(reporterName)
-        .getOrElse(throw new SamzaException("Metrics reporter %s missing 
.class config" format reporterName))
-
-      val reporter =
-        Util
-          .getObj[MetricsReporterFactory](metricsFactoryClassName)
-          .getMetricsReporter(reporterName, containerName, config)
-      (reporterName, reporter)
-    }).toMap
-
-    info("Got metrics reporters: %s" format reporters.keys)
-
-    val securityManager = config.getSecurityManagerFactory match {
-      case Some(securityManagerFactoryClassName) =>
-        Util
-          .getObj[SecurityManagerFactory](securityManagerFactoryClassName)
-          .getSecurityManager(config)
-      case _ => null
-    }
-    info("Got security manager: %s" format securityManager)
-
-    val coordinatorSystemProducer = new 
CoordinatorStreamSystemFactory().getCoordinatorStreamSystemProducer(config, 
samzaContainerMetrics.registry)
-    val localityManager = new LocalityManager(coordinatorSystemProducer)
-    val checkpointManager = config.getCheckpointManagerFactory() match {
-      case Some(checkpointFactoryClassName) if 
(!checkpointFactoryClassName.isEmpty) =>
-        Util
-          .getObj[CheckpointManagerFactory](checkpointFactoryClassName)
-          .getCheckpointManager(config, samzaContainerMetrics.registry)
-      case _ => null
-    }
-    info("Got checkpoint manager: %s" format checkpointManager)
-
-    val offsetManager = OffsetManager(inputStreamMetadata, config, 
checkpointManager, systemAdmins, offsetManagerMetrics)
-
-    info("Got offset manager: %s" format offsetManager)
-
-    val dropDeserializationError = config.getDropDeserialization match {
-      case Some(dropError) => dropError.toBoolean
-      case _ => false
-    }
-
-    val dropSerializationError = config.getDropSerialization match {
-      case Some(dropError) => dropError.toBoolean
-      case _ => false
-    }
-
-    val pollIntervalMs = config
-      .getPollIntervalMs
-      .getOrElse(SystemConsumers.DEFAULT_POLL_INTERVAL_MS.toString)
-      .toInt
-
-    val consumerMultiplexer = new SystemConsumers(
-      chooser = chooser,
-      consumers = consumers,
-      serdeManager = serdeManager,
-      metrics = systemConsumersMetrics,
-      dropDeserializationError = dropDeserializationError,
-      pollIntervalMs = pollIntervalMs)
-
-    val producerMultiplexer = new SystemProducers(
-      producers = producers,
-      serdeManager = serdeManager,
-      metrics = systemProducersMetrics,
-      dropSerializationError = dropSerializationError)
-
-    val storageEngineFactories = config
-      .getStoreNames
-      .map(storeName => {
-        val storageFactoryClassName = config
-          .getStorageFactoryClassName(storeName)
-          .getOrElse(throw new SamzaException("Missing storage factory for 
%s." format storeName))
-        (storeName, Util.getObj[StorageEngineFactory[Object, 
Object]](storageFactoryClassName))
-      }).toMap
-
-    info("Got storage engines: %s" format storageEngineFactories.keys)
-
-    val taskClassName = config
-      .getTaskClass
-      .getOrElse(throw new SamzaException("No task class defined in 
configuration."))
-
-    info("Got stream task class: %s" format taskClassName)
-
-    val taskWindowMs = config.getWindowMs.getOrElse(-1L)
-
-    info("Got window milliseconds: %s" format taskWindowMs)
-
-    val taskCommitMs = config.getCommitMs.getOrElse(60000L)
-
-    info("Got commit milliseconds: %s" format taskCommitMs)
-
-    val taskShutdownMs = config.getShutdownMs.getOrElse(5000L)
-
-    info("Got shutdown timeout milliseconds: %s" format taskShutdownMs)
-
-    // Wire up all task-instance-level (unshared) objects.
-
-    val taskNames = containerModel
-      .getTasks
-      .values
-      .map(_.getTaskName)
-      .toSet
-    val containerContext = new SamzaContainerContext(containerId, config, 
taskNames)
-
-    // TODO not sure how we should make this config based, or not. Kind of
-    // strange, since it has some dynamic directories when used with YARN.
-    val defaultStoreBaseDir = new File(System.getProperty("user.dir"), "state")
-    info("Got default storage engine base directory: %s" format 
defaultStoreBaseDir)
-
-    val storeWatchPaths = new util.HashSet[Path]()
-    storeWatchPaths.add(defaultStoreBaseDir.toPath)
-
-    val taskInstances: Map[TaskName, TaskInstance] = 
containerModel.getTasks.values.map(taskModel => {
-      debug("Setting up task instance: %s" format taskModel)
-
-      val taskName = taskModel.getTaskName
-
-      val task = Util.getObj[StreamTask](taskClassName)
-
-      val taskInstanceMetrics = new TaskInstanceMetrics("TaskName-%s" format 
taskName)
-
-      val collector = new TaskInstanceCollector(producerMultiplexer, 
taskInstanceMetrics)
-
-      val storeConsumers = changeLogSystemStreams
-        .map {
-          case (storeName, changeLogSystemStream) =>
-            val systemConsumer = systemFactories
-              .getOrElse(changeLogSystemStream.getSystem, throw new 
SamzaException("Changelog system %s for store %s does not exist in the config." 
format (changeLogSystemStream, storeName)))
-              .getConsumer(changeLogSystemStream.getSystem, config, 
taskInstanceMetrics.registry)
-            samzaContainerMetrics.addStoreRestorationGauge(taskName, storeName)
-            (storeName, systemConsumer)
-        }.toMap
-
-      info("Got store consumers: %s" format storeConsumers)
-
-      var loggedStorageBaseDir: File = null
-      if(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR) != null) {
-        val jobNameAndId = Util.getJobNameAndId(config)
-        loggedStorageBaseDir = new 
File(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR) + 
File.separator + jobNameAndId._1 + "-" + jobNameAndId._2)
-      } else {
-        warn("No override was provided for logged store base directory. This 
disables local state re-use on " +
-          "application restart. If you want to enable this feature, set 
LOGGED_STORE_BASE_DIR as an environment " +
-          "variable in all machines running the Samza container")
-        loggedStorageBaseDir = defaultStoreBaseDir
-      }
-
-      storeWatchPaths.add(loggedStorageBaseDir.toPath)
-
-      info("Got base directory for logged data stores: %s" format 
loggedStorageBaseDir)
-
-      val taskStores = storageEngineFactories
-        .map {
-          case (storeName, storageEngineFactory) =>
-            val changeLogSystemStreamPartition = if 
(changeLogSystemStreams.contains(storeName)) {
-              new SystemStreamPartition(changeLogSystemStreams(storeName), 
taskModel.getChangelogPartition)
-            } else {
-              null
-            }
-            val keySerde = config.getStorageKeySerde(storeName) match {
-              case Some(keySerde) => serdes.getOrElse(keySerde, throw new 
SamzaException("No class defined for serde: %s." format keySerde))
-              case _ => null
-            }
-            val msgSerde = config.getStorageMsgSerde(storeName) match {
-              case Some(msgSerde) => serdes.getOrElse(msgSerde, throw new 
SamzaException("No class defined for serde: %s." format msgSerde))
-              case _ => null
-            }
-            val storeBaseDir = if(changeLogSystemStreamPartition != null) {
-              TaskStorageManager.getStorePartitionDir(loggedStorageBaseDir, 
storeName, taskName)
-            }
-            else {
-              TaskStorageManager.getStorePartitionDir(defaultStoreBaseDir, 
storeName, taskName)
-            }
-            val storageEngine = storageEngineFactory.getStorageEngine(
-              storeName,
-              storeBaseDir,
-              keySerde,
-              msgSerde,
-              collector,
-              taskInstanceMetrics.registry,
-              changeLogSystemStreamPartition,
-              containerContext)
-            (storeName, storageEngine)
-        }
-
-      info("Got task stores: %s" format taskStores)
-
-      val storageManager = new TaskStorageManager(
-        taskName = taskName,
-        taskStores = taskStores,
-        storeConsumers = storeConsumers,
-        changeLogSystemStreams = changeLogSystemStreams,
-        jobModel.maxChangeLogStreamPartitions,
-        streamMetadataCache = streamMetadataCache,
-        storeBaseDir = defaultStoreBaseDir,
-        loggedStoreBaseDir = loggedStorageBaseDir,
-        partition = taskModel.getChangelogPartition,
-        systemAdmins = systemAdmins)
-
-      val systemStreamPartitions = taskModel
-        .getSystemStreamPartitions
-        .toSet
-
-      info("Retrieved SystemStreamPartitions " + systemStreamPartitions + " 
for " + taskName)
-
-      val taskInstance = new TaskInstance(
-        task = task,
-        taskName = taskName,
-        config = config,
-        metrics = taskInstanceMetrics,
-        systemAdmins = systemAdmins,
-        consumerMultiplexer = consumerMultiplexer,
-        collector = collector,
-        containerContext = containerContext,
-        offsetManager = offsetManager,
-        storageManager = storageManager,
-        reporters = reporters,
-        systemStreamPartitions = systemStreamPartitions,
-        exceptionHandler = TaskInstanceExceptionHandler(taskInstanceMetrics, 
config))
-
-      (taskName, taskInstance)
-    }).toMap
-
-    val diskPollMillis = config.getInt("container.disk.poll.interval.ms", 0)
-    var diskSpaceMonitor: DiskSpaceMonitor = null
-    if (diskPollMillis != 0) {
-      val diskUsage = samzaContainerMetrics.createOrGetDiskUsageGauge()
-
-      diskSpaceMonitor = new PollingScanDiskSpaceMonitor(storeWatchPaths, 
diskPollMillis)
-      diskSpaceMonitor.registerListener(new Listener {
-        override def onUpdate(diskUsageSample: Long): Unit =
-          diskUsage.set(diskUsageSample)
-      })
-
-      info("Initialized disk space monitor watch paths to: %s" format 
storeWatchPaths)
-    }
-
-    val runLoop = new RunLoop(
-      taskInstances = taskInstances,
-      consumerMultiplexer = consumerMultiplexer,
-      metrics = samzaContainerMetrics,
-      windowMs = taskWindowMs,
-      commitMs = taskCommitMs,
-      shutdownMs = taskShutdownMs)
-
-    info("Samza container setup complete.")
-
-    new SamzaContainer(
-      containerContext = containerContext,
-      taskInstances = taskInstances,
-      runLoop = runLoop,
-      consumerMultiplexer = consumerMultiplexer,
-      producerMultiplexer = producerMultiplexer,
-      offsetManager = offsetManager,
-      localityManager = localityManager,
-      securityManager = securityManager,
-      metrics = samzaContainerMetrics,
-      reporters = reporters,
-      jvm = jvm,
-      jmxServer = jmxServer,
-      diskSpaceMonitor = diskSpaceMonitor)
-  }
-}
-
-class SamzaContainer(
-  containerContext: SamzaContainerContext,
-  taskInstances: Map[TaskName, TaskInstance],
-  runLoop: RunLoop,
-  consumerMultiplexer: SystemConsumers,
-  producerMultiplexer: SystemProducers,
-  metrics: SamzaContainerMetrics,
-  jmxServer: JmxServer,
-  diskSpaceMonitor: DiskSpaceMonitor = null,
-  offsetManager: OffsetManager = new OffsetManager,
-  localityManager: LocalityManager = null,
-  securityManager: SecurityManager = null,
-  reporters: Map[String, MetricsReporter] = Map(),
-  jvm: JvmMetrics = null) extends Runnable with Logging {
-
-  def run {
-    try {
-      info("Starting container.")
-
-      startMetrics
-      startOffsetManager
-      startLocalityManager
-      startStores
-      startDiskSpaceMonitor
-      startProducers
-      startTask
-      startConsumers
-      startSecurityManger
-
-      info("Entering run loop.")
-      runLoop.run
-    } catch {
-      case e: Exception =>
-        error("Caught exception in process loop.", e)
-        throw e
-    } finally {
-      info("Shutting down.")
-
-      shutdownConsumers
-      shutdownTask
-      shutdownStores
-      shutdownDiskSpaceMonitor
-      shutdownProducers
-      shutdownLocalityManager
-      shutdownOffsetManager
-      shutdownMetrics
-      shutdownSecurityManger
-
-      info("Shutdown complete.")
-    }
-  }
-
-  def startDiskSpaceMonitor: Unit = {
-    if (diskSpaceMonitor != null) {
-      info("Starting disk space monitor")
-      diskSpaceMonitor.start()
-    }
-  }
-
-  def startMetrics {
-    info("Registering task instances with metrics.")
-
-    taskInstances.values.foreach(_.registerMetrics)
-
-    info("Starting JVM metrics.")
-
-    if (jvm != null) {
-      jvm.start
-    }
-
-    info("Starting metrics reporters.")
-
-    reporters.values.foreach(reporter => {
-      reporter.register(metrics.source, metrics.registry)
-      reporter.start
-    })
-  }
-
-  def startOffsetManager {
-    info("Registering task instances with offsets.")
-
-    taskInstances.values.foreach(_.registerOffsets)
-
-    info("Starting offset manager.")
-
-    offsetManager.start
-  }
-
-  def startLocalityManager {
-    if(localityManager != null) {
-      info("Registering localityManager for the container")
-      localityManager.start
-      localityManager.register(String.valueOf(containerContext.id))
-
-      info("Writing container locality and JMX address to Coordinator Stream")
-      try {
-        val hostInet = Util.getLocalHost
-        val jmxUrl = if (jmxServer != null) jmxServer.getJmxUrl else ""
-        val jmxTunnelingUrl = if (jmxServer != null) 
jmxServer.getTunnelingJmxUrl else ""
-        localityManager.writeContainerToHostMapping(containerContext.id, 
hostInet.getHostName, jmxUrl, jmxTunnelingUrl)
-      } catch {
-        case uhe: UnknownHostException =>
-          warn("Received UnknownHostException when persisting locality info 
for container %d: %s" format (containerContext.id, uhe.getMessage))  //No-op
-        case unknownException: Throwable =>
-          warn("Received an exception when persisting locality info for 
container %d: %s" format (containerContext.id, unknownException.getMessage))
-      }
-    }
-  }
-
-  def startStores {
-    info("Starting task instance stores.")
-    taskInstances.values.foreach(taskInstance => {
-      val startTime = System.currentTimeMillis()
-      taskInstance.startStores
-      // Measuring the time to restore the stores
-      val timeToRestore = System.currentTimeMillis() - startTime
-      val taskGauge = 
metrics.taskStoreRestorationMetrics.getOrElse(taskInstance.taskName, null)
-      if (taskGauge != null) {
-        taskGauge.set(timeToRestore)
-      }
-    })
-  }
-
-  def startTask {
-    info("Initializing stream tasks.")
-
-    taskInstances.values.foreach(_.initTask)
-  }
-
-  def startProducers {
-    info("Registering task instances with producers.")
-
-    taskInstances.values.foreach(_.registerProducers)
-
-    info("Starting producer multiplexer.")
-
-    producerMultiplexer.start
-  }
-
-  def startConsumers {
-    info("Registering task instances with consumers.")
-
-    taskInstances.values.foreach(_.registerConsumers)
-
-    info("Starting consumer multiplexer.")
-
-    consumerMultiplexer.start
-  }
-
-  def startSecurityManger: Unit = {
-    if (securityManager != null) {
-      info("Starting security manager.")
-
-      securityManager.start
-    }
-  }
-
-  def shutdownConsumers {
-    info("Shutting down consumer multiplexer.")
-
-    consumerMultiplexer.stop
-  }
-
-  def shutdownProducers {
-    info("Shutting down producer multiplexer.")
-
-    producerMultiplexer.stop
-  }
-
-  def shutdownTask {
-    info("Shutting down task instance stream tasks.")
-
-    taskInstances.values.foreach(_.shutdownTask)
-  }
-
-  def shutdownStores {
-    info("Shutting down task instance stores.")
-
-    taskInstances.values.foreach(_.shutdownStores)
-  }
-
-  def shutdownLocalityManager {
-    if(localityManager != null) {
-      info("Shutting down locality manager.")
-      localityManager.stop
-    }
-  }
-
-  def shutdownOffsetManager {
-    info("Shutting down offset manager.")
-
-    offsetManager.stop
-  }
-
-
-  def shutdownMetrics {
-    info("Shutting down metrics reporters.")
-
-    reporters.values.foreach(_.stop)
-
-    if (jvm != null) {
-      info("Shutting down JVM metrics.")
-
-      jvm.stop
-    }
-  }
-
-  def shutdownSecurityManger: Unit = {
-    if (securityManager != null) {
-      info("Shutting down security manager.")
-
-      securityManager.stop
-    }
-  }
-
-  def shutdownDiskSpaceMonitor: Unit = {
-    if (diskSpaceMonitor != null) {
-      info("Shutting down disk space monitor.")
-      diskSpaceMonitor.stop()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/38e81c0f/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
index 2a3535e..0b7bcdd 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
@@ -21,13 +21,13 @@ package org.apache.samza.storage
 
 import java.io._
 import java.util
-import scala.collection.{JavaConversions, Map}
-import org.apache.samza.util.Logging
-import org.apache.samza.Partition
-import org.apache.samza.system._
-import org.apache.samza.util.Util
-import org.apache.samza.SamzaException
+
+import org.apache.samza.{Partition, SamzaException}
 import org.apache.samza.container.TaskName
+import org.apache.samza.system._
+import org.apache.samza.util.{Logging, Util}
+
+import scala.collection.{JavaConversions, Map}
 
 object TaskStorageManager {
   def getStoreDir(storeBaseDir: File, storeName: String) = {
@@ -55,7 +55,13 @@ class TaskStorageManager(
   partition: Partition,
   systemAdmins: Map[String, SystemAdmin]) extends Logging {
 
-  var taskStoresToRestore = taskStores
+  var taskStoresToRestore = taskStores.filter{
+    case (storeName, storageEngine) => 
storageEngine.getStoreProperties.isLoggedStore
+  }
+  val persistedStores = taskStores.filter{
+    case (storeName, storageEngine) => 
storageEngine.getStoreProperties.isPersistedToDisk
+  }
+
   var changeLogOldestOffsets: Map[SystemStream, String] = Map()
   val fileOffset: util.Map[SystemStreamPartition, String] = new 
util.HashMap[SystemStreamPartition, String]()
   val offsetFileName = "OFFSET"
@@ -63,29 +69,15 @@ class TaskStorageManager(
   def apply(storageEngineName: String) = taskStores(storageEngineName)
 
   def init {
-    cleanBaseDirs
-    setupBaseDirs
-    validateChangelogStreams
-    startConsumers
-    restoreStores
-    stopConsumers
+    cleanBaseDirs()
+    setupBaseDirs()
+    validateChangelogStreams()
+    startConsumers()
+    restoreStores()
+    stopConsumers()
   }
 
-  private def setupBaseDirs {
-    debug("Setting up base directories for stores.")
-
-    val loggedStores = changeLogSystemStreams.keySet
-
-    (taskStores.keySet -- loggedStores)
-      .foreach(TaskStorageManager.getStorePartitionDir(storeBaseDir, _, 
taskName).mkdirs)
-
-    loggedStores.foreach(storeName => {
-      val loggedStoragePartitionDir = 
TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName)
-      if(!loggedStoragePartitionDir.exists()) loggedStoragePartitionDir.mkdirs
-    })
-  }
-
-  private def cleanBaseDirs {
+  private def cleanBaseDirs() {
     debug("Cleaning base directories for stores.")
 
     taskStores.keys.foreach(storeName => {
@@ -99,14 +91,27 @@ class TaskStorageManager(
 
       val loggedStoragePartitionDir = 
TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName)
       info("Got logged storage partition directory as %s" format 
loggedStoragePartitionDir.toPath.toString)
-
       // If we find valid offsets s.t. we can restore the state, keep the disk 
files. Otherwise, delete them.
-      if(!readOffsetFile(storeName, loggedStoragePartitionDir) && 
loggedStoragePartitionDir.exists()) {
-          Util.rm(loggedStoragePartitionDir)
+      if (!persistedStores.contains(storeName) ||
+        (loggedStoragePartitionDir.exists() && !readOffsetFile(storeName, 
loggedStoragePartitionDir))) {
+        Util.rm(loggedStoragePartitionDir)
       }
     })
   }
 
+  private def setupBaseDirs() {
+    debug("Setting up base directories for stores.")
+    taskStores.foreach {
+      case (storeName, storageEngine) =>
+        if (storageEngine.getStoreProperties.isLoggedStore) {
+          val loggedStoragePartitionDir = 
TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName)
+          if (!loggedStoragePartitionDir.exists()) 
loggedStoragePartitionDir.mkdirs()
+        } else {
+          TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, 
taskName).mkdirs()
+        }
+    }
+  }
+
   /**
     * Attempts to read the offset file and returns {@code true} if the offsets 
were read successfully.
     *
@@ -130,7 +135,7 @@ class TaskStorageManager(
     offsetsRead
   }
 
-  private def validateChangelogStreams = {
+  private def validateChangelogStreams() = {
     info("Validating change log streams")
 
     for ((storeName, systemStream) <- changeLogSystemStreams) {
@@ -147,7 +152,7 @@ class TaskStorageManager(
     info("Assigning oldest change log offsets for taskName %s: %s" format 
(taskName, changeLogOldestOffsets))
   }
 
-  private def startConsumers {
+  private def startConsumers() {
     debug("Starting consumers for stores.")
 
     for ((storeName, systemStream) <- changeLogSystemStreams) {
@@ -170,7 +175,7 @@ class TaskStorageManager(
     storeConsumers.values.foreach(_.start)
   }
 
-  private def restoreStores {
+  private def restoreStores() {
     debug("Restoring stores.")
 
     for ((storeName, store) <- taskStoresToRestore) {
@@ -184,7 +189,7 @@ class TaskStorageManager(
     }
   }
 
-  private def stopConsumers {
+  private def stopConsumers() {
     debug("Stopping consumers for stores.")
 
     storeConsumers.values.foreach(_.stop)
@@ -219,7 +224,7 @@ class TaskStorageManager(
   private def flushChangelogOffsetFiles() {
     debug("Persisting logged key value stores")
 
-    for ((storeName, systemStream) <- changeLogSystemStreams) {
+    for ((storeName, systemStream) <- 
changeLogSystemStreams.filterKeys(storeName => 
persistedStores.contains(storeName))) {
       val systemAdmin = systemAdmins
               .getOrElse(systemStream.getSystem,
                          throw new SamzaException("Unable to get systemAdmin 
for store " + storeName + " and systemStream" + systemStream))

http://git-wip-us.apache.org/repos/asf/samza/blob/38e81c0f/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java 
b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java
index b90ea87..4f71a54 100644
--- a/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java
+++ b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java
@@ -36,11 +36,13 @@ public class MockStorageEngine implements StorageEngine {
   public static File storeDir;
   public static SystemStreamPartition ssp;
   public static ArrayList<IncomingMessageEnvelope> incomingMessageEnvelopes = 
new ArrayList<IncomingMessageEnvelope>();
+  public static StoreProperties storeProperties;
 
-  public MockStorageEngine(String storeName, File storeDir, 
SystemStreamPartition changeLogSystemStreamPartition) {
+  public MockStorageEngine(String storeName, File storeDir, 
SystemStreamPartition changeLogSystemStreamPartition, StoreProperties 
properties) {
     MockStorageEngine.storeName = storeName;
     MockStorageEngine.storeDir = storeDir;
     MockStorageEngine.ssp = changeLogSystemStreamPartition;
+    MockStorageEngine.storeProperties = properties;
   }
 
   @Override
@@ -57,4 +59,9 @@ public class MockStorageEngine implements StorageEngine {
   @Override
   public void stop() {
   }
+
+  @Override
+  public StoreProperties getStoreProperties() {
+    return storeProperties;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/38e81c0f/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java
 
b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java
index c00c454..d483ae6 100644
--- 
a/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java
+++ 
b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java
@@ -32,6 +32,7 @@ public class MockStorageEngineFactory implements 
StorageEngineFactory<Object, Ob
   public StorageEngine getStorageEngine(String storeName, File storeDir, 
Serde<Object> keySerde, Serde<Object> msgSerde,
       MessageCollector collector, MetricsRegistry registry, 
SystemStreamPartition changeLogSystemStreamPartition,
       SamzaContainerContext containerContext) {
-    return new MockStorageEngine(storeName, storeDir, 
changeLogSystemStreamPartition);
+    StoreProperties storeProperties = new 
StoreProperties.StorePropertiesBuilder().setLoggedStore(true).build();
+    return new MockStorageEngine(storeName, storeDir, 
changeLogSystemStreamPartition, storeProperties);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/38e81c0f/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java 
b/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java
index 13f4fa9..21d0150 100644
--- a/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java
+++ b/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java
@@ -62,7 +62,7 @@ public class TestStorageRecovery {
 
     Set<String> set1 = new HashSet<String>(Arrays.asList(SYSTEM_STREAM_NAME));
     Set<String> set2 = new HashSet<String>(Arrays.asList(INPUT_STREAM));
-    HashMap<String, SystemStreamMetadata> ssmMap = new HashMap<String, 
SystemStreamMetadata>();
+    HashMap<String, SystemStreamMetadata> ssmMap = new HashMap<>();
     ssmMap.put(SYSTEM_STREAM_NAME, systemStreamMetadata);
     ssmMap.put(INPUT_STREAM, inputSystemStreamMetadata);
     when(systemAdmin.getSystemStreamMetadata(set1)).thenReturn(ssmMap);

http://git-wip-us.apache.org/repos/asf/samza/blob/38e81c0f/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
 
b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
index e126481..4d40f52 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
@@ -21,18 +21,23 @@ package org.apache.samza.storage
 
 
 import java.io.File
-import org.junit.{After, Before, Test}
-import org.junit.Assert._
-import org.scalatest.mock.MockitoSugar
-import org.mockito.Mockito._
-import org.mockito.Matchers._
-import scala.collection.JavaConversions
+import java.util
 
+import org.apache.samza.Partition
 import org.apache.samza.container.TaskName
-import org.apache.samza.util.Util
-import org.apache.samza.system._
+import org.apache.samza.storage.StoreProperties.StorePropertiesBuilder
 import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import org.apache.samza.Partition
+import org.apache.samza.system._
+import org.apache.samza.util.Util
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.scalatest.mock.MockitoSugar
+
+import scala.collection.JavaConversions
 
 class TestTaskStorageManager extends MockitoSugar {
 
@@ -54,16 +59,221 @@ class TestTaskStorageManager extends MockitoSugar {
     Util.rm(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir)
   }
 
+  /**
+   * This tests the entire TaskStorageManager lifecycle for a Persisted Logged 
Store
+   * For example, a RocksDb store with changelog needs to continuously update 
the offset file on flush & stop
+   * When the task is restarted, it should restore correctly from the offset 
in the OFFSET file on disk (if available)
+   */
+  @Test
+  def testStoreLifecycleForLoggedPersistedStore(): Unit = {
+    // Basic test setup of SystemStream, SystemStreamPartition for this task
+    val ss = new SystemStream("kafka", "testStream")
+    val partition = new Partition(0)
+    val ssp = new SystemStreamPartition(ss, partition)
+    val storeDirectory = 
TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName)
+    val storeFile = new File(storeDirectory, "store.sst")
+    val offsetFile = new File(storeDirectory, "OFFSET")
+
+    // getStoreProperties should always return the same StoreProperties
+    val mockStorageEngine = mock[StorageEngine]
+    when(mockStorageEngine.getStoreProperties).thenAnswer(new 
Answer[StoreProperties] {
+      override def answer(invocation: InvocationOnMock): StoreProperties = {
+        new 
StorePropertiesBuilder().setLoggedStore(true).setPersistedToDisk(true).build()
+      }
+    })
+    // Restore simply creates the file
+    when(mockStorageEngine.restore(any())).thenAnswer(new Answer[Unit] {
+      override def answer(invocation: InvocationOnMock): Unit = {
+        storeFile.createNewFile()
+      }
+    })
+
+    // Mock for StreamMetadataCache, SystemConsumer, SystemAdmin
+    val mockStreamMetadataCache = mock[StreamMetadataCache]
+    val mockSystemConsumer = mock[SystemConsumer]
+    val mockSystemAdmin = mock[SystemAdmin]
+    doNothing().when(mockSystemAdmin).validateChangelogStream("testStream", 1)
+    var registerOffset = "0"
+    when(mockSystemConsumer.register(any(), any())).thenAnswer(new 
Answer[Unit] {
+      override def answer(invocation: InvocationOnMock): Unit = {
+        val args = invocation.getArguments
+        if (ssp.equals(args.apply(0).asInstanceOf[SystemStreamPartition])) {
+          val offset = args.apply(1).asInstanceOf[String]
+          assertNotNull(offset)
+          assertEquals(registerOffset, offset)
+        }
+      }
+    })
+    doNothing().when(mockSystemConsumer).stop()
+
+    // Test 1: Initial invocation - No store on disk (only changelog has data)
+    // Setup initial sspMetadata
+    val sspMetadata = new SystemStreamPartitionMetadata("0", "50", "51")
+    var metadata = new SystemStreamMetadata("testStream", new 
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
+      {
+        put(partition, sspMetadata)
+      }
+    })
+    when(mockStreamMetadataCache.getStreamMetadata(any(), 
any())).thenReturn(Map(ss -> metadata))
+    when(mockSystemAdmin.getSystemStreamMetadata(any())).thenReturn(new 
util.HashMap[String, SystemStreamMetadata](){
+      {
+        put("testStream", metadata)
+      }
+    })
+    val taskManager = new TaskStorageManagerBuilder()
+      .addStore(loggedStore, mockStorageEngine, mockSystemConsumer)
+      .setStreamMetadataCache(mockStreamMetadataCache)
+      .setSystemAdmin("kafka", mockSystemAdmin)
+      .build
+
+
+    taskManager.init
+
+    assertTrue(storeFile.exists())
+    assertFalse(offsetFile.exists())
+
+    // Test 2: flush should update the offset file
+    taskManager.flush()
+    assertTrue(offsetFile.exists())
+    assertEquals("50", Util.readDataFromFile(offsetFile))
+
+    // Test 3: Update sspMetadata before shutdown and verify that offset file 
is updated correctly
+    metadata = new SystemStreamMetadata("testStream", new 
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
+      {
+        put(partition, new SystemStreamPartitionMetadata("0", "100", "101"))
+      }
+    })
+    when(mockStreamMetadataCache.getStreamMetadata(any(), 
any())).thenReturn(Map(ss -> metadata))
+    when(mockSystemAdmin.getSystemStreamMetadata(any())).thenReturn(new 
util.HashMap[String, SystemStreamMetadata](){
+      {
+        put("testStream", metadata)
+      }
+    })
+    taskManager.stop()
+    assertTrue(storeFile.exists())
+    assertTrue(offsetFile.exists())
+    assertEquals("100", Util.readDataFromFile(offsetFile))
+
+
+    // Test 4: Initialize again with an updated sspMetadata; Verify that it 
restores from the correct offset
+    metadata = new SystemStreamMetadata("testStream", new 
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
+      {
+        put(partition, new SystemStreamPartitionMetadata("0", "150", "151"))
+      }
+    })
+    when(mockStreamMetadataCache.getStreamMetadata(any(), 
any())).thenReturn(Map(ss -> metadata))
+    when(mockSystemAdmin.getSystemStreamMetadata(any())).thenReturn(new 
util.HashMap[String, SystemStreamMetadata](){
+      {
+        put("testStream", metadata)
+      }
+    })
+    registerOffset = "100"
+
+    taskManager.init
+
+    assertTrue(storeFile.exists())
+    assertTrue(offsetFile.exists())
+  }
+
+  /**
+   * This tests the entire TaskStorageManager lifecycle for an InMemory Logged 
Store
+   * For example, an InMemory KV store with changelog should not update the 
offset file on flush & stop
+   * When the task is restarted, it should ALWAYS restore correctly from the 
earliest offset
+   */
+  @Test
+  def testStoreLifecycleForLoggedInMemoryStore(): Unit = {
+    // Basic test setup of SystemStream, SystemStreamPartition for this task
+    val ss = new SystemStream("kafka", "testStream")
+    val partition = new Partition(0)
+    val ssp = new SystemStreamPartition(ss, partition)
+    val storeDirectory = 
TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultStoreBaseDir,
 store, taskName)
+
+    // getStoreProperties should always return the same StoreProperties
+    val mockStorageEngine = mock[StorageEngine]
+    when(mockStorageEngine.getStoreProperties).thenAnswer(new 
Answer[StoreProperties] {
+      override def answer(invocation: InvocationOnMock): StoreProperties = {
+        new 
StorePropertiesBuilder().setLoggedStore(true).setPersistedToDisk(false).build()
+      }
+    })
+    // Restore simply creates the file
+    doNothing().when(mockStorageEngine).restore(any())
+
+    // Mock for StreamMetadataCache, SystemConsumer, SystemAdmin
+    val mockStreamMetadataCache = mock[StreamMetadataCache]
+    val mockSystemAdmin = mock[SystemAdmin]
+    doNothing().when(mockSystemAdmin).validateChangelogStream("testStream", 1)
+
+    val mockSystemConsumer = mock[SystemConsumer]
+    when(mockSystemConsumer.register(any(), any())).thenAnswer(new 
Answer[Unit] {
+      override def answer(invocation: InvocationOnMock): Unit = {
+        val args = invocation.getArguments
+        if (ssp.equals(args.apply(0).asInstanceOf[SystemStreamPartition])) {
+          val offset = args.apply(1).asInstanceOf[String]
+          assertNotNull(offset)
+          assertEquals("0", offset) // Should always restore from earliest 
offset
+        }
+      }
+    })
+    doNothing().when(mockSystemConsumer).stop()
+
+    // Test 1: Initial invocation - No store data (only changelog has data)
+    // Setup initial sspMetadata
+    val sspMetadata = new SystemStreamPartitionMetadata("0", "50", "51")
+    var metadata = new SystemStreamMetadata("testStream", new 
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
+      {
+        put(partition, sspMetadata)
+      }
+    })
+    when(mockStreamMetadataCache.getStreamMetadata(any(), 
any())).thenReturn(Map(ss -> metadata))
+    val taskManager = new TaskStorageManagerBuilder()
+      .addStore(store, mockStorageEngine, mockSystemConsumer)
+      .setStreamMetadataCache(mockStreamMetadataCache)
+      .setSystemAdmin("kafka", mockSystemAdmin)
+      .build
+
+
+    taskManager.init
+
+    // Verify that the store directory doesn't have ANY files
+    assertNull(storeDirectory.listFiles())
+
+    // Test 2: flush should NOT create/update the offset file. Store directory 
has no files
+    taskManager.flush()
+    assertNull(storeDirectory.listFiles())
+
+    // Test 3: Update sspMetadata before shutdown and verify that offset file 
is NOT created
+    metadata = new SystemStreamMetadata("testStream", new 
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
+      {
+        put(partition, new SystemStreamPartitionMetadata("0", "100", "101"))
+      }
+    })
+    when(mockStreamMetadataCache.getStreamMetadata(any(), 
any())).thenReturn(Map(ss -> metadata))
+    taskManager.stop()
+    assertNull(storeDirectory.listFiles())
+
+    // Test 4: Initialize again with an updated sspMetadata; Verify that it 
restores from the earliest offset
+    metadata = new SystemStreamMetadata("testStream", new 
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
+      {
+        put(partition, new SystemStreamPartitionMetadata("0", "150", "151"))
+      }
+    })
+    when(mockStreamMetadataCache.getStreamMetadata(any(), 
any())).thenReturn(Map(ss -> metadata))
+
+    taskManager.init
+
+    assertNull(storeDirectory.listFiles())
+  }
+
   @Test
-  def testCleanBaseDirs() {
+  def testStoreDirsWithoutOffsetFileAreDeletedInCleanBaseDirs() {
     val checkFilePath1 = new 
File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultStoreBaseDir,
 store, taskName), "check")
     checkFilePath1.createNewFile()
     val checkFilePath2 = new 
File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName), "check")
     checkFilePath2.createNewFile()
 
     val taskStorageManager = new TaskStorageManagerBuilder()
-      .addStore(store)
-      .addStore(loggedStore)
+      .addStore(store, false)
+      .addStore(loggedStore, true)
       .build
 
     //Invoke test method
@@ -79,12 +289,12 @@ class TestTaskStorageManager extends MockitoSugar {
   }
 
   @Test
-  def testCleanBaseDirsWithOffsetFileForLoggedStore() {
+  def testLoggedStoreDirsWithOffsetFileAreNotDeletedInCleanBaseDirs() {
     val offsetFilePath = new 
File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName), "OFFSET")
     Util.writeDataToFile(offsetFilePath, "100")
 
     val taskStorageManager = new TaskStorageManagerBuilder()
-      .addStore(loggedStore)
+      .addStore(loggedStore, true)
       .build
 
     val cleanDirMethod = 
taskStorageManager.getClass.getDeclaredMethod("cleanBaseDirs",
@@ -97,6 +307,23 @@ class TestTaskStorageManager extends MockitoSugar {
   }
 
   @Test
+  def testOffsetFileIsRemovedInCleanBaseDirsForInMemoryLoggedStore() {
+    val offsetFilePath = new 
File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName), "OFFSET")
+    Util.writeDataToFile(offsetFilePath, "100")
+
+    val taskStorageManager = new TaskStorageManagerBuilder()
+      .addStore(loggedStore, false)
+      .build
+
+    val cleanDirMethod = 
taskStorageManager.getClass.getDeclaredMethod("cleanBaseDirs",
+      new Array[java.lang.Class[_]](0):_*)
+    cleanDirMethod.setAccessible(true)
+    cleanDirMethod.invoke(taskStorageManager, new Array[Object](0):_*)
+
+    assertFalse("Offset file was not removed. Clean up failed!", 
offsetFilePath.exists())
+  }
+
+  @Test
   def testStopCreatesOffsetFileForLoggedStore() {
     val partition = new Partition(0)
 
@@ -109,7 +336,7 @@ class TestTaskStorageManager extends MockitoSugar {
 
     //Build TaskStorageManager
     val taskStorageManager = new TaskStorageManagerBuilder()
-      .addStore(loggedStore)
+      .addStore(loggedStore, true)
       .setSystemAdmin("kafka", mockSystemAdmin)
       .setPartition(partition)
       .build
@@ -131,6 +358,9 @@ class TestTaskStorageManager extends MockitoSugar {
     val partition = new Partition(0)
 
     val offsetFilePath = new 
File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName) + File.separator + "OFFSET")
+    val anotherOffsetPath = new File(
+      TaskStorageManager.getStorePartitionDir(
+        TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, store, taskName) 
+ File.separator + "OFFSET")
 
     val mockSystemAdmin = mock[SystemAdmin]
     val mockSspMetadata = Map("testStream" -> new 
SystemStreamMetadata("testStream" , JavaConversions.mapAsJavaMap[Partition, 
SystemStreamPartitionMetadata](Map(partition -> new 
SystemStreamPartitionMetadata("20", "100", "101")))))
@@ -139,7 +369,8 @@ class TestTaskStorageManager extends MockitoSugar {
 
     //Build TaskStorageManager
     val taskStorageManager = new TaskStorageManagerBuilder()
-            .addStore(loggedStore)
+            .addStore(loggedStore, true)
+            .addStore(store, false)
             .setSystemAdmin("kafka", mockSystemAdmin)
             .setPartition(partition)
             .build
@@ -150,6 +381,8 @@ class TestTaskStorageManager extends MockitoSugar {
     //Check conditions
     assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
     assertEquals("Found incorrect value in offset file!", "100", 
Util.readDataFromFile(offsetFilePath))
+
+    assertTrue("Offset file got created for a store that is not persisted to 
the disk!!", !anotherOffsetPath.exists())
   }
 
   /**
@@ -167,7 +400,7 @@ class TestTaskStorageManager extends MockitoSugar {
 
     //Build TaskStorageManager
     val taskStorageManager = new TaskStorageManagerBuilder()
-            .addStore(loggedStore)
+            .addStore(loggedStore, true)
             .setSystemAdmin("kafka", mockSystemAdmin)
             .setPartition(partition)
             .build
@@ -200,7 +433,7 @@ class TestTaskStorageManager extends MockitoSugar {
 
     //Build TaskStorageManager
     val taskStorageManager = new TaskStorageManagerBuilder()
-            .addStore(loggedStore)
+            .addStore(loggedStore, true)
             .setSystemAdmin("kafka", mockSystemAdmin)
             .setPartition(partition)
             .build
@@ -226,11 +459,6 @@ class TestTaskStorageManager extends MockitoSugar {
   }
 
   @Test
-  def testFlushOffsetFileExceptionsHandledGracefully(): Unit = {
-
-  }
-
-  @Test
   def testStopShouldNotCreateOffsetFileForEmptyStore() {
     val partition = new Partition(0)
 
@@ -243,7 +471,7 @@ class TestTaskStorageManager extends MockitoSugar {
 
     //Build TaskStorageManager
     val taskStorageManager = new TaskStorageManagerBuilder()
-      .addStore(loggedStore)
+      .addStore(loggedStore, true)
       .setSystemAdmin("kafka", mockSystemAdmin)
       .setPartition(partition)
       .build
@@ -265,7 +493,7 @@ class TaskStorageManagerBuilder extends MockitoSugar {
   var taskStores: Map[String, StorageEngine] = Map()
   var storeConsumers: Map[String, SystemConsumer] = Map()
   var changeLogSystemStreams: Map[String, SystemStream] = Map()
-  val streamMetadataCache = mock[StreamMetadataCache]
+  var streamMetadataCache = mock[StreamMetadataCache]
   var partition: Partition = new Partition(0)
   var systemAdmins: Map[String, SystemAdmin] = Map("kafka" -> 
mock[SystemAdmin])
   var taskName: TaskName = new TaskName("testTask")
@@ -273,8 +501,20 @@ class TaskStorageManagerBuilder extends MockitoSugar {
   var loggedStoreBaseDir: File =  
TaskStorageManagerBuilder.defaultLoggedStoreBaseDir
   var changeLogStreamPartitions: Int = 1
 
-  def addStore(storeName: String): TaskStorageManagerBuilder =  {
-    taskStores = taskStores ++ Map(storeName -> mock[StorageEngine])
+  def addStore(storeName: String, storageEngine: StorageEngine, 
systemConsumer: SystemConsumer): TaskStorageManagerBuilder = {
+    taskStores = taskStores ++ Map(storeName -> storageEngine)
+    storeConsumers = storeConsumers ++ Map(storeName -> systemConsumer)
+    changeLogSystemStreams = changeLogSystemStreams ++ Map(storeName -> new 
SystemStream("kafka", "testStream"))
+    this
+  }
+
+  def addStore(storeName: String, isPersistedToDisk: Boolean): 
TaskStorageManagerBuilder =  {
+    taskStores = taskStores ++ {
+      val mockStorageEngine = mock[StorageEngine]
+      when(mockStorageEngine.getStoreProperties)
+        .thenReturn(new 
StorePropertiesBuilder().setPersistedToDisk(isPersistedToDisk).setLoggedStore(false).build())
+      Map(storeName -> mockStorageEngine)
+    }
     storeConsumers = storeConsumers ++ Map(storeName -> mock[SystemConsumer])
     changeLogSystemStreams = changeLogSystemStreams ++ Map(storeName -> new 
SystemStream("kafka", "testStream"))
     this
@@ -300,6 +540,11 @@ class TaskStorageManagerBuilder extends MockitoSugar {
     this
   }
 
+  def setStreamMetadataCache(metadataCache: StreamMetadataCache) = {
+    streamMetadataCache = metadataCache
+    this
+  }
+
   def build: TaskStorageManager = {
     new TaskStorageManager(
       taskName = taskName,
@@ -314,4 +559,4 @@ class TaskStorageManagerBuilder extends MockitoSugar {
       systemAdmins = systemAdmins
     )
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/38e81c0f/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
 
b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
index dae6e35..a7b748f 100644
--- 
a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
+++ 
b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
@@ -22,13 +22,11 @@ package org.apache.samza.storage.kv
 import java.io.File
 import org.apache.samza.container.SamzaContainerContext
 import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.storage.kv._
 import org.apache.samza.system.SystemStreamPartition
 import org.rocksdb.{FlushOptions, WriteOptions}
 import org.apache.samza.config.StorageConfig._
 
-class RocksDbKeyValueStorageEngineFactory [K, V] extends 
BaseKeyValueStorageEngineFactory[K, V]
-{
+class RocksDbKeyValueStorageEngineFactory [K, V] extends 
BaseKeyValueStorageEngineFactory[K, V] {
   /**
    * Return a KeyValueStore instance for the given store name
    * @param storeName Name of the store
@@ -49,7 +47,15 @@ class RocksDbKeyValueStorageEngineFactory [K, V] extends 
BaseKeyValueStorageEngi
     val rocksDbOptions = RocksDbOptionsHelper.options(storageConfig, 
containerContext)
     val rocksDbWriteOptions = new WriteOptions().setDisableWAL(true)
     val rocksDbFlushOptions = new FlushOptions().setWaitForFlush(true)
-    val rocksDb = new RocksDbKeyValueStore(storeDir, rocksDbOptions, 
storageConfig, isLoggedStore, storeName, rocksDbWriteOptions, 
rocksDbFlushOptions, rocksDbMetrics)
+    val rocksDb = new RocksDbKeyValueStore(
+      storeDir,
+      rocksDbOptions,
+      storageConfig,
+      isLoggedStore,
+      storeName,
+      rocksDbWriteOptions,
+      rocksDbFlushOptions,
+      rocksDbMetrics)
     rocksDb
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/38e81c0f/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
 
b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index 38c8fa0..9b9b1f6 100644
--- 
a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ 
b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -23,9 +23,8 @@ import java.io.File
 import org.apache.samza.SamzaException
 import org.apache.samza.util.{ LexicographicComparator, Logging }
 import org.apache.samza.config.Config
-import org.apache.samza.container.SamzaContainerContext
 import org.rocksdb._
-import org.rocksdb.TtlDB;
+import org.rocksdb.TtlDB
 
 object RocksDbKeyValueStore extends Logging {
 

http://git-wip-us.apache.org/repos/asf/samza/blob/38e81c0f/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
index 391cf89..c975893 100644
--- 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
+++ 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
@@ -25,7 +25,7 @@ import org.apache.samza.SamzaException
 import org.apache.samza.container.SamzaContainerContext
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.serializers.Serde
-import org.apache.samza.storage.{StorageEngine, StorageEngineFactory}
+import org.apache.samza.storage.{StoreProperties, StorageEngine, 
StorageEngineFactory}
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.task.MessageCollector
 
@@ -37,6 +37,8 @@ import org.apache.samza.task.MessageCollector
  */
 trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, 
V] {
 
+  private val INMEMORY_KV_STORAGE_ENGINE_FACTORY =
+    "org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory"
   /**
    * Return a KeyValueStore instance for the given store name,
    * which will be used as the underlying raw store
@@ -74,8 +76,17 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends 
StorageEngineFactory[K, V]
                         registry: MetricsRegistry,
                         changeLogSystemStreamPartition: SystemStreamPartition,
                         containerContext: SamzaContainerContext): 
StorageEngine = {
-
     val storageConfig = containerContext.config.subset("stores." + storeName + 
".", true)
+    val storeFactory = storageConfig.get("factory")
+    var storePropertiesBuilder = new StoreProperties.StorePropertiesBuilder()
+
+    if (storeFactory == null) {
+      throw new SamzaException("Store factory not defined. Cannot proceed with 
KV store creation!")
+    }
+    if (!storeFactory.equals(INMEMORY_KV_STORAGE_ENGINE_FACTORY)) {
+      storePropertiesBuilder = storePropertiesBuilder.setPersistedToDisk(true)
+    }
+
     val batchSize = storageConfig.getInt("write.batch.size", 500)
     val cacheSize = storageConfig.getInt("object.cache.size", 
math.max(batchSize, 1000))
     val enableCache = cacheSize > 0
@@ -99,6 +110,7 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends 
StorageEngineFactory[K, V]
       rawStore
     } else {
       val loggedStoreMetrics = new LoggedStoreMetrics(storeName, registry)
+      storePropertiesBuilder = storePropertiesBuilder.setLoggedStore(true)
       new LoggedStore(rawStore, changeLogSystemStreamPartition, collector, 
loggedStoreMetrics)
     }
 
@@ -120,7 +132,7 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends 
StorageEngineFactory[K, V]
     // create the storage engine and return
     // TODO: Decide if we should use raw bytes when restoring
     val keyValueStorageEngineMetrics = new 
KeyValueStorageEngineMetrics(storeName, registry)
-    new KeyValueStorageEngine(nullSafeStore, rawStore, 
keyValueStorageEngineMetrics, batchSize)
+    new KeyValueStorageEngine(storePropertiesBuilder.build(), nullSafeStore, 
rawStore, keyValueStorageEngineMetrics, batchSize)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/38e81c0f/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
index e5a66a4..a3ffc42 100644
--- 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
+++ 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
@@ -20,7 +20,7 @@
 package org.apache.samza.storage.kv
 
 import org.apache.samza.util.Logging
-import org.apache.samza.storage.StorageEngine
+import org.apache.samza.storage.{StoreProperties, StorageEngine}
 import org.apache.samza.system.IncomingMessageEnvelope
 
 import scala.collection.JavaConversions._
@@ -31,6 +31,7 @@ import scala.collection.JavaConversions._
  * This implements both the key/value interface and the storage engine 
interface.
  */
 class KeyValueStorageEngine[K, V](
+  storeProperties: StoreProperties,
   wrapperStore: KeyValueStore[K, V],
   rawStore: KeyValueStore[Array[Byte], Array[Byte]],
   metrics: KeyValueStorageEngineMetrics = new KeyValueStorageEngineMetrics,
@@ -135,4 +136,6 @@ class KeyValueStorageEngine[K, V](
     flush()
     wrapperStore.close()
   }
+
+  override def getStoreProperties: StoreProperties = storeProperties
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/38e81c0f/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
 
b/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
index 23f8a1a..fd4e762 100644
--- 
a/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
+++ 
b/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
@@ -20,19 +20,16 @@
 package org.apache.samza.storage.kv
 
 import java.io.File
-import java.util.Arrays
-import java.util.Random
+import java.util.{Arrays, Random}
 
-import org.apache.samza.config.{MapConfig, StorageConfig}
+import org.apache.samza.config.MapConfig
 import org.apache.samza.serializers.Serde
 import org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStore
-import org.junit.After
 import org.junit.Assert._
-import org.junit.Before
-import org.junit.Test
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.runners.Parameterized.Parameters
+import org.junit.{After, Before, Test}
 import org.scalatest.Assertions.intercept
 
 import scala.collection.JavaConversions._
@@ -59,13 +56,14 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: 
String) {
       case "inmemory" =>
         new InMemoryKeyValueStore
       case "rocksdb" =>
-        new RocksDbKeyValueStore (dir,
-                                  new org.rocksdb.Options()
-                                  .setCreateIfMissing(true)
-                                  
.setCompressionType(org.rocksdb.CompressionType.SNAPPY_COMPRESSION),
-                                  new MapConfig(),
-                                  false,
-                                  "someStore")
+        new RocksDbKeyValueStore (
+          dir,
+          new org.rocksdb.Options()
+            .setCreateIfMissing(true)
+            
.setCompressionType(org.rocksdb.CompressionType.SNAPPY_COMPRESSION),
+          new MapConfig(),
+          false,
+          "someStore")
       case _ =>
         throw new IllegalArgumentException("Type of store undefined: " + 
typeOfStore)
     }

Reply via email to