This is an automated email from the ASF dual-hosted git repository.

atoomula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 320359f  SEP-19 : Refactoring sideInputs from SamzaContainer to 
ContainerStorageManager (#912)
320359f is described below

commit 320359fe4aad6210f2eeb91d30e3a5117929ac4a
Author: rmatharu <[email protected]>
AuthorDate: Fri Mar 1 10:13:13 2019 -0800

    SEP-19 : Refactoring sideInputs from SamzaContainer to 
ContainerStorageManager (#912)
    
    * Rocksdb bug fix
    
    * Moving TaskSideInputStorageManagers in CSM
    
    * Adding basic side input refactor. Consumption logic works from inside CSM.
    
    * Cleaning up sideInput references from TaskInstances and SamzaContainer
    
    * minor
    
    * Adding periodic flush logic to CSM for sideInputs
    
    * Removing sideInputs from taskInputs
    
    * fixing test
    
    * test fix
    
    * test fix
    
    * Code simplifications
    
    * checkstyle
    
    * Using sideInput storage manager for standby tasks
    
    * Minor store dir integ
    
    * Adding comment
    
    * Adding merge function in case of duplicate keys
    
    * fixing tests
    
    * Addressing cosmetic review comments
    
    * Added bound-flush and exception handling for sideInputs
    
    * Adding documentation
    
    * TestTaskStorageManager : updating test
    
    * Adding taskMode to decide StorePartitionDir
    
    * Adding taskMode to decide StorePartitionDir
    
    minor
    
    * minor
    
    * minor refactor of CSM
    
    * minor
    
    * Addressing review comments
    
    * minor change in CSM
---
 .../apache/samza/storage/StorageManagerUtil.java   |  11 +-
 .../org/apache/samza/storage/StorageRecovery.java  |   4 +-
 .../samza/storage/TaskSideInputStorageManager.java |  23 +-
 .../apache/samza/container/SamzaContainer.scala    | 126 ++---
 .../org/apache/samza/container/TaskInstance.scala  |  73 +--
 .../samza/storage/ContainerStorageManager.java     | 542 ++++++++++++++++++---
 .../apache/samza/storage/TaskStorageManager.scala  |   5 +-
 .../storage/TestTaskSideInputStorageManager.java   |   3 +-
 .../org/apache/samza/task/TestAsyncRunLoop.java    |   2 -
 .../samza/storage/TestContainerStorageManager.java |   4 +-
 .../samza/storage/TestTaskStorageManager.scala     | 136 +++---
 .../apache/samza/monitor/LocalStoreMonitor.java    |   6 +-
 12 files changed, 628 insertions(+), 307 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java 
b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
index 79e67e1..42f7f4b 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
@@ -28,6 +28,7 @@ import java.util.Set;
 
 import java.util.stream.Collectors;
 import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
 import org.apache.samza.serializers.model.SamzaObjectMapper;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemStreamPartition;
@@ -139,9 +140,9 @@ public class StorageManagerUtil {
    * @param offsets The SSP-offset to write
    * @throws IOException because of deserializing to json
    */
-  public static void writeOffsetFile(File storeBaseDir, String storeName, 
TaskName taskName,
+  public static void writeOffsetFile(File storeBaseDir, String storeName, 
TaskName taskName, TaskMode taskMode,
       Map<SystemStreamPartition, String> offsets) throws IOException {
-    File offsetFile = new File(getStorePartitionDir(storeBaseDir, storeName, 
taskName), OFFSET_FILE_NAME);
+    File offsetFile = new File(getStorePartitionDir(storeBaseDir, storeName, 
taskName, taskMode), OFFSET_FILE_NAME);
     String fileContents = OBJECT_WRITER.writeValueAsString(offsets);
     FileUtil.writeWithChecksum(offsetFile, fileContents);
   }
@@ -153,7 +154,7 @@ public class StorageManagerUtil {
    * @param taskName the task name which is referencing the store
    */
   public static void deleteOffsetFile(File storeBaseDir, String storeName, 
TaskName taskName) {
-    File offsetFile = new File(getStorePartitionDir(storeBaseDir, storeName, 
taskName), OFFSET_FILE_NAME);
+    File offsetFile = new File(getStorePartitionDir(storeBaseDir, storeName, 
taskName, TaskMode.Active), OFFSET_FILE_NAME);
     if (offsetFile.exists()) {
       FileUtil.rm(offsetFile);
     }
@@ -207,9 +208,11 @@ public class StorageManagerUtil {
    * @param storeBaseDir the base directory to use
    * @param storeName the store name to use
    * @param taskName the task name which is referencing the store
+   * @param taskMode the mode of the given task
    * @return the partition directory for the store
    */
-  public static File getStorePartitionDir(File storeBaseDir, String storeName, 
TaskName taskName) {
+  public static File getStorePartitionDir(File storeBaseDir, String storeName, 
TaskName taskName, TaskMode taskMode) {
+    // TODO: use task-Mode to decide the storePartitionDir -- standby's dir 
should be the same as active
     return new File(storeBaseDir, (storeName + File.separator + 
taskName.toString()).replace(' ', '_'));
   }
 }
diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
index cf8338a..4d01159 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -214,10 +214,10 @@ public class StorageRecovery extends CommandLine {
 
       ContainerStorageManager containerStorageManager =
           new ContainerStorageManager(containerModel, streamMetadataCache, 
systemAdmins, changeLogSystemStreams,
-              storageEngineFactories, systemFactories, this.getSerdes(), 
jobConfig, new HashMap<>(),
+              new HashMap<>(), storageEngineFactories, systemFactories, 
this.getSerdes(), jobConfig, new HashMap<>(),
               new SamzaContainerMetrics(containerModel.getId(), new 
MetricsRegistryMap()),
               JobContextImpl.fromConfigWithDefaults(jobConfig), 
containerContext, new HashMap<>(),
-              storeBaseDir, storeBaseDir, maxPartitionNumber, new 
SystemClock());
+              storeBaseDir, storeBaseDir, maxPartitionNumber, null, new 
SystemClock());
       this.containerStorageManagers.put(containerModel.getId(), 
containerStorageManager);
     }
   }
diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
 
b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
index 2b5717a..5e9c6cf 100644
--- 
a/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
@@ -39,6 +39,7 @@ import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.system.IncomingMessageEnvelope;
@@ -72,14 +73,16 @@ public class TaskSideInputStorageManager {
   private final StreamMetadataCache streamMetadataCache;
   private final SystemAdmins systemAdmins;
   private final TaskName taskName;
+  private final TaskMode taskMode;
   private final Map<SystemStreamPartition, String> lastProcessedOffsets = new 
ConcurrentHashMap<>();
 
   private Map<SystemStreamPartition, String> startingOffsets;
 
   public TaskSideInputStorageManager(
       TaskName taskName,
+      TaskMode taskMode,
       StreamMetadataCache streamMetadataCache,
-      String storeBaseDir,
+      File storeBaseDir,
       Map<String, StorageEngine> sideInputStores,
       Map<String, SideInputsProcessor> storesToProcessor,
       Map<String, Set<SystemStreamPartition>> storesToSSPs,
@@ -88,11 +91,12 @@ public class TaskSideInputStorageManager {
       Clock clock) {
     this.clock = clock;
     this.stores = sideInputStores;
-    this.storeBaseDir = new File(storeBaseDir);
+    this.storeBaseDir = storeBaseDir;
     this.storeToSSps = storesToSSPs;
     this.streamMetadataCache = streamMetadataCache;
     this.systemAdmins = systemAdmins;
     this.taskName = taskName;
+    this.taskMode = taskMode;
     this.storeToProcessor = storesToProcessor;
 
     validateStoreConfiguration();
@@ -132,8 +136,9 @@ public class TaskSideInputStorageManager {
 
   /**
    * Flushes the contents of the underlying store and writes the offset file 
to disk.
+   * Synchronized inorder to be exclusive with process()
    */
-  public void flush() {
+  public synchronized void flush() {
     LOG.info("Flushing the side input stores.");
     stores.values().forEach(StorageEngine::flush);
     writeOffsetFiles();
@@ -172,6 +177,11 @@ public class TaskSideInputStorageManager {
     return startingOffsets.get(ssp);
   }
 
+  // Get the taskName associated with this instance.
+  public TaskName getTaskName() {
+    return this.taskName;
+  }
+
   /**
    * Gets the last processed offset for the given side input {@link 
SystemStreamPartition}.
    *
@@ -192,10 +202,11 @@ public class TaskSideInputStorageManager {
 
   /**
    * Processes the incoming side input message envelope and updates the last 
processed offset for its SSP.
+   * Synchronized inorder to be exclusive with flush().
    *
    * @param message incoming message to be processed
    */
-  public void process(IncomingMessageEnvelope message) {
+  public synchronized void process(IncomingMessageEnvelope message) {
     SystemStreamPartition ssp = message.getSystemStreamPartition();
     Set<String> storeNames = sspsToStores.get(ssp);
 
@@ -249,7 +260,7 @@ public class TaskSideInputStorageManager {
               .collect(Collectors.toMap(Function.identity(), 
lastProcessedOffsets::get));
 
             try {
-              StorageManagerUtil.writeOffsetFile(storeBaseDir, storeName, 
taskName, offsets);
+              StorageManagerUtil.writeOffsetFile(storeBaseDir, storeName, 
taskName, taskMode, offsets);
             } catch (Exception e) {
               throw new SamzaException("Failed to write offset file for side 
input store: " + storeName, e);
             }
@@ -287,7 +298,7 @@ public class TaskSideInputStorageManager {
 
   @VisibleForTesting
   File getStoreLocation(String storeName) {
-    return StorageManagerUtil.getStorePartitionDir(storeBaseDir, storeName, 
taskName);
+    return StorageManagerUtil.getStorePartitionDir(storeBaseDir, storeName, 
taskName, taskMode);
   }
 
   /**
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index f8d799c..5df4678 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -26,7 +26,7 @@ import java.net.{URL, UnknownHostException}
 import java.nio.file.Path
 import java.time.Duration
 import java.util
-import java.util.{Base64}
+import java.util.Base64
 import java.util.concurrent.{ExecutorService, Executors, 
ScheduledExecutorService, TimeUnit}
 
 import com.google.common.annotations.VisibleForTesting
@@ -44,7 +44,7 @@ import 
org.apache.samza.container.disk.DiskSpaceMonitor.Listener
 import org.apache.samza.container.disk.{DiskQuotaPolicyFactory, 
DiskSpaceMonitor, NoThrottlingDiskQuotaPolicyFactory, 
PollingScanDiskSpaceMonitor}
 import org.apache.samza.container.host.{StatisticsMonitorImpl, 
SystemMemoryStatistics, SystemStatisticsMonitor}
 import org.apache.samza.context._
-import org.apache.samza.job.model.{ContainerModel, JobModel}
+import org.apache.samza.job.model.{ContainerModel, JobModel, TaskMode}
 import org.apache.samza.metadatastore.MetadataStoreFactory
 import org.apache.samza.metrics.{JmxServer, JvmMetrics, MetricsRegistryMap, 
MetricsReporter}
 import org.apache.samza.serializers._
@@ -174,14 +174,25 @@ object SamzaContainer extends Logging {
       .flatMap(_.getSystemStreamPartitions.asScala)
       .toSet
 
+    val sideInputStoresToSystemStreams = config.getStoreNames
+      .map { storeName => (storeName, config.getSideInputs(storeName)) }
+      .filter { case (storeName, sideInputs) => sideInputs.nonEmpty }
+      .map { case (storeName, sideInputs) => (storeName, 
sideInputs.map(StreamUtil.getSystemStreamFromNameOrId(config, _))) }
+      .toMap
+
+    val sideInputSystemStreams = 
sideInputStoresToSystemStreams.values.flatMap(sideInputs => 
sideInputs.toStream).toSet
+
+    info("Got side input store system streams: %s" format 
sideInputSystemStreams)
+
     val inputSystemStreams = inputSystemStreamPartitions
       .map(_.getSystemStream)
-      .toSet
+      .toSet.diff(sideInputSystemStreams)
 
     val inputSystems = inputSystemStreams
       .map(_.getSystem)
       .toSet
 
+
     val systemNames = config.getSystemNames
 
     info("Got system names: %s" format systemNames)
@@ -358,14 +369,6 @@ object SamzaContainer extends Logging {
 
     info("Got intermediate streams: %s" format intermediateStreams)
 
-    val sideInputStoresToSystemStreams = config.getStoreNames
-      .map { storeName => (storeName, config.getSideInputs(storeName)) }
-      .filter { case (storeName, sideInputs) => sideInputs.nonEmpty }
-      .map { case (storeName, sideInputs) => (storeName, 
sideInputs.map(StreamUtil.getSystemStreamFromNameOrId(config, _))) }
-      .toMap
-
-    info("Got side input store system streams: %s" format 
sideInputStoresToSystemStreams)
-
     val controlMessageKeySerdes = intermediateStreams
       .flatMap(streamId => {
         val systemStream = config.streamIdToSystemStream(streamId)
@@ -540,18 +543,17 @@ object SamzaContainer extends Logging {
     val loggedStorageBaseDir = getLoggedStorageBaseDir(config, 
defaultStoreBaseDir)
     info("Got base directory for logged data stores: %s" format 
loggedStorageBaseDir)
 
-    val sideInputStorageEngineFactories = 
storageEngineFactories.filterKeys(storeName => 
sideInputStoresToSystemStreams.contains(storeName))
-    val nonSideInputStorageEngineFactories = (storageEngineFactories.toSet 
diff sideInputStorageEngineFactories.toSet).toMap
-
     val containerStorageManager = new ContainerStorageManager(containerModel, 
streamMetadataCache, systemAdmins,
-      changeLogSystemStreams.asJava, 
nonSideInputStorageEngineFactories.asJava, systemFactories.asJava, 
serdes.asJava, config,
+      changeLogSystemStreams.asJava, 
sideInputStoresToSystemStreams.mapValues(systemStreamSet => 
systemStreamSet.toSet.asJava).asJava,
+      storageEngineFactories.asJava, systemFactories.asJava, serdes.asJava, 
config,
       taskInstanceMetrics.asJava, samzaContainerMetrics, jobContext, 
containerContext, taskCollectors.asJava,
-      loggedStorageBaseDir, nonLoggedStorageBaseDir, 
maxChangeLogStreamPartitions, new SystemClock)
+      loggedStorageBaseDir, nonLoggedStorageBaseDir, 
maxChangeLogStreamPartitions, serdeManager, new SystemClock)
 
     storeWatchPaths.addAll(containerStorageManager.getStoreDirectoryPaths)
 
     // Create taskInstances
-    val taskInstances: Map[TaskName, TaskInstance] = taskModels.map(taskModel 
=> {
+    val taskInstances: Map[TaskName, TaskInstance] = taskModels
+      .filter(taskModel => 
taskModel.getTaskMode.eq(TaskMode.Active)).map(taskModel => {
       debug("Setting up task instance: %s" format taskModel)
 
       val taskName = taskModel.getTaskName
@@ -561,46 +563,6 @@ object SamzaContainer extends Logging {
         case tf: StreamTaskFactory => 
tf.asInstanceOf[StreamTaskFactory].createInstance()
       }
 
-      val sideInputStores = sideInputStorageEngineFactories.map {
-          case (storeName, storageEngineFactory) =>
-            val changeLogSystemStreamPartition = if 
(changeLogSystemStreams.contains(storeName)) {
-              new SystemStreamPartition(changeLogSystemStreams(storeName), 
taskModel.getChangelogPartition)
-            } else {
-              null
-            }
-
-            val keySerde = config.getStorageKeySerde(storeName) match {
-              case Some(keySerde) => serdes.getOrElse(keySerde,
-                throw new SamzaException("StorageKeySerde: No class defined 
for serde: %s." format keySerde))
-              case _ => null
-            }
-
-            val msgSerde = config.getStorageMsgSerde(storeName) match {
-              case Some(msgSerde) => serdes.getOrElse(msgSerde,
-                throw new SamzaException("StorageMsgSerde: No class defined 
for serde: %s." format msgSerde))
-              case _ => null
-            }
-
-            // We use the logged storage base directory for side input stores 
since side input stores
-            // dont have changelog configured.
-            val storeDir = 
StorageManagerUtil.getStorePartitionDir(loggedStorageBaseDir, storeName, 
taskName)
-            storeWatchPaths.add(storeDir.toPath)
-
-            val sideInputStorageEngine = storageEngineFactory.getStorageEngine(
-              storeName,
-              storeDir,
-              keySerde,
-              msgSerde,
-              taskCollectors.get(taskName).get,
-              taskInstanceMetrics.get(taskName).get.registry,
-              changeLogSystemStreamPartition,
-              jobContext,
-              containerContext, StoreMode.ReadWrite)
-            (storeName, sideInputStorageEngine)
-        }
-
-      info("Got side input stores: %s" format sideInputStores)
-
       val taskSSPs = taskModel.getSystemStreamPartitions.asScala.toSet
       info("Got task SSPs: %s" format taskSSPs)
 
@@ -608,19 +570,8 @@ object SamzaContainer extends Logging {
         taskSSPs.filter(ssp => 
sideInputSystemStreams.contains(ssp.getSystemStream)).asJava)
 
       val taskSideInputSSPs = 
sideInputStoresToSSPs.values.flatMap(_.asScala).toSet
-
       info ("Got task side input SSPs: %s" format taskSideInputSSPs)
 
-      val sideInputStoresToProcessor = sideInputStores.keys.map(storeName => {
-          // serialized instances takes precedence over the factory 
configuration.
-          
config.getSideInputsProcessorSerializedInstance(storeName).map(serializedInstance
 =>
-              (storeName, SerdeUtils.deserialize("Side Inputs Processor", 
serializedInstance)))
-            
.orElse(config.getSideInputsProcessorFactory(storeName).map(factoryClassName =>
-              (storeName, Util.getObj(factoryClassName, 
classOf[SideInputsProcessorFactory])
-                .getSideInputsProcessor(config, 
taskInstanceMetrics.get(taskName).get.registry))))
-            .get
-        }).toMap
-
       val storageManager = new TaskStorageManager(
         taskName = taskName,
         containerStorageManager = containerStorageManager,
@@ -629,20 +580,6 @@ object SamzaContainer extends Logging {
         loggedStoreBaseDir = loggedStorageBaseDir,
         partition = taskModel.getChangelogPartition)
 
-      var sideInputStorageManager: TaskSideInputStorageManager = null
-      if (sideInputStores.nonEmpty) {
-        sideInputStorageManager = new TaskSideInputStorageManager(
-          taskName,
-          streamMetadataCache,
-          loggedStorageBaseDir.getPath,
-          sideInputStores.asJava,
-          sideInputStoresToProcessor.asJava,
-          sideInputStoresToSSPs.asJava,
-          systemAdmins,
-          config,
-          new SystemClock)
-      }
-
       val tableManager = new TableManager(config)
 
       info("Got table manager")
@@ -658,13 +595,11 @@ object SamzaContainer extends Logging {
           storageManager = storageManager,
           tableManager = tableManager,
           reporters = reporters,
-          systemStreamPartitions = taskSSPs,
+          systemStreamPartitions = taskSSPs -- taskSideInputSSPs,
           exceptionHandler = 
TaskInstanceExceptionHandler(taskInstanceMetrics.get(taskName).get, config),
           jobModel = jobModel,
           streamMetadataCache = streamMetadataCache,
           timerExecutor = timerExecutor,
-          sideInputSSPs = taskSideInputSSPs,
-          sideInputStorageManager = sideInputStorageManager,
           jobContext = jobContext,
           containerContext = containerContext,
           applicationContainerContextOption = 
applicationContainerContextOption,
@@ -838,7 +773,10 @@ class SamzaContainer(
         containerListener.afterStart()
       }
       metrics.containerStartupTime.update(System.nanoTime() - startTime)
-      runLoop.run
+      if (taskInstances.size > 0)
+        runLoop.run
+      else
+        Thread.sleep(Long.MaxValue)
     } catch {
       case e: Throwable =>
         if (status.equals(SamzaContainerStatus.STARTED)) {
@@ -1014,12 +952,6 @@ class SamzaContainer(
   def startStores {
     info("Starting container storage manager.")
     containerStorageManager.start()
-
-    taskInstances.values.foreach(taskInstance => {
-      val startTime = System.currentTimeMillis()
-      info("Starting side inputs in task instance %s" format 
taskInstance.taskName)
-      taskInstance.startSideInputs
-    })
   }
 
   def startTableManager: Unit = {
@@ -1056,9 +988,10 @@ class SamzaContainer(
 
     taskInstances.values.foreach(_.registerConsumers)
 
-    info("Starting consumer multiplexer.")
-
-    consumerMultiplexer.start
+    if (taskInstances.size > 0) {
+      info("Starting consumer multiplexer.")
+      consumerMultiplexer.start
+    }
   }
 
   def startSecurityManger {
@@ -1155,9 +1088,6 @@ class SamzaContainer(
   def shutdownStores {
     info("Shutting down container storage manager.")
     containerStorageManager.shutdown()
-
-    info("Shutting down task instance side inputs.")
-    taskInstances.values.foreach(_.shutdownSideInputs)
   }
 
   def shutdownTableManager: Unit = {
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index a8b8078..35ebb68 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -32,7 +32,7 @@ import org.apache.samza.job.model.{JobModel, TaskModel}
 import org.apache.samza.metrics.MetricsReporter
 import org.apache.samza.scheduler.{CallbackSchedulerImpl, ScheduledCallback}
 import org.apache.samza.storage.kv.KeyValueStore
-import org.apache.samza.storage.{TaskSideInputStorageManager, 
TaskStorageManager}
+import org.apache.samza.storage.{TaskStorageManager}
 import org.apache.samza.system._
 import org.apache.samza.table.TableManager
 import org.apache.samza.task._
@@ -58,8 +58,6 @@ class TaskInstance(
   jobModel: JobModel = null,
   streamMetadataCache: StreamMetadataCache = null,
   timerExecutor : ScheduledExecutorService = null,
-  sideInputSSPs: Set[SystemStreamPartition] = Set(),
-  sideInputStorageManager: TaskSideInputStorageManager = null,
   jobContext: JobContext,
   containerContext: ContainerContext,
   applicationContainerContextOption: Option[ApplicationContainerContext],
@@ -80,8 +78,6 @@ class TaskInstance(
     (storeName: String) => {
       if (storageManager != null && 
storageManager.getStore(storeName).isDefined) {
         storageManager.getStore(storeName).get.asInstanceOf[KeyValueStore[_, 
_]]
-      } else if (sideInputStorageManager != null && 
sideInputStorageManager.getStore(storeName) != null) {
-        
sideInputStorageManager.getStore(storeName).asInstanceOf[KeyValueStore[_, _]]
       } else {
         null
       }
@@ -117,18 +113,7 @@ class TaskInstance(
 
   def registerOffsets {
     debug("Registering offsets for taskName: %s" format taskName)
-
-    val sspsToRegister = systemStreamPartitions -- sideInputSSPs
-    offsetManager.register(taskName, sspsToRegister)
-  }
-
-  def startSideInputs {
-    if (sideInputStorageManager != null) {
-      debug("Starting side input storage manager for taskName: %s" format 
taskName)
-      sideInputStorageManager.init()
-    } else {
-      debug("Skipping side input storage manager initialization for taskName: 
%s" format taskName)
-    }
+    offsetManager.register(taskName, systemStreamPartitions)
   }
 
   def startTableManager {
@@ -168,11 +153,8 @@ class TaskInstance(
       val startpoint = offsetManager.getStartpoint(taskName, 
systemStreamPartition).getOrElse(null)
       consumerMultiplexer.register(systemStreamPartition, startingOffset, 
startpoint)
       metrics.addOffsetGauge(systemStreamPartition, () =>
-        if (sideInputSSPs.contains(systemStreamPartition)) {
-          sideInputStorageManager.getLastProcessedOffset(systemStreamPartition)
-        } else {
           offsetManager.getLastProcessedOffset(taskName, 
systemStreamPartition).orNull
-        })
+        )
     })
   }
 
@@ -193,25 +175,22 @@ class TaskInstance(
       trace("Processing incoming message envelope for taskName and SSP: %s, %s"
         format (taskName, incomingMessageSsp))
 
-      if (sideInputSSPs.contains(incomingMessageSsp) && 
!envelope.isEndOfStream) {
-        sideInputStorageManager.process(envelope)
+      if (isAsyncTask) {
+        exceptionHandler.maybeHandle {
+          val callback = callbackFactory.createCallback()
+          task.asInstanceOf[AsyncStreamTask].processAsync(envelope, collector, 
coordinator, callback)
+        }
       } else {
-        if (isAsyncTask) {
-          exceptionHandler.maybeHandle {
-            val callback = callbackFactory.createCallback()
-            task.asInstanceOf[AsyncStreamTask].processAsync(envelope, 
collector, coordinator, callback)
-          }
-        } else {
-          exceptionHandler.maybeHandle {
-            task.asInstanceOf[StreamTask].process(envelope, collector, 
coordinator)
-          }
+        exceptionHandler.maybeHandle {
+          task.asInstanceOf[StreamTask].process(envelope, collector, 
coordinator)
+        }
 
-          trace("Updating offset map for taskName, SSP and offset: %s, %s, %s"
-            format (taskName, incomingMessageSsp, envelope.getOffset))
+        trace("Updating offset map for taskName, SSP and offset: %s, %s, %s"
+          format(taskName, incomingMessageSsp, envelope.getOffset))
 
-          offsetManager.update(taskName, incomingMessageSsp, 
envelope.getOffset)
-        }
+        offsetManager.update(taskName, incomingMessageSsp, envelope.getOffset)
       }
+
     }
   }
 
@@ -263,11 +242,6 @@ class TaskInstance(
       tableManager.flush
     }
 
-    trace("Flushing side input stores for taskName: %s" format taskName)
-    if (sideInputStorageManager != null) {
-      sideInputStorageManager.flush()
-    }
-
     trace("Checkpointing offsets for taskName: %s" format taskName)
     offsetManager.writeCheckpoint(taskName, checkpoint)
 
@@ -295,15 +269,6 @@ class TaskInstance(
     }
   }
 
-  def shutdownSideInputs {
-    if (sideInputStorageManager != null) {
-      debug("Shutting down side input storage manager for taskName: %s" format 
taskName)
-      sideInputStorageManager.stop()
-    } else {
-      debug("Skipping side input storage manager shutdown for taskName: %s" 
format taskName)
-    }
-  }
-
   def shutdownTableManager {
     if (tableManager != null) {
       debug("Shutting down table manager for taskName: %s" format taskName)
@@ -357,13 +322,7 @@ class TaskInstance(
   }
 
   private def getStartingOffset(systemStreamPartition: SystemStreamPartition) 
= {
-    val offset =
-      if (sideInputSSPs.contains(systemStreamPartition)) {
-        
Option(sideInputStorageManager.getStartingOffset(systemStreamPartition))
-      } else {
-        offsetManager.getStartingOffset(taskName, systemStreamPartition)
-      }
-
+    val offset = offsetManager.getStartingOffset(taskName, 
systemStreamPartition)
     val startingOffset = offset.getOrElse(
       throw new SamzaException("No offset defined for SystemStreamPartition: 
%s" format systemStreamPartition))
 
diff --git 
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
 
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index 3b86a4e..ad9637d 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ 
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -19,10 +19,13 @@
 package org.apache.samza.storage;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.io.File;
 import java.nio.file.Path;
+import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -31,40 +34,61 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+import org.apache.commons.collections4.MapUtils;
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfig;
 import org.apache.samza.container.SamzaContainerMetrics;
 import org.apache.samza.container.TaskInstanceMetrics;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.context.ContainerContext;
 import org.apache.samza.context.JobContext;
 import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.TaskMode;
 import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.metrics.Gauge;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeManager;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemConsumers;
+import org.apache.samza.system.SystemConsumersMetrics;
 import org.apache.samza.system.SystemFactory;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.system.SystemStreamPartitionIterator;
+import org.apache.samza.system.chooser.DefaultChooser;
+import org.apache.samza.system.chooser.MessageChooser;
+import org.apache.samza.system.chooser.RoundRobinChooserFactory;
+import org.apache.samza.table.utils.SerdeUtils;
 import org.apache.samza.task.TaskInstanceCollector;
 import org.apache.samza.util.Clock;
 import org.apache.samza.util.FileUtil;
+import org.apache.samza.util.ScalaJavaUtil;
+import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.collection.JavaConversions;
 import scala.collection.JavaConverters;
 
 
@@ -78,10 +102,18 @@ import scala.collection.JavaConverters;
  *
  *  b) performing individual task stores' restores in parallel.
  *
+ *  and
+ *  c) restoring sideInputs.
+ *  It provides bootstrap semantics for sideinputs -- the main thread is 
blocked until
+ *  all sideInputSSPs have not caught up. Side input store flushes are not in 
sync with task-commit, although
+ *  they happen at the same frequency.
+ *  In case, where a user explicitly requests a task-commit, it will not 
include committing side inputs.
  */
 public class ContainerStorageManager {
   private static final Logger LOG = 
LoggerFactory.getLogger(ContainerStorageManager.class);
   private static final String RESTORE_THREAD_NAME = "Samza Restore Thread-%d";
+  private static final String SIDEINPUTS_FLUSH_THREAD_NAME = "SideInputs Flush 
Thread";
+  private static final String SIDEINPUTS_METRICS_NAME = 
"samza-container-%s-sideinputs";
 
   /** Maps containing relevant per-task objects */
   private final Map<TaskName, Map<String, StorageEngine>> taskStores;
@@ -89,10 +121,11 @@ public class ContainerStorageManager {
   private final Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics;
   private final Map<TaskName, TaskInstanceCollector> taskInstanceCollectors;
 
-  private final Map<String, SystemConsumer> systemConsumers; // Mapping from 
storeSystemNames to SystemConsumers
+  private final Map<String, SystemConsumer> storeConsumers; // Mapping from 
store name to SystemConsumers
   private final Map<String, StorageEngineFactory<Object, Object>> 
storageEngineFactories; // Map of storageEngineFactories indexed by store name
   private final Map<String, SystemStream> changelogSystemStreams; // Map of 
changelog system-streams indexed by store name
   private final Map<String, Serde<Object>> serdes; // Map of Serde objects 
indexed by serde name (specified in config)
+  private final SystemAdmins systemAdmins;
 
   private final StreamMetadataCache streamMetadataCache;
   private final SamzaContainerMetrics samzaContainerMetrics;
@@ -109,19 +142,42 @@ public class ContainerStorageManager {
   private final int parallelRestoreThreadPoolSize;
   private final int maxChangeLogStreamPartitions; // The partition count of 
each changelog-stream topic. This is used for validating changelog streams 
before restoring.
 
+  /* Sideinput related parameters */
+  private final Map<String, Set<SystemStream>> sideInputSystemStreams; // Map 
of side input system-streams indexed by store name
+  private final Map<TaskName, Map<String, Set<SystemStreamPartition>>> 
taskSideInputSSPs;
+  private final Map<SystemStreamPartition, TaskSideInputStorageManager> 
sideInputStorageManagers; // Map of sideInput storageManagers indexed by ssp, 
for simpler lookup for process()
+  private final Map<String, SystemConsumer> sideInputConsumers; // Mapping 
from storeSystemNames to SystemConsumers
+  private SystemConsumers sideInputSystemConsumers;
+  private final Map<SystemStreamPartition, 
SystemStreamMetadata.SystemStreamPartitionMetadata> initialSideInputSSPMetadata
+      = new ConcurrentHashMap<>(); // Recorded sspMetadata of the 
taskSideInputSSPs recorded at start, used to determine when sideInputs are 
caughtup and container init can proceed
+  private volatile CountDownLatch sideInputsCaughtUp; // Used by the 
sideInput-read thread to signal to the main thread
+  private volatile boolean shutDownSideInputRead = false;
+  private final ScheduledExecutorService sideInputsFlushExecutor = 
Executors.newSingleThreadScheduledExecutor(
+      new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat(SIDEINPUTS_FLUSH_THREAD_NAME).build());
+  private ScheduledFuture sideInputsFlushFuture;
+  private static final Duration SIDE_INPUT_FLUSH_TIMEOUT = 
Duration.ofMinutes(1);
+  private volatile Optional<Throwable> sideInputException = Optional.empty();
+
   private final Config config;
 
   public ContainerStorageManager(ContainerModel containerModel, 
StreamMetadataCache streamMetadataCache,
       SystemAdmins systemAdmins, Map<String, SystemStream> 
changelogSystemStreams,
+      Map<String, Set<SystemStream>> sideInputSystemStreams,
       Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
       Map<String, SystemFactory> systemFactories, Map<String, Serde<Object>> 
serdes, Config config,
       Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics, 
SamzaContainerMetrics samzaContainerMetrics,
       JobContext jobContext, ContainerContext containerContext,
       Map<TaskName, TaskInstanceCollector> taskInstanceCollectors, File 
loggedStoreBaseDirectory,
-      File nonLoggedStoreBaseDirectory, int maxChangeLogStreamPartitions, 
Clock clock) {
+      File nonLoggedStoreBaseDirectory, int maxChangeLogStreamPartitions, 
SerdeManager serdeManager, Clock clock) {
 
     this.containerModel = containerModel;
-    this.changelogSystemStreams = changelogSystemStreams;
+    this.sideInputSystemStreams = new HashMap<>(sideInputSystemStreams);
+    this.taskSideInputSSPs = getTaskSideInputSSPs(containerModel, 
sideInputSystemStreams);
+
+    this.changelogSystemStreams = getChangelogSystemStreams(containerModel, 
changelogSystemStreams); // handling standby tasks
+
+    LOG.info("Starting with changelogSystemStreams = {} sideInputSystemStreams 
= {}", this.changelogSystemStreams, this.sideInputSystemStreams);
+
     this.storageEngineFactories = storageEngineFactories;
     this.serdes = serdes;
     this.loggedStoreBaseDirectory = loggedStoreBaseDirectory;
@@ -148,32 +204,111 @@ public class ContainerStorageManager {
 
     this.maxChangeLogStreamPartitions = maxChangeLogStreamPartitions;
     this.streamMetadataCache = streamMetadataCache;
+    this.systemAdmins = systemAdmins;
 
     // create taskStores for all tasks in the containerModel and each store in 
storageEngineFactories
-    this.taskStores = createTaskStores(containerModel, jobContext, 
containerContext, storageEngineFactories, changelogSystemStreams,
-        serdes, taskInstanceMetrics, taskInstanceCollectors, 
StorageEngineFactory.StoreMode.BulkLoad);
+    this.taskStores = createTaskStores(containerModel, jobContext, 
containerContext, storageEngineFactories, serdes, taskInstanceMetrics, 
taskInstanceCollectors);
 
-    // create system consumers (1 per store system)
-    this.systemConsumers = createStoreConsumers(changelogSystemStreams, 
systemFactories, config, this.samzaContainerMetrics.registry());
+    // create system consumers (1 per store system in changelogSystemStreams), 
and index it by storeName
+    Map<String, SystemConsumer> storeSystemConsumers = 
createConsumers(this.changelogSystemStreams.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
+        e -> Collections.singleton(e.getValue()))), systemFactories, config, 
this.samzaContainerMetrics.registry());
+    this.storeConsumers = createStoreIndexedMap(this.changelogSystemStreams, 
storeSystemConsumers);
 
     // creating task restore managers
     this.taskRestoreManagers = createTaskRestoreManagers(systemAdmins, clock);
+
+    // create side input storage managers
+    sideInputStorageManagers = createSideInputStorageManagers(clock);
+
+    // create side Input consumers indexed by systemName
+    this.sideInputConsumers = createConsumers(this.sideInputSystemStreams, 
systemFactories, config, this.samzaContainerMetrics.registry());
+
+    // create SystemConsumers for consuming from taskSideInputSSPs, if 
sideInputs are being used
+    if (sideInputsPresent()) {
+
+      scala.collection.immutable.Map<SystemStream, SystemStreamMetadata> 
inputStreamMetadata = 
streamMetadataCache.getStreamMetadata(JavaConversions.asScalaSet(
+          
this.sideInputSystemStreams.values().stream().flatMap(Set::stream).collect(Collectors.toSet())).toSet(),
 false);
+
+      SystemConsumersMetrics systemConsumersMetrics = new 
SystemConsumersMetrics(
+          new MetricsRegistryMap(String.format(SIDEINPUTS_METRICS_NAME, 
containerModel.getId())));
+
+      MessageChooser chooser = DefaultChooser.apply(inputStreamMetadata, new 
RoundRobinChooserFactory(), config,
+          systemConsumersMetrics.registry(), systemAdmins);
+
+      sideInputSystemConsumers =
+          new SystemConsumers(chooser, 
ScalaJavaUtil.toScalaMap(this.sideInputConsumers), serdeManager,
+              systemConsumersMetrics, 
SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT(), 
SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR(),
+              SystemConsumers.DEFAULT_POLL_INTERVAL_MS(), 
ScalaJavaUtil.toScalaFunction(() -> System.nanoTime()));
+    }
+
+  }
+
+  /**
+   * Add all side inputs to a map of maps, indexed first by taskName, then by 
sideInput store name.
+   *
+   * @param containerModel the containerModel to use
+   * @param sideInputSystemStreams the map of store to side input system stream
+   * @return taskSideInputSSPs map
+   */
+  private Map<TaskName, Map<String, Set<SystemStreamPartition>>> 
getTaskSideInputSSPs(ContainerModel containerModel, Map<String, 
Set<SystemStream>> sideInputSystemStreams) {
+    Map<TaskName, Map<String, Set<SystemStreamPartition>>> taskSideInputSSPs = 
new HashMap<>();
+
+    containerModel.getTasks().forEach((taskName, taskModel) -> {
+        sideInputSystemStreams.keySet().forEach(storeName -> {
+            Set<SystemStreamPartition> taskSideInputs = 
taskModel.getSystemStreamPartitions().stream().filter(ssp -> 
sideInputSystemStreams.get(storeName).contains(ssp.getSystemStream())).collect(Collectors.toSet());
+            taskSideInputSSPs.putIfAbsent(taskName, new HashMap<>());
+            taskSideInputSSPs.get(taskName).put(storeName, taskSideInputs);
+          });
+      });
+    return taskSideInputSSPs;
   }
 
   /**
+   * For each standby task, we remove its changeLogSSPs from changelogSSP map 
and add it to the task's taskSideInputSSPs.
+   * The task's sideInputManager will consume and restore these as well.
+   *
+   * @param containerModel the container's model
+   * @param changelogSystemStreams the passed in set of changelogSystemStreams
+   * @return A map of changeLogSSP to storeName across all tasks, assuming no 
two stores have the same changelogSSP
+   */
+  private Map<String, SystemStream> getChangelogSystemStreams(ContainerModel 
containerModel, Map<String, SystemStream> changelogSystemStreams) {
+
+    if (MapUtils.invertMap(changelogSystemStreams).size() != 
changelogSystemStreams.size()) {
+      throw new SamzaException("Two stores cannot have the same changelog 
system-stream");
+    }
+
+    Map<SystemStreamPartition, String> changelogSSPToStore = new HashMap<>();
+    changelogSystemStreams.forEach((storeName, systemStream) ->
+        containerModel.getTasks().forEach((taskName, taskModel) -> { 
changelogSSPToStore.put(new SystemStreamPartition(systemStream, 
taskModel.getChangelogPartition()), storeName); })
+    );
+
+    getTasks(containerModel, TaskMode.Standby).forEach((taskName, taskModel) 
-> {
+        changelogSystemStreams.forEach((storeName, systemStream) -> {
+            SystemStreamPartition ssp = new 
SystemStreamPartition(systemStream, taskModel.getChangelogPartition());
+            changelogSSPToStore.remove(ssp);
+            this.taskSideInputSSPs.putIfAbsent(taskName, new HashMap<>());
+            this.sideInputSystemStreams.put(storeName, 
Collections.singleton(ssp.getSystemStream()));
+            this.taskSideInputSSPs.get(taskName).put(storeName, 
Collections.singleton(ssp));
+          });
+      });
+
+    // changelogSystemStreams correspond only to active tasks (since those of 
standby-tasks moved to side inputs above)
+    return 
MapUtils.invertMap(changelogSSPToStore).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
 x -> x.getValue().getSystemStream()));
+  }
+
+
+  /**
    *  Creates SystemConsumer objects for store restoration, creating one 
consumer per system.
    */
-  private static Map<String, SystemConsumer> createStoreConsumers(Map<String, 
SystemStream> changelogSystemStreams,
+  private static Map<String, SystemConsumer> createConsumers(Map<String, 
Set<SystemStream>> systemStreams,
       Map<String, SystemFactory> systemFactories, Config config, 
MetricsRegistry registry) {
     // Determine the set of systems being used across all stores
     Set<String> storeSystems =
-        
changelogSystemStreams.values().stream().map(SystemStream::getSystem).collect(Collectors.toSet());
+        
systemStreams.values().stream().flatMap(Set::stream).map(SystemStream::getSystem).collect(Collectors.toSet());
 
     // Create one consumer for each system in use, map with one entry for each 
such system
     Map<String, SystemConsumer> storeSystemConsumers = new HashMap<>();
 
-    // Map of each storeName to its respective systemConsumer
-    Map<String, SystemConsumer> storeConsumers = new HashMap<>();
 
     // Iterate over the list of storeSystems and create one sysConsumer per 
system
     for (String storeSystemName : storeSystems) {
@@ -185,29 +320,45 @@ public class ContainerStorageManager {
           systemFactory.getConsumer(storeSystemName, config, registry));
     }
 
+    return storeSystemConsumers;
+
+  }
+
+  private static Map<String, SystemConsumer> createStoreIndexedMap(Map<String, 
SystemStream> changelogSystemStreams,
+      Map<String, SystemConsumer> storeSystemConsumers) {
+    // Map of each storeName to its respective systemConsumer
+    Map<String, SystemConsumer> storeConsumers = new HashMap<>();
+
     // Populate the map of storeName to its relevant systemConsumer
     for (String storeName : changelogSystemStreams.keySet()) {
       storeConsumers.put(storeName, 
storeSystemConsumers.get(changelogSystemStreams.get(storeName).getSystem()));
     }
-
     return storeConsumers;
   }
 
   private Map<TaskName, TaskRestoreManager> 
createTaskRestoreManagers(SystemAdmins systemAdmins, Clock clock) {
     Map<TaskName, TaskRestoreManager> taskRestoreManagers = new HashMap<>();
-    containerModel.getTasks().forEach((taskName, taskModel) ->
-      taskRestoreManagers.put(taskName, new TaskRestoreManager(taskModel, 
changelogSystemStreams, taskStores.get(taskName), systemAdmins, clock)));
+    containerModel.getTasks().forEach((taskName, taskModel) -> {
+        taskRestoreManagers.put(taskName,
+            new TaskRestoreManager(taskModel, changelogSystemStreams, 
getNonSideInputStores(taskName), systemAdmins, clock));
+      });
     return taskRestoreManagers;
   }
 
+  // Helper method to filter active Tasks from the container model
+  private static Map<TaskName, TaskModel> getTasks(ContainerModel 
containerModel, TaskMode taskMode) {
+    return containerModel.getTasks().entrySet().stream()
+            .filter(x -> 
x.getValue().getTaskMode().equals(taskMode)).collect(Collectors.toMap(Map.Entry::getKey,
 Map.Entry::getValue));
+  }
+
   /**
-   * Create taskStores with the given store mode for all stores in 
storageEngineFactories.
+   * Create taskStores for all stores in storageEngineFactories.
+   * The store mode is chosen as bulk-load if its a non-sideinput store, and 
readWrite if its a side input store
    */
   private Map<TaskName, Map<String, StorageEngine>> 
createTaskStores(ContainerModel containerModel, JobContext jobContext, 
ContainerContext containerContext,
-      Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
-      Map<String, SystemStream> changelogSystemStreams, Map<String, 
Serde<Object>> serdes,
+      Map<String, StorageEngineFactory<Object, Object>> 
storageEngineFactories, Map<String, Serde<Object>> serdes,
       Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
-      Map<TaskName, TaskInstanceCollector> taskInstanceCollectors, 
StorageEngineFactory.StoreMode storeMode) {
+      Map<TaskName, TaskInstanceCollector> taskInstanceCollectors) {
 
     Map<TaskName, Map<String, StorageEngine>> taskStores = new HashMap<>();
 
@@ -222,14 +373,16 @@ public class ContainerStorageManager {
 
       for (String storeName : storageEngineFactories.keySet()) {
 
+        StorageEngineFactory.StoreMode storeMode = 
this.sideInputSystemStreams.containsKey(storeName) ?
+            StorageEngineFactory.StoreMode.ReadWrite : 
StorageEngineFactory.StoreMode.BulkLoad;
+
         StorageEngine storageEngine =
-            createStore(storeName, taskName, taskModel, jobContext, 
containerContext, storageEngineFactories,
-                changelogSystemStreams, serdes, taskInstanceMetrics, 
taskInstanceCollectors, storeMode);
+            createStore(storeName, taskName, taskModel, jobContext, 
containerContext, storageEngineFactories, serdes, taskInstanceMetrics, 
taskInstanceCollectors, storeMode);
 
         // add created store to map
         taskStores.get(taskName).put(storeName, storageEngine);
 
-        LOG.info("Created store {} for task {}", storeName, taskName);
+        LOG.info("Created store {} for task {} in mode {}", storeName, 
taskName, storeMode);
       }
     }
 
@@ -237,31 +390,28 @@ public class ContainerStorageManager {
   }
 
   /**
-   * Recreate all persistent stores in ReadWrite mode.
+   * Recreate all non-sideInput persistent stores in ReadWrite mode.
    *
    */
   private void recreatePersistentTaskStoresInReadWriteMode(ContainerModel 
containerModel, JobContext jobContext,
       ContainerContext containerContext, Map<String, 
StorageEngineFactory<Object, Object>> storageEngineFactories,
-      Map<String, SystemStream> changelogSystemStreams, Map<String, 
Serde<Object>> serdes,
-      Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
+      Map<String, Serde<Object>> serdes, Map<TaskName, TaskInstanceMetrics> 
taskInstanceMetrics,
       Map<TaskName, TaskInstanceCollector> taskInstanceCollectors) {
 
     // iterate over each task and each storeName
     for (Map.Entry<TaskName, TaskModel> task : 
containerModel.getTasks().entrySet()) {
       TaskName taskName = task.getKey();
       TaskModel taskModel = task.getValue();
+      Map<String, StorageEngine> nonSideInputStores = 
getNonSideInputStores(taskName);
 
-      for (String storeName : storageEngineFactories.keySet()) {
+      for (String storeName : nonSideInputStores.keySet()) {
 
-        // if this store has been already created in the taskStores, then 
re-create and overwrite it only if it is a persistentStore
-        if (this.taskStores.get(taskName).containsKey(storeName) && 
this.taskStores.get(taskName)
-            .get(storeName)
-            .getStoreProperties()
-            .isPersistedToDisk()) {
+        // if this store has been already created then re-create and overwrite 
it only if it is a
+        // persistentStore and a non-sideInputStore, because sideInputStores 
are always created in RW mode
+        if 
(nonSideInputStores.get(storeName).getStoreProperties().isPersistedToDisk()) {
 
           StorageEngine storageEngine =
-              createStore(storeName, taskName, taskModel, jobContext, 
containerContext, storageEngineFactories,
-                  changelogSystemStreams, serdes, taskInstanceMetrics, 
taskInstanceCollectors,
+              createStore(storeName, taskName, taskModel, jobContext, 
containerContext, storageEngineFactories, serdes, taskInstanceMetrics, 
taskInstanceCollectors,
                   StorageEngineFactory.StoreMode.ReadWrite);
 
           // add created store to map
@@ -269,8 +419,7 @@ public class ContainerStorageManager {
 
           LOG.info("Re-created store {} in read-write mode for task {} because 
it a persistent store", storeName, taskName);
         } else {
-
-          LOG.info("Skipping re-creation of store {} for task {} because it a 
non-persistent store", storeName, taskName);
+          LOG.info("Skipping re-creation of store {} for task {}", storeName, 
taskName);
         }
       }
     }
@@ -282,8 +431,7 @@ public class ContainerStorageManager {
    */
   private StorageEngine createStore(String storeName, TaskName taskName, 
TaskModel taskModel, JobContext jobContext,
       ContainerContext containerContext, Map<String, 
StorageEngineFactory<Object, Object>> storageEngineFactories,
-      Map<String, SystemStream> changelogSystemStreams, Map<String, 
Serde<Object>> serdes,
-      Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
+      Map<String, Serde<Object>> serdes, Map<TaskName, TaskInstanceMetrics> 
taskInstanceMetrics,
       Map<TaskName, TaskInstanceCollector> taskInstanceCollectors, 
StorageEngineFactory.StoreMode storeMode) {
 
     StorageConfig storageConfig = new StorageConfig(config);
@@ -292,11 +440,15 @@ public class ContainerStorageManager {
         (changelogSystemStreams.containsKey(storeName)) ? new 
SystemStreamPartition(
             changelogSystemStreams.get(storeName), 
taskModel.getChangelogPartition()) : null;
 
-    // Use the logged-store-base-directory for change logged stores, and 
non-logged-store-base-dir for non logged stores
-    File storeDirectory =
-        (changeLogSystemStreamPartition != null) ? 
StorageManagerUtil.getStorePartitionDir(this.loggedStoreBaseDirectory,
-            storeName, taskName)
-            : 
StorageManagerUtil.getStorePartitionDir(this.nonLoggedStoreBaseDirectory, 
storeName, taskName);
+    // Use the logged-store-base-directory for change logged stores and side 
input stores, and non-logged-store-base-dir
+    // for non logged stores
+    File storeDirectory;
+    if (changeLogSystemStreamPartition != null || 
sideInputSystemStreams.containsKey(storeName)) {
+      storeDirectory = 
StorageManagerUtil.getStorePartitionDir(this.loggedStoreBaseDirectory, 
storeName, taskName, taskModel.getTaskMode());
+    } else {
+      storeDirectory = 
StorageManagerUtil.getStorePartitionDir(this.nonLoggedStoreBaseDirectory, 
storeName, taskName, taskModel.getTaskMode());
+    }
+
     this.storeDirectoryPaths.add(storeDirectory.toPath());
 
     if (storageConfig.getStorageKeySerde(storeName).isEmpty()) {
@@ -330,18 +482,118 @@ public class ContainerStorageManager {
             storeMetricsRegistry, changeLogSystemStreamPartition, jobContext, 
containerContext, storeMode);
   }
 
+
+  // Create side input store processors, one per store per task
+  private Map<TaskName, Map<String, SideInputsProcessor>> 
createSideInputProcessors(StorageConfig config, ContainerModel containerModel,
+      Map<String, Set<SystemStream>> sideInputSystemStreams, Map<TaskName, 
TaskInstanceMetrics> taskInstanceMetrics) {
+
+    Map<TaskName, Map<String, SideInputsProcessor>> 
sideInputStoresToProcessors = new HashMap<>();
+    getTasks(containerModel, TaskMode.Active).forEach((taskName, taskModel) -> 
{
+        sideInputStoresToProcessors.put(taskName, new HashMap<>());
+        for (String storeName : sideInputSystemStreams.keySet()) {
+          if 
(config.getSideInputsProcessorSerializedInstance(storeName).isDefined()) {
+            sideInputStoresToProcessors.get(taskName)
+                .put(storeName, SerdeUtils.deserialize("Side Inputs Processor",
+                    
config.getSideInputsProcessorSerializedInstance(storeName).get()));
+          } else {
+            sideInputStoresToProcessors.get(taskName)
+                .put(storeName, 
Util.getObj(config.getSideInputsProcessorFactory(storeName).get(),
+                    
SideInputsProcessorFactory.class).getSideInputsProcessor(config,
+                    taskInstanceMetrics.get(taskName).registry()));
+          }
+        }
+      });
+
+    // creating identity sideInputProcessor for stores of standbyTasks
+    getTasks(containerModel, TaskMode.Standby).forEach((taskName, taskModel) 
-> {
+        sideInputStoresToProcessors.put(taskName, new HashMap<>());
+        for (String storeName : sideInputSystemStreams.keySet()) {
+
+          // have to use the right serde because the sideInput stores are 
created
+          Serde keySerde = serdes.get(new 
StorageConfig(config).getStorageKeySerde(storeName).get());
+          Serde msgSerde = serdes.get(new 
StorageConfig(config).getStorageMsgSerde(storeName).get());
+          sideInputStoresToProcessors.get(taskName).put(storeName, new 
SideInputsProcessor() {
+            @Override
+            public Collection<Entry<?, ?>> process(IncomingMessageEnvelope 
message, KeyValueStore store) {
+              return ImmutableList.of(new Entry<>(keySerde.fromBytes((byte[]) 
message.getKey()), msgSerde.fromBytes((byte[]) message.getMessage())));
+            }
+          });
+        }
+      });
+
+    return sideInputStoresToProcessors;
+  }
+
+  // Create task side input storage managers, one per task, index by the SSP 
they are responsible for consuming
+  private Map<SystemStreamPartition, TaskSideInputStorageManager> 
createSideInputStorageManagers(Clock clock) {
+
+    // creating side input store processors, one per store per task
+    Map<TaskName, Map<String, SideInputsProcessor>> taskSideInputProcessors = 
createSideInputProcessors(new StorageConfig(config), this.containerModel, 
this.sideInputSystemStreams, this.taskInstanceMetrics);
+
+    Map<SystemStreamPartition, TaskSideInputStorageManager> 
sideInputStorageManagers = new HashMap<>();
+
+    if (sideInputsPresent()) {
+      containerModel.getTasks().forEach((taskName, taskModel) -> {
+
+          Map<String, StorageEngine> sideInputStores = 
getSideInputStores(taskName);
+          Map<String, Set<SystemStreamPartition>> sideInputStoresToSSPs = new 
HashMap<>();
+
+          for (String storeName : sideInputStores.keySet()) {
+            Set<SystemStreamPartition> storeSSPs = 
taskSideInputSSPs.get(taskName).get(storeName);
+            sideInputStoresToSSPs.put(storeName, storeSSPs);
+          }
+
+          TaskSideInputStorageManager taskSideInputStorageManager =
+              new TaskSideInputStorageManager(taskName, 
taskModel.getTaskMode(), streamMetadataCache,
+                  loggedStoreBaseDirectory, sideInputStores, 
taskSideInputProcessors.get(taskName), sideInputStoresToSSPs,
+                  systemAdmins, config, clock);
+
+          
sideInputStoresToSSPs.values().stream().flatMap(Set::stream).forEach(ssp -> {
+              sideInputStorageManagers.put(ssp, taskSideInputStorageManager);
+            });
+
+          LOG.info("Created taskSideInputStorageManager for task {}, 
sideInputStores {} and loggedStoreBaseDirectory {}",
+              taskName, sideInputStores, loggedStoreBaseDirectory);
+        });
+    }
+    return sideInputStorageManagers;
+  }
+
+  private Map<String, StorageEngine> getSideInputStores(TaskName taskName) {
+    return taskStores.get(taskName).entrySet().stream().
+        filter(e -> 
sideInputSystemStreams.containsKey(e.getKey())).collect(Collectors.toMap(Map.Entry::getKey,
 Map.Entry::getValue));
+  }
+
+  private Map<String, StorageEngine> getNonSideInputStores(TaskName taskName) {
+    return taskStores.get(taskName).entrySet().stream().
+        filter(e -> 
!sideInputSystemStreams.containsKey(e.getKey())).collect(Collectors.toMap(Map.Entry::getKey,
 Map.Entry::getValue));
+  }
+
+  private Set<TaskSideInputStorageManager> getSideInputStorageManagers() {
+    return 
this.sideInputStorageManagers.values().stream().collect(Collectors.toSet());
+  }
+
+
   public void start() throws SamzaException {
-    LOG.info("Restore started");
+    restoreStores();
+    if (sideInputsPresent()) {
+      startSideInputs();
+    }
+  }
+
+  // Restoration of all stores, in parallel across tasks
+  private void restoreStores() {
+    LOG.info("Store Restore started");
 
     // initialize each TaskStorageManager
     this.taskRestoreManagers.values().forEach(taskStorageManager -> 
taskStorageManager.initialize());
 
-    // Start consumers
-    this.systemConsumers.values().forEach(systemConsumer -> 
systemConsumer.start());
+    // Start store consumers
+    this.storeConsumers.values().forEach(systemConsumer -> 
systemConsumer.start());
 
     // Create a thread pool for parallel restores (and stopping of persistent 
stores)
     ExecutorService executorService = 
Executors.newFixedThreadPool(this.parallelRestoreThreadPoolSize,
-        new ThreadFactoryBuilder().setNameFormat(RESTORE_THREAD_NAME).build());
+        new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat(RESTORE_THREAD_NAME).build());
 
     List<Future> taskRestoreFutures = new 
ArrayList<>(this.taskRestoreManagers.entrySet().size());
 
@@ -364,14 +616,148 @@ public class ContainerStorageManager {
 
     executorService.shutdown();
 
-    // Stop consumers
-    this.systemConsumers.values().forEach(systemConsumer -> 
systemConsumer.stop());
+    // Stop store consumers
+    this.storeConsumers.values().forEach(systemConsumer -> 
systemConsumer.stop());
 
     // Now re-create persistent stores in read-write mode, leave 
non-persistent stores as-is
     recreatePersistentTaskStoresInReadWriteMode(this.containerModel, 
jobContext, containerContext,
-        storageEngineFactories, changelogSystemStreams, serdes, 
taskInstanceMetrics, taskInstanceCollectors);
+        storageEngineFactories, serdes, taskInstanceMetrics, 
taskInstanceCollectors);
+
+    LOG.info("Store Restore complete");
+  }
+
+  // Read sideInputs until all sideInputStreams are caughtup, so start() can 
return
+  private void startSideInputs() {
+
+    LOG.info("SideInput Restore started");
+
+    // initialize the sideInputStorageManagers
+    getSideInputStorageManagers().forEach(sideInputStorageManager -> 
sideInputStorageManager.init());
+
+    // start the checkpointing thread at the commit-ms frequency
+    sideInputsFlushFuture = sideInputsFlushExecutor.scheduleWithFixedDelay(new 
Runnable() {
+      @Override
+      public void run() {
+        try {
+          getSideInputStorageManagers().forEach(sideInputStorageManager -> 
sideInputStorageManager.flush());
+        } catch (Exception e) {
+          LOG.error("Exception during flushing side inputs", e);
+          sideInputException = Optional.of(e);
+        }
+      }
+    }, 0, new TaskConfig(config).getCommitMs(), TimeUnit.MILLISECONDS);
+
+    // set the latch to the number of sideInput SSPs
+    this.sideInputsCaughtUp = new 
CountDownLatch(this.sideInputStorageManagers.keySet().size());
+
+    // register all side input SSPs with the consumers
+    for (SystemStreamPartition ssp : sideInputStorageManagers.keySet()) {
+      String startingOffset = 
sideInputStorageManagers.get(ssp).getStartingOffset(ssp);
+
+      if (startingOffset == null) {
+        throw new SamzaException("No offset defined for SideInput 
SystemStreamPartition : " + ssp);
+      }
+
+      // register startingOffset with the sysConsumer and register a metric 
for it
+      sideInputSystemConsumers.register(ssp, startingOffset, null);
+      
taskInstanceMetrics.get(sideInputStorageManagers.get(ssp).getTaskName()).addOffsetGauge(
+          ssp, ScalaJavaUtil.toScalaFunction(() -> 
sideInputStorageManagers.get(ssp).getLastProcessedOffset(ssp)));
+
+      SystemStreamMetadata systemStreamMetadata = 
streamMetadataCache.getSystemStreamMetadata(ssp.getSystemStream(), false);
+      SystemStreamMetadata.SystemStreamPartitionMetadata sspMetadata =
+          (systemStreamMetadata == null) ? null : 
systemStreamMetadata.getSystemStreamPartitionMetadata().get(ssp.getPartition());
+
+      // record a copy of the sspMetadata, to later check if its caught up
+      initialSideInputSSPMetadata.put(ssp, sspMetadata);
+
+      // check if the ssp is caught to upcoming, even at start
+      checkSideInputCaughtUp(ssp, startingOffset, 
SystemStreamMetadata.OffsetType.UPCOMING, false);
+    }
+
+    // start the systemConsumers for consuming input
+    this.sideInputSystemConsumers.start();
+
+    // create a thread for sideInput reads
+    Thread readSideInputs = new Thread(() -> {
+        while (!shutDownSideInputRead) {
+          IncomingMessageEnvelope envelope = 
sideInputSystemConsumers.choose(true);
+          if (envelope != null) {
+
+            if (!envelope.isEndOfStream())
+              
sideInputStorageManagers.get(envelope.getSystemStreamPartition()).process(envelope);
+
+            checkSideInputCaughtUp(envelope.getSystemStreamPartition(), 
envelope.getOffset(),
+                SystemStreamMetadata.OffsetType.NEWEST, 
envelope.isEndOfStream());
 
-    LOG.info("Restore complete");
+          } else {
+            LOG.trace("No incoming message was available");
+          }
+        }
+      });
+
+    readSideInputs.setDaemon(true);
+    readSideInputs.setUncaughtExceptionHandler(new 
Thread.UncaughtExceptionHandler() {
+      @Override
+      public void uncaughtException(Thread t, Throwable e) {
+        sideInputException = Optional.of(e);
+        sideInputsCaughtUp.countDown();
+      }
+    });
+
+    try {
+      readSideInputs.start();
+      // Make the main thread wait until all sideInputs have been caughtup or 
thrown an exception
+      this.sideInputsCaughtUp.await();
+
+      if (sideInputException.isPresent()) { // Throw exception if there was an 
exception in catching-up sideInputs
+        // TODO: SAMZA-2113 relay exception to main thread
+        throw new SamzaException("Exception in restoring side inputs", 
sideInputException.get());
+      }
+    } catch (InterruptedException e) {
+      sideInputException = Optional.of(e);
+      throw new SamzaException("Side inputs read was interrupted", e);
+    }
+
+    LOG.info("SideInput Restore complete");
+  }
+
+  private boolean sideInputsPresent() {
+    return !this.sideInputSystemStreams.isEmpty();
+  }
+
+  // Method to check if the given offset means the stream is caught up for 
reads
+  private void checkSideInputCaughtUp(SystemStreamPartition ssp, String 
offset, SystemStreamMetadata.OffsetType offsetType, boolean isEndOfStream) {
+
+    if (isEndOfStream) {
+      this.initialSideInputSSPMetadata.remove(ssp);
+      this.sideInputsCaughtUp.countDown();
+      LOG.info("Side input ssp {} has caught up to offset {} ({}).", ssp, 
offset, offsetType);
+      return;
+    }
+
+    SystemStreamMetadata.SystemStreamPartitionMetadata sspMetadata = 
this.initialSideInputSSPMetadata.get(ssp);
+    String offsetToCheck = sspMetadata == null ? null : 
sspMetadata.getOffset(offsetType);
+    LOG.trace("Checking {} offset {} against {} for {}.", offsetType, offset, 
offsetToCheck, ssp);
+
+    // Let's compare offset of the chosen message with offsetToCheck.
+    Integer comparatorResult;
+    if (offset == null || offsetToCheck == null) {
+      comparatorResult = -1;
+    } else {
+      SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(ssp.getSystem());
+      comparatorResult = systemAdmin.offsetComparator(offset, offsetToCheck);
+    }
+
+    // The SSP is no longer lagging if the envelope's offset is greater than 
or equal to the
+    // latest offset.
+    if (comparatorResult != null && comparatorResult.intValue() >= 0) {
+
+      LOG.info("Side input ssp {} has caught up to offset {} ({}).", ssp, 
offset, offsetType);
+      // if its caught up, we remove the ssp from the map, and countDown the 
latch
+      this.initialSideInputSSPMetadata.remove(ssp);
+      this.sideInputsCaughtUp.countDown();
+      return;
+    }
   }
 
   /**
@@ -407,15 +793,30 @@ public class ContainerStorageManager {
   }
 
   public void shutdown() {
-    this.taskRestoreManagers.forEach((taskInstance, taskRestoreManager) -> {
-        if (taskRestoreManager != null) {
-          LOG.debug("Shutting down task storage manager for taskName: {} ", 
taskInstance);
-          taskRestoreManager.stop();
-        } else {
-          LOG.debug("Skipping task storage manager shutdown for taskName: {}", 
taskInstance);
-        }
-      });
+    // stop all nonsideinputstores including persistent and non-persistent 
stores
+    this.containerModel.getTasks().forEach((taskName, taskModel) ->
+        getNonSideInputStores(taskName).forEach((storeName, store) -> 
store.stop())
+    );
+
+    // stop all sideinput consumers and stores
+    if (sideInputsPresent()) {
+      // stop reading sideInputs
+      this.shutDownSideInputRead = true;
+
+      this.sideInputSystemConsumers.stop();
 
+      // cancel all future sideInput flushes, shutdown the executor, and await 
for finish
+      sideInputsFlushFuture.cancel(false);
+      sideInputsFlushExecutor.shutdown();
+      try {
+        
sideInputsFlushExecutor.awaitTermination(SIDE_INPUT_FLUSH_TIMEOUT.toMillis(), 
TimeUnit.MILLISECONDS);
+      } catch (InterruptedException e) {
+        throw new SamzaException("Exception while shutting down side inputs", 
e);
+      }
+
+      // stop all sideInputStores -- this will perform one last flush on the 
KV stores, and write the offset file
+      this.getSideInputStorageManagers().forEach(sideInputStorageManager -> 
sideInputStorageManager.stop());
+    }
     LOG.info("Shutdown complete");
   }
 
@@ -514,7 +915,7 @@ public class ContainerStorageManager {
 
       taskStores.keySet().forEach(storeName -> {
           File nonLoggedStorePartitionDir =
-              
StorageManagerUtil.getStorePartitionDir(nonLoggedStoreBaseDirectory, storeName, 
taskModel.getTaskName());
+              
StorageManagerUtil.getStorePartitionDir(nonLoggedStoreBaseDirectory, storeName, 
taskModel.getTaskName(), taskModel.getTaskMode());
           LOG.info("Got non logged storage partition directory as " + 
nonLoggedStorePartitionDir.toPath().toString());
 
           if (nonLoggedStorePartitionDir.exists()) {
@@ -523,7 +924,7 @@ public class ContainerStorageManager {
           }
 
           File loggedStorePartitionDir =
-              
StorageManagerUtil.getStorePartitionDir(loggedStoreBaseDirectory, storeName, 
taskModel.getTaskName());
+              
StorageManagerUtil.getStorePartitionDir(loggedStoreBaseDirectory, storeName, 
taskModel.getTaskName(), taskModel.getTaskMode());
           LOG.info("Got logged storage partition directory as " + 
loggedStorePartitionDir.toPath().toString());
 
           // Delete the logged store if it is not valid.
@@ -579,7 +980,7 @@ public class ContainerStorageManager {
           if (storageEngine.getStoreProperties().isLoggedStore()) {
 
             File loggedStorePartitionDir =
-                
StorageManagerUtil.getStorePartitionDir(loggedStoreBaseDirectory, storeName, 
taskModel.getTaskName());
+                
StorageManagerUtil.getStorePartitionDir(loggedStoreBaseDirectory, storeName, 
taskModel.getTaskName(), taskModel.getTaskMode());
 
             LOG.info("Using logged storage partition directory: " + 
loggedStorePartitionDir.toPath().toString()
                 + " for store: " + storeName);
@@ -589,7 +990,7 @@ public class ContainerStorageManager {
             }
           } else {
             File nonLoggedStorePartitionDir =
-                
StorageManagerUtil.getStorePartitionDir(nonLoggedStoreBaseDirectory, storeName, 
taskModel.getTaskName());
+                
StorageManagerUtil.getStorePartitionDir(nonLoggedStoreBaseDirectory, storeName, 
taskModel.getTaskName(), taskModel.getTaskMode());
             LOG.info("Using non logged storage partition directory: " + 
nonLoggedStorePartitionDir.toPath().toString()
                 + " for store: " + storeName);
             nonLoggedStorePartitionDir.mkdirs();
@@ -659,7 +1060,7 @@ public class ContainerStorageManager {
         SystemStreamPartition systemStreamPartition =
             new SystemStreamPartition(changelogSystemStreamEntry.getValue(), 
taskModel.getChangelogPartition());
         SystemAdmin systemAdmin = 
systemAdmins.getSystemAdmin(changelogSystemStreamEntry.getValue().getSystem());
-        SystemConsumer systemConsumer = 
systemConsumers.get(changelogSystemStreamEntry.getKey());
+        SystemConsumer systemConsumer = 
storeConsumers.get(changelogSystemStreamEntry.getKey());
 
         String offset = getStartingOffset(systemStreamPartition, systemAdmin);
 
@@ -708,7 +1109,7 @@ public class ContainerStorageManager {
       LOG.debug("Restoring stores for task: {}", taskModel.getTaskName());
 
       for (String storeName : taskStoresToRestore) {
-        SystemConsumer systemConsumer = systemConsumers.get(storeName);
+        SystemConsumer systemConsumer = storeConsumers.get(storeName);
         SystemStream systemStream = changelogSystemStreams.get(storeName);
 
         SystemStreamPartitionIterator systemStreamPartitionIterator = new 
SystemStreamPartitionIterator(systemConsumer,
@@ -732,11 +1133,16 @@ public class ContainerStorageManager {
      * can invoke compaction.
      */
     public void stopPersistentStores() {
-      this.taskStores.values().stream().filter(storageEngine -> {
-          return storageEngine.getStoreProperties().isPersistedToDisk();
-        }).forEach(storageEngine -> {
-            storageEngine.stop();
-          });
+
+      Map<String, StorageEngine> persistentStores = 
this.taskStores.entrySet().stream().filter(e -> {
+          return e.getValue().getStoreProperties().isPersistedToDisk();
+        }).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+      persistentStores.forEach((storeName, storageEngine) -> {
+          storageEngine.stop();
+          this.taskStores.remove(storeName);
+        });
+      LOG.info("Stopped persistent stores {}", persistentStores);
     }
   }
 }
diff --git 
a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
index 85cbf53..c2a2c7e 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
@@ -24,6 +24,7 @@ import java.io._
 import com.google.common.annotations.VisibleForTesting
 import org.apache.samza.Partition
 import org.apache.samza.container.TaskName
+import org.apache.samza.job.model.TaskMode
 import org.apache.samza.system._
 import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
 import org.apache.samza.util.{FileUtil, Logging}
@@ -90,7 +91,9 @@ class TaskStorageManager(
 
         if (newestOffset != null) {
           debug("Storing offset for store in OFFSET file ")
-          StorageManagerUtil.writeOffsetFile(loggedStoreBaseDir, storeName, 
taskName, Map(ssp -> newestOffset).asJava)
+
+          // TaskStorageManagers are only spun-up for active tasks
+          StorageManagerUtil.writeOffsetFile(loggedStoreBaseDir, storeName, 
taskName, TaskMode.Active, Map(ssp -> newestOffset).asJava)
           debug("Successfully stored offset %s for store %s in OFFSET file " 
format(newestOffset, storeName))
         } else {
           //if newestOffset is null, then it means the store is (or has 
become) empty. No need to persist the offset file
diff --git 
a/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputStorageManager.java
 
b/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputStorageManager.java
index 2d60f7b..a7cefa0 100644
--- 
a/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputStorageManager.java
+++ 
b/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputStorageManager.java
@@ -30,6 +30,7 @@ import java.util.stream.IntStream;
 import org.apache.samza.Partition;
 import org.apache.samza.config.Config;
 import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemAdmins;
@@ -288,7 +289,7 @@ public class TestTaskSideInputStorageManager {
     }
 
     TaskSideInputStorageManager build() {
-      return spy(new TaskSideInputStorageManager(taskName, 
streamMetadataCache, storeBaseDir, stores,
+      return spy(new TaskSideInputStorageManager(taskName, TaskMode.Active, 
streamMetadataCache, new File(storeBaseDir), stores,
           storeToProcessor, storeToSSps, systemAdmins, mock(Config.class), 
clock));
     }
   }
diff --git 
a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
index 06a4ebf..acaecdb 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
@@ -103,8 +103,6 @@ public class TestAsyncRunLoop {
         null,
         null,
         null,
-        new scala.collection.immutable.HashSet<>(),
-        null,
         mock(JobContext.class),
         mock(ContainerContext.class),
         Option.apply(null),
diff --git 
a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
 
b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
index 9517eec..99e133f 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
+++ 
b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
@@ -204,10 +204,10 @@ public class TestContainerStorageManager {
     // Create the container storage manager
     this.containerStorageManager =
         new ContainerStorageManager(new ContainerModel("samza-container-test", 
tasks), mockStreamMetadataCache,
-            mockSystemAdmins, changelogSystemStreams, storageEngineFactories, 
systemFactories, serdes, config,
+            mockSystemAdmins, changelogSystemStreams, new HashMap<>(), 
storageEngineFactories, systemFactories, serdes, config,
             taskInstanceMetrics, samzaContainerMetrics, 
Mockito.mock(JobContext.class),
             Mockito.mock(ContainerContext.class), Mockito.mock(Map.class), 
DEFAULT_LOGGED_STORE_BASE_DIR,
-            DEFAULT_STORE_BASE_DIR, 2, new SystemClock());
+            DEFAULT_STORE_BASE_DIR, 2, null, new SystemClock());
   }
 
   @Test
diff --git 
a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
 
b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
index 8e22533..424a0d0 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
@@ -27,7 +27,7 @@ import org.apache.samza.Partition
 import org.apache.samza.config._
 import org.apache.samza.container.{SamzaContainerMetrics, TaskInstanceMetrics, 
TaskName}
 import org.apache.samza.context.{ContainerContext, JobContext}
-import org.apache.samza.job.model.{ContainerModel, TaskModel}
+import org.apache.samza.job.model.{ContainerModel, TaskMode, TaskModel}
 import org.apache.samza.serializers.{Serde, StringSerdeFactory}
 import org.apache.samza.storage.StoreProperties.StorePropertiesBuilder
 import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
@@ -37,7 +37,7 @@ import org.apache.samza.util.{FileUtil, SystemClock}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 import org.mockito.Matchers._
-import org.mockito.Mockito
+import org.mockito.{Matchers, Mockito}
 import org.mockito.Mockito._
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
@@ -55,9 +55,9 @@ class TestTaskStorageManager extends MockitoSugar {
 
   @Before
   def setupTestDirs() {
-    
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultStoreBaseDir,
 store, taskName)
+    
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultStoreBaseDir,
 store, taskName, TaskMode.Active)
       .mkdirs()
-    
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName)
+    
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName, TaskMode.Active)
       .mkdirs()
   }
 
@@ -67,6 +67,10 @@ class TestTaskStorageManager extends MockitoSugar {
     FileUtil.rm(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir)
   }
 
+  def getStreamName(storeName : String): String = {
+    "testStream-"+storeName
+  }
+
   /**
     * This tests the entire TaskStorageManager lifecycle for a Persisted 
Logged Store
     * For example, a RocksDb store with changelog needs to continuously update 
the offset file on flush & stop
@@ -75,10 +79,10 @@ class TestTaskStorageManager extends MockitoSugar {
   @Test
   def testStoreLifecycleForLoggedPersistedStore(): Unit = {
     // Basic test setup of SystemStream, SystemStreamPartition for this task
-    val ss = new SystemStream("kafka", "testStream")
+    val ss = new SystemStream("kafka", getStreamName(loggedStore))
     val partition = new Partition(0)
     val ssp = new SystemStreamPartition(ss, partition)
-    val storeDirectory = 
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName)
+    val storeDirectory = 
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName, TaskMode.Active)
     val storeFile = new File(storeDirectory, "store.sst")
     val offsetFile = new File(storeDirectory, 
StorageManagerUtil.OFFSET_FILE_NAME)
 
@@ -89,14 +93,14 @@ class TestTaskStorageManager extends MockitoSugar {
     val mockSSPMetadataCache = mock[SSPMetadataCache]
     val mockSystemConsumer = mock[SystemConsumer]
     val mockSystemAdmin = mock[SystemAdmin]
-    val changelogSpec = StreamSpec.createChangeLogStreamSpec("testStream", 
"kafka", 1)
+    val changelogSpec = 
StreamSpec.createChangeLogStreamSpec(getStreamName(loggedStore), "kafka", 1)
     doNothing().when(mockSystemAdmin).validateStream(changelogSpec)
     doNothing().when(mockSystemConsumer).stop()
 
     // Test 1: Initial invocation - No store on disk (only changelog has data)
     // Setup initial sspMetadata
     var sspMetadata = new SystemStreamPartitionMetadata("0", "50", "51")
-    var metadata = new SystemStreamMetadata("testStream", new 
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
+    var metadata = new SystemStreamMetadata(getStreamName(loggedStore), new 
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
       {
         put(partition, sspMetadata)
       }
@@ -121,18 +125,18 @@ class TestTaskStorageManager extends MockitoSugar {
     // Test 2: flush should update the offset file
     taskManager.flush()
     assertTrue(offsetFile.exists())
-    assertEquals("{\"kafka.testStream.0\":\"50\"}", 
FileUtil.readWithChecksum(offsetFile))
+    assertEquals("{\"kafka.testStream-loggedStore1.0\":\"50\"}", 
FileUtil.readWithChecksum(offsetFile))
 
     // Test 3: Update sspMetadata before shutdown and verify that offset file 
is updated correctly
     when(mockSSPMetadataCache.getMetadata(ssp)).thenReturn(new 
SystemStreamPartitionMetadata("0", "100", "101"))
     taskManager.stop()
     assertTrue(storeFile.exists())
     assertTrue(offsetFile.exists())
-    assertEquals("{\"kafka.testStream.0\":\"100\"}", 
FileUtil.readWithChecksum(offsetFile))
+    assertEquals("{\"kafka.testStream-loggedStore1.0\":\"100\"}", 
FileUtil.readWithChecksum(offsetFile))
 
     // Test 4: Initialize again with an updated sspMetadata; Verify that it 
restores from the correct offset
     sspMetadata = new SystemStreamPartitionMetadata("0", "150", "151")
-    metadata = new SystemStreamMetadata("testStream", new 
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
+    metadata = new SystemStreamMetadata(getStreamName(loggedStore), new 
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
       {
         put(partition, sspMetadata)
       }
@@ -165,17 +169,17 @@ class TestTaskStorageManager extends MockitoSugar {
   @Test
   def testStoreLifecycleForLoggedInMemoryStore(): Unit = {
     // Basic test setup of SystemStream, SystemStreamPartition for this task
-    val ss = new SystemStream("kafka", "testStream")
+    val ss = new SystemStream("kafka", getStreamName(store))
     val partition = new Partition(0)
     val ssp = new SystemStreamPartition(ss, partition)
-    val storeDirectory = 
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 store, taskName)
+    val storeDirectory = 
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 store, taskName, TaskMode.Active)
 
     val mockStorageEngine: StorageEngine = 
createMockStorageEngine(isLoggedStore = true, isPersistedStore = false, null)
 
     // Mock for StreamMetadataCache, SystemConsumer, SystemAdmin
     val mockStreamMetadataCache = mock[StreamMetadataCache]
     val mockSystemAdmin = mock[SystemAdmin]
-    val changelogSpec = StreamSpec.createChangeLogStreamSpec("testStream", 
"kafka", 1)
+    val changelogSpec = 
StreamSpec.createChangeLogStreamSpec(getStreamName(store), "kafka", 1)
     doNothing().when(mockSystemAdmin).validateStream(changelogSpec)
 
     val mockSystemConsumer = mock[SystemConsumer]
@@ -184,7 +188,7 @@ class TestTaskStorageManager extends MockitoSugar {
     // Test 1: Initial invocation - No store data (only changelog has data)
     // Setup initial sspMetadata
     val sspMetadata = new SystemStreamPartitionMetadata("0", "50", "51")
-    var metadata = new SystemStreamMetadata("testStream", new 
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
+    var metadata = new SystemStreamMetadata(getStreamName(store), new 
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
       {
         put(partition, sspMetadata)
       }
@@ -208,7 +212,7 @@ class TestTaskStorageManager extends MockitoSugar {
     assertTrue(storeDirectory.list().isEmpty)
 
     // Test 3: Update sspMetadata before shutdown and verify that offset file 
is NOT created
-    metadata = new SystemStreamMetadata("testStream", new 
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
+    metadata = new SystemStreamMetadata(getStreamName(store), new 
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
       {
         put(partition, new SystemStreamPartitionMetadata("0", "100", "101"))
       }
@@ -218,7 +222,7 @@ class TestTaskStorageManager extends MockitoSugar {
     assertTrue(storeDirectory.list().isEmpty)
 
     // Test 4: Initialize again with an updated sspMetadata; Verify that it 
restores from the earliest offset
-    metadata = new SystemStreamMetadata("testStream", new 
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
+    metadata = new SystemStreamMetadata(getStreamName(store), new 
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
       {
         put(partition, new SystemStreamPartitionMetadata("0", "150", "151"))
       }
@@ -241,9 +245,9 @@ class TestTaskStorageManager extends MockitoSugar {
 
   @Test
   def testStoreDirsWithoutOffsetFileAreDeletedInCleanBaseDirs() {
-    val checkFilePath1 = new 
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultStoreBaseDir,
 store, taskName), "check")
+    val checkFilePath1 = new 
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultStoreBaseDir,
 store, taskName, TaskMode.Active), "check")
     checkFilePath1.createNewFile()
-    val checkFilePath2 = new 
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName), "check")
+    val checkFilePath2 = new 
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName, TaskMode.Active), "check")
     checkFilePath2.createNewFile()
 
     val taskStorageManager = new TaskStorageManagerBuilder()
@@ -259,7 +263,7 @@ class TestTaskStorageManager extends MockitoSugar {
 
   @Test
   def testLoggedStoreDirsWithOffsetFileAreNotDeletedInCleanBaseDirs() {
-    val offsetFilePath = new 
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName), StorageManagerUtil.OFFSET_FILE_NAME)
+    val offsetFilePath = new 
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName, TaskMode.Active), StorageManagerUtil.OFFSET_FILE_NAME)
     FileUtil.writeWithChecksum(offsetFilePath, "100")
 
     val taskStorageManager = new TaskStorageManagerBuilder()
@@ -275,7 +279,7 @@ class TestTaskStorageManager extends MockitoSugar {
   def testStoreDeletedWhenOffsetFileOlderThanDeleteRetention() {
     // This test ensures that store gets deleted when lastModifiedTime of the 
offset file
     // is older than deletionRetention of the changeLog.
-    val storeDirectory = 
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName)
+    val storeDirectory = 
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName, TaskMode.Active)
     storeDirectory.setLastModified(0)
     val offsetFile = new File(storeDirectory, 
StorageManagerUtil.OFFSET_FILE_NAME)
     offsetFile.createNewFile()
@@ -294,7 +298,7 @@ class TestTaskStorageManager extends MockitoSugar {
 
   @Test
   def testOffsetFileIsRemovedInCleanBaseDirsForInMemoryLoggedStore() {
-    val offsetFilePath = new 
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName), StorageManagerUtil.OFFSET_FILE_NAME)
+    val offsetFilePath = new 
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName, TaskMode.Active), StorageManagerUtil.OFFSET_FILE_NAME)
     FileUtil.writeWithChecksum(offsetFilePath, "100")
 
     val taskStorageManager = new TaskStorageManagerBuilder()
@@ -310,22 +314,22 @@ class TestTaskStorageManager extends MockitoSugar {
   def testStopCreatesOffsetFileForLoggedStore() {
     val partition = new Partition(0)
 
-    val storeDirectory = 
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName)
+    val storeDirectory = 
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName, TaskMode.Active)
     val offsetFile = new File(storeDirectory, 
StorageManagerUtil.OFFSET_FILE_NAME)
 
     val sspMetadataCache = mock[SSPMetadataCache]
     val sspMetadata = new SystemStreamPartitionMetadata("20", "100", "101")
-    when(sspMetadataCache.getMetadata(new SystemStreamPartition("kafka", 
"testStream", partition)))
+    when(sspMetadataCache.getMetadata(new SystemStreamPartition("kafka", 
getStreamName(loggedStore), partition)))
       .thenReturn(sspMetadata)
 
-    var metadata = new SystemStreamMetadata("testStream", new 
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
+    var metadata = new SystemStreamMetadata(getStreamName(loggedStore), new 
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
       {
         put(partition, sspMetadata)
       }
     })
 
     val mockStreamMetadataCache = mock[StreamMetadataCache]
-    when(mockStreamMetadataCache.getStreamMetadata(any(), 
any())).thenReturn(Map(new SystemStream("kafka", "testStream") -> metadata))
+    when(mockStreamMetadataCache.getStreamMetadata(any(), 
any())).thenReturn(Map(new SystemStream("kafka", getStreamName(loggedStore)) -> 
metadata))
 
     //Build TaskStorageManager
     val taskStorageManager = new TaskStorageManagerBuilder()
@@ -341,7 +345,7 @@ class TestTaskStorageManager extends MockitoSugar {
 
     //Check conditions
     assertTrue("Offset file doesn't exist!", offsetFile.exists())
-    assertEquals("Found incorrect value in offset file!", 
"{\"kafka.testStream.0\":\"100\"}", FileUtil.readWithChecksum(offsetFile))
+    assertEquals("Found incorrect value in offset file!", 
"{\"kafka.testStream-loggedStore1.0\":\"100\"}", 
FileUtil.readWithChecksum(offsetFile))
   }
 
   /**
@@ -351,31 +355,23 @@ class TestTaskStorageManager extends MockitoSugar {
   def testFlushCreatesOffsetFileForLoggedStore() {
     val partition = new Partition(0)
 
-    val offsetFilePath = new 
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName) + File.separator + StorageManagerUtil.OFFSET_FILE_NAME)
+    val offsetFilePath = new 
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName, TaskMode.Active) + File.separator + 
StorageManagerUtil.OFFSET_FILE_NAME)
     val anotherOffsetPath = new File(
-      StorageManagerUtil.getStorePartitionDir(
-        TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, store, taskName) 
+ File.separator + StorageManagerUtil.OFFSET_FILE_NAME)
+      
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 store, taskName, TaskMode.Active) + File.separator + 
StorageManagerUtil.OFFSET_FILE_NAME)
 
     val sspMetadataCache = mock[SSPMetadataCache]
     val sspMetadata = new SystemStreamPartitionMetadata("20", "100", "101")
-    when(sspMetadataCache.getMetadata(new SystemStreamPartition("kafka", 
"testStream", partition)))
+    when(sspMetadataCache.getMetadata(new SystemStreamPartition("kafka", 
getStreamName(loggedStore), partition)))
+      .thenReturn(sspMetadata)
+    when(sspMetadataCache.getMetadata(new SystemStreamPartition("kafka", 
getStreamName(store), partition)))
       .thenReturn(sspMetadata)
-
-    var metadata = new SystemStreamMetadata("testStream", new 
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
-      {
-        put(partition, sspMetadata)
-      }
-    })
-
-    val mockStreamMetadataCache = mock[StreamMetadataCache]
-    when(mockStreamMetadataCache.getStreamMetadata(any(), 
any())).thenReturn(Map(new SystemStream("kafka", "testStream") -> metadata))
 
     //Build TaskStorageManager
     val taskStorageManager = new TaskStorageManagerBuilder()
       .addLoggedStore(loggedStore, true)
       .addStore(store, false)
       .setSSPMetadataCache(sspMetadataCache)
-      .setStreamMetadataCache(mockStreamMetadataCache)
+      .setStreamMetadataCache(createMockStreamMetadataCache("20", "100", 
"101"))
       .setPartition(partition)
       .initializeContainerStorageManager()
       .build
@@ -385,7 +381,7 @@ class TestTaskStorageManager extends MockitoSugar {
 
     //Check conditions
     assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
-    assertEquals("Found incorrect value in offset file!", 
"{\"kafka.testStream.0\":\"100\"}", FileUtil.readWithChecksum(offsetFilePath))
+    assertEquals("Found incorrect value in offset file!", 
"{\"kafka.testStream-loggedStore1.0\":\"100\"}", 
FileUtil.readWithChecksum(offsetFilePath))
 
     assertTrue("Offset file got created for a store that is not persisted to 
the disk!!", !anotherOffsetPath.exists())
   }
@@ -397,24 +393,24 @@ class TestTaskStorageManager extends MockitoSugar {
   def testFlushDeletesOffsetFileForLoggedStoreForEmptyPartition() {
     val partition = new Partition(0)
 
-    val offsetFilePath = new 
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName) + File.separator + StorageManagerUtil.OFFSET_FILE_NAME)
+    val offsetFilePath = new 
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName, TaskMode.Active) + File.separator + 
StorageManagerUtil.OFFSET_FILE_NAME)
 
     val sspMetadataCache = mock[SSPMetadataCache]
     val sspMetadata = new SystemStreamPartitionMetadata("0", "100", "101")
-    when(sspMetadataCache.getMetadata(new SystemStreamPartition("kafka", 
"testStream", partition)))
+    when(sspMetadataCache.getMetadata(new SystemStreamPartition("kafka", 
getStreamName(loggedStore), partition)))
       // first return some metadata
       .thenReturn(sspMetadata)
       // then return no metadata to trigger the delete
       .thenReturn(null)
 
-    var metadata = new SystemStreamMetadata("testStream", new 
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
+    var metadata = new SystemStreamMetadata(getStreamName(loggedStore), new 
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
       {
         put(partition, sspMetadata)
       }
     })
 
     val mockStreamMetadataCache = mock[StreamMetadataCache]
-    when(mockStreamMetadataCache.getStreamMetadata(any(), 
any())).thenReturn(Map(new SystemStream("kafka", "testStream") -> metadata))
+    when(mockStreamMetadataCache.getStreamMetadata(any(), 
any())).thenReturn(Map(new SystemStream("kafka", getStreamName(loggedStore)) -> 
metadata))
 
     //Build TaskStorageManager
     val taskStorageManager = new TaskStorageManagerBuilder()
@@ -430,7 +426,7 @@ class TestTaskStorageManager extends MockitoSugar {
 
     //Check conditions
     assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
-    assertEquals("Found incorrect value in offset file!", 
"{\"kafka.testStream.0\":\"100\"}", FileUtil.readWithChecksum(offsetFilePath))
+    assertEquals("Found incorrect value in offset file!", 
"{\"kafka.testStream-loggedStore1.0\":\"100\"}", 
FileUtil.readWithChecksum(offsetFilePath))
 
     //Invoke test method again
     taskStorageManager.flush()
@@ -442,23 +438,23 @@ class TestTaskStorageManager extends MockitoSugar {
   @Test
   def testFlushOverwritesOffsetFileForLoggedStore() {
     val partition = new Partition(0)
-    val ssp = new SystemStreamPartition("kafka", "testStream", partition)
+    val ssp = new SystemStreamPartition("kafka", getStreamName(loggedStore), 
partition)
 
-    val offsetFilePath = new 
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName) + File.separator + StorageManagerUtil.OFFSET_FILE_NAME)
+    val offsetFilePath = new 
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName, TaskMode.Active) + File.separator + 
StorageManagerUtil.OFFSET_FILE_NAME)
     FileUtil.writeWithChecksum(offsetFilePath, "100")
 
     val sspMetadataCache = mock[SSPMetadataCache]
     val sspMetadata = new SystemStreamPartitionMetadata("20", "139", "140")
     when(sspMetadataCache.getMetadata(ssp)).thenReturn(sspMetadata)
 
-    var metadata = new SystemStreamMetadata("testStream", new 
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
+    var metadata = new SystemStreamMetadata(getStreamName(loggedStore), new 
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
       {
         put(partition, sspMetadata)
       }
     })
 
     val mockStreamMetadataCache = mock[StreamMetadataCache]
-    when(mockStreamMetadataCache.getStreamMetadata(any(), 
any())).thenReturn(Map(new SystemStream("kafka", "testStream") -> metadata))
+    when(mockStreamMetadataCache.getStreamMetadata(any(), 
any())).thenReturn(Map(new SystemStream("kafka", getStreamName(loggedStore)) -> 
metadata))
 
     //Build TaskStorageManager
     val taskStorageManager = new TaskStorageManagerBuilder()
@@ -474,7 +470,7 @@ class TestTaskStorageManager extends MockitoSugar {
 
     //Check conditions
     assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
-    assertEquals("Found incorrect value in offset file!", 
"{\"kafka.testStream.0\":\"139\"}", FileUtil.readWithChecksum(offsetFilePath))
+    assertEquals("Found incorrect value in offset file!", 
"{\"kafka.testStream-loggedStore1.0\":\"139\"}", 
FileUtil.readWithChecksum(offsetFilePath))
 
     // Flush again
     when(sspMetadataCache.getMetadata(ssp)).thenReturn(new 
SystemStreamPartitionMetadata("20", "193", "194"))
@@ -484,18 +480,18 @@ class TestTaskStorageManager extends MockitoSugar {
 
     //Check conditions
     assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
-    assertEquals("Found incorrect value in offset file!", 
"{\"kafka.testStream.0\":\"193\"}", FileUtil.readWithChecksum(offsetFilePath))
+    assertEquals("Found incorrect value in offset file!", 
"{\"kafka.testStream-loggedStore1.0\":\"193\"}", 
FileUtil.readWithChecksum(offsetFilePath))
   }
 
   @Test
   def testStopShouldNotCreateOffsetFileForEmptyStore() {
     val partition = new Partition(0)
 
-    val offsetFilePath = new 
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName) + File.separator + StorageManagerUtil.OFFSET_FILE_NAME)
+    val offsetFilePath = new 
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName, TaskMode.Active) + File.separator + 
StorageManagerUtil.OFFSET_FILE_NAME)
 
 
     val sspMetadataCache = mock[SSPMetadataCache]
-    when(sspMetadataCache.getMetadata(new SystemStreamPartition("kafka", 
"testStream", partition))).thenReturn(null)
+    when(sspMetadataCache.getMetadata(new SystemStreamPartition("kafka", 
getStreamName(loggedStore), partition))).thenReturn(null)
 
     //Build TaskStorageManager
     val taskStorageManager = new TaskStorageManagerBuilder()
@@ -569,7 +565,7 @@ class TestTaskStorageManager extends MockitoSugar {
   @Test
   def testReadOfOldOffsetFormat(): Unit = {
     // Create a file in old single-offset format, with a sample offset
-    val storeDirectory = 
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName)
+    val storeDirectory = 
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName, TaskMode.Active)
     val storeFile = new File(storeDirectory, "store.sst")
     val offsetFile = new File(storeDirectory, 
StorageManagerUtil.OFFSET_FILE_NAME)
     val sampleOldOffset = "912321"
@@ -584,13 +580,13 @@ class TestTaskStorageManager extends MockitoSugar {
 
   private def testChangelogConsumerOffsetRegistration(oldestOffset: String, 
newestOffset: String, upcomingOffset: String, expectedRegisteredOffset: String, 
fileOffset: String, writeOffsetFile: Boolean): Unit = {
     val systemName = "kafka"
-    val streamName = "testStream"
+    val streamName = getStreamName(loggedStore)
     val partitionCount = 1
     // Basic test setup of SystemStream, SystemStreamPartition for this task
     val ss = new SystemStream(systemName, streamName)
     val partition = new Partition(0)
     val ssp = new SystemStreamPartition(ss, partition)
-    val storeDirectory = 
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName)
+    val storeDirectory = 
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName, TaskMode.Active  )
     val storeFile = new File(storeDirectory, "store.sst")
 
     if (writeOffsetFile) {
@@ -672,14 +668,24 @@ class TestTaskStorageManager extends MockitoSugar {
 
   private def createMockStreamMetadataCache(oldestOffset: String, 
newestOffset: String, upcomingOffset: String) = {
     // an empty store would return a SSPMetadata with oldest, newest and 
upcoming offset set to null
-    var metadata = new SystemStreamMetadata("testStream", new 
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
+    var metadata1 = new SystemStreamMetadata(getStreamName(loggedStore), new 
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
+      {
+        put(new Partition(0), new SystemStreamPartitionMetadata(oldestOffset, 
newestOffset, upcomingOffset))
+      }
+    })
+
+    var metadata2 = new SystemStreamMetadata(getStreamName(store), new 
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
       {
         put(new Partition(0), new SystemStreamPartitionMetadata(oldestOffset, 
newestOffset, upcomingOffset))
       }
     })
 
     val mockStreamMetadataCache = mock[StreamMetadataCache]
-    when(mockStreamMetadataCache.getStreamMetadata(any(), 
any())).thenReturn(Map(new SystemStream("kafka", "testStream") -> metadata))
+    
when(mockStreamMetadataCache.getStreamMetadata(org.mockito.Matchers.eq(Set(new 
SystemStream("kafka", getStreamName(loggedStore)))), any())).thenReturn(Map(new 
SystemStream("kafka", getStreamName(loggedStore)) -> metadata1))
+    
when(mockStreamMetadataCache.getStreamMetadata(org.mockito.Matchers.eq(Set(new 
SystemStream("kafka", getStreamName(store)))), any())).thenReturn(Map(new 
SystemStream("kafka", getStreamName(store)) -> metadata2))
+    
when(mockStreamMetadataCache.getStreamMetadata(org.mockito.Matchers.eq(Set(new 
SystemStream("kafka", getStreamName(store)), new SystemStream("kafka", 
getStreamName(loggedStore)))), any())).
+      thenReturn(Map(new SystemStream("kafka", getStreamName(store)) -> 
metadata2, new SystemStream("kafka", getStreamName(loggedStore)) -> metadata1))
+
     mockStreamMetadataCache
   }
 
@@ -727,10 +733,14 @@ class TaskStorageManagerBuilder extends MockitoSugar {
   def addStore(storeName: String, storageEngine: StorageEngine, 
systemConsumer: SystemConsumer): TaskStorageManagerBuilder = {
     taskStores = taskStores ++ Map(storeName -> storageEngine)
     storeConsumers = storeConsumers ++ Map("kafka" -> systemConsumer)
-    changeLogSystemStreams = changeLogSystemStreams ++ Map(storeName -> new 
SystemStream("kafka", "testStream"))
+    changeLogSystemStreams = changeLogSystemStreams ++ Map(storeName -> new 
SystemStream("kafka", getStreamName(storeName)))
     this
   }
 
+  def getStreamName(storeName : String): String = {
+    "testStream-"+storeName
+  }
+
   def addStore(storeName: String, isPersistedToDisk: Boolean): 
TaskStorageManagerBuilder = {
     val mockStorageEngine = mock[StorageEngine]
     when(mockStorageEngine.getStoreProperties)
@@ -816,10 +826,10 @@ class TaskStorageManagerBuilder extends MockitoSugar {
 
 
     containerStorageManager = new ContainerStorageManager(containerModel, 
streamMetadataCache, mockSystemAdmins,
-      changeLogSystemStreams.asJava, storageEngineFactories.asJava, 
systemFactories.asJava, mockSerdes.asJava, config,
+      changeLogSystemStreams.asJava, Map[String, 
util.Set[SystemStream]]().asJava, storageEngineFactories.asJava, 
systemFactories.asJava, mockSerdes.asJava, config,
       new HashMap[TaskName, TaskInstanceMetrics]().asJava, 
Mockito.mock(classOf[SamzaContainerMetrics]), Mockito.mock(classOf[JobContext]),
       Mockito.mock(classOf[ContainerContext]), new HashMap[TaskName, 
TaskInstanceCollector].asJava, loggedStoreBaseDir, 
TaskStorageManagerBuilder.defaultStoreBaseDir, 1,
-      new SystemClock)
+      null, new SystemClock)
     this
   }
 
diff --git 
a/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java 
b/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java
index 9e1daaf..c711e8d 100644
--- a/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java
+++ b/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java
@@ -28,12 +28,11 @@ import java.util.List;
 import org.apache.commons.io.filefilter.DirectoryFileFilter;
 import org.apache.commons.io.FileUtils;
 import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
 import org.apache.samza.rest.model.JobStatus;
 import org.apache.samza.rest.model.Task;
 import org.apache.samza.rest.proxy.job.JobInstance;
-import org.apache.samza.storage.ContainerStorageManager;
 import org.apache.samza.storage.StorageManagerUtil;
-import org.apache.samza.storage.TaskStorageManager;
 import org.apache.samza.util.Clock;
 import org.apache.samza.util.SystemClock;
 import org.slf4j.Logger;
@@ -101,7 +100,8 @@ public class LocalStoreMonitor implements Monitor {
               LOG.info(String.format("Local store: %s is actively used by the 
task: %s.", storeName, task.getTaskName()));
             } else {
               LOG.info(String.format("Local store: %s not used by the task: 
%s.", storeName, task.getTaskName()));
-              
markSweepTaskStore(StorageManagerUtil.getStorePartitionDir(jobDir, storeName, 
new TaskName(task.getTaskName())));
+              
markSweepTaskStore(StorageManagerUtil.getStorePartitionDir(jobDir, storeName, 
new TaskName(task.getTaskName()),
+                  TaskMode.Active));
             }
           }
         }

Reply via email to