This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 9346522  SAMZA-2530: Split out processing logic from 
TaskSideInputStorageManager (#1367)
9346522 is described below

commit 9346522e4af3259d3e85e317839ff96e278409f9
Author: bkonold <[email protected]>
AuthorDate: Tue Jun 9 15:46:07 2020 -0700

    SAMZA-2530: Split out processing logic from TaskSideInputStorageManager 
(#1367)
---
 .../apache/samza/storage/TaskSideInputHandler.java | 273 +++++++++++++++++++++
 .../samza/storage/TaskSideInputStorageManager.java | 233 ++----------------
 .../samza/storage/ContainerStorageManager.java     | 150 +++++------
 .../samza/storage/TestTaskSideInputHandler.java    | 202 +++++++++++++++
 .../storage/TestTaskSideInputStorageManager.java   | 145 ++---------
 5 files changed, 596 insertions(+), 407 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java 
b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
new file mode 100644
index 0000000..56aeb85
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputHandler.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * This class encapsulates all processing logic / state for all side input 
SSPs within a task.
+ */
+public class TaskSideInputHandler {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TaskSideInputHandler.class);
+
+  private final StorageManagerUtil storageManagerUtil = new 
StorageManagerUtil();
+  private final Map<SystemStreamPartition, String> lastProcessedOffsets = new 
ConcurrentHashMap<>();
+
+  private final TaskName taskName;
+  private final TaskSideInputStorageManager taskSideInputStorageManager;
+  private final Map<SystemStreamPartition, Set<String>> sspToStores;
+  private final Map<String, SideInputsProcessor> storeToProcessor;
+  private final SystemAdmins systemAdmins;
+  private final StreamMetadataCache streamMetadataCache;
+
+  private Map<SystemStreamPartition, String> startingOffsets;
+
+  public TaskSideInputHandler(TaskName taskName, TaskMode taskMode, File 
storeBaseDir,
+      Map<String, StorageEngine> storeToStorageEngines, Map<String, 
Set<SystemStreamPartition>> storeToSSPs,
+      Map<String, SideInputsProcessor> storeToProcessor, SystemAdmins 
systemAdmins,
+      StreamMetadataCache streamMetadataCache, Clock clock) {
+    validateProcessorConfiguration(storeToSSPs.keySet(), storeToProcessor);
+
+    this.taskName = taskName;
+    this.systemAdmins = systemAdmins;
+    this.streamMetadataCache = streamMetadataCache;
+    this.storeToProcessor = storeToProcessor;
+
+    this.sspToStores = new HashMap<>();
+    storeToSSPs.forEach((store, ssps) -> {
+        for (SystemStreamPartition ssp: ssps) {
+          this.sspToStores.computeIfAbsent(ssp, key -> new HashSet<>());
+          this.sspToStores.computeIfPresent(ssp, (key, value) -> {
+              value.add(store);
+              return value;
+            });
+        }
+      });
+
+    this.taskSideInputStorageManager = new 
TaskSideInputStorageManager(taskName,
+        taskMode,
+        storeBaseDir,
+        storeToStorageEngines,
+        storeToSSPs,
+        clock);
+  }
+
+  /**
+   * The {@link TaskName} associated with this {@link TaskSideInputHandler}
+   *
+   * @return the task name for this handler
+   */
+  public TaskName getTaskName() {
+    return this.taskName;
+  }
+
+  /**
+   * Initializes the underlying {@link TaskSideInputStorageManager} and 
determines starting offsets for each SSP.
+   */
+  public void init() {
+    this.taskSideInputStorageManager.init();
+
+    Map<SystemStreamPartition, String> fileOffsets = 
this.taskSideInputStorageManager.getFileOffsets();
+    LOG.info("File offsets for the task {}: {}", taskName, fileOffsets);
+
+    this.lastProcessedOffsets.putAll(fileOffsets);
+    LOG.info("Last processed offsets for the task {}: {}", taskName, 
lastProcessedOffsets);
+
+    this.startingOffsets = getStartingOffsets(fileOffsets, getOldestOffsets());
+    LOG.info("Starting offsets for the task {}: {}", taskName, 
startingOffsets);
+  }
+
+  /**
+   * Processes the incoming side input message envelope and updates the last 
processed offset for its SSP.
+   * Synchronized inorder to be exclusive with flush().
+   *
+   * @param envelope incoming envelope to be processed
+   */
+  public synchronized void process(IncomingMessageEnvelope envelope) {
+    SystemStreamPartition envelopeSSP = envelope.getSystemStreamPartition();
+    String envelopeOffset = envelope.getOffset();
+
+    for (String store: this.sspToStores.get(envelopeSSP)) {
+      SideInputsProcessor storeProcessor = this.storeToProcessor.get(store);
+      KeyValueStore keyValueStore = (KeyValueStore) 
this.taskSideInputStorageManager.getStore(store);
+      Collection<Entry<?, ?>> entriesToBeWritten = 
storeProcessor.process(envelope, keyValueStore);
+
+      // TODO: SAMZA-2255: optimize writes to side input stores
+      for (Entry entry : entriesToBeWritten) {
+        // If the key is null we ignore, if the value is null, we issue a 
delete, else we issue a put
+        if (entry.getKey() != null) {
+          if (entry.getValue() != null) {
+            keyValueStore.put(entry.getKey(), entry.getValue());
+          } else {
+            keyValueStore.delete(entry.getKey());
+          }
+        }
+      }
+    }
+
+    this.lastProcessedOffsets.put(envelopeSSP, envelopeOffset);
+  }
+
+  /**
+   * Flushes the underlying {@link TaskSideInputStorageManager}
+   * Synchronized inorder to be exclusive with process()
+   */
+  public synchronized void flush() {
+    this.taskSideInputStorageManager.flush(this.lastProcessedOffsets);
+  }
+
+  /**
+   * Gets the starting offset for the given side input {@link 
SystemStreamPartition}.
+   *
+   * Note: The method doesn't respect {@link 
org.apache.samza.config.StreamConfig#CONSUMER_OFFSET_DEFAULT} and
+   * {@link org.apache.samza.config.StreamConfig#CONSUMER_RESET_OFFSET} 
configurations. It will use the local offset
+   * file if it is valid, else it will fall back to oldest offset in the 
stream.
+   *
+   * @param ssp side input system stream partition to get the starting offset 
for
+   * @return the starting offset
+   */
+  public String getStartingOffset(SystemStreamPartition ssp) {
+    return this.startingOffsets.get(ssp);
+  }
+
+  /**
+   * Gets the last processed offset for the given side input {@link 
SystemStreamPartition}.
+   *
+   * @param ssp side input system stream partition to get the last processed 
offset for
+   * @return the last processed offset
+   */
+  public String getLastProcessedOffset(SystemStreamPartition ssp) {
+    return this.lastProcessedOffsets.get(ssp);
+  }
+
+  /**
+   * Stops the underlying storage manager at the last processed offsets. Any 
pending and upcoming invocations
+   * of {@link #process} and {@link #flush} are assumed to have completed or 
ceased prior to calling this method.
+   */
+  public void stop() {
+    this.taskSideInputStorageManager.stop(this.lastProcessedOffsets);
+  }
+
+  /**
+   * Gets the starting offsets for the {@link SystemStreamPartition}s 
belonging to all the side input stores.
+   * If the local file offset is available and is greater than the oldest 
available offset from source, uses it,
+   * else falls back to oldest offset in the source.
+   *
+   * @param fileOffsets offsets from the local offset file
+   * @param oldestOffsets oldest offsets from the source
+   * @return a {@link Map} of {@link SystemStreamPartition} to offset
+   */
+  @VisibleForTesting
+  Map<SystemStreamPartition, String> getStartingOffsets(
+      Map<SystemStreamPartition, String> fileOffsets, 
Map<SystemStreamPartition, String> oldestOffsets) {
+    Map<SystemStreamPartition, String> startingOffsets = new HashMap<>();
+
+    this.sspToStores.keySet().forEach(ssp -> {
+        String fileOffset = fileOffsets.get(ssp);
+        String oldestOffset = oldestOffsets.get(ssp);
+
+        startingOffsets.put(ssp,
+            this.storageManagerUtil.getStartingOffset(
+                ssp, this.systemAdmins.getSystemAdmin(ssp.getSystem()), 
fileOffset, oldestOffset));
+      });
+
+    return startingOffsets;
+  }
+
+  /**
+   * Gets the oldest offset for the {@link SystemStreamPartition}s associated 
with all the store side inputs.
+   *   1. Groups the list of the SSPs based on system stream
+   *   2. Fetches the {@link SystemStreamMetadata} from {@link 
StreamMetadataCache}
+   *   3. Fetches the partition metadata for each system stream and fetch the 
corresponding partition metadata
+   *      and populates the oldest offset for SSPs belonging to the system 
stream.
+   *
+   * @return a {@link Map} of {@link SystemStreamPartition} to their oldest 
offset. If partitionMetadata could not be
+   * obtained for any {@link SystemStreamPartition} the offset for it is 
populated as null.
+   */
+  @VisibleForTesting
+  Map<SystemStreamPartition, String> getOldestOffsets() {
+    Map<SystemStreamPartition, String> oldestOffsets = new HashMap<>();
+
+    // Step 1
+    Map<SystemStream, List<SystemStreamPartition>> systemStreamToSsp = 
this.sspToStores.keySet().stream()
+        
.collect(Collectors.groupingBy(SystemStreamPartition::getSystemStream));
+
+    // Step 2
+    Map<SystemStream, SystemStreamMetadata> metadata = 
JavaConverters.mapAsJavaMapConverter(
+        this.streamMetadataCache.getStreamMetadata(
+            
JavaConverters.asScalaSetConverter(systemStreamToSsp.keySet()).asScala().toSet(),
 false)).asJava();
+
+    // Step 3
+    metadata.forEach((systemStream, systemStreamMetadata) -> {
+
+        // get the partition metadata for each system stream
+        Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> 
partitionMetadata =
+            systemStreamMetadata.getSystemStreamPartitionMetadata();
+
+        // For SSPs belonging to the system stream, use the partition metadata 
to get the oldest offset
+        // if partitionMetadata was not obtained for any SSP, populate 
oldest-offset as null
+        // Because of https://bugs.openjdk.java.net/browse/JDK-8148463 using 
lambda will NPE when getOldestOffset() is null
+        for (SystemStreamPartition ssp : systemStreamToSsp.get(systemStream)) {
+          oldestOffsets.put(ssp, 
partitionMetadata.get(ssp.getPartition()).getOldestOffset());
+        }
+      });
+
+    return oldestOffsets;
+  }
+
+  /**
+   * Validates that each store has an associated {@link SideInputsProcessor}
+   */
+  private void validateProcessorConfiguration(Set<String> stores, Map<String, 
SideInputsProcessor> storeToProcessor) {
+    stores.forEach(storeName -> {
+        if (!storeToProcessor.containsKey(storeName)) {
+          throw new SamzaException(
+              String.format("Side inputs processor missing for store: %s.", 
storeName));
+        }
+      });
+  }
+}
diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
 
b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
index 9cd888a..0a4e763 100644
--- 
a/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
@@ -20,34 +20,20 @@
 package org.apache.samza.storage;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.TaskMode;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.SystemAdmins;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.util.Clock;
 import org.apache.samza.util.FileUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.collection.JavaConverters;
 
 import java.io.File;
-import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -62,53 +48,28 @@ public class TaskSideInputStorageManager {
   private static final long STORE_DELETE_RETENTION_MS = 
TimeUnit.DAYS.toMillis(1); // same as changelog delete retention
 
   private final Clock clock;
-  private final Map<String, SideInputsProcessor> storeToProcessor;
   private final Map<String, StorageEngine> stores;
   private final File storeBaseDir;
   private final Map<String, Set<SystemStreamPartition>> storeToSSps;
-  private final Map<SystemStreamPartition, Set<String>> sspsToStores;
-  private final StreamMetadataCache streamMetadataCache;
-  private final SystemAdmins systemAdmins;
   private final TaskName taskName;
   private final TaskMode taskMode;
-  private final Map<SystemStreamPartition, String> lastProcessedOffsets = new 
ConcurrentHashMap<>();
   private final StorageManagerUtil storageManagerUtil = new 
StorageManagerUtil();
 
-  private Map<SystemStreamPartition, String> startingOffsets;
+  public TaskSideInputStorageManager(TaskName taskName, TaskMode taskMode, 
File storeBaseDir, Map<String, StorageEngine> sideInputStores,
+      Map<String, Set<SystemStreamPartition>> storesToSSPs, Clock clock) {
+    validateStoreConfiguration(sideInputStores);
 
-  public TaskSideInputStorageManager(
-      TaskName taskName,
-      TaskMode taskMode,
-      StreamMetadataCache streamMetadataCache,
-      File storeBaseDir,
-      Map<String, StorageEngine> sideInputStores,
-      Map<String, SideInputsProcessor> storesToProcessor,
-      Map<String, Set<SystemStreamPartition>> storesToSSPs,
-      SystemAdmins systemAdmins,
-      Config config,
-      Clock clock) {
     this.clock = clock;
     this.stores = sideInputStores;
     this.storeBaseDir = storeBaseDir;
     this.storeToSSps = storesToSSPs;
-    this.streamMetadataCache = streamMetadataCache;
-    this.systemAdmins = systemAdmins;
     this.taskName = taskName;
     this.taskMode = taskMode;
-    this.storeToProcessor = storesToProcessor;
-
-    validateStoreConfiguration();
+  }
 
-    this.sspsToStores = new HashMap<>();
-    storesToSSPs.forEach((store, ssps) -> {
-        for (SystemStreamPartition ssp: ssps) {
-          sspsToStores.computeIfAbsent(ssp, key -> new HashSet<>());
-          sspsToStores.computeIfPresent(ssp, (key, value) -> {
-              value.add(store);
-              return value;
-            });
-        }
-      });
+  // Get the taskName associated with this instance.
+  public TaskName getTaskName() {
+    return this.taskName;
   }
 
   /**
@@ -117,38 +78,32 @@ public class TaskSideInputStorageManager {
   public void init() {
     LOG.info("Initializing side input stores.");
 
-    Map<SystemStreamPartition, String> fileOffsets = getFileOffsets();
-    LOG.info("File offsets for the task {}: {}", taskName, fileOffsets);
-
-    Map<SystemStreamPartition, String> oldestOffsets = getOldestOffsets();
-    LOG.info("Oldest offsets for the task {}: {}", taskName, oldestOffsets);
-
-    startingOffsets = getStartingOffsets(fileOffsets, oldestOffsets);
-    LOG.info("Starting offsets for the task {}: {}", taskName, 
startingOffsets);
-
-    lastProcessedOffsets.putAll(fileOffsets);
-    LOG.info("Last processed offsets for the task {}: {}", taskName, 
lastProcessedOffsets);
-
     initializeStoreDirectories();
   }
 
   /**
    * Flushes the contents of the underlying store and writes the offset file 
to disk.
    * Synchronized inorder to be exclusive with process()
+   *
+   * @param lastProcessedOffsets The last processed offsets for each SSP. 
These will be used when writing offsets files
+   *                             for each store.
    */
-  public synchronized void flush() {
+  public void flush(Map<SystemStreamPartition, String> lastProcessedOffsets) {
     LOG.info("Flushing the side input stores.");
     stores.values().forEach(StorageEngine::flush);
-    writeOffsetFiles();
+    writeFileOffsets(lastProcessedOffsets);
   }
 
   /**
    * Stops the storage engines for all the stores and writes the offset file 
to disk.
+   *
+   * @param lastProcessedOffsets The last processed offsets for each SSP. 
These will be used when writing offsets files
+   *                             for each store.
    */
-  public void stop() {
+  public void stop(Map<SystemStreamPartition, String> lastProcessedOffsets) {
     LOG.info("Stopping the side input stores.");
     stores.values().forEach(StorageEngine::stop);
-    writeOffsetFiles();
+    writeFileOffsets(lastProcessedOffsets);
   }
 
   /**
@@ -162,77 +117,6 @@ public class TaskSideInputStorageManager {
   }
 
   /**
-   * Gets the starting offset for the given side input {@link 
SystemStreamPartition}.
-   *
-   * Note: The method doesn't respect {@link 
org.apache.samza.config.StreamConfig#CONSUMER_OFFSET_DEFAULT} and
-   * {@link org.apache.samza.config.StreamConfig#CONSUMER_RESET_OFFSET} 
configurations. It will use the local offset
-   * file if it is valid, else it will fall back to oldest offset in the 
stream.
-   *
-   * @param ssp side input system stream partition to get the starting offset 
for
-   * @return the starting offset
-   */
-  public String getStartingOffset(SystemStreamPartition ssp) {
-    return startingOffsets.get(ssp);
-  }
-
-  // Get the taskName associated with this instance.
-  public TaskName getTaskName() {
-    return this.taskName;
-  }
-
-  /**
-   * Gets the last processed offset for the given side input {@link 
SystemStreamPartition}.
-   *
-   * @param ssp side input system stream partition to get the last processed 
offset for
-   * @return the last processed offset
-   */
-  public String getLastProcessedOffset(SystemStreamPartition ssp) {
-    return lastProcessedOffsets.get(ssp);
-  }
-
-  /**
-   * For unit testing only
-   */
-  @VisibleForTesting
-  void updateLastProcessedOffset(SystemStreamPartition ssp, String offset) {
-    lastProcessedOffsets.put(ssp, offset);
-  }
-
-  /**
-   * Processes the incoming side input message envelope and updates the last 
processed offset for its SSP.
-   * Synchronized inorder to be exclusive with flush().
-   *
-   * @param message incoming message to be processed
-   */
-  public synchronized void process(IncomingMessageEnvelope message) {
-    SystemStreamPartition ssp = message.getSystemStreamPartition();
-    Set<String> storeNames = sspsToStores.get(ssp);
-
-    for (String storeName : storeNames) {
-      SideInputsProcessor sideInputsProcessor = 
storeToProcessor.get(storeName);
-
-      KeyValueStore keyValueStore = (KeyValueStore) stores.get(storeName);
-      Collection<Entry<?, ?>> entriesToBeWritten = 
sideInputsProcessor.process(message, keyValueStore);
-
-      // Iterate over the list to be written.
-      // TODO: SAMZA-2255: Optimize value writes in TaskSideInputStorageManager
-      for (Entry entry : entriesToBeWritten) {
-        // If the key is null we ignore, if the value is null, we issue a 
delete, else we issue a put
-        if (entry.getKey() != null) {
-          if (entry.getValue() != null) {
-            keyValueStore.put(entry.getKey(), entry.getValue());
-          } else {
-            keyValueStore.delete(entry.getKey());
-          }
-        }
-      }
-    }
-
-    // update the last processed offset
-    lastProcessedOffsets.put(ssp, message.getOffset());
-  }
-
-  /**
    * Initializes the store directories for all the stores:
    *  1. Cleans up the directories for invalid stores.
    *  2. Ensures that the directories exist.
@@ -258,9 +142,10 @@ public class TaskSideInputStorageManager {
   /**
    * Writes the offset files for all side input stores one by one. There is 
one offset file per store.
    * Its contents are a JSON encoded mapping from each side input SSP to its 
last processed offset, and a checksum.
+   *
+   * @param lastProcessedOffsets The offset per SSP to write
    */
-  @VisibleForTesting
-  void writeOffsetFiles() {
+  public void writeFileOffsets(Map<SystemStreamPartition, String> 
lastProcessedOffsets) {
     storeToSSps.entrySet().stream()
         .filter(entry -> isPersistedStore(entry.getKey())) // filter out 
in-memory side input stores
         .forEach((entry) -> {
@@ -283,9 +168,7 @@ public class TaskSideInputStorageManager {
    *
    * @return a {@link Map} of {@link SystemStreamPartition} to offset in the 
offset files.
    */
-  @SuppressWarnings("unchecked")
-  @VisibleForTesting
-  Map<SystemStreamPartition, String> getFileOffsets() {
+  public Map<SystemStreamPartition, String> getFileOffsets() {
     LOG.info("Loading initial offsets from the file for side input stores.");
     Map<SystemStreamPartition, String> fileOffsets = new HashMap<>();
 
@@ -313,73 +196,6 @@ public class TaskSideInputStorageManager {
     return storageManagerUtil.getTaskStoreDir(storeBaseDir, storeName, 
taskName, taskMode);
   }
 
-  /**
-   * Gets the starting offsets for the {@link SystemStreamPartition}s 
belonging to all the side input stores.
-   * If the local file offset is available and is greater than the oldest 
available offset from source, uses it,
-   * else falls back to oldest offset in the source.
-   *
-   * @param fileOffsets offsets from the local offset file
-   * @param oldestOffsets oldest offsets from the source
-   * @return a {@link Map} of {@link SystemStreamPartition} to offset
-   */
-  @VisibleForTesting
-  Map<SystemStreamPartition, String> getStartingOffsets(
-      Map<SystemStreamPartition, String> fileOffsets, 
Map<SystemStreamPartition, String> oldestOffsets) {
-    Map<SystemStreamPartition, String> startingOffsets = new HashMap<>();
-
-    sspsToStores.keySet().forEach(ssp -> {
-        String fileOffset = fileOffsets.get(ssp);
-        String oldestOffset = oldestOffsets.get(ssp);
-
-        startingOffsets.put(ssp,
-          storageManagerUtil.getStartingOffset(
-            ssp, systemAdmins.getSystemAdmin(ssp.getSystem()), fileOffset, 
oldestOffset));
-      });
-
-    return startingOffsets;
-  }
-
-  /**
-   * Gets the oldest offset for the {@link SystemStreamPartition}s associated 
with all the store side inputs.
-   *   1. Groups the list of the SSPs based on system stream
-   *   2. Fetches the {@link SystemStreamMetadata} from {@link 
StreamMetadataCache}
-   *   3. Fetches the partition metadata for each system stream and fetch the 
corresponding partition metadata
-   *      and populates the oldest offset for SSPs belonging to the system 
stream.
-   *
-   * @return a {@link Map} of {@link SystemStreamPartition} to their oldest 
offset. If partitionMetadata could not be
-   * obtained for any {@link SystemStreamPartition} the offset for it is 
populated as null.
-   */
-  @VisibleForTesting
-  Map<SystemStreamPartition, String> getOldestOffsets() {
-    Map<SystemStreamPartition, String> oldestOffsets = new HashMap<>();
-
-    // Step 1
-    Map<SystemStream, List<SystemStreamPartition>> systemStreamToSsp = 
sspsToStores.keySet().stream()
-        
.collect(Collectors.groupingBy(SystemStreamPartition::getSystemStream));
-
-    // Step 2
-    Map<SystemStream, SystemStreamMetadata> metadata = 
JavaConverters.mapAsJavaMapConverter(
-        streamMetadataCache.getStreamMetadata(
-            
JavaConverters.asScalaSetConverter(systemStreamToSsp.keySet()).asScala().toSet(),
 false)).asJava();
-
-    // Step 3
-    metadata.forEach((systemStream, systemStreamMetadata) -> {
-
-        // get the partition metadata for each system stream
-        Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> 
partitionMetadata =
-          systemStreamMetadata.getSystemStreamPartitionMetadata();
-
-        // For SSPs belonging to the system stream, use the partition metadata 
to get the oldest offset
-        // if partitionMetadata was not obtained for any SSP, populate 
oldest-offset as null
-        // Because of https://bugs.openjdk.java.net/browse/JDK-8148463 using 
lambda will NPE when getOldestOffset() is null
-        for (SystemStreamPartition ssp : systemStreamToSsp.get(systemStream)) {
-          oldestOffsets.put(ssp, 
partitionMetadata.get(ssp.getPartition()).getOldestOffset());
-        }
-      });
-
-    return oldestOffsets;
-  }
-
   private boolean isValidSideInputStore(String storeName, File storeLocation) {
     return isPersistedStore(storeName)
         && !storageManagerUtil.isStaleStore(storeLocation, 
STORE_DELETE_RETENTION_MS, clock.currentTimeMillis(), true)
@@ -393,13 +209,8 @@ public class TaskSideInputStorageManager {
         .orElse(false);
   }
 
-  private void validateStoreConfiguration() {
+  private void validateStoreConfiguration(Map<String, StorageEngine> stores) {
     stores.forEach((storeName, storageEngine) -> {
-        if (!storeToProcessor.containsKey(storeName)) {
-          throw new SamzaException(
-              String.format("Side inputs processor missing for store: %s.", 
storeName));
-        }
-
         if (storageEngine.getStoreProperties().isLoggedStore()) {
           throw new SamzaException(
               String.format("Cannot configure both side inputs and a changelog 
for store: %s.", storeName));
diff --git 
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
 
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index 6e59e55..3087eb4 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ 
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -149,10 +149,10 @@ public class ContainerStorageManager {
   private final int maxChangeLogStreamPartitions; // The partition count of 
each changelog-stream topic. This is used for validating changelog streams 
before restoring.
 
   /* Sideinput related parameters */
-  private final Map<String, Set<SystemStream>> sideInputSystemStreams; // Map 
of sideInput system-streams indexed by store name
-  private final Map<TaskName, Map<String, Set<SystemStreamPartition>>> 
taskSideInputSSPs;
-  private final Map<SystemStreamPartition, TaskSideInputStorageManager> 
sideInputStorageManagers; // Map of sideInput storageManagers indexed by ssp, 
for simpler lookup for process()
-  private final Map<String, SystemConsumer> sideInputConsumers; // Mapping 
from storeSystemNames to SystemConsumers
+  private final boolean hasSideInputs;
+  // side inputs indexed first by task, then store name
+  private final Map<TaskName, Map<String, Set<SystemStreamPartition>>> 
taskSideInputStoreSSPs;
+  private final Map<SystemStreamPartition, TaskSideInputHandler> 
sspSideInputHandlers;
   private SystemConsumers sideInputSystemConsumers;
   private final Map<SystemStreamPartition, 
SystemStreamMetadata.SystemStreamPartitionMetadata> initialSideInputSSPMetadata
       = new ConcurrentHashMap<>(); // Recorded sspMetadata of the 
taskSideInputSSPs recorded at start, used to determine when sideInputs are 
caughtup and container init can proceed
@@ -194,12 +194,16 @@ public class ContainerStorageManager {
       Clock clock) {
     this.checkpointManager = checkpointManager;
     this.containerModel = containerModel;
-    this.sideInputSystemStreams = new HashMap<>(sideInputSystemStreams);
-    this.taskSideInputSSPs = getTaskSideInputSSPs(containerModel, 
sideInputSystemStreams);
+    this.taskSideInputStoreSSPs = getTaskSideInputSSPs(containerModel, 
sideInputSystemStreams);
+    this.hasSideInputs = this.taskSideInputStoreSSPs.values().stream()
+        .flatMap(m -> m.values().stream())
+        .flatMap(Collection::stream)
+        .findAny()
+        .isPresent();
     this.sspMetadataCache = sspMetadataCache;
     this.changelogSystemStreams = getChangelogSystemStreams(containerModel, 
changelogSystemStreams); // handling standby tasks
 
-    LOG.info("Starting with changelogSystemStreams = {} sideInputSystemStreams 
= {}", this.changelogSystemStreams, this.sideInputSystemStreams);
+    LOG.info("Starting with changelogSystemStreams = {} taskSideInputStoreSSPs 
= {}", this.changelogSystemStreams, this.taskSideInputStoreSSPs);
 
     this.storageEngineFactories = storageEngineFactories;
     this.serdes = serdes;
@@ -238,25 +242,38 @@ public class ContainerStorageManager {
     // create taskStores for all tasks in the containerModel and each store in 
storageEngineFactories
     this.taskStores = createTaskStores(containerModel, jobContext, 
containerContext, storageEngineFactories, serdes, taskInstanceMetrics, 
taskInstanceCollectors);
 
+    Set<String> containerChangelogSystems = 
this.changelogSystemStreams.values().stream()
+        .map(SystemStream::getSystem)
+        .collect(Collectors.toSet());
+
     // create system consumers (1 per store system in changelogSystemStreams), 
and index it by storeName
-    Map<String, SystemConsumer> storeSystemConsumers = 
createConsumers(this.changelogSystemStreams.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
-        e -> Collections.singleton(e.getValue()))), systemFactories, config, 
this.samzaContainerMetrics.registry());
+    Map<String, SystemConsumer> storeSystemConsumers = createConsumers(
+        containerChangelogSystems, systemFactories, config, 
this.samzaContainerMetrics.registry());
     this.storeConsumers = createStoreIndexedMap(this.changelogSystemStreams, 
storeSystemConsumers);
 
     // creating task restore managers
     this.taskRestoreManagers = createTaskRestoreManagers(systemAdmins, clock, 
this.samzaContainerMetrics);
 
-    // create sideInput storage managers
-    sideInputStorageManagers = createSideInputStorageManagers(clock);
-
-    // create sideInput consumers indexed by systemName
-    this.sideInputConsumers = createConsumers(this.sideInputSystemStreams, 
systemFactories, config, this.samzaContainerMetrics.registry());
+    this.sspSideInputHandlers = createSideInputHandlers(clock);
 
     // create SystemConsumers for consuming from taskSideInputSSPs, if 
sideInputs are being used
-    if (sideInputsPresent()) {
+    if (this.hasSideInputs) {
+      Set<SystemStream> containerSideInputSystemStreams = 
this.taskSideInputStoreSSPs.values().stream()
+          .flatMap(map -> map.values().stream())
+          .flatMap(Set::stream)
+          .map(SystemStreamPartition::getSystemStream)
+          .collect(Collectors.toSet());
+
+      Set<String> containerSideInputSystems = 
containerSideInputSystemStreams.stream()
+          .map(SystemStream::getSystem)
+          .collect(Collectors.toSet());
 
-      scala.collection.immutable.Map<SystemStream, SystemStreamMetadata> 
inputStreamMetadata = 
streamMetadataCache.getStreamMetadata(JavaConversions.asScalaSet(
-          
this.sideInputSystemStreams.values().stream().flatMap(Set::stream).collect(Collectors.toSet())).toSet(),
 false);
+      // create sideInput consumers indexed by systemName
+      // Mapping from storeSystemNames to SystemConsumers
+      Map<String, SystemConsumer> sideInputConsumers =
+          createConsumers(containerSideInputSystems, systemFactories, config, 
this.samzaContainerMetrics.registry());
+
+      scala.collection.immutable.Map<SystemStream, SystemStreamMetadata> 
inputStreamMetadata = 
streamMetadataCache.getStreamMetadata(JavaConversions.asScalaSet(containerSideInputSystemStreams).toSet(),
 false);
 
       SystemConsumersMetrics sideInputSystemConsumersMetrics = new 
SystemConsumersMetrics(samzaContainerMetrics.registry(), 
SIDEINPUTS_METRICS_PREFIX);
       // we use the same registry as samza-container-metrics
@@ -265,7 +282,7 @@ public class ContainerStorageManager {
           sideInputSystemConsumersMetrics.registry(), systemAdmins);
 
       sideInputSystemConsumers =
-          new SystemConsumers(chooser, 
ScalaJavaUtil.toScalaMap(this.sideInputConsumers), systemAdmins, serdeManager,
+          new SystemConsumers(chooser, 
ScalaJavaUtil.toScalaMap(sideInputConsumers), systemAdmins, serdeManager,
               sideInputSystemConsumersMetrics, 
SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT(), 
SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR(),
               TaskConfig.DEFAULT_POLL_INTERVAL_MS, 
ScalaJavaUtil.toScalaFunction(() -> System.nanoTime()));
     }
@@ -283,9 +300,9 @@ public class ContainerStorageManager {
     Map<TaskName, Map<String, Set<SystemStreamPartition>>> taskSideInputSSPs = 
new HashMap<>();
 
     containerModel.getTasks().forEach((taskName, taskModel) -> {
+        taskSideInputSSPs.putIfAbsent(taskName, new HashMap<>());
         sideInputSystemStreams.keySet().forEach(storeName -> {
             Set<SystemStreamPartition> taskSideInputs = 
taskModel.getSystemStreamPartitions().stream().filter(ssp -> 
sideInputSystemStreams.get(storeName).contains(ssp.getSystemStream())).collect(Collectors.toSet());
-            taskSideInputSSPs.putIfAbsent(taskName, new HashMap<>());
             taskSideInputSSPs.get(taskName).put(storeName, taskSideInputs);
           });
       });
@@ -312,12 +329,11 @@ public class ContainerStorageManager {
     );
 
     getTasks(containerModel, TaskMode.Standby).forEach((taskName, taskModel) 
-> {
+        this.taskSideInputStoreSSPs.putIfAbsent(taskName, new HashMap<>());
         changelogSystemStreams.forEach((storeName, systemStream) -> {
             SystemStreamPartition ssp = new 
SystemStreamPartition(systemStream, taskModel.getChangelogPartition());
             changelogSSPToStore.remove(ssp);
-            this.taskSideInputSSPs.putIfAbsent(taskName, new HashMap<>());
-            this.sideInputSystemStreams.put(storeName, 
Collections.singleton(ssp.getSystemStream()));
-            this.taskSideInputSSPs.get(taskName).put(storeName, 
Collections.singleton(ssp));
+            this.taskSideInputStoreSSPs.get(taskName).put(storeName, 
Collections.singleton(ssp));
           });
       });
 
@@ -329,28 +345,21 @@ public class ContainerStorageManager {
   /**
    *  Creates SystemConsumer objects for store restoration, creating one 
consumer per system.
    */
-  private static Map<String, SystemConsumer> createConsumers(Map<String, 
Set<SystemStream>> systemStreams,
+  private static Map<String, SystemConsumer> createConsumers(Set<String> 
storeSystems,
       Map<String, SystemFactory> systemFactories, Config config, 
MetricsRegistry registry) {
-    // Determine the set of systems being used across all stores
-    Set<String> storeSystems =
-        
systemStreams.values().stream().flatMap(Set::stream).map(SystemStream::getSystem).collect(Collectors.toSet());
-
     // Create one consumer for each system in use, map with one entry for each 
such system
-    Map<String, SystemConsumer> storeSystemConsumers = new HashMap<>();
-
+    Map<String, SystemConsumer> consumers = new HashMap<>();
 
     // Iterate over the list of storeSystems and create one sysConsumer per 
system
     for (String storeSystemName : storeSystems) {
       SystemFactory systemFactory = systemFactories.get(storeSystemName);
       if (systemFactory == null) {
-        throw new SamzaException("Changelog system " + storeSystemName + " 
does not exist in config");
+        throw new SamzaException("System " + storeSystemName + " does not 
exist in config");
       }
-      storeSystemConsumers.put(storeSystemName,
-          systemFactory.getConsumer(storeSystemName, config, registry));
+      consumers.put(storeSystemName, 
systemFactory.getConsumer(storeSystemName, config, registry));
     }
 
-    return storeSystemConsumers;
-
+    return consumers;
   }
 
   private static Map<String, SystemConsumer> createStoreIndexedMap(Map<String, 
SystemStream> changelogSystemStreams,
@@ -406,7 +415,7 @@ public class ContainerStorageManager {
 
       for (String storeName : storageEngineFactories.keySet()) {
 
-        StorageEngineFactory.StoreMode storeMode = 
this.sideInputSystemStreams.containsKey(storeName) ?
+        StorageEngineFactory.StoreMode storeMode = 
this.taskSideInputStoreSSPs.get(taskName).containsKey(storeName) ?
             StorageEngineFactory.StoreMode.ReadWrite : 
StorageEngineFactory.StoreMode.BulkLoad;
 
         StorageEngine storageEngine =
@@ -476,7 +485,7 @@ public class ContainerStorageManager {
     // Use the logged-store-base-directory for change logged stores and 
sideInput stores, and non-logged-store-base-dir
     // for non logged stores
     File storeDirectory;
-    if (changeLogSystemStreamPartition != null || 
sideInputSystemStreams.containsKey(storeName)) {
+    if (changeLogSystemStreamPartition != null || 
this.taskSideInputStoreSSPs.get(taskName).containsKey(storeName)) {
       storeDirectory = 
storageManagerUtil.getTaskStoreDir(this.loggedStoreBaseDirectory, storeName, 
taskName,
           taskModel.getTaskMode());
     } else {
@@ -522,15 +531,14 @@ public class ContainerStorageManager {
 
   // Create sideInput store processors, one per store per task
   private Map<TaskName, Map<String, SideInputsProcessor>> 
createSideInputProcessors(StorageConfig config,
-      ContainerModel containerModel, Map<String, Set<SystemStream>> 
sideInputSystemStreams,
-      Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics) {
+      ContainerModel containerModel, Map<TaskName, TaskInstanceMetrics> 
taskInstanceMetrics) {
 
     Map<TaskName, Map<String, SideInputsProcessor>> 
sideInputStoresToProcessors = new HashMap<>();
     containerModel.getTasks().forEach((taskName, taskModel) -> {
         sideInputStoresToProcessors.put(taskName, new HashMap<>());
         TaskMode taskMode = taskModel.getTaskMode();
 
-        for (String storeName : sideInputSystemStreams.keySet()) {
+        for (String storeName : 
this.taskSideInputStoreSSPs.get(taskName).keySet()) {
 
           SideInputsProcessor sideInputsProcessor;
           Optional<String> sideInputsProcessorSerializedInstance =
@@ -583,53 +591,57 @@ public class ContainerStorageManager {
   }
 
   // Create task sideInput storage managers, one per task, index by the SSP 
they are responsible for consuming
-  private Map<SystemStreamPartition, TaskSideInputStorageManager> 
createSideInputStorageManagers(Clock clock) {
+  private Map<SystemStreamPartition, TaskSideInputHandler> 
createSideInputHandlers(Clock clock) {
     // creating sideInput store processors, one per store per task
     Map<TaskName, Map<String, SideInputsProcessor>> taskSideInputProcessors =
-        createSideInputProcessors(new StorageConfig(config), 
this.containerModel, this.sideInputSystemStreams,
-            this.taskInstanceMetrics);
+        createSideInputProcessors(new StorageConfig(config), 
this.containerModel, this.taskInstanceMetrics);
 
-    Map<SystemStreamPartition, TaskSideInputStorageManager> 
sideInputStorageManagers = new HashMap<>();
+    Map<SystemStreamPartition, TaskSideInputHandler> handlers = new 
HashMap<>();
 
-    if (sideInputsPresent()) {
+    if (this.hasSideInputs) {
       containerModel.getTasks().forEach((taskName, taskModel) -> {
 
           Map<String, StorageEngine> sideInputStores = 
getSideInputStores(taskName);
           Map<String, Set<SystemStreamPartition>> sideInputStoresToSSPs = new 
HashMap<>();
 
           for (String storeName : sideInputStores.keySet()) {
-            Set<SystemStreamPartition> storeSSPs = 
taskSideInputSSPs.get(taskName).get(storeName);
+            Set<SystemStreamPartition> storeSSPs = 
this.taskSideInputStoreSSPs.get(taskName).get(storeName);
             sideInputStoresToSSPs.put(storeName, storeSSPs);
           }
 
-          TaskSideInputStorageManager taskSideInputStorageManager =
-              new TaskSideInputStorageManager(taskName, 
taskModel.getTaskMode(), streamMetadataCache,
-                  loggedStoreBaseDirectory, sideInputStores, 
taskSideInputProcessors.get(taskName), sideInputStoresToSSPs,
-                  systemAdmins, config, clock);
+          TaskSideInputHandler taskSideInputHandler = new 
TaskSideInputHandler(taskName,
+              taskModel.getTaskMode(),
+              loggedStoreBaseDirectory,
+              sideInputStores,
+              sideInputStoresToSSPs,
+              taskSideInputProcessors.get(taskName),
+              this.systemAdmins,
+              this.streamMetadataCache,
+              clock);
 
           
sideInputStoresToSSPs.values().stream().flatMap(Set::stream).forEach(ssp -> {
-              sideInputStorageManagers.put(ssp, taskSideInputStorageManager);
+              handlers.put(ssp, taskSideInputHandler);
             });
 
-          LOG.info("Created taskSideInputStorageManager for task {}, 
sideInputStores {} and loggedStoreBaseDirectory {}",
+          LOG.info("Created TaskSideInputHandler for task {}, sideInputStores 
{} and loggedStoreBaseDirectory {}",
               taskName, sideInputStores, loggedStoreBaseDirectory);
         });
     }
-    return sideInputStorageManagers;
+    return handlers;
   }
 
   private Map<String, StorageEngine> getSideInputStores(TaskName taskName) {
     return taskStores.get(taskName).entrySet().stream().
-        filter(e -> 
sideInputSystemStreams.containsKey(e.getKey())).collect(Collectors.toMap(Map.Entry::getKey,
 Map.Entry::getValue));
+        filter(e -> 
this.taskSideInputStoreSSPs.get(taskName).containsKey(e.getKey())).collect(Collectors.toMap(Map.Entry::getKey,
 Map.Entry::getValue));
   }
 
   private Map<String, StorageEngine> getNonSideInputStores(TaskName taskName) {
     return taskStores.get(taskName).entrySet().stream().
-        filter(e -> 
!sideInputSystemStreams.containsKey(e.getKey())).collect(Collectors.toMap(Map.Entry::getKey,
 Map.Entry::getValue));
+        filter(e -> 
!this.taskSideInputStoreSSPs.get(taskName).containsKey(e.getKey())).collect(Collectors.toMap(Map.Entry::getKey,
 Map.Entry::getValue));
   }
 
-  private Set<TaskSideInputStorageManager> getSideInputStorageManagers() {
-    return 
this.sideInputStorageManagers.values().stream().collect(Collectors.toSet());
+  private Set<TaskSideInputHandler> getSideInputHandlers() {
+    return 
this.sspSideInputHandlers.values().stream().collect(Collectors.toSet());
   }
 
   public void start() throws SamzaException, InterruptedException {
@@ -651,7 +663,7 @@ public class ContainerStorageManager {
     }
     LOG.info("Checkpointed changelog ssp offsets: {}", 
checkpointedChangelogSSPOffsets);
     restoreStores(checkpointedChangelogSSPOffsets);
-    if (sideInputsPresent()) {
+    if (this.hasSideInputs) {
       startSideInputs();
     }
   }
@@ -714,7 +726,7 @@ public class ContainerStorageManager {
     LOG.info("SideInput Restore started");
 
     // initialize the sideInputStorageManagers
-    getSideInputStorageManagers().forEach(sideInputStorageManager -> 
sideInputStorageManager.init());
+    getSideInputHandlers().forEach(TaskSideInputHandler::init);
 
     // start the checkpointing thread at the commit-ms frequency
     TaskConfig taskConfig = new TaskConfig(config);
@@ -722,7 +734,7 @@ public class ContainerStorageManager {
       @Override
       public void run() {
         try {
-          getSideInputStorageManagers().forEach(sideInputStorageManager -> 
sideInputStorageManager.flush());
+          getSideInputHandlers().forEach(TaskSideInputHandler::flush);
         } catch (Exception e) {
           LOG.error("Exception during flushing sideInputs", e);
           sideInputException = e;
@@ -731,11 +743,11 @@ public class ContainerStorageManager {
     }, 0, taskConfig.getCommitMs(), TimeUnit.MILLISECONDS);
 
     // set the latch to the number of sideInput SSPs
-    this.sideInputsCaughtUp = new 
CountDownLatch(this.sideInputStorageManagers.keySet().size());
+    this.sideInputsCaughtUp = new 
CountDownLatch(this.sspSideInputHandlers.keySet().size());
 
     // register all sideInput SSPs with the consumers
-    for (SystemStreamPartition ssp : sideInputStorageManagers.keySet()) {
-      String startingOffset = 
sideInputStorageManagers.get(ssp).getStartingOffset(ssp);
+    for (SystemStreamPartition ssp : this.sspSideInputHandlers.keySet()) {
+      String startingOffset = 
this.sspSideInputHandlers.get(ssp).getStartingOffset(ssp);
 
       if (startingOffset == null) {
         throw new SamzaException(
@@ -744,8 +756,8 @@ public class ContainerStorageManager {
 
       // register startingOffset with the sysConsumer and register a metric 
for it
       sideInputSystemConsumers.register(ssp, startingOffset);
-      
taskInstanceMetrics.get(sideInputStorageManagers.get(ssp).getTaskName()).addOffsetGauge(
-          ssp, ScalaJavaUtil.toScalaFunction(() -> 
sideInputStorageManagers.get(ssp).getLastProcessedOffset(ssp)));
+      
taskInstanceMetrics.get(this.sspSideInputHandlers.get(ssp).getTaskName()).addOffsetGauge(
+          ssp, ScalaJavaUtil.toScalaFunction(() -> 
this.sspSideInputHandlers.get(ssp).getLastProcessedOffset(ssp)));
 
       SystemStreamMetadata systemStreamMetadata = 
streamMetadataCache.getSystemStreamMetadata(ssp.getSystemStream(), false);
       SystemStreamMetadata.SystemStreamPartitionMetadata sspMetadata =
@@ -772,7 +784,7 @@ public class ContainerStorageManager {
 
               if (envelope != null) {
                 if (!envelope.isEndOfStream()) {
-                  
sideInputStorageManagers.get(envelope.getSystemStreamPartition()).process(envelope);
+                  
this.sspSideInputHandlers.get(envelope.getSystemStreamPartition()).process(envelope);
                 }
 
                 checkSideInputCaughtUp(envelope.getSystemStreamPartition(), 
envelope.getOffset(),
@@ -812,10 +824,6 @@ public class ContainerStorageManager {
     LOG.info("SideInput Restore complete");
   }
 
-  private boolean sideInputsPresent() {
-    return !this.sideInputSystemStreams.isEmpty();
-  }
-
   // Method to check if the given offset means the stream is caught up for 
reads
   private void checkSideInputCaughtUp(SystemStreamPartition ssp, String 
offset, SystemStreamMetadata.OffsetType offsetType, boolean isEndOfStream) {
 
@@ -892,7 +900,7 @@ public class ContainerStorageManager {
     this.shouldShutdown = true;
 
     // stop all sideinput consumers and stores
-    if (sideInputsPresent()) {
+    if (this.hasSideInputs) {
       sideInputsReadExecutor.shutdownNow();
 
       this.sideInputSystemConsumers.stop();
@@ -907,7 +915,7 @@ public class ContainerStorageManager {
       }
 
       // stop all sideInputStores -- this will perform one last flush on the 
KV stores, and write the offset file
-      this.getSideInputStorageManagers().forEach(sideInputStorageManager -> 
sideInputStorageManager.stop());
+      this.getSideInputHandlers().forEach(TaskSideInputHandler::stop);
     }
     LOG.info("Shutdown complete");
   }
diff --git 
a/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputHandler.java
 
b/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputHandler.java
new file mode 100644
index 0000000..75562db
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputHandler.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.samza.Partition;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.ScalaJavaUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.anyBoolean;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+
+public class TestTaskSideInputHandler {
+  private static final String TEST_SYSTEM = "test-system";
+  private static final String TEST_STORE = "test-store";
+  private static final String TEST_STREAM = "test-stream";
+
+    /**
+   * This test is for cases, when calls to systemAdmin (e.g., 
KafkaSystemAdmin's) get-stream-metadata method return null.
+   */
+  @Test
+  public void testGetStartingOffsetsWhenStreamMetadataIsNull() {
+    final String taskName = "test-get-starting-offset-task";
+
+    Set<SystemStreamPartition> ssps = IntStream.range(1, 2)
+        .mapToObj(idx -> new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, 
new Partition(idx)))
+        .collect(Collectors.toSet());
+    Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> 
partitionMetadata = ssps.stream()
+        .collect(Collectors.toMap(SystemStreamPartition::getPartition,
+            x -> new SystemStreamMetadata.SystemStreamPartitionMetadata(null, 
"1", "2")));
+
+
+    TaskSideInputHandler handler = new 
MockTaskSideInputHandlerBuilder(taskName, TaskMode.Active)
+        .addStreamMetadata(Collections.singletonMap(new 
SystemStream(TEST_SYSTEM, TEST_STREAM),
+            new SystemStreamMetadata(TEST_STREAM, partitionMetadata)))
+        .addStore(TEST_STORE, ssps)
+        .build();
+
+    handler.init();
+
+    ssps.forEach(ssp -> {
+        String startingOffset = handler.getStartingOffset(
+            new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, 
ssp.getPartition()));
+        Assert.assertNull("Starting offset should be null", startingOffset);
+      });
+  }
+
+  @Test
+  public void testGetStartingOffsets() {
+    final String storeName = "test-get-starting-offset-store";
+    final String taskName = "test-get-starting-offset-task";
+
+    Set<SystemStreamPartition> ssps = IntStream.range(1, 6)
+        .mapToObj(idx -> new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, 
new Partition(idx)))
+        .collect(Collectors.toSet());
+
+
+    TaskSideInputHandler handler = new 
MockTaskSideInputHandlerBuilder(taskName, TaskMode.Active)
+        .addStore(storeName, ssps)
+        .build();
+
+    // set up file and oldest offsets. for even partitions, fileOffsets will 
be larger; for odd partitions oldestOffsets will be larger
+    Map<SystemStreamPartition, String> fileOffsets = ssps.stream()
+        .collect(Collectors.toMap(Function.identity(), ssp -> {
+            int partitionId = ssp.getPartition().getPartitionId();
+            int offset = partitionId % 2 == 0 ? partitionId + 10 : partitionId;
+            return String.valueOf(offset);
+          }));
+    Map<SystemStreamPartition, String> oldestOffsets = ssps.stream()
+        .collect(Collectors.toMap(Function.identity(), ssp -> {
+            int partitionId = ssp.getPartition().getPartitionId();
+            int offset = partitionId % 2 == 0 ? partitionId : partitionId + 10;
+
+            return String.valueOf(offset);
+          }));
+
+    doCallRealMethod().when(handler).getStartingOffsets(fileOffsets, 
oldestOffsets);
+
+    Map<SystemStreamPartition, String> startingOffsets = 
handler.getStartingOffsets(fileOffsets, oldestOffsets);
+
+    assertTrue("Failed to get starting offsets for all ssps", 
startingOffsets.size() == 5);
+    startingOffsets.forEach((ssp, offset) -> {
+        int partitionId = ssp.getPartition().getPartitionId();
+        String expectedOffset = partitionId % 2 == 0
+            // 1 + fileOffset
+            ? 
getOffsetAfter(String.valueOf(ssp.getPartition().getPartitionId() + 10))
+            // oldestOffset
+            : String.valueOf(ssp.getPartition().getPartitionId() + 10);
+        assertEquals("Larger of fileOffsets and oldestOffsets should always be 
chosen", expectedOffset, offset);
+      });
+  }
+
+  private static final class MockTaskSideInputHandlerBuilder {
+    final TaskName taskName;
+    final TaskMode taskMode;
+    final File storeBaseDir;
+
+    final Map<String, StorageEngine> stores = new HashMap<>();
+    final Map<String, Set<SystemStreamPartition>> storeToSSPs = new 
HashMap<>();
+    final Clock clock = mock(Clock.class);
+    final Map<String, SideInputsProcessor> storeToProcessor = new HashMap<>();
+    final StreamMetadataCache streamMetadataCache = 
mock(StreamMetadataCache.class);
+    final SystemAdmins systemAdmins = mock(SystemAdmins.class);
+
+    public MockTaskSideInputHandlerBuilder(String taskName, TaskMode taskMode) 
{
+      this.taskName = new TaskName(taskName);
+      this.taskMode = taskMode;
+      this.storeBaseDir = mock(File.class);
+
+      initializeMocks();
+    }
+
+    private void initializeMocks() {
+      SystemAdmin admin = mock(SystemAdmin.class);
+      doAnswer(invocation -> {
+          String offset1 = invocation.getArgumentAt(0, String.class);
+          String offset2 = invocation.getArgumentAt(1, String.class);
+
+          return Long.compare(Long.parseLong(offset1), 
Long.parseLong(offset2));
+        }).when(admin).offsetComparator(any(), any());
+      doAnswer(invocation -> {
+          Map<SystemStreamPartition, String> sspToOffsets = 
invocation.getArgumentAt(0, Map.class);
+
+          return sspToOffsets.entrySet()
+              .stream()
+              .collect(Collectors.toMap(Map.Entry::getKey,
+                  entry -> getOffsetAfter(entry.getValue())));
+        }).when(admin).getOffsetsAfter(any());
+      doReturn(admin).when(systemAdmins).getSystemAdmin(TEST_SYSTEM);
+      doReturn(ScalaJavaUtil.toScalaMap(new 
HashMap<>())).when(streamMetadataCache).getStreamMetadata(any(), anyBoolean());
+    }
+
+
+    MockTaskSideInputHandlerBuilder addStreamMetadata(Map<SystemStream, 
SystemStreamMetadata> streamMetadata) {
+      
doReturn(ScalaJavaUtil.toScalaMap(streamMetadata)).when(streamMetadataCache).getStreamMetadata(any(),
 anyBoolean());
+      return this;
+    }
+
+    MockTaskSideInputHandlerBuilder addStore(String storeName, 
Set<SystemStreamPartition> storeSSPs) {
+      storeToSSPs.put(storeName, storeSSPs);
+      storeToProcessor.put(storeName, mock(SideInputsProcessor.class));
+      return this;
+    }
+
+    TaskSideInputHandler build() {
+      return spy(new TaskSideInputHandler(taskName,
+          taskMode,
+          storeBaseDir,
+          stores,
+          storeToSSPs,
+          storeToProcessor,
+          systemAdmins,
+          streamMetadataCache,
+          clock));
+    }
+  }
+
+  private static String getOffsetAfter(String offset) {
+    return String.valueOf(Long.parseLong(offset) + 1);
+  }
+}
diff --git 
a/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputStorageManager.java
 
b/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputStorageManager.java
index 6761702..9c41e85 100644
--- 
a/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputStorageManager.java
+++ 
b/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputStorageManager.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.storage;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import java.io.File;
 import java.util.Collections;
@@ -29,18 +30,10 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.samza.Partition;
-import org.apache.samza.config.Config;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.TaskMode;
-import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.SystemAdmin;
-import org.apache.samza.system.SystemAdmins;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.util.Clock;
-import org.apache.samza.util.ScalaJavaUtil;
-import org.junit.Assert;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
@@ -72,6 +65,7 @@ public class TestTaskSideInputStorageManager {
     final String taskName = "test-flush-task";
     final SystemStreamPartition ssp = new SystemStreamPartition("test-system", 
"test-stream", new Partition(0));
     final String offset = "123";
+    final ImmutableMap<SystemStreamPartition, String> processedOffsets = 
ImmutableMap.of(ssp, offset);
 
     TaskSideInputStorageManager testSideInputStorageManager = new 
MockTaskSideInputStorageManagerBuilder(taskName, LOGGED_STORE_DIR)
         .addLoggedStore(storeName, ImmutableSet.of(ssp))
@@ -79,14 +73,13 @@ public class TestTaskSideInputStorageManager {
     Map<String, StorageEngine> stores = new HashMap<>();
 
     initializeSideInputStorageManager(testSideInputStorageManager);
-    testSideInputStorageManager.updateLastProcessedOffset(ssp, offset);
-    testSideInputStorageManager.flush();
+    testSideInputStorageManager.flush(processedOffsets);
 
     for (StorageEngine storageEngine : stores.values()) {
       verify(storageEngine).flush();
     }
 
-    verify(testSideInputStorageManager).writeOffsetFiles();
+    verify(testSideInputStorageManager).writeFileOffsets(eq(processedOffsets));
 
     File storeDir = testSideInputStorageManager.getStoreLocation(storeName);
     assertTrue("Store directory: " + storeDir.getPath() + " is missing.", 
storeDir.exists());
@@ -100,16 +93,19 @@ public class TestTaskSideInputStorageManager {
   public void testStop() {
     final String storeName = "test-stop-store";
     final String taskName = "test-stop-task";
+    final SystemStreamPartition ssp = new SystemStreamPartition("test-system", 
"test-stream", new Partition(0));
+    final String offset = "123";
+    final ImmutableMap<SystemStreamPartition, String> processedOffsets = 
ImmutableMap.of(ssp, offset);
 
     TaskSideInputStorageManager testSideInputStorageManager = new 
MockTaskSideInputStorageManagerBuilder(taskName, NON_LOGGED_STORE_DIR)
         .addInMemoryStore(storeName, ImmutableSet.of())
         .build();
 
     initializeSideInputStorageManager(testSideInputStorageManager);
-    testSideInputStorageManager.stop();
+    testSideInputStorageManager.stop(processedOffsets);
 
     verify(testSideInputStorageManager.getStore(storeName)).stop();
-    verify(testSideInputStorageManager).writeOffsetFiles();
+    verify(testSideInputStorageManager).writeFileOffsets(eq(processedOffsets));
   }
 
   @Test
@@ -122,7 +118,7 @@ public class TestTaskSideInputStorageManager {
         .build();
 
     initializeSideInputStorageManager(testSideInputStorageManager);
-    testSideInputStorageManager.writeOffsetFiles(); // should be no-op
+    testSideInputStorageManager.writeFileOffsets(Collections.emptyMap()); // 
should be no-op
     File storeDir = testSideInputStorageManager.getStoreLocation(storeName);
 
     assertFalse("Store directory: " + storeDir.getPath() + " should not be 
created for non-persisted store", storeDir.exists());
@@ -138,15 +134,15 @@ public class TestTaskSideInputStorageManager {
     final SystemStreamPartition ssp = new SystemStreamPartition("test-system", 
"test-stream", new Partition(0));
     final SystemStreamPartition ssp2 = new 
SystemStreamPartition("test-system2", "test-stream2", new Partition(0));
 
+    Map<SystemStreamPartition, String> processedOffsets = ImmutableMap.of(ssp, 
offset, ssp2, offset);
+
     TaskSideInputStorageManager testSideInputStorageManager = new 
MockTaskSideInputStorageManagerBuilder(taskName, LOGGED_STORE_DIR)
         .addLoggedStore(storeName, ImmutableSet.of(ssp))
         .addLoggedStore(storeName2, ImmutableSet.of(ssp2))
         .build();
 
     initializeSideInputStorageManager(testSideInputStorageManager);
-    testSideInputStorageManager.updateLastProcessedOffset(ssp, offset);
-    testSideInputStorageManager.updateLastProcessedOffset(ssp2, offset);
-    testSideInputStorageManager.writeOffsetFiles();
+    testSideInputStorageManager.writeFileOffsets(processedOffsets);
     File storeDir = testSideInputStorageManager.getStoreLocation(storeName);
 
     assertTrue("Store directory: " + storeDir.getPath() + " is missing.", 
storeDir.exists());
@@ -175,87 +171,19 @@ public class TestTaskSideInputStorageManager {
         .build();
 
     initializeSideInputStorageManager(testSideInputStorageManager);
-    ssps.forEach(ssp -> 
testSideInputStorageManager.updateLastProcessedOffset(ssp, offset));
-    testSideInputStorageManager.writeOffsetFiles();
+    Map<SystemStreamPartition, String> processedOffsets = ssps.stream()
+        .collect(Collectors.toMap(Function.identity(), ssp -> offset));
 
-    Map<SystemStreamPartition, String> fileOffsets = 
testSideInputStorageManager.getFileOffsets();
+    testSideInputStorageManager.writeFileOffsets(processedOffsets);
 
+    Map<SystemStreamPartition, String> fileOffsets = 
testSideInputStorageManager.getFileOffsets();
     ssps.forEach(ssp -> {
         assertTrue("Failed to get offset for ssp: " + ssp.toString() + " from 
file.", fileOffsets.containsKey(ssp));
         assertEquals("Mismatch between last processed offset and file 
offset.", fileOffsets.get(ssp), offset);
       });
   }
 
-  /**
-   * This test is for cases, when calls to systemAdmin (e.g., 
KafkaSystemAdmin's) get-stream-metadata method return null.
-   */
-  @Test
-  public void testGetStartingOffsetsWhenStreamMetadataIsNull() {
-    final String storeName = "test-get-starting-offset-store";
-    final String taskName = "test-get-starting-offset-task";
-
-    Set<SystemStreamPartition> ssps = IntStream.range(1, 2)
-        .mapToObj(idx -> new SystemStreamPartition("test-system", 
"test-stream", new Partition(idx)))
-        .collect(Collectors.toSet());
-    Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> 
partitionMetadata = ssps.stream()
-        .collect(Collectors.toMap(SystemStreamPartition::getPartition,
-            x -> new SystemStreamMetadata.SystemStreamPartitionMetadata(null, 
"1", "2")));
-
-
-    TaskSideInputStorageManager testSideInputStorageManager = new 
MockTaskSideInputStorageManagerBuilder(taskName, LOGGED_STORE_DIR)
-        .addLoggedStore(storeName, ssps)
-        .addStreamMetadata(Collections.singletonMap(new 
SystemStream("test-system", "test-stream"),
-            new SystemStreamMetadata("test-stream", partitionMetadata)))
-        .build();
-
-    initializeSideInputStorageManager(testSideInputStorageManager);
-    ssps.forEach(ssp -> {
-        String startingOffset = testSideInputStorageManager.getStartingOffset(
-            new SystemStreamPartition("test-system", "test-stream", 
ssp.getPartition()));
-        Assert.assertNull("Starting offset should be null", startingOffset);
-      });
-  }
-
-  @Test
-  public void testGetStartingOffsets() {
-    final String storeName = "test-get-starting-offset-store";
-    final String taskName = "test-get-starting-offset-task";
-
-    Set<SystemStreamPartition> ssps = IntStream.range(1, 6)
-        .mapToObj(idx -> new SystemStreamPartition("test-system", 
"test-stream", new Partition(idx)))
-        .collect(Collectors.toSet());
-
-
-    TaskSideInputStorageManager testSideInputStorageManager = new 
MockTaskSideInputStorageManagerBuilder(taskName, LOGGED_STORE_DIR)
-        .addLoggedStore(storeName, ssps)
-        .build();
-
-    initializeSideInputStorageManager(testSideInputStorageManager);
-    Map<SystemStreamPartition, String> fileOffsets = ssps.stream()
-        .collect(Collectors.toMap(Function.identity(), ssp -> {
-            int partitionId = ssp.getPartition().getPartitionId();
-            int offset = partitionId % 2 == 0 ? partitionId + 10 : partitionId;
-            return String.valueOf(offset);
-          }));
-
-    Map<SystemStreamPartition, String> oldestOffsets = ssps.stream()
-        .collect(Collectors.toMap(Function.identity(), ssp -> {
-            int partitionId = ssp.getPartition().getPartitionId();
-            int offset = partitionId % 2 == 0 ? partitionId : partitionId + 10;
-
-            return String.valueOf(offset);
-          }));
-
-    
doCallRealMethod().when(testSideInputStorageManager).getStartingOffsets(fileOffsets,
 oldestOffsets);
-
-    Map<SystemStreamPartition, String> startingOffsets =
-        testSideInputStorageManager.getStartingOffsets(fileOffsets, 
oldestOffsets);
-
-    assertTrue("Failed to get starting offsets for all ssps", 
startingOffsets.size() == 5);
-  }
-
   private void initializeSideInputStorageManager(TaskSideInputStorageManager 
testSideInputStorageManager) {
-    doReturn(new 
HashMap<>()).when(testSideInputStorageManager).getStartingOffsets(any(), any());
     testSideInputStorageManager.init();
   }
 
@@ -263,39 +191,13 @@ public class TestTaskSideInputStorageManager {
     private final TaskName taskName;
     private final String storeBaseDir;
 
-    private Clock clock = mock(Clock.class);
-    private Map<String, SideInputsProcessor> storeToProcessor = new 
HashMap<>();
     private Map<String, StorageEngine> stores = new HashMap<>();
     private Map<String, Set<SystemStreamPartition>> storeToSSps = new 
HashMap<>();
-    private StreamMetadataCache streamMetadataCache = 
mock(StreamMetadataCache.class);
-    private SystemAdmins systemAdmins = mock(SystemAdmins.class);
+    private Clock clock = mock(Clock.class);
 
     public MockTaskSideInputStorageManagerBuilder(String taskName, String 
storeBaseDir) {
       this.taskName = new TaskName(taskName);
       this.storeBaseDir = storeBaseDir;
-
-      initializeMocks();
-    }
-
-    private void initializeMocks() {
-      SystemAdmin admin = mock(SystemAdmin.class);
-      doAnswer(invocation -> {
-          String offset1 = invocation.getArgumentAt(0, String.class);
-          String offset2 = invocation.getArgumentAt(1, String.class);
-
-          return Long.compare(Long.parseLong(offset1), 
Long.parseLong(offset2));
-        }).when(admin).offsetComparator(any(), any());
-      doAnswer(invocation -> {
-          Map<SystemStreamPartition, String> sspToOffsets = 
invocation.getArgumentAt(0, Map.class);
-
-          return sspToOffsets.entrySet()
-              .stream()
-              .collect(Collectors.toMap(Map.Entry::getKey,
-                  entry -> String.valueOf(Long.parseLong(entry.getValue()) + 
1)));
-        }).when(admin).getOffsetsAfter(any());
-      doReturn(admin).when(systemAdmins).getSystemAdmin("test-system");
-
-      doReturn(ScalaJavaUtil.toScalaMap(new 
HashMap<>())).when(streamMetadataCache).getStreamMetadata(any(), anyBoolean());
     }
 
     MockTaskSideInputStorageManagerBuilder addInMemoryStore(String storeName, 
Set<SystemStreamPartition> ssps) {
@@ -304,32 +206,25 @@ public class TestTaskSideInputStorageManager {
           new 
StoreProperties.StorePropertiesBuilder().setLoggedStore(false).setPersistedToDisk(false).build());
 
       stores.put(storeName, storageEngine);
-      storeToProcessor.put(storeName, mock(SideInputsProcessor.class));
       storeToSSps.put(storeName, ssps);
 
       return this;
     }
 
-    MockTaskSideInputStorageManagerBuilder addStreamMetadata(Map<SystemStream, 
SystemStreamMetadata> streamMetadata) {
-      
doReturn(ScalaJavaUtil.toScalaMap(streamMetadata)).when(streamMetadataCache).getStreamMetadata(any(),
 anyBoolean());
-      return this;
-    }
-
     MockTaskSideInputStorageManagerBuilder addLoggedStore(String storeName, 
Set<SystemStreamPartition> ssps) {
       StorageEngine storageEngine = mock(StorageEngine.class);
       when(storageEngine.getStoreProperties()).thenReturn(
           new 
StoreProperties.StorePropertiesBuilder().setLoggedStore(false).setPersistedToDisk(true).build());
 
       stores.put(storeName, storageEngine);
-      storeToProcessor.put(storeName, mock(SideInputsProcessor.class));
       storeToSSps.put(storeName, ssps);
 
       return this;
     }
 
     TaskSideInputStorageManager build() {
-      return spy(new TaskSideInputStorageManager(taskName, TaskMode.Active, 
streamMetadataCache, new File(storeBaseDir), stores,
-          storeToProcessor, storeToSSps, systemAdmins, mock(Config.class), 
clock));
+      return spy(new TaskSideInputStorageManager(taskName, TaskMode.Active, 
new File(storeBaseDir), stores, storeToSSps,
+          clock));
     }
   }
 }
\ No newline at end of file

Reply via email to