mynameborat commented on code in PR #1655:
URL: https://github.com/apache/samza/pull/1655#discussion_r1128978158


##########
samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java:
##########
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import scala.collection.JavaConversions;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.samza.SamzaException;
+import org.apache.samza.application.ApplicationUtil;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.RunLoop;
+import org.apache.samza.container.RunLoopTask;
+import org.apache.samza.container.SamzaContainerMetrics;
+import org.apache.samza.container.TaskInstanceMetrics;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeManager;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemConsumers;
+import org.apache.samza.system.SystemConsumersMetrics;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.chooser.DefaultChooser;
+import org.apache.samza.system.chooser.MessageChooser;
+import org.apache.samza.system.chooser.RoundRobinChooserFactory;
+import org.apache.samza.table.utils.SerdeUtils;
+import org.apache.samza.task.TaskInstanceCollector;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.ReflectionUtil;
+import org.apache.samza.util.ScalaJavaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class SideInputsManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SideInputsManager.class);
+
+  private static final String SIDE_INPUTS_THREAD_NAME = "SideInputs Thread";
+  // We use a prefix to differentiate the SystemConsumersMetrics for 
sideInputs from the ones in SamzaContainer
+  private static final String SIDE_INPUTS_METRICS_PREFIX = "side-inputs-";
+
+  // Timeout with which sideinput thread checks for exceptions and for whether 
SSPs as caught up
+  private static final int SIDE_INPUT_CHECK_TIMEOUT_SECONDS = 10;
+  private static final int SIDE_INPUT_SHUTDOWN_TIMEOUT_SECONDS = 60;
+  private static final int DEFAULT_SIDE_INPUT_ELASTICITY_FACTOR = 1;
+
+  private final SamzaContainerMetrics samzaContainerMetrics;
+  private final Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics;
+  private final Config config;
+
+  /* Sideinput related parameters */
+  private final boolean hasSideInputs;
+  private final Map<TaskName, Map<String, StorageEngine>> sideInputStores;
+  // side inputs indexed first by task, then store name
+  private final Map<TaskName, Map<String, Set<SystemStreamPartition>>> 
taskSideInputStoreSSPs;
+  private final Set<String> sideInputStoreNames;
+  private final Map<SystemStreamPartition, TaskSideInputHandler> 
sspSideInputHandlers;
+  private SystemConsumers sideInputSystemConsumers;
+
+  // Used by the sideInput-read thread to signal to the main thread. Map's 
contents are mutated.
+  private final Map<TaskName, CountDownLatch> sideInputTaskLatches;
+  private final ExecutorService sideInputsExecutor = 
Executors.newSingleThreadExecutor(
+      new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat(SIDE_INPUTS_THREAD_NAME).build());
+  private RunLoop sideInputRunLoop; // created in start()
+
+  private volatile boolean shouldShutdown = false;
+  private volatile Throwable sideInputException = null;
+
+  public SideInputsManager(Map<String, Set<SystemStream>> 
sideInputSystemStreams,
+      Map<String, SystemFactory> systemFactories,
+      Map<String, SystemStream> changelogSystemStreams,
+      Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
+      Set<Path> storeDirectoryPaths,
+      ContainerModel containerModel, JobContext jobContext, ContainerContext 
containerContext,
+      SamzaContainerMetrics samzaContainerMetrics,
+      Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
+      Map<TaskName, TaskInstanceCollector> taskInstanceCollectors,
+      StreamMetadataCache streamMetadataCache,
+      SystemAdmins systemAdmins,
+      SerdeManager serdeManager, Map<String, Serde<Object>> serdes,
+      StorageManagerUtil storageManagerUtil,
+      File loggedStoreBaseDirectory, File nonLoggedStoreBaseDirectory,
+      Config config, Clock clock) {
+    this.taskSideInputStoreSSPs = getTaskSideInputSSPs(sideInputSystemStreams, 
changelogSystemStreams, containerModel);
+    this.sideInputStoreNames = 
ContainerStorageManagerUtil.getSideInputStoreNames(
+        sideInputSystemStreams, changelogSystemStreams, containerModel);
+    this.sideInputTaskLatches = new HashMap<>();
+    this.hasSideInputs = this.taskSideInputStoreSSPs.values().stream()
+        .flatMap(m -> m.values().stream())
+        .flatMap(Collection::stream)
+        .findAny()
+        .isPresent();
+
+    this.taskInstanceMetrics = taskInstanceMetrics;
+    this.samzaContainerMetrics = samzaContainerMetrics;
+    this.config = config;
+
+    // create side input taskStores for all tasks in the containerModel and 
each store in storageEngineFactories
+    this.sideInputStores = 
ContainerStorageManagerUtil.createTaskStores(sideInputStoreNames, 
storageEngineFactories, sideInputStoreNames, changelogSystemStreams, 
storeDirectoryPaths, containerModel,
+        jobContext, containerContext, serdes, taskInstanceMetrics, 
taskInstanceCollectors,
+        storageManagerUtil,
+        loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config);
+
+    this.sspSideInputHandlers = createSideInputHandlers(hasSideInputs, 
sideInputStores, taskSideInputStoreSSPs, sideInputTaskLatches, 
taskInstanceMetrics, containerModel, streamMetadataCache, systemAdmins, serdes, 
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock
+    );
+
+    // create SystemConsumers for consuming from taskSideInputSSPs, if 
sideInputs are being used
+    if (this.hasSideInputs) {
+      Set<SystemStream> containerSideInputSystemStreams = 
this.taskSideInputStoreSSPs.values().stream()
+          .flatMap(map -> map.values().stream())
+          .flatMap(Set::stream)
+          .map(SystemStreamPartition::getSystemStream)
+          .collect(Collectors.toSet());
+
+      Set<String> containerSideInputSystems = 
containerSideInputSystemStreams.stream()
+          .map(SystemStream::getSystem)
+          .collect(Collectors.toSet());
+
+      // create sideInput consumers indexed by systemName
+      // Mapping from storeSystemNames to SystemConsumers
+      Map<String, SystemConsumer> sideInputConsumers =
+          
ContainerStorageManagerUtil.createSystemConsumers(containerSideInputSystems, 
systemFactories,
+              samzaContainerMetrics.registry(), config);
+
+      scala.collection.immutable.Map<SystemStream, SystemStreamMetadata> 
inputStreamMetadata =
+          
streamMetadataCache.getStreamMetadata(JavaConversions.asScalaSet(containerSideInputSystemStreams).toSet(),
 false);
+
+      // we use the same registry as samza-container-metrics
+      SystemConsumersMetrics sideInputSystemConsumersMetrics =
+          new SystemConsumersMetrics(samzaContainerMetrics.registry(), 
SIDE_INPUTS_METRICS_PREFIX);
+
+      MessageChooser chooser = DefaultChooser.apply(inputStreamMetadata, new 
RoundRobinChooserFactory(), config,
+          sideInputSystemConsumersMetrics.registry(), systemAdmins);
+
+      ApplicationConfig applicationConfig = new ApplicationConfig(config);
+
+      this.sideInputSystemConsumers =
+          new SystemConsumers(chooser, 
ScalaJavaUtil.toScalaMap(sideInputConsumers), systemAdmins, serdeManager,
+              sideInputSystemConsumersMetrics, 
SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT(),
+              SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR(),
+              TaskConfig.DEFAULT_POLL_INTERVAL_MS, 
ScalaJavaUtil.toScalaFunction(() -> System.nanoTime()),
+              JobConfig.DEFAULT_JOB_ELASTICITY_FACTOR, 
applicationConfig.getRunId());
+    }
+  }
+
+  // read sideInputs until all sideInputStreams are caught up, then return
+  public void start() {
+    if (this.hasSideInputs) {
+      LOG.info("SideInput Restore started");
+
+      // initialize the sideInputStorageManagers
+      this.sspSideInputHandlers.values().forEach(TaskSideInputHandler::init);
+
+      Map<TaskName, TaskSideInputHandler> taskSideInputHandlers = 
this.sspSideInputHandlers.values().stream()
+          .distinct()
+          .collect(Collectors.toMap(TaskSideInputHandler::getTaskName, 
Function.identity()));
+
+      Map<TaskName, TaskInstanceMetrics> sideInputTaskMetrics = new 
HashMap<>();
+      Map<TaskName, RunLoopTask> sideInputTasks = new HashMap<>();
+      this.taskSideInputStoreSSPs.forEach((taskName, storesToSSPs) -> {
+        Set<SystemStreamPartition> taskSSPs = 
this.taskSideInputStoreSSPs.get(taskName).values().stream()
+            .flatMap(Set::stream)
+            .collect(Collectors.toSet());
+
+        if (!taskSSPs.isEmpty()) {
+          String sideInputSource = SIDE_INPUTS_METRICS_PREFIX + 
this.taskInstanceMetrics.get(taskName).source();
+          TaskInstanceMetrics sideInputMetrics = new TaskInstanceMetrics(
+              sideInputSource, 
this.taskInstanceMetrics.get(taskName).registry(), SIDE_INPUTS_METRICS_PREFIX);
+          sideInputTaskMetrics.put(taskName, sideInputMetrics);
+
+          RunLoopTask sideInputTask = new SideInputTask(taskName, taskSSPs,
+              taskSideInputHandlers.get(taskName), 
sideInputTaskMetrics.get(taskName));
+          sideInputTasks.put(taskName, sideInputTask);
+        }
+      });
+
+      // register all sideInput SSPs with the consumers
+      for (SystemStreamPartition ssp : this.sspSideInputHandlers.keySet()) {
+        String startingOffset = 
this.sspSideInputHandlers.get(ssp).getStartingOffset(ssp);
+
+        if (startingOffset == null) {
+          throw new SamzaException(
+              "No starting offset could be obtained for SideInput 
SystemStreamPartition : " + ssp + ". Consumer cannot start.");
+        }
+
+        // register startingOffset with the sysConsumer and register a metric 
for it
+        sideInputSystemConsumers.register(ssp, startingOffset);
+        
taskInstanceMetrics.get(this.sspSideInputHandlers.get(ssp).getTaskName()).addOffsetGauge(
+            ssp, ScalaJavaUtil.toScalaFunction(() -> 
this.sspSideInputHandlers.get(ssp).getLastProcessedOffset(ssp)));
+        
sideInputTaskMetrics.get(this.sspSideInputHandlers.get(ssp).getTaskName()).addOffsetGauge(
+            ssp, ScalaJavaUtil.toScalaFunction(() -> 
this.sspSideInputHandlers.get(ssp).getLastProcessedOffset(ssp)));
+      }
+
+      // start the systemConsumers for consuming input
+      this.sideInputSystemConsumers.start();
+
+      TaskConfig taskConfig = new TaskConfig(this.config);
+      SamzaContainerMetrics sideInputContainerMetrics =
+          new SamzaContainerMetrics(SIDE_INPUTS_METRICS_PREFIX + 
this.samzaContainerMetrics.source(),
+              this.samzaContainerMetrics.registry(), 
SIDE_INPUTS_METRICS_PREFIX);
+
+      final ApplicationConfig applicationConfig = new 
ApplicationConfig(config);
+
+      this.sideInputRunLoop = new RunLoop(sideInputTasks,
+          null, // all operations are executed in the main runloop thread
+          this.sideInputSystemConsumers,
+          1, // single message in flight per task
+          -1, // no windowing
+          taskConfig.getCommitMs(),
+          taskConfig.getCallbackTimeoutMs(),
+          taskConfig.getDrainCallbackTimeoutMs(),
+          // TODO consolidate these container configs SAMZA-2275
+          this.config.getLong("container.disk.quota.delay.max.ms", 
TimeUnit.SECONDS.toMillis(1)),
+          taskConfig.getMaxIdleMs(),
+          sideInputContainerMetrics,
+          System::nanoTime,
+          false,
+          DEFAULT_SIDE_INPUT_ELASTICITY_FACTOR,
+          applicationConfig.getRunId(),
+          ApplicationUtil.isHighLevelApiJob(config)
+      ); // commit must be synchronous to ensure integrity of state flush
+
+      try {
+        sideInputsExecutor.submit(() -> {
+          try {
+            sideInputRunLoop.run();
+          } catch (Exception e) {
+            LOG.error("Exception in reading sideInputs", e);
+            sideInputException = e;
+          }
+        });
+
+        // Make the main thread wait until all sideInputs have been caughtup 
or an exception was thrown
+        while (!shouldShutdown && sideInputException == null &&
+            !awaitSideInputTasks(sideInputTaskLatches)) {
+          LOG.debug("Waiting for SideInput bootstrap to complete");
+        }
+
+        if (sideInputException != null) { // Throw exception if there was an 
exception in catching-up sideInputs
+          throw new SamzaException("Exception in restoring sideInputs", 
sideInputException);
+        }
+
+      } catch (InterruptedException e) {
+        LOG.warn("Received an interrupt during side inputs store restoration."
+            + " Exiting prematurely without completing store restore.");
+        /*
+         * We want to stop side input restoration and rethrow the exception 
upstream. Container should handle the
+         * interrupt exception and shutdown the components and cleaning up the 
resource. We don't want to clean up the
+         * resources prematurely here.
+         */
+        shouldShutdown = true; // todo: should we cancel the flush future 
right away or wait for container to handle it as part of shutdown sequence?
+        throw new SamzaException("Side inputs read was interrupted", e);
+      }
+
+      LOG.info("SideInput Restore complete");
+    }
+  }
+
+  public Map<TaskName, Map<String, StorageEngine>> getSideInputStores() {
+    return ImmutableMap.copyOf(this.sideInputStores);
+  }
+
+  public void shutdown() {
+    // stop all side input consumers and stores

Review Comment:
   We need to set the `shouldShutdown = true` here for behavior parity across 
the refactor?



##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java:
##########
@@ -273,491 +200,52 @@ public ContainerStorageManager(
         );
     this.restoreExecutor = Executors.newFixedThreadPool(restoreThreadPoolSize,
         new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat(RESTORE_THREAD_NAME).build());
-
-    this.sspSideInputHandlers = createSideInputHandlers(clock);
-
-    // create SystemConsumers for consuming from taskSideInputSSPs, if 
sideInputs are being used
-    if (this.hasSideInputs) {
-      Set<SystemStream> containerSideInputSystemStreams = 
this.taskSideInputStoreSSPs.values().stream()
-          .flatMap(map -> map.values().stream())
-          .flatMap(Set::stream)
-          .map(SystemStreamPartition::getSystemStream)
-          .collect(Collectors.toSet());
-
-      Set<String> containerSideInputSystems = 
containerSideInputSystemStreams.stream()
-          .map(SystemStream::getSystem)
-          .collect(Collectors.toSet());
-
-      // create sideInput consumers indexed by systemName
-      // Mapping from storeSystemNames to SystemConsumers
-      Map<String, SystemConsumer> sideInputConsumers =
-          createConsumers(containerSideInputSystems, systemFactories, config, 
this.samzaContainerMetrics.registry());
-
-      scala.collection.immutable.Map<SystemStream, SystemStreamMetadata> 
inputStreamMetadata = 
streamMetadataCache.getStreamMetadata(JavaConversions.asScalaSet(containerSideInputSystemStreams).toSet(),
 false);
-
-      SystemConsumersMetrics sideInputSystemConsumersMetrics = new 
SystemConsumersMetrics(samzaContainerMetrics.registry(), 
SIDEINPUTS_METRICS_PREFIX);
-      // we use the same registry as samza-container-metrics
-
-      MessageChooser chooser = DefaultChooser.apply(inputStreamMetadata, new 
RoundRobinChooserFactory(), config,
-          sideInputSystemConsumersMetrics.registry(), systemAdmins);
-
-      ApplicationConfig applicationConfig = new ApplicationConfig(config);
-
-      sideInputSystemConsumers =
-          new SystemConsumers(chooser, 
ScalaJavaUtil.toScalaMap(sideInputConsumers), systemAdmins, serdeManager,
-              sideInputSystemConsumersMetrics, 
SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT(), 
SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR(),
-              TaskConfig.DEFAULT_POLL_INTERVAL_MS, 
ScalaJavaUtil.toScalaFunction(() -> System.nanoTime()),
-              JobConfig.DEFAULT_JOB_ELASTICITY_FACTOR, 
applicationConfig.getRunId());
-    }
-
-  }
-
-  /**
-   * Remove changeLogSSPs that are associated with standby tasks from 
changelogSSP map and only return changelogSSPs
-   * associated with the active tasks.
-   * The standby changelogs will be consumed and restored as side inputs.
-   *
-   * @param containerModel the container's model
-   * @param changelogSystemStreams the passed in set of changelogSystemStreams
-   * @return A map of changeLogSSP to storeName across all tasks, assuming no 
two stores have the same changelogSSP
-   */
-  @VisibleForTesting
-  Map<String, SystemStream> getActiveTaskChangelogSystemStreams(ContainerModel 
containerModel,
-      Map<String, SystemStream> changelogSystemStreams) {
-    if (MapUtils.invertMap(changelogSystemStreams).size() != 
changelogSystemStreams.size()) {
-      throw new SamzaException("Two stores cannot have the same changelog 
system-stream");
-    }
-
-    Map<SystemStreamPartition, String> changelogSSPToStore = new HashMap<>();
-    changelogSystemStreams.forEach((storeName, systemStream) ->
-        containerModel.getTasks().forEach((taskName, taskModel) ->
-            changelogSSPToStore.put(new SystemStreamPartition(systemStream, 
taskModel.getChangelogPartition()), storeName))
-    );
-
-    getTasks(containerModel, TaskMode.Standby).forEach((taskName, taskModel) 
-> {
-      changelogSystemStreams.forEach((storeName, systemStream) -> {
-        SystemStreamPartition ssp = new SystemStreamPartition(systemStream, 
taskModel.getChangelogPartition());
-        changelogSSPToStore.remove(ssp);
-      });
-    });
-
-    // changelogSystemStreams correspond only to active tasks (since those of 
standby-tasks moved to sideInputs above)
-    return MapUtils.invertMap(changelogSSPToStore).entrySet().stream()
-        .collect(Collectors.toMap(Map.Entry::getKey, x -> 
x.getValue().getSystemStream()));
-  }
-
-  /**
-   * Fetch the side input stores. For active containers, the stores correspond 
to the side inputs and for standbys, they
-   * include the durable stores.
-   * @param containerModel the container's model
-   * @param sideInputSystemStreams the map of store to side input system 
streams
-   * @param changelogSystemStreams the map of store to changelog system streams
-   * @return A set of side input stores
-   */
-  @VisibleForTesting
-  Set<String> getSideInputStores(ContainerModel containerModel,
-      Map<String, Set<SystemStream>> sideInputSystemStreams, Map<String, 
SystemStream> changelogSystemStreams) {
-    // add all the side input stores by default regardless of active vs standby
-    Set<String> sideInputStores = new 
HashSet<>(sideInputSystemStreams.keySet());
-
-    // In case of standby tasks, we treat the stores that have changelogs as 
side input stores for bootstrapping state
-    if (getTasks(containerModel, TaskMode.Standby).size() > 0) {
-      sideInputStores.addAll(changelogSystemStreams.keySet());
-    }
-    return sideInputStores;
-  }
-
-  /**
-   * Add all sideInputs to a map of maps, indexed first by taskName, then by 
sideInput store name.
-   *
-   * @param containerModel the containerModel to use
-   * @param sideInputSystemStreams the map of store to sideInput system stream
-   * @param changelogSystemStreams the map of store to changelog system stream
-   * @return taskSideInputSSPs map
-   */
-  @VisibleForTesting
-  Map<TaskName, Map<String, Set<SystemStreamPartition>>> 
getTaskSideInputSSPs(ContainerModel containerModel,
-      Map<String, Set<SystemStream>> sideInputSystemStreams, Map<String, 
SystemStream> changelogSystemStreams) {
-    Map<TaskName, Map<String, Set<SystemStreamPartition>>> taskSideInputSSPs = 
new HashMap<>();
-
-    containerModel.getTasks().forEach((taskName, taskModel) -> {
-      taskSideInputSSPs.putIfAbsent(taskName, new HashMap<>());
-      sideInputSystemStreams.keySet().forEach(storeName -> {
-        Set<SystemStreamPartition> taskSideInputs = 
taskModel.getSystemStreamPartitions().stream().filter(ssp -> 
sideInputSystemStreams.get(storeName).contains(ssp.getSystemStream())).collect(Collectors.toSet());
-        taskSideInputSSPs.get(taskName).put(storeName, taskSideInputs);
-      });
-    });
-
-    getTasks(containerModel, TaskMode.Standby).forEach((taskName, taskModel) 
-> {
-      taskSideInputSSPs.putIfAbsent(taskName, new HashMap<>());
-      changelogSystemStreams.forEach((storeName, systemStream) -> {
-        SystemStreamPartition ssp = new SystemStreamPartition(systemStream, 
taskModel.getChangelogPartition());
-        taskSideInputSSPs.get(taskName).put(storeName, 
Collections.singleton(ssp));
-      });
-    });
-
-    return taskSideInputSSPs;
-  }
-
-  /**
-   *  Creates SystemConsumer objects for store restoration, creating one 
consumer per system.
-   */
-  private static Map<String, SystemConsumer> createConsumers(Set<String> 
storeSystems,
-      Map<String, SystemFactory> systemFactories, Config config, 
MetricsRegistry registry) {
-    // Create one consumer for each system in use, map with one entry for each 
such system
-    Map<String, SystemConsumer> consumers = new HashMap<>();
-
-    // Iterate over the list of storeSystems and create one sysConsumer per 
system
-    for (String storeSystemName : storeSystems) {
-      SystemFactory systemFactory = systemFactories.get(storeSystemName);
-      if (systemFactory == null) {
-        throw new SamzaException("System " + storeSystemName + " does not 
exist in config");
-      }
-      consumers.put(storeSystemName, 
systemFactory.getConsumer(storeSystemName, config, registry));
-    }
-
-    return consumers;
-  }
-
-  private static Map<String, SystemConsumer> createStoreIndexedMap(Map<String, 
SystemStream> changelogSystemStreams,
-      Map<String, SystemConsumer> systemNameToSystemConsumers) {
-    // Map of each storeName to its respective systemConsumer
-    Map<String, SystemConsumer> storeConsumers = new HashMap<>();
-
-    // Populate the map of storeName to its relevant systemConsumer
-    for (String storeName : changelogSystemStreams.keySet()) {
-      storeConsumers.put(storeName, 
systemNameToSystemConsumers.get(changelogSystemStreams.get(storeName).getSystem()));
-    }
-    return storeConsumers;
-  }
-
-  private Map<String, TaskRestoreManager> 
createTaskRestoreManagers(Map<String, StateBackendFactory> factories,
-      Map<String, Set<String>> backendFactoryStoreNames, Clock clock, 
SamzaContainerMetrics samzaContainerMetrics, TaskName taskName,
-      TaskModel taskModel) {
-    // Get the factories for the task based on the stores of the tasks to be 
restored from the factory
-    Map<String, TaskRestoreManager> backendFactoryRestoreManagers = new 
HashMap<>(); // backendFactoryName -> restoreManager
-    MetricsRegistry taskMetricsRegistry =
-        taskInstanceMetrics.get(taskName) != null ? 
taskInstanceMetrics.get(taskName).registry() : new MetricsRegistryMap();
-
-    backendFactoryStoreNames.forEach((factoryName, storeNames) -> {
-      StateBackendFactory factory = factories.get(factoryName);
-      if (factory == null) {
-        throw new SamzaException(
-            String.format("Required restore state backend factory: %s not 
found in configured factories %s",
-                factoryName, String.join(", ", factories.keySet())));
-      }
-      KafkaChangelogRestoreParams kafkaChangelogRestoreParams = new 
KafkaChangelogRestoreParams(storeConsumers,
-          inMemoryStores.get(taskName), systemAdmins.getSystemAdmins(), 
storageEngineFactories, serdes,
-          taskInstanceCollectors.get(taskName));
-      TaskRestoreManager restoreManager = 
factory.getRestoreManager(jobContext, containerContext, taskModel, 
restoreExecutor,
-          taskMetricsRegistry, storeNames, config, clock, 
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory,
-          kafkaChangelogRestoreParams);
-
-      backendFactoryRestoreManagers.put(factoryName, restoreManager);
-    });
-    samzaContainerMetrics.addStoresRestorationGauge(taskName);
-    return backendFactoryRestoreManagers;
-  }
-
-  /**
-   * Return a map of backend factory names to set of stores that should be 
restored using it
-   */
-  @VisibleForTesting
-  Map<String, Set<String>> getBackendFactoryStoreNames(Checkpoint checkpoint, 
Set<String> storeNames,
-      StorageConfig storageConfig) {
-    Map<String, Set<String>> backendFactoryStoreNames = new HashMap<>(); // 
backendFactoryName -> set(storeNames)
-
-    if (checkpoint != null && checkpoint.getVersion() == 1) {
-      // Only restore stores with changelog streams configured
-      Set<String> changelogStores = storeNames.stream()
-          .filter(storeName -> 
storageConfig.getChangelogStream(storeName).isPresent())
-          .collect(Collectors.toSet());
-      // Default to changelog backend factory when using checkpoint v1 for 
backwards compatibility
-      if (!changelogStores.isEmpty()) {
-        
backendFactoryStoreNames.put(StorageConfig.KAFKA_STATE_BACKEND_FACTORY, 
changelogStores);
-      }
-      if (storeNames.size() > changelogStores.size()) {
-        Set<String> nonChangelogStores = storeNames.stream()
-            .filter(storeName -> !changelogStores.contains(storeName))
-            .collect(Collectors.toSet());
-        LOG.info("non-Side input stores: {}, do not have a configured store 
changelogs for checkpoint V1,"
-                + "restore for the store will be skipped",
-            nonChangelogStores);
-      }
-    } else if (checkpoint == null ||  checkpoint.getVersion() == 2) {
-      // Extract the state checkpoint markers if checkpoint exists
-      Map<String, Map<String, String>> stateCheckpointMarkers = checkpoint == 
null ? Collections.emptyMap() :
-          ((CheckpointV2) checkpoint).getStateCheckpointMarkers();
-
-      // Find stores associated to each state backend factory
-      storeNames.forEach(storeName -> {
-        List<String> storeFactories = 
storageConfig.getStoreRestoreFactories(storeName);
-
-        if (storeFactories.isEmpty()) {
-          // If the restore factory is not configured for the store and the 
store does not have a changelog topic
-          LOG.info("non-Side input store: {}, does not have a configured 
restore factories nor store changelogs,"
-                  + "restore for the store will be skipped",
-              storeName);
-        } else {
-          // Search the ordered list for the first matched state backend 
factory in the checkpoint
-          // If the checkpoint does not exist or state checkpoint markers does 
not exist, we match the first configured
-          // restore manager
-          Optional<String> factoryNameOpt = storeFactories.stream()
-              .filter(factoryName -> 
stateCheckpointMarkers.containsKey(factoryName) &&
-                  
stateCheckpointMarkers.get(factoryName).containsKey(storeName))
-              .findFirst();
-          String factoryName;
-          if (factoryNameOpt.isPresent()) {
-            factoryName = factoryNameOpt.get();
-          } else { // Restore factories configured but no checkpoints found
-            // Use first configured restore factory
-            factoryName = storeFactories.get(0);
-            LOG.warn("No matching checkpoints found for configured factories: 
{}, " +
-                "defaulting to using the first configured factory with no 
checkpoints", storeFactories);
-          }
-          if (!backendFactoryStoreNames.containsKey(factoryName)) {
-            backendFactoryStoreNames.put(factoryName, new HashSet<>());
-          }
-          backendFactoryStoreNames.get(factoryName).add(storeName);
-        }
-      });
-    } else {
-      throw new SamzaException(String.format("Unsupported checkpoint version 
%s", checkpoint.getVersion()));
-    }
-    return backendFactoryStoreNames;
-  }
-
-  // Helper method to filter active Tasks from the container model
-  private static Map<TaskName, TaskModel> getTasks(ContainerModel 
containerModel, TaskMode taskMode) {
-    return containerModel.getTasks().entrySet().stream()
-        .filter(x -> 
x.getValue().getTaskMode().equals(taskMode)).collect(Collectors.toMap(Map.Entry::getKey,
 Map.Entry::getValue));
   }
 
-  /**
-   * Create taskStores for all stores in storesToCreate.
-   * The store mode is chosen as read-write mode.
-   */
-  private Map<TaskName, Map<String, StorageEngine>> 
createTaskStores(Set<String> storesToCreate,
-      ContainerModel containerModel, JobContext jobContext, ContainerContext 
containerContext,
-      Map<String, StorageEngineFactory<Object, Object>> 
storageEngineFactories, Map<String, Serde<Object>> serdes,
-      Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
-      Map<TaskName, TaskInstanceCollector> taskInstanceCollectors) {
-    Map<TaskName, Map<String, StorageEngine>> taskStores = new HashMap<>();
-    StorageConfig storageConfig = new StorageConfig(config);
-
-    // iterate over each task and each storeName
-    for (Map.Entry<TaskName, TaskModel> task : 
containerModel.getTasks().entrySet()) {
-      TaskName taskName = task.getKey();
-      TaskModel taskModel = task.getValue();
-      if (!taskStores.containsKey(taskName)) {
-        taskStores.put(taskName, new HashMap<>());
-      }
-
-      for (String storeName : storesToCreate) {
-        List<String> storeBackupManagers = 
storageConfig.getStoreBackupFactories(storeName);
-        // A store is considered durable if it is backed by a changelog or 
another backupManager factory
-        boolean isDurable = changelogSystemStreams.containsKey(storeName) || 
!storeBackupManagers.isEmpty();
-        boolean isSideInput = this.sideInputStoreNames.contains(storeName);
-        // Use the logged-store-base-directory for change logged stores and 
sideInput stores, and non-logged-store-base-dir
-        // for non logged stores
-        File storeBaseDir = isDurable || isSideInput ? 
this.loggedStoreBaseDirectory : this.nonLoggedStoreBaseDirectory;
-        File storeDirectory = storageManagerUtil.getTaskStoreDir(storeBaseDir, 
storeName, taskName,
-            taskModel.getTaskMode());
-        this.storeDirectoryPaths.add(storeDirectory.toPath());
-
-        // if taskInstanceMetrics are specified use those for store metrics,
-        // otherwise (in case of StorageRecovery) use a blank 
MetricsRegistryMap
-        MetricsRegistry storeMetricsRegistry =
-            taskInstanceMetrics.get(taskName) != null ? 
taskInstanceMetrics.get(taskName).registry() : new MetricsRegistryMap();
-
-        StorageEngine storageEngine =
-            createStore(storeName, storeDirectory, taskModel, jobContext, 
containerContext, storageEngineFactories,
-                serdes, storeMetricsRegistry, 
taskInstanceCollectors.get(taskName),
-                StorageEngineFactory.StoreMode.ReadWrite, 
this.changelogSystemStreams, this.config);
-
-        // add created store to map
-        taskStores.get(taskName).put(storeName, storageEngine);
-
-        LOG.info("Created task store {} in read-write mode for task {} in path 
{}", storeName, taskName, storeDirectory.getAbsolutePath());
-      }
-    }
-    return taskStores;
-  }
-
-  /**
-   * Method to instantiate a StorageEngine with the given parameters, and 
populate the storeDirectory paths (used to monitor
-   * disk space).
-   */
-  public static StorageEngine createStore(
-      String storeName,
-      File storeDirectory,
-      TaskModel taskModel,
-      JobContext jobContext,
-      ContainerContext containerContext,
-      Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
-      Map<String, Serde<Object>> serdes,
-      MetricsRegistry storeMetricsRegistry,
-      MessageCollector messageCollector,
-      StorageEngineFactory.StoreMode storeMode,
-      Map<String, SystemStream> changelogSystemStreams,
-      Config config) {
-
-    StorageConfig storageConfig = new StorageConfig(config);
-    SystemStreamPartition changeLogSystemStreamPartition = 
changelogSystemStreams.containsKey(storeName) ?
-        new SystemStreamPartition(changelogSystemStreams.get(storeName), 
taskModel.getChangelogPartition()) : null;
-
-    Optional<String> storageKeySerde = 
storageConfig.getStorageKeySerde(storeName);
-    Serde keySerde = null;
-    if (storageKeySerde.isPresent()) {
-      keySerde = serdes.get(storageKeySerde.get());
-    }
-    Optional<String> storageMsgSerde = 
storageConfig.getStorageMsgSerde(storeName);
-    Serde messageSerde = null;
-    if (storageMsgSerde.isPresent()) {
-      messageSerde = serdes.get(storageMsgSerde.get());
-    }
-
-    return storageEngineFactories.get(storeName)
-        .getStorageEngine(storeName, storeDirectory, keySerde, messageSerde, 
messageCollector,
-            storeMetricsRegistry, changeLogSystemStreamPartition, jobContext, 
containerContext, storeMode);
-  }
-
-
-  // Create sideInput store processors, one per store per task
-  private Map<TaskName, Map<String, SideInputsProcessor>> 
createSideInputProcessors(StorageConfig config,
-      ContainerModel containerModel, Map<TaskName, TaskInstanceMetrics> 
taskInstanceMetrics) {
-
-    Map<TaskName, Map<String, SideInputsProcessor>> 
sideInputStoresToProcessors = new HashMap<>();
-    containerModel.getTasks().forEach((taskName, taskModel) -> {
-      sideInputStoresToProcessors.put(taskName, new HashMap<>());
-      TaskMode taskMode = taskModel.getTaskMode();
-
-      for (String storeName : 
this.taskSideInputStoreSSPs.get(taskName).keySet()) {
-
-        SideInputsProcessor sideInputsProcessor;
-        Optional<String> sideInputsProcessorSerializedInstance =
-            config.getSideInputsProcessorSerializedInstance(storeName);
-
-        if (sideInputsProcessorSerializedInstance.isPresent()) {
-
-          sideInputsProcessor = SerdeUtils.deserialize("Side Inputs 
Processor", sideInputsProcessorSerializedInstance.get());
-          LOG.info("Using serialized side-inputs-processor for store: {}, 
task: {}", storeName, taskName);
-
-        } else if 
(config.getSideInputsProcessorFactory(storeName).isPresent()) {
-          String sideInputsProcessorFactoryClassName = 
config.getSideInputsProcessorFactory(storeName).get();
-          SideInputsProcessorFactory sideInputsProcessorFactory =
-              ReflectionUtil.getObj(sideInputsProcessorFactoryClassName, 
SideInputsProcessorFactory.class);
-          sideInputsProcessor = 
sideInputsProcessorFactory.getSideInputsProcessor(config, 
taskInstanceMetrics.get(taskName).registry());
-          LOG.info("Using side-inputs-processor from factory: {} for store: 
{}, task: {}", config.getSideInputsProcessorFactory(storeName).get(), 
storeName, taskName);
-
-        } else {
-          // if this is a active-task with a side-input store but no 
sideinput-processor-factory defined in config, we rely on upstream validations 
to fail the deploy
-
-          // if this is a standby-task and the store is a non-side-input 
changelog store
-          // we creating identity sideInputProcessor for stores of standbyTasks
-          // have to use the right serde because the sideInput stores are 
created
-
-          Serde keySerde = serdes.get(config.getStorageKeySerde(storeName)
-              .orElseThrow(() -> new SamzaException("Could not find storage 
key serde for store: " + storeName)));
-          Serde msgSerde = serdes.get(config.getStorageMsgSerde(storeName)
-              .orElseThrow(() -> new SamzaException("Could not find storage 
msg serde for store: " + storeName)));
-          sideInputsProcessor = new SideInputsProcessor() {
-            @Override
-            public Collection<Entry<?, ?>> process(IncomingMessageEnvelope 
message, KeyValueStore store) {
-              // Ignore message if the key is null
-              if (message.getKey() == null) {
-                return ImmutableList.of();
-              } else {
-                // Skip serde if the message is null
-                return ImmutableList.of(new 
Entry<>(keySerde.fromBytes((byte[]) message.getKey()),
-                    message.getMessage() == null ? null : 
msgSerde.fromBytes((byte[]) message.getMessage())));
-              }
-            }
-          };
-          LOG.info("Using identity side-inputs-processor for store: {}, task: 
{}", storeName, taskName);
-        }
-
-        sideInputStoresToProcessors.get(taskName).put(storeName, 
sideInputsProcessor);
-      }
-    });
-
-    return sideInputStoresToProcessors;
-  }
-
-  // Create task sideInput storage managers, one per task, index by the SSP 
they are responsible for consuming
-  private Map<SystemStreamPartition, TaskSideInputHandler> 
createSideInputHandlers(Clock clock) {
-    // creating sideInput store processors, one per store per task
-    Map<TaskName, Map<String, SideInputsProcessor>> taskSideInputProcessors =
-        createSideInputProcessors(new StorageConfig(config), 
this.containerModel, this.taskInstanceMetrics);
-
-    Map<SystemStreamPartition, TaskSideInputHandler> handlers = new 
HashMap<>();
-
-    if (this.hasSideInputs) {
-      containerModel.getTasks().forEach((taskName, taskModel) -> {
-
-        Map<String, StorageEngine> taskSideInputStores = 
sideInputStores.get(taskName);
-        Map<String, Set<SystemStreamPartition>> sideInputStoresToSSPs = new 
HashMap<>();
-        boolean taskHasSideInputs = false;
-        for (String storeName : taskSideInputStores.keySet()) {
-          Set<SystemStreamPartition> storeSSPs = 
this.taskSideInputStoreSSPs.get(taskName).get(storeName);
-          taskHasSideInputs = taskHasSideInputs || !storeSSPs.isEmpty();
-          sideInputStoresToSSPs.put(storeName, storeSSPs);
-        }
-
-        if (taskHasSideInputs) {
-          CountDownLatch taskCountDownLatch = new CountDownLatch(1);
-          this.sideInputTaskLatches.put(taskName, taskCountDownLatch);
-
-          TaskSideInputHandler taskSideInputHandler = new 
TaskSideInputHandler(taskName,
-              taskModel.getTaskMode(),
-              loggedStoreBaseDirectory,
-              taskSideInputStores,
-              sideInputStoresToSSPs,
-              taskSideInputProcessors.get(taskName),
-              this.systemAdmins,
-              this.streamMetadataCache,
-              taskCountDownLatch,
-              clock);
-
-          
sideInputStoresToSSPs.values().stream().flatMap(Set::stream).forEach(ssp -> {
-            handlers.put(ssp, taskSideInputHandler);
-          });
-
-          LOG.info("Created TaskSideInputHandler for task {}, 
taskSideInputStores {} and loggedStoreBaseDirectory {}",
-              taskName, taskSideInputStores, loggedStoreBaseDirectory);
-        }
-      });
-    }
-    return handlers;
-  }
-
-  private Set<TaskSideInputHandler> getSideInputHandlers() {
-    return 
this.sspSideInputHandlers.values().stream().collect(Collectors.toSet());
-  }
 
   public void start() throws SamzaException, InterruptedException {
-    // Restores and recreates
+    // Restores and recreates stores.
     restoreStores();
+
     // Shutdown restore executor since it will no longer be used
     try {
       restoreExecutor.shutdown();
       if 
(restoreExecutor.awaitTermination(RESTORE_THREAD_POOL_SHUTDOWN_TIMEOUT_SECONDS, 
TimeUnit.MILLISECONDS)) {
         restoreExecutor.shutdownNow();
       }
     } catch (Exception e) {
-      LOG.error(e.getMessage());
-    }
-    if (this.hasSideInputs) {
-      startSideInputs();
+      LOG.error("Error shutting down restore executor", e);
     }
+
+    // create and restore side input stores
+    this.sideInputsManager = new SideInputsManager(
+        sideInputSystemStreams, systemFactories, 
activeTaskChangelogSystemStreams, storageEngineFactories, storeDirectoryPaths, 
containerModel, jobContext, containerContext, samzaContainerMetrics, 
taskInstanceMetrics, taskInstanceCollectors, streamMetadataCache, systemAdmins, 
serdeManager,

Review Comment:
   shouldn't `activeTaskChangelogSystemStreams` be replaced by 
`changelogSystemStreams` passed in the constructor of 
`ContainerStorageManager`? 
   
   e.g., 
   ```
     */
     @VisibleForTesting
     Map<TaskName, Map<String, Set<SystemStreamPartition>>> 
getTaskSideInputSSPs(ContainerModel containerModel,
         Map<String, Set<SystemStream>> sideInputSystemStreams, Map<String, 
SystemStream> changelogSystemStreams) {
   ```
   method used in CSM prior to refactor which has the caller arguments
   ```this.taskSideInputStoreSSPs = getTaskSideInputSSPs(containerModel, 
sideInputSystemStreams, changelogSystemStreams);```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to