This is an automated email from the ASF dual-hosted git repository.
atoomula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 320359f SEP-19 : Refactoring sideInputs from SamzaContainer to
ContainerStorageManager (#912)
320359f is described below
commit 320359fe4aad6210f2eeb91d30e3a5117929ac4a
Author: rmatharu <[email protected]>
AuthorDate: Fri Mar 1 10:13:13 2019 -0800
SEP-19 : Refactoring sideInputs from SamzaContainer to
ContainerStorageManager (#912)
* Rocksdb bug fix
* Moving TaskSideInputStorageManagers in CSM
* Adding basic side input refactor. Consumption logic works from inside CSM.
* Cleaning up sideInput references from TaskInstances and SamzaContainer
* minor
* Adding periodic flush logic to CSM for sideInputs
* Removing sideInputs from taskInputs
* fixing test
* test fix
* test fix
* Code simplifications
* checkstyle
* Using sideInput storage manager for standby tasks
* Minor store dir integ
* Adding comment
* Adding merge function in case of duplicate keys
* fixing tests
* Addressing cosmetic review comments
* Added bound-flush and exception handling for sideInputs
* Adding documentation
* TestTaskStorageManager : updating test
* Adding taskMode to decide StorePartitionDir
* Adding taskMode to decide StorePartitionDir
minor
* minor
* minor refactor of CSM
* minor
* Addressing review comments
* minor change in CSM
---
.../apache/samza/storage/StorageManagerUtil.java | 11 +-
.../org/apache/samza/storage/StorageRecovery.java | 4 +-
.../samza/storage/TaskSideInputStorageManager.java | 23 +-
.../apache/samza/container/SamzaContainer.scala | 126 ++---
.../org/apache/samza/container/TaskInstance.scala | 73 +--
.../samza/storage/ContainerStorageManager.java | 542 ++++++++++++++++++---
.../apache/samza/storage/TaskStorageManager.scala | 5 +-
.../storage/TestTaskSideInputStorageManager.java | 3 +-
.../org/apache/samza/task/TestAsyncRunLoop.java | 2 -
.../samza/storage/TestContainerStorageManager.java | 4 +-
.../samza/storage/TestTaskStorageManager.scala | 136 +++---
.../apache/samza/monitor/LocalStoreMonitor.java | 6 +-
12 files changed, 628 insertions(+), 307 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
index 79e67e1..42f7f4b 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
@@ -28,6 +28,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
import org.apache.samza.serializers.model.SamzaObjectMapper;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemStreamPartition;
@@ -139,9 +140,9 @@ public class StorageManagerUtil {
* @param offsets The SSP-offset to write
* @throws IOException because of deserializing to json
*/
- public static void writeOffsetFile(File storeBaseDir, String storeName,
TaskName taskName,
+ public static void writeOffsetFile(File storeBaseDir, String storeName,
TaskName taskName, TaskMode taskMode,
Map<SystemStreamPartition, String> offsets) throws IOException {
- File offsetFile = new File(getStorePartitionDir(storeBaseDir, storeName,
taskName), OFFSET_FILE_NAME);
+ File offsetFile = new File(getStorePartitionDir(storeBaseDir, storeName,
taskName, taskMode), OFFSET_FILE_NAME);
String fileContents = OBJECT_WRITER.writeValueAsString(offsets);
FileUtil.writeWithChecksum(offsetFile, fileContents);
}
@@ -153,7 +154,7 @@ public class StorageManagerUtil {
* @param taskName the task name which is referencing the store
*/
public static void deleteOffsetFile(File storeBaseDir, String storeName,
TaskName taskName) {
- File offsetFile = new File(getStorePartitionDir(storeBaseDir, storeName,
taskName), OFFSET_FILE_NAME);
+ File offsetFile = new File(getStorePartitionDir(storeBaseDir, storeName,
taskName, TaskMode.Active), OFFSET_FILE_NAME);
if (offsetFile.exists()) {
FileUtil.rm(offsetFile);
}
@@ -207,9 +208,11 @@ public class StorageManagerUtil {
* @param storeBaseDir the base directory to use
* @param storeName the store name to use
* @param taskName the task name which is referencing the store
+ * @param taskMode the mode of the given task
* @return the partition directory for the store
*/
- public static File getStorePartitionDir(File storeBaseDir, String storeName,
TaskName taskName) {
+ public static File getStorePartitionDir(File storeBaseDir, String storeName,
TaskName taskName, TaskMode taskMode) {
+ // TODO: use task-Mode to decide the storePartitionDir -- standby's dir
should be the same as active
return new File(storeBaseDir, (storeName + File.separator +
taskName.toString()).replace(' ', '_'));
}
}
diff --git
a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
index cf8338a..4d01159 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -214,10 +214,10 @@ public class StorageRecovery extends CommandLine {
ContainerStorageManager containerStorageManager =
new ContainerStorageManager(containerModel, streamMetadataCache,
systemAdmins, changeLogSystemStreams,
- storageEngineFactories, systemFactories, this.getSerdes(),
jobConfig, new HashMap<>(),
+ new HashMap<>(), storageEngineFactories, systemFactories,
this.getSerdes(), jobConfig, new HashMap<>(),
new SamzaContainerMetrics(containerModel.getId(), new
MetricsRegistryMap()),
JobContextImpl.fromConfigWithDefaults(jobConfig),
containerContext, new HashMap<>(),
- storeBaseDir, storeBaseDir, maxPartitionNumber, new
SystemClock());
+ storeBaseDir, storeBaseDir, maxPartitionNumber, null, new
SystemClock());
this.containerStorageManagers.put(containerModel.getId(),
containerStorageManager);
}
}
diff --git
a/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
index 2b5717a..5e9c6cf 100644
---
a/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
+++
b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
@@ -39,6 +39,7 @@ import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.system.IncomingMessageEnvelope;
@@ -72,14 +73,16 @@ public class TaskSideInputStorageManager {
private final StreamMetadataCache streamMetadataCache;
private final SystemAdmins systemAdmins;
private final TaskName taskName;
+ private final TaskMode taskMode;
private final Map<SystemStreamPartition, String> lastProcessedOffsets = new
ConcurrentHashMap<>();
private Map<SystemStreamPartition, String> startingOffsets;
public TaskSideInputStorageManager(
TaskName taskName,
+ TaskMode taskMode,
StreamMetadataCache streamMetadataCache,
- String storeBaseDir,
+ File storeBaseDir,
Map<String, StorageEngine> sideInputStores,
Map<String, SideInputsProcessor> storesToProcessor,
Map<String, Set<SystemStreamPartition>> storesToSSPs,
@@ -88,11 +91,12 @@ public class TaskSideInputStorageManager {
Clock clock) {
this.clock = clock;
this.stores = sideInputStores;
- this.storeBaseDir = new File(storeBaseDir);
+ this.storeBaseDir = storeBaseDir;
this.storeToSSps = storesToSSPs;
this.streamMetadataCache = streamMetadataCache;
this.systemAdmins = systemAdmins;
this.taskName = taskName;
+ this.taskMode = taskMode;
this.storeToProcessor = storesToProcessor;
validateStoreConfiguration();
@@ -132,8 +136,9 @@ public class TaskSideInputStorageManager {
/**
* Flushes the contents of the underlying store and writes the offset file
to disk.
+ * Synchronized inorder to be exclusive with process()
*/
- public void flush() {
+ public synchronized void flush() {
LOG.info("Flushing the side input stores.");
stores.values().forEach(StorageEngine::flush);
writeOffsetFiles();
@@ -172,6 +177,11 @@ public class TaskSideInputStorageManager {
return startingOffsets.get(ssp);
}
+ // Get the taskName associated with this instance.
+ public TaskName getTaskName() {
+ return this.taskName;
+ }
+
/**
* Gets the last processed offset for the given side input {@link
SystemStreamPartition}.
*
@@ -192,10 +202,11 @@ public class TaskSideInputStorageManager {
/**
* Processes the incoming side input message envelope and updates the last
processed offset for its SSP.
+ * Synchronized inorder to be exclusive with flush().
*
* @param message incoming message to be processed
*/
- public void process(IncomingMessageEnvelope message) {
+ public synchronized void process(IncomingMessageEnvelope message) {
SystemStreamPartition ssp = message.getSystemStreamPartition();
Set<String> storeNames = sspsToStores.get(ssp);
@@ -249,7 +260,7 @@ public class TaskSideInputStorageManager {
.collect(Collectors.toMap(Function.identity(),
lastProcessedOffsets::get));
try {
- StorageManagerUtil.writeOffsetFile(storeBaseDir, storeName,
taskName, offsets);
+ StorageManagerUtil.writeOffsetFile(storeBaseDir, storeName,
taskName, taskMode, offsets);
} catch (Exception e) {
throw new SamzaException("Failed to write offset file for side
input store: " + storeName, e);
}
@@ -287,7 +298,7 @@ public class TaskSideInputStorageManager {
@VisibleForTesting
File getStoreLocation(String storeName) {
- return StorageManagerUtil.getStorePartitionDir(storeBaseDir, storeName,
taskName);
+ return StorageManagerUtil.getStorePartitionDir(storeBaseDir, storeName,
taskName, taskMode);
}
/**
diff --git
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index f8d799c..5df4678 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -26,7 +26,7 @@ import java.net.{URL, UnknownHostException}
import java.nio.file.Path
import java.time.Duration
import java.util
-import java.util.{Base64}
+import java.util.Base64
import java.util.concurrent.{ExecutorService, Executors,
ScheduledExecutorService, TimeUnit}
import com.google.common.annotations.VisibleForTesting
@@ -44,7 +44,7 @@ import
org.apache.samza.container.disk.DiskSpaceMonitor.Listener
import org.apache.samza.container.disk.{DiskQuotaPolicyFactory,
DiskSpaceMonitor, NoThrottlingDiskQuotaPolicyFactory,
PollingScanDiskSpaceMonitor}
import org.apache.samza.container.host.{StatisticsMonitorImpl,
SystemMemoryStatistics, SystemStatisticsMonitor}
import org.apache.samza.context._
-import org.apache.samza.job.model.{ContainerModel, JobModel}
+import org.apache.samza.job.model.{ContainerModel, JobModel, TaskMode}
import org.apache.samza.metadatastore.MetadataStoreFactory
import org.apache.samza.metrics.{JmxServer, JvmMetrics, MetricsRegistryMap,
MetricsReporter}
import org.apache.samza.serializers._
@@ -174,14 +174,25 @@ object SamzaContainer extends Logging {
.flatMap(_.getSystemStreamPartitions.asScala)
.toSet
+ val sideInputStoresToSystemStreams = config.getStoreNames
+ .map { storeName => (storeName, config.getSideInputs(storeName)) }
+ .filter { case (storeName, sideInputs) => sideInputs.nonEmpty }
+ .map { case (storeName, sideInputs) => (storeName,
sideInputs.map(StreamUtil.getSystemStreamFromNameOrId(config, _))) }
+ .toMap
+
+ val sideInputSystemStreams =
sideInputStoresToSystemStreams.values.flatMap(sideInputs =>
sideInputs.toStream).toSet
+
+ info("Got side input store system streams: %s" format
sideInputSystemStreams)
+
val inputSystemStreams = inputSystemStreamPartitions
.map(_.getSystemStream)
- .toSet
+ .toSet.diff(sideInputSystemStreams)
val inputSystems = inputSystemStreams
.map(_.getSystem)
.toSet
+
val systemNames = config.getSystemNames
info("Got system names: %s" format systemNames)
@@ -358,14 +369,6 @@ object SamzaContainer extends Logging {
info("Got intermediate streams: %s" format intermediateStreams)
- val sideInputStoresToSystemStreams = config.getStoreNames
- .map { storeName => (storeName, config.getSideInputs(storeName)) }
- .filter { case (storeName, sideInputs) => sideInputs.nonEmpty }
- .map { case (storeName, sideInputs) => (storeName,
sideInputs.map(StreamUtil.getSystemStreamFromNameOrId(config, _))) }
- .toMap
-
- info("Got side input store system streams: %s" format
sideInputStoresToSystemStreams)
-
val controlMessageKeySerdes = intermediateStreams
.flatMap(streamId => {
val systemStream = config.streamIdToSystemStream(streamId)
@@ -540,18 +543,17 @@ object SamzaContainer extends Logging {
val loggedStorageBaseDir = getLoggedStorageBaseDir(config,
defaultStoreBaseDir)
info("Got base directory for logged data stores: %s" format
loggedStorageBaseDir)
- val sideInputStorageEngineFactories =
storageEngineFactories.filterKeys(storeName =>
sideInputStoresToSystemStreams.contains(storeName))
- val nonSideInputStorageEngineFactories = (storageEngineFactories.toSet
diff sideInputStorageEngineFactories.toSet).toMap
-
val containerStorageManager = new ContainerStorageManager(containerModel,
streamMetadataCache, systemAdmins,
- changeLogSystemStreams.asJava,
nonSideInputStorageEngineFactories.asJava, systemFactories.asJava,
serdes.asJava, config,
+ changeLogSystemStreams.asJava,
sideInputStoresToSystemStreams.mapValues(systemStreamSet =>
systemStreamSet.toSet.asJava).asJava,
+ storageEngineFactories.asJava, systemFactories.asJava, serdes.asJava,
config,
taskInstanceMetrics.asJava, samzaContainerMetrics, jobContext,
containerContext, taskCollectors.asJava,
- loggedStorageBaseDir, nonLoggedStorageBaseDir,
maxChangeLogStreamPartitions, new SystemClock)
+ loggedStorageBaseDir, nonLoggedStorageBaseDir,
maxChangeLogStreamPartitions, serdeManager, new SystemClock)
storeWatchPaths.addAll(containerStorageManager.getStoreDirectoryPaths)
// Create taskInstances
- val taskInstances: Map[TaskName, TaskInstance] = taskModels.map(taskModel
=> {
+ val taskInstances: Map[TaskName, TaskInstance] = taskModels
+ .filter(taskModel =>
taskModel.getTaskMode.eq(TaskMode.Active)).map(taskModel => {
debug("Setting up task instance: %s" format taskModel)
val taskName = taskModel.getTaskName
@@ -561,46 +563,6 @@ object SamzaContainer extends Logging {
case tf: StreamTaskFactory =>
tf.asInstanceOf[StreamTaskFactory].createInstance()
}
- val sideInputStores = sideInputStorageEngineFactories.map {
- case (storeName, storageEngineFactory) =>
- val changeLogSystemStreamPartition = if
(changeLogSystemStreams.contains(storeName)) {
- new SystemStreamPartition(changeLogSystemStreams(storeName),
taskModel.getChangelogPartition)
- } else {
- null
- }
-
- val keySerde = config.getStorageKeySerde(storeName) match {
- case Some(keySerde) => serdes.getOrElse(keySerde,
- throw new SamzaException("StorageKeySerde: No class defined
for serde: %s." format keySerde))
- case _ => null
- }
-
- val msgSerde = config.getStorageMsgSerde(storeName) match {
- case Some(msgSerde) => serdes.getOrElse(msgSerde,
- throw new SamzaException("StorageMsgSerde: No class defined
for serde: %s." format msgSerde))
- case _ => null
- }
-
- // We use the logged storage base directory for side input stores
since side input stores
- // dont have changelog configured.
- val storeDir =
StorageManagerUtil.getStorePartitionDir(loggedStorageBaseDir, storeName,
taskName)
- storeWatchPaths.add(storeDir.toPath)
-
- val sideInputStorageEngine = storageEngineFactory.getStorageEngine(
- storeName,
- storeDir,
- keySerde,
- msgSerde,
- taskCollectors.get(taskName).get,
- taskInstanceMetrics.get(taskName).get.registry,
- changeLogSystemStreamPartition,
- jobContext,
- containerContext, StoreMode.ReadWrite)
- (storeName, sideInputStorageEngine)
- }
-
- info("Got side input stores: %s" format sideInputStores)
-
val taskSSPs = taskModel.getSystemStreamPartitions.asScala.toSet
info("Got task SSPs: %s" format taskSSPs)
@@ -608,19 +570,8 @@ object SamzaContainer extends Logging {
taskSSPs.filter(ssp =>
sideInputSystemStreams.contains(ssp.getSystemStream)).asJava)
val taskSideInputSSPs =
sideInputStoresToSSPs.values.flatMap(_.asScala).toSet
-
info ("Got task side input SSPs: %s" format taskSideInputSSPs)
- val sideInputStoresToProcessor = sideInputStores.keys.map(storeName => {
- // serialized instances takes precedence over the factory
configuration.
-
config.getSideInputsProcessorSerializedInstance(storeName).map(serializedInstance
=>
- (storeName, SerdeUtils.deserialize("Side Inputs Processor",
serializedInstance)))
-
.orElse(config.getSideInputsProcessorFactory(storeName).map(factoryClassName =>
- (storeName, Util.getObj(factoryClassName,
classOf[SideInputsProcessorFactory])
- .getSideInputsProcessor(config,
taskInstanceMetrics.get(taskName).get.registry))))
- .get
- }).toMap
-
val storageManager = new TaskStorageManager(
taskName = taskName,
containerStorageManager = containerStorageManager,
@@ -629,20 +580,6 @@ object SamzaContainer extends Logging {
loggedStoreBaseDir = loggedStorageBaseDir,
partition = taskModel.getChangelogPartition)
- var sideInputStorageManager: TaskSideInputStorageManager = null
- if (sideInputStores.nonEmpty) {
- sideInputStorageManager = new TaskSideInputStorageManager(
- taskName,
- streamMetadataCache,
- loggedStorageBaseDir.getPath,
- sideInputStores.asJava,
- sideInputStoresToProcessor.asJava,
- sideInputStoresToSSPs.asJava,
- systemAdmins,
- config,
- new SystemClock)
- }
-
val tableManager = new TableManager(config)
info("Got table manager")
@@ -658,13 +595,11 @@ object SamzaContainer extends Logging {
storageManager = storageManager,
tableManager = tableManager,
reporters = reporters,
- systemStreamPartitions = taskSSPs,
+ systemStreamPartitions = taskSSPs -- taskSideInputSSPs,
exceptionHandler =
TaskInstanceExceptionHandler(taskInstanceMetrics.get(taskName).get, config),
jobModel = jobModel,
streamMetadataCache = streamMetadataCache,
timerExecutor = timerExecutor,
- sideInputSSPs = taskSideInputSSPs,
- sideInputStorageManager = sideInputStorageManager,
jobContext = jobContext,
containerContext = containerContext,
applicationContainerContextOption =
applicationContainerContextOption,
@@ -838,7 +773,10 @@ class SamzaContainer(
containerListener.afterStart()
}
metrics.containerStartupTime.update(System.nanoTime() - startTime)
- runLoop.run
+ if (taskInstances.size > 0)
+ runLoop.run
+ else
+ Thread.sleep(Long.MaxValue)
} catch {
case e: Throwable =>
if (status.equals(SamzaContainerStatus.STARTED)) {
@@ -1014,12 +952,6 @@ class SamzaContainer(
def startStores {
info("Starting container storage manager.")
containerStorageManager.start()
-
- taskInstances.values.foreach(taskInstance => {
- val startTime = System.currentTimeMillis()
- info("Starting side inputs in task instance %s" format
taskInstance.taskName)
- taskInstance.startSideInputs
- })
}
def startTableManager: Unit = {
@@ -1056,9 +988,10 @@ class SamzaContainer(
taskInstances.values.foreach(_.registerConsumers)
- info("Starting consumer multiplexer.")
-
- consumerMultiplexer.start
+ if (taskInstances.size > 0) {
+ info("Starting consumer multiplexer.")
+ consumerMultiplexer.start
+ }
}
def startSecurityManger {
@@ -1155,9 +1088,6 @@ class SamzaContainer(
def shutdownStores {
info("Shutting down container storage manager.")
containerStorageManager.shutdown()
-
- info("Shutting down task instance side inputs.")
- taskInstances.values.foreach(_.shutdownSideInputs)
}
def shutdownTableManager: Unit = {
diff --git
a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index a8b8078..35ebb68 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -32,7 +32,7 @@ import org.apache.samza.job.model.{JobModel, TaskModel}
import org.apache.samza.metrics.MetricsReporter
import org.apache.samza.scheduler.{CallbackSchedulerImpl, ScheduledCallback}
import org.apache.samza.storage.kv.KeyValueStore
-import org.apache.samza.storage.{TaskSideInputStorageManager,
TaskStorageManager}
+import org.apache.samza.storage.{TaskStorageManager}
import org.apache.samza.system._
import org.apache.samza.table.TableManager
import org.apache.samza.task._
@@ -58,8 +58,6 @@ class TaskInstance(
jobModel: JobModel = null,
streamMetadataCache: StreamMetadataCache = null,
timerExecutor : ScheduledExecutorService = null,
- sideInputSSPs: Set[SystemStreamPartition] = Set(),
- sideInputStorageManager: TaskSideInputStorageManager = null,
jobContext: JobContext,
containerContext: ContainerContext,
applicationContainerContextOption: Option[ApplicationContainerContext],
@@ -80,8 +78,6 @@ class TaskInstance(
(storeName: String) => {
if (storageManager != null &&
storageManager.getStore(storeName).isDefined) {
storageManager.getStore(storeName).get.asInstanceOf[KeyValueStore[_,
_]]
- } else if (sideInputStorageManager != null &&
sideInputStorageManager.getStore(storeName) != null) {
-
sideInputStorageManager.getStore(storeName).asInstanceOf[KeyValueStore[_, _]]
} else {
null
}
@@ -117,18 +113,7 @@ class TaskInstance(
def registerOffsets {
debug("Registering offsets for taskName: %s" format taskName)
-
- val sspsToRegister = systemStreamPartitions -- sideInputSSPs
- offsetManager.register(taskName, sspsToRegister)
- }
-
- def startSideInputs {
- if (sideInputStorageManager != null) {
- debug("Starting side input storage manager for taskName: %s" format
taskName)
- sideInputStorageManager.init()
- } else {
- debug("Skipping side input storage manager initialization for taskName:
%s" format taskName)
- }
+ offsetManager.register(taskName, systemStreamPartitions)
}
def startTableManager {
@@ -168,11 +153,8 @@ class TaskInstance(
val startpoint = offsetManager.getStartpoint(taskName,
systemStreamPartition).getOrElse(null)
consumerMultiplexer.register(systemStreamPartition, startingOffset,
startpoint)
metrics.addOffsetGauge(systemStreamPartition, () =>
- if (sideInputSSPs.contains(systemStreamPartition)) {
- sideInputStorageManager.getLastProcessedOffset(systemStreamPartition)
- } else {
offsetManager.getLastProcessedOffset(taskName,
systemStreamPartition).orNull
- })
+ )
})
}
@@ -193,25 +175,22 @@ class TaskInstance(
trace("Processing incoming message envelope for taskName and SSP: %s, %s"
format (taskName, incomingMessageSsp))
- if (sideInputSSPs.contains(incomingMessageSsp) &&
!envelope.isEndOfStream) {
- sideInputStorageManager.process(envelope)
+ if (isAsyncTask) {
+ exceptionHandler.maybeHandle {
+ val callback = callbackFactory.createCallback()
+ task.asInstanceOf[AsyncStreamTask].processAsync(envelope, collector,
coordinator, callback)
+ }
} else {
- if (isAsyncTask) {
- exceptionHandler.maybeHandle {
- val callback = callbackFactory.createCallback()
- task.asInstanceOf[AsyncStreamTask].processAsync(envelope,
collector, coordinator, callback)
- }
- } else {
- exceptionHandler.maybeHandle {
- task.asInstanceOf[StreamTask].process(envelope, collector,
coordinator)
- }
+ exceptionHandler.maybeHandle {
+ task.asInstanceOf[StreamTask].process(envelope, collector,
coordinator)
+ }
- trace("Updating offset map for taskName, SSP and offset: %s, %s, %s"
- format (taskName, incomingMessageSsp, envelope.getOffset))
+ trace("Updating offset map for taskName, SSP and offset: %s, %s, %s"
+ format(taskName, incomingMessageSsp, envelope.getOffset))
- offsetManager.update(taskName, incomingMessageSsp,
envelope.getOffset)
- }
+ offsetManager.update(taskName, incomingMessageSsp, envelope.getOffset)
}
+
}
}
@@ -263,11 +242,6 @@ class TaskInstance(
tableManager.flush
}
- trace("Flushing side input stores for taskName: %s" format taskName)
- if (sideInputStorageManager != null) {
- sideInputStorageManager.flush()
- }
-
trace("Checkpointing offsets for taskName: %s" format taskName)
offsetManager.writeCheckpoint(taskName, checkpoint)
@@ -295,15 +269,6 @@ class TaskInstance(
}
}
- def shutdownSideInputs {
- if (sideInputStorageManager != null) {
- debug("Shutting down side input storage manager for taskName: %s" format
taskName)
- sideInputStorageManager.stop()
- } else {
- debug("Skipping side input storage manager shutdown for taskName: %s"
format taskName)
- }
- }
-
def shutdownTableManager {
if (tableManager != null) {
debug("Shutting down table manager for taskName: %s" format taskName)
@@ -357,13 +322,7 @@ class TaskInstance(
}
private def getStartingOffset(systemStreamPartition: SystemStreamPartition)
= {
- val offset =
- if (sideInputSSPs.contains(systemStreamPartition)) {
-
Option(sideInputStorageManager.getStartingOffset(systemStreamPartition))
- } else {
- offsetManager.getStartingOffset(taskName, systemStreamPartition)
- }
-
+ val offset = offsetManager.getStartingOffset(taskName,
systemStreamPartition)
val startingOffset = offset.getOrElse(
throw new SamzaException("No offset defined for SystemStreamPartition:
%s" format systemStreamPartition))
diff --git
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index 3b86a4e..ad9637d 100644
---
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -19,10 +19,13 @@
package org.apache.samza.storage;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.File;
import java.nio.file.Path;
+import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -31,40 +34,61 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import org.apache.commons.collections4.MapUtils;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.SamzaContainerMetrics;
import org.apache.samza.container.TaskInstanceMetrics;
import org.apache.samza.container.TaskName;
import org.apache.samza.context.ContainerContext;
import org.apache.samza.context.JobContext;
import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.TaskMode;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeManager;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemConsumers;
+import org.apache.samza.system.SystemConsumersMetrics;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.SystemStreamPartitionIterator;
+import org.apache.samza.system.chooser.DefaultChooser;
+import org.apache.samza.system.chooser.MessageChooser;
+import org.apache.samza.system.chooser.RoundRobinChooserFactory;
+import org.apache.samza.table.utils.SerdeUtils;
import org.apache.samza.task.TaskInstanceCollector;
import org.apache.samza.util.Clock;
import org.apache.samza.util.FileUtil;
+import org.apache.samza.util.ScalaJavaUtil;
+import org.apache.samza.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.collection.JavaConversions;
import scala.collection.JavaConverters;
@@ -78,10 +102,18 @@ import scala.collection.JavaConverters;
*
* b) performing individual task stores' restores in parallel.
*
+ * and
+ * c) restoring sideInputs.
+ * It provides bootstrap semantics for sideinputs -- the main thread is
blocked until
+ * all sideInputSSPs have not caught up. Side input store flushes are not in
sync with task-commit, although
+ * they happen at the same frequency.
+ * In case, where a user explicitly requests a task-commit, it will not
include committing side inputs.
*/
public class ContainerStorageManager {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerStorageManager.class);
private static final String RESTORE_THREAD_NAME = "Samza Restore Thread-%d";
+ private static final String SIDEINPUTS_FLUSH_THREAD_NAME = "SideInputs Flush
Thread";
+ private static final String SIDEINPUTS_METRICS_NAME =
"samza-container-%s-sideinputs";
/** Maps containing relevant per-task objects */
private final Map<TaskName, Map<String, StorageEngine>> taskStores;
@@ -89,10 +121,11 @@ public class ContainerStorageManager {
private final Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics;
private final Map<TaskName, TaskInstanceCollector> taskInstanceCollectors;
- private final Map<String, SystemConsumer> systemConsumers; // Mapping from
storeSystemNames to SystemConsumers
+ private final Map<String, SystemConsumer> storeConsumers; // Mapping from
store name to SystemConsumers
private final Map<String, StorageEngineFactory<Object, Object>>
storageEngineFactories; // Map of storageEngineFactories indexed by store name
private final Map<String, SystemStream> changelogSystemStreams; // Map of
changelog system-streams indexed by store name
private final Map<String, Serde<Object>> serdes; // Map of Serde objects
indexed by serde name (specified in config)
+ private final SystemAdmins systemAdmins;
private final StreamMetadataCache streamMetadataCache;
private final SamzaContainerMetrics samzaContainerMetrics;
@@ -109,19 +142,42 @@ public class ContainerStorageManager {
private final int parallelRestoreThreadPoolSize;
private final int maxChangeLogStreamPartitions; // The partition count of
each changelog-stream topic. This is used for validating changelog streams
before restoring.
+ /* Sideinput related parameters */
+ private final Map<String, Set<SystemStream>> sideInputSystemStreams; // Map
of side input system-streams indexed by store name
+ private final Map<TaskName, Map<String, Set<SystemStreamPartition>>>
taskSideInputSSPs;
+ private final Map<SystemStreamPartition, TaskSideInputStorageManager>
sideInputStorageManagers; // Map of sideInput storageManagers indexed by ssp,
for simpler lookup for process()
+ private final Map<String, SystemConsumer> sideInputConsumers; // Mapping
from storeSystemNames to SystemConsumers
+ private SystemConsumers sideInputSystemConsumers;
+ private final Map<SystemStreamPartition,
SystemStreamMetadata.SystemStreamPartitionMetadata> initialSideInputSSPMetadata
+ = new ConcurrentHashMap<>(); // Recorded sspMetadata of the
taskSideInputSSPs recorded at start, used to determine when sideInputs are
caughtup and container init can proceed
+ private volatile CountDownLatch sideInputsCaughtUp; // Used by the
sideInput-read thread to signal to the main thread
+ private volatile boolean shutDownSideInputRead = false;
+ private final ScheduledExecutorService sideInputsFlushExecutor =
Executors.newSingleThreadScheduledExecutor(
+ new
ThreadFactoryBuilder().setDaemon(true).setNameFormat(SIDEINPUTS_FLUSH_THREAD_NAME).build());
+ private ScheduledFuture sideInputsFlushFuture;
+ private static final Duration SIDE_INPUT_FLUSH_TIMEOUT =
Duration.ofMinutes(1);
+ private volatile Optional<Throwable> sideInputException = Optional.empty();
+
private final Config config;
public ContainerStorageManager(ContainerModel containerModel,
StreamMetadataCache streamMetadataCache,
SystemAdmins systemAdmins, Map<String, SystemStream>
changelogSystemStreams,
+ Map<String, Set<SystemStream>> sideInputSystemStreams,
Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
Map<String, SystemFactory> systemFactories, Map<String, Serde<Object>>
serdes, Config config,
Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
SamzaContainerMetrics samzaContainerMetrics,
JobContext jobContext, ContainerContext containerContext,
Map<TaskName, TaskInstanceCollector> taskInstanceCollectors, File
loggedStoreBaseDirectory,
- File nonLoggedStoreBaseDirectory, int maxChangeLogStreamPartitions,
Clock clock) {
+ File nonLoggedStoreBaseDirectory, int maxChangeLogStreamPartitions,
SerdeManager serdeManager, Clock clock) {
this.containerModel = containerModel;
- this.changelogSystemStreams = changelogSystemStreams;
+ this.sideInputSystemStreams = new HashMap<>(sideInputSystemStreams);
+ this.taskSideInputSSPs = getTaskSideInputSSPs(containerModel,
sideInputSystemStreams);
+
+ this.changelogSystemStreams = getChangelogSystemStreams(containerModel,
changelogSystemStreams); // handling standby tasks
+
+ LOG.info("Starting with changelogSystemStreams = {} sideInputSystemStreams
= {}", this.changelogSystemStreams, this.sideInputSystemStreams);
+
this.storageEngineFactories = storageEngineFactories;
this.serdes = serdes;
this.loggedStoreBaseDirectory = loggedStoreBaseDirectory;
@@ -148,32 +204,111 @@ public class ContainerStorageManager {
this.maxChangeLogStreamPartitions = maxChangeLogStreamPartitions;
this.streamMetadataCache = streamMetadataCache;
+ this.systemAdmins = systemAdmins;
// create taskStores for all tasks in the containerModel and each store in
storageEngineFactories
- this.taskStores = createTaskStores(containerModel, jobContext,
containerContext, storageEngineFactories, changelogSystemStreams,
- serdes, taskInstanceMetrics, taskInstanceCollectors,
StorageEngineFactory.StoreMode.BulkLoad);
+ this.taskStores = createTaskStores(containerModel, jobContext,
containerContext, storageEngineFactories, serdes, taskInstanceMetrics,
taskInstanceCollectors);
- // create system consumers (1 per store system)
- this.systemConsumers = createStoreConsumers(changelogSystemStreams,
systemFactories, config, this.samzaContainerMetrics.registry());
+ // create system consumers (1 per store system in changelogSystemStreams),
and index it by storeName
+ Map<String, SystemConsumer> storeSystemConsumers =
createConsumers(this.changelogSystemStreams.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
+ e -> Collections.singleton(e.getValue()))), systemFactories, config,
this.samzaContainerMetrics.registry());
+ this.storeConsumers = createStoreIndexedMap(this.changelogSystemStreams,
storeSystemConsumers);
// creating task restore managers
this.taskRestoreManagers = createTaskRestoreManagers(systemAdmins, clock);
+
+ // create side input storage managers
+ sideInputStorageManagers = createSideInputStorageManagers(clock);
+
+ // create side Input consumers indexed by systemName
+ this.sideInputConsumers = createConsumers(this.sideInputSystemStreams,
systemFactories, config, this.samzaContainerMetrics.registry());
+
+ // create SystemConsumers for consuming from taskSideInputSSPs, if
sideInputs are being used
+ if (sideInputsPresent()) {
+
+ scala.collection.immutable.Map<SystemStream, SystemStreamMetadata>
inputStreamMetadata =
streamMetadataCache.getStreamMetadata(JavaConversions.asScalaSet(
+
this.sideInputSystemStreams.values().stream().flatMap(Set::stream).collect(Collectors.toSet())).toSet(),
false);
+
+ SystemConsumersMetrics systemConsumersMetrics = new
SystemConsumersMetrics(
+ new MetricsRegistryMap(String.format(SIDEINPUTS_METRICS_NAME,
containerModel.getId())));
+
+ MessageChooser chooser = DefaultChooser.apply(inputStreamMetadata, new
RoundRobinChooserFactory(), config,
+ systemConsumersMetrics.registry(), systemAdmins);
+
+ sideInputSystemConsumers =
+ new SystemConsumers(chooser,
ScalaJavaUtil.toScalaMap(this.sideInputConsumers), serdeManager,
+ systemConsumersMetrics,
SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT(),
SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR(),
+ SystemConsumers.DEFAULT_POLL_INTERVAL_MS(),
ScalaJavaUtil.toScalaFunction(() -> System.nanoTime()));
+ }
+
+ }
+
+ /**
+ * Add all side inputs to a map of maps, indexed first by taskName, then by
sideInput store name.
+ *
+ * @param containerModel the containerModel to use
+ * @param sideInputSystemStreams the map of store to side input system stream
+ * @return taskSideInputSSPs map
+ */
+ private Map<TaskName, Map<String, Set<SystemStreamPartition>>>
getTaskSideInputSSPs(ContainerModel containerModel, Map<String,
Set<SystemStream>> sideInputSystemStreams) {
+ Map<TaskName, Map<String, Set<SystemStreamPartition>>> taskSideInputSSPs =
new HashMap<>();
+
+ containerModel.getTasks().forEach((taskName, taskModel) -> {
+ sideInputSystemStreams.keySet().forEach(storeName -> {
+ Set<SystemStreamPartition> taskSideInputs =
taskModel.getSystemStreamPartitions().stream().filter(ssp ->
sideInputSystemStreams.get(storeName).contains(ssp.getSystemStream())).collect(Collectors.toSet());
+ taskSideInputSSPs.putIfAbsent(taskName, new HashMap<>());
+ taskSideInputSSPs.get(taskName).put(storeName, taskSideInputs);
+ });
+ });
+ return taskSideInputSSPs;
}
/**
+ * For each standby task, we remove its changeLogSSPs from changelogSSP map
and add it to the task's taskSideInputSSPs.
+ * The task's sideInputManager will consume and restore these as well.
+ *
+ * @param containerModel the container's model
+ * @param changelogSystemStreams the passed in set of changelogSystemStreams
+ * @return A map of changeLogSSP to storeName across all tasks, assuming no
two stores have the same changelogSSP
+ */
+ private Map<String, SystemStream> getChangelogSystemStreams(ContainerModel
containerModel, Map<String, SystemStream> changelogSystemStreams) {
+
+ if (MapUtils.invertMap(changelogSystemStreams).size() !=
changelogSystemStreams.size()) {
+ throw new SamzaException("Two stores cannot have the same changelog
system-stream");
+ }
+
+ Map<SystemStreamPartition, String> changelogSSPToStore = new HashMap<>();
+ changelogSystemStreams.forEach((storeName, systemStream) ->
+ containerModel.getTasks().forEach((taskName, taskModel) -> {
changelogSSPToStore.put(new SystemStreamPartition(systemStream,
taskModel.getChangelogPartition()), storeName); })
+ );
+
+ getTasks(containerModel, TaskMode.Standby).forEach((taskName, taskModel)
-> {
+ changelogSystemStreams.forEach((storeName, systemStream) -> {
+ SystemStreamPartition ssp = new
SystemStreamPartition(systemStream, taskModel.getChangelogPartition());
+ changelogSSPToStore.remove(ssp);
+ this.taskSideInputSSPs.putIfAbsent(taskName, new HashMap<>());
+ this.sideInputSystemStreams.put(storeName,
Collections.singleton(ssp.getSystemStream()));
+ this.taskSideInputSSPs.get(taskName).put(storeName,
Collections.singleton(ssp));
+ });
+ });
+
+ // changelogSystemStreams correspond only to active tasks (since those of
standby-tasks moved to side inputs above)
+ return
MapUtils.invertMap(changelogSSPToStore).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
x -> x.getValue().getSystemStream()));
+ }
+
+
+ /**
* Creates SystemConsumer objects for store restoration, creating one
consumer per system.
*/
- private static Map<String, SystemConsumer> createStoreConsumers(Map<String,
SystemStream> changelogSystemStreams,
+ private static Map<String, SystemConsumer> createConsumers(Map<String,
Set<SystemStream>> systemStreams,
Map<String, SystemFactory> systemFactories, Config config,
MetricsRegistry registry) {
// Determine the set of systems being used across all stores
Set<String> storeSystems =
-
changelogSystemStreams.values().stream().map(SystemStream::getSystem).collect(Collectors.toSet());
+
systemStreams.values().stream().flatMap(Set::stream).map(SystemStream::getSystem).collect(Collectors.toSet());
// Create one consumer for each system in use, map with one entry for each
such system
Map<String, SystemConsumer> storeSystemConsumers = new HashMap<>();
- // Map of each storeName to its respective systemConsumer
- Map<String, SystemConsumer> storeConsumers = new HashMap<>();
// Iterate over the list of storeSystems and create one sysConsumer per
system
for (String storeSystemName : storeSystems) {
@@ -185,29 +320,45 @@ public class ContainerStorageManager {
systemFactory.getConsumer(storeSystemName, config, registry));
}
+ return storeSystemConsumers;
+
+ }
+
+ private static Map<String, SystemConsumer> createStoreIndexedMap(Map<String,
SystemStream> changelogSystemStreams,
+ Map<String, SystemConsumer> storeSystemConsumers) {
+ // Map of each storeName to its respective systemConsumer
+ Map<String, SystemConsumer> storeConsumers = new HashMap<>();
+
// Populate the map of storeName to its relevant systemConsumer
for (String storeName : changelogSystemStreams.keySet()) {
storeConsumers.put(storeName,
storeSystemConsumers.get(changelogSystemStreams.get(storeName).getSystem()));
}
-
return storeConsumers;
}
private Map<TaskName, TaskRestoreManager>
createTaskRestoreManagers(SystemAdmins systemAdmins, Clock clock) {
Map<TaskName, TaskRestoreManager> taskRestoreManagers = new HashMap<>();
- containerModel.getTasks().forEach((taskName, taskModel) ->
- taskRestoreManagers.put(taskName, new TaskRestoreManager(taskModel,
changelogSystemStreams, taskStores.get(taskName), systemAdmins, clock)));
+ containerModel.getTasks().forEach((taskName, taskModel) -> {
+ taskRestoreManagers.put(taskName,
+ new TaskRestoreManager(taskModel, changelogSystemStreams,
getNonSideInputStores(taskName), systemAdmins, clock));
+ });
return taskRestoreManagers;
}
+ // Helper method to filter active Tasks from the container model
+ private static Map<TaskName, TaskModel> getTasks(ContainerModel
containerModel, TaskMode taskMode) {
+ return containerModel.getTasks().entrySet().stream()
+ .filter(x ->
x.getValue().getTaskMode().equals(taskMode)).collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+ }
+
/**
- * Create taskStores with the given store mode for all stores in
storageEngineFactories.
+ * Create taskStores for all stores in storageEngineFactories.
+ * The store mode is chosen as bulk-load if its a non-sideinput store, and
readWrite if its a side input store
*/
private Map<TaskName, Map<String, StorageEngine>>
createTaskStores(ContainerModel containerModel, JobContext jobContext,
ContainerContext containerContext,
- Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
- Map<String, SystemStream> changelogSystemStreams, Map<String,
Serde<Object>> serdes,
+ Map<String, StorageEngineFactory<Object, Object>>
storageEngineFactories, Map<String, Serde<Object>> serdes,
Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
- Map<TaskName, TaskInstanceCollector> taskInstanceCollectors,
StorageEngineFactory.StoreMode storeMode) {
+ Map<TaskName, TaskInstanceCollector> taskInstanceCollectors) {
Map<TaskName, Map<String, StorageEngine>> taskStores = new HashMap<>();
@@ -222,14 +373,16 @@ public class ContainerStorageManager {
for (String storeName : storageEngineFactories.keySet()) {
+ StorageEngineFactory.StoreMode storeMode =
this.sideInputSystemStreams.containsKey(storeName) ?
+ StorageEngineFactory.StoreMode.ReadWrite :
StorageEngineFactory.StoreMode.BulkLoad;
+
StorageEngine storageEngine =
- createStore(storeName, taskName, taskModel, jobContext,
containerContext, storageEngineFactories,
- changelogSystemStreams, serdes, taskInstanceMetrics,
taskInstanceCollectors, storeMode);
+ createStore(storeName, taskName, taskModel, jobContext,
containerContext, storageEngineFactories, serdes, taskInstanceMetrics,
taskInstanceCollectors, storeMode);
// add created store to map
taskStores.get(taskName).put(storeName, storageEngine);
- LOG.info("Created store {} for task {}", storeName, taskName);
+ LOG.info("Created store {} for task {} in mode {}", storeName,
taskName, storeMode);
}
}
@@ -237,31 +390,28 @@ public class ContainerStorageManager {
}
/**
- * Recreate all persistent stores in ReadWrite mode.
+ * Recreate all non-sideInput persistent stores in ReadWrite mode.
*
*/
private void recreatePersistentTaskStoresInReadWriteMode(ContainerModel
containerModel, JobContext jobContext,
ContainerContext containerContext, Map<String,
StorageEngineFactory<Object, Object>> storageEngineFactories,
- Map<String, SystemStream> changelogSystemStreams, Map<String,
Serde<Object>> serdes,
- Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
+ Map<String, Serde<Object>> serdes, Map<TaskName, TaskInstanceMetrics>
taskInstanceMetrics,
Map<TaskName, TaskInstanceCollector> taskInstanceCollectors) {
// iterate over each task and each storeName
for (Map.Entry<TaskName, TaskModel> task :
containerModel.getTasks().entrySet()) {
TaskName taskName = task.getKey();
TaskModel taskModel = task.getValue();
+ Map<String, StorageEngine> nonSideInputStores =
getNonSideInputStores(taskName);
- for (String storeName : storageEngineFactories.keySet()) {
+ for (String storeName : nonSideInputStores.keySet()) {
- // if this store has been already created in the taskStores, then
re-create and overwrite it only if it is a persistentStore
- if (this.taskStores.get(taskName).containsKey(storeName) &&
this.taskStores.get(taskName)
- .get(storeName)
- .getStoreProperties()
- .isPersistedToDisk()) {
+ // if this store has been already created then re-create and overwrite
it only if it is a
+ // persistentStore and a non-sideInputStore, because sideInputStores
are always created in RW mode
+ if
(nonSideInputStores.get(storeName).getStoreProperties().isPersistedToDisk()) {
StorageEngine storageEngine =
- createStore(storeName, taskName, taskModel, jobContext,
containerContext, storageEngineFactories,
- changelogSystemStreams, serdes, taskInstanceMetrics,
taskInstanceCollectors,
+ createStore(storeName, taskName, taskModel, jobContext,
containerContext, storageEngineFactories, serdes, taskInstanceMetrics,
taskInstanceCollectors,
StorageEngineFactory.StoreMode.ReadWrite);
// add created store to map
@@ -269,8 +419,7 @@ public class ContainerStorageManager {
LOG.info("Re-created store {} in read-write mode for task {} because
it a persistent store", storeName, taskName);
} else {
-
- LOG.info("Skipping re-creation of store {} for task {} because it a
non-persistent store", storeName, taskName);
+ LOG.info("Skipping re-creation of store {} for task {}", storeName,
taskName);
}
}
}
@@ -282,8 +431,7 @@ public class ContainerStorageManager {
*/
private StorageEngine createStore(String storeName, TaskName taskName,
TaskModel taskModel, JobContext jobContext,
ContainerContext containerContext, Map<String,
StorageEngineFactory<Object, Object>> storageEngineFactories,
- Map<String, SystemStream> changelogSystemStreams, Map<String,
Serde<Object>> serdes,
- Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
+ Map<String, Serde<Object>> serdes, Map<TaskName, TaskInstanceMetrics>
taskInstanceMetrics,
Map<TaskName, TaskInstanceCollector> taskInstanceCollectors,
StorageEngineFactory.StoreMode storeMode) {
StorageConfig storageConfig = new StorageConfig(config);
@@ -292,11 +440,15 @@ public class ContainerStorageManager {
(changelogSystemStreams.containsKey(storeName)) ? new
SystemStreamPartition(
changelogSystemStreams.get(storeName),
taskModel.getChangelogPartition()) : null;
- // Use the logged-store-base-directory for change logged stores, and
non-logged-store-base-dir for non logged stores
- File storeDirectory =
- (changeLogSystemStreamPartition != null) ?
StorageManagerUtil.getStorePartitionDir(this.loggedStoreBaseDirectory,
- storeName, taskName)
- :
StorageManagerUtil.getStorePartitionDir(this.nonLoggedStoreBaseDirectory,
storeName, taskName);
+ // Use the logged-store-base-directory for change logged stores and side
input stores, and non-logged-store-base-dir
+ // for non logged stores
+ File storeDirectory;
+ if (changeLogSystemStreamPartition != null ||
sideInputSystemStreams.containsKey(storeName)) {
+ storeDirectory =
StorageManagerUtil.getStorePartitionDir(this.loggedStoreBaseDirectory,
storeName, taskName, taskModel.getTaskMode());
+ } else {
+ storeDirectory =
StorageManagerUtil.getStorePartitionDir(this.nonLoggedStoreBaseDirectory,
storeName, taskName, taskModel.getTaskMode());
+ }
+
this.storeDirectoryPaths.add(storeDirectory.toPath());
if (storageConfig.getStorageKeySerde(storeName).isEmpty()) {
@@ -330,18 +482,118 @@ public class ContainerStorageManager {
storeMetricsRegistry, changeLogSystemStreamPartition, jobContext,
containerContext, storeMode);
}
+
+ // Create side input store processors, one per store per task
+ private Map<TaskName, Map<String, SideInputsProcessor>>
createSideInputProcessors(StorageConfig config, ContainerModel containerModel,
+ Map<String, Set<SystemStream>> sideInputSystemStreams, Map<TaskName,
TaskInstanceMetrics> taskInstanceMetrics) {
+
+ Map<TaskName, Map<String, SideInputsProcessor>>
sideInputStoresToProcessors = new HashMap<>();
+ getTasks(containerModel, TaskMode.Active).forEach((taskName, taskModel) ->
{
+ sideInputStoresToProcessors.put(taskName, new HashMap<>());
+ for (String storeName : sideInputSystemStreams.keySet()) {
+ if
(config.getSideInputsProcessorSerializedInstance(storeName).isDefined()) {
+ sideInputStoresToProcessors.get(taskName)
+ .put(storeName, SerdeUtils.deserialize("Side Inputs Processor",
+
config.getSideInputsProcessorSerializedInstance(storeName).get()));
+ } else {
+ sideInputStoresToProcessors.get(taskName)
+ .put(storeName,
Util.getObj(config.getSideInputsProcessorFactory(storeName).get(),
+
SideInputsProcessorFactory.class).getSideInputsProcessor(config,
+ taskInstanceMetrics.get(taskName).registry()));
+ }
+ }
+ });
+
+ // creating identity sideInputProcessor for stores of standbyTasks
+ getTasks(containerModel, TaskMode.Standby).forEach((taskName, taskModel)
-> {
+ sideInputStoresToProcessors.put(taskName, new HashMap<>());
+ for (String storeName : sideInputSystemStreams.keySet()) {
+
+ // have to use the right serde because the sideInput stores are
created
+ Serde keySerde = serdes.get(new
StorageConfig(config).getStorageKeySerde(storeName).get());
+ Serde msgSerde = serdes.get(new
StorageConfig(config).getStorageMsgSerde(storeName).get());
+ sideInputStoresToProcessors.get(taskName).put(storeName, new
SideInputsProcessor() {
+ @Override
+ public Collection<Entry<?, ?>> process(IncomingMessageEnvelope
message, KeyValueStore store) {
+ return ImmutableList.of(new Entry<>(keySerde.fromBytes((byte[])
message.getKey()), msgSerde.fromBytes((byte[]) message.getMessage())));
+ }
+ });
+ }
+ });
+
+ return sideInputStoresToProcessors;
+ }
+
+ // Create task side input storage managers, one per task, index by the SSP
they are responsible for consuming
+ private Map<SystemStreamPartition, TaskSideInputStorageManager>
createSideInputStorageManagers(Clock clock) {
+
+ // creating side input store processors, one per store per task
+ Map<TaskName, Map<String, SideInputsProcessor>> taskSideInputProcessors =
createSideInputProcessors(new StorageConfig(config), this.containerModel,
this.sideInputSystemStreams, this.taskInstanceMetrics);
+
+ Map<SystemStreamPartition, TaskSideInputStorageManager>
sideInputStorageManagers = new HashMap<>();
+
+ if (sideInputsPresent()) {
+ containerModel.getTasks().forEach((taskName, taskModel) -> {
+
+ Map<String, StorageEngine> sideInputStores =
getSideInputStores(taskName);
+ Map<String, Set<SystemStreamPartition>> sideInputStoresToSSPs = new
HashMap<>();
+
+ for (String storeName : sideInputStores.keySet()) {
+ Set<SystemStreamPartition> storeSSPs =
taskSideInputSSPs.get(taskName).get(storeName);
+ sideInputStoresToSSPs.put(storeName, storeSSPs);
+ }
+
+ TaskSideInputStorageManager taskSideInputStorageManager =
+ new TaskSideInputStorageManager(taskName,
taskModel.getTaskMode(), streamMetadataCache,
+ loggedStoreBaseDirectory, sideInputStores,
taskSideInputProcessors.get(taskName), sideInputStoresToSSPs,
+ systemAdmins, config, clock);
+
+
sideInputStoresToSSPs.values().stream().flatMap(Set::stream).forEach(ssp -> {
+ sideInputStorageManagers.put(ssp, taskSideInputStorageManager);
+ });
+
+ LOG.info("Created taskSideInputStorageManager for task {},
sideInputStores {} and loggedStoreBaseDirectory {}",
+ taskName, sideInputStores, loggedStoreBaseDirectory);
+ });
+ }
+ return sideInputStorageManagers;
+ }
+
+ private Map<String, StorageEngine> getSideInputStores(TaskName taskName) {
+ return taskStores.get(taskName).entrySet().stream().
+ filter(e ->
sideInputSystemStreams.containsKey(e.getKey())).collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+ }
+
+ private Map<String, StorageEngine> getNonSideInputStores(TaskName taskName) {
+ return taskStores.get(taskName).entrySet().stream().
+ filter(e ->
!sideInputSystemStreams.containsKey(e.getKey())).collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+ }
+
+ private Set<TaskSideInputStorageManager> getSideInputStorageManagers() {
+ return
this.sideInputStorageManagers.values().stream().collect(Collectors.toSet());
+ }
+
+
public void start() throws SamzaException {
- LOG.info("Restore started");
+ restoreStores();
+ if (sideInputsPresent()) {
+ startSideInputs();
+ }
+ }
+
+ // Restoration of all stores, in parallel across tasks
+ private void restoreStores() {
+ LOG.info("Store Restore started");
// initialize each TaskStorageManager
this.taskRestoreManagers.values().forEach(taskStorageManager ->
taskStorageManager.initialize());
- // Start consumers
- this.systemConsumers.values().forEach(systemConsumer ->
systemConsumer.start());
+ // Start store consumers
+ this.storeConsumers.values().forEach(systemConsumer ->
systemConsumer.start());
// Create a thread pool for parallel restores (and stopping of persistent
stores)
ExecutorService executorService =
Executors.newFixedThreadPool(this.parallelRestoreThreadPoolSize,
- new ThreadFactoryBuilder().setNameFormat(RESTORE_THREAD_NAME).build());
+ new
ThreadFactoryBuilder().setDaemon(true).setNameFormat(RESTORE_THREAD_NAME).build());
List<Future> taskRestoreFutures = new
ArrayList<>(this.taskRestoreManagers.entrySet().size());
@@ -364,14 +616,148 @@ public class ContainerStorageManager {
executorService.shutdown();
- // Stop consumers
- this.systemConsumers.values().forEach(systemConsumer ->
systemConsumer.stop());
+ // Stop store consumers
+ this.storeConsumers.values().forEach(systemConsumer ->
systemConsumer.stop());
// Now re-create persistent stores in read-write mode, leave
non-persistent stores as-is
recreatePersistentTaskStoresInReadWriteMode(this.containerModel,
jobContext, containerContext,
- storageEngineFactories, changelogSystemStreams, serdes,
taskInstanceMetrics, taskInstanceCollectors);
+ storageEngineFactories, serdes, taskInstanceMetrics,
taskInstanceCollectors);
+
+ LOG.info("Store Restore complete");
+ }
+
+ // Read sideInputs until all sideInputStreams are caughtup, so start() can
return
+ private void startSideInputs() {
+
+ LOG.info("SideInput Restore started");
+
+ // initialize the sideInputStorageManagers
+ getSideInputStorageManagers().forEach(sideInputStorageManager ->
sideInputStorageManager.init());
+
+ // start the checkpointing thread at the commit-ms frequency
+ sideInputsFlushFuture = sideInputsFlushExecutor.scheduleWithFixedDelay(new
Runnable() {
+ @Override
+ public void run() {
+ try {
+ getSideInputStorageManagers().forEach(sideInputStorageManager ->
sideInputStorageManager.flush());
+ } catch (Exception e) {
+ LOG.error("Exception during flushing side inputs", e);
+ sideInputException = Optional.of(e);
+ }
+ }
+ }, 0, new TaskConfig(config).getCommitMs(), TimeUnit.MILLISECONDS);
+
+ // set the latch to the number of sideInput SSPs
+ this.sideInputsCaughtUp = new
CountDownLatch(this.sideInputStorageManagers.keySet().size());
+
+ // register all side input SSPs with the consumers
+ for (SystemStreamPartition ssp : sideInputStorageManagers.keySet()) {
+ String startingOffset =
sideInputStorageManagers.get(ssp).getStartingOffset(ssp);
+
+ if (startingOffset == null) {
+ throw new SamzaException("No offset defined for SideInput
SystemStreamPartition : " + ssp);
+ }
+
+ // register startingOffset with the sysConsumer and register a metric
for it
+ sideInputSystemConsumers.register(ssp, startingOffset, null);
+
taskInstanceMetrics.get(sideInputStorageManagers.get(ssp).getTaskName()).addOffsetGauge(
+ ssp, ScalaJavaUtil.toScalaFunction(() ->
sideInputStorageManagers.get(ssp).getLastProcessedOffset(ssp)));
+
+ SystemStreamMetadata systemStreamMetadata =
streamMetadataCache.getSystemStreamMetadata(ssp.getSystemStream(), false);
+ SystemStreamMetadata.SystemStreamPartitionMetadata sspMetadata =
+ (systemStreamMetadata == null) ? null :
systemStreamMetadata.getSystemStreamPartitionMetadata().get(ssp.getPartition());
+
+ // record a copy of the sspMetadata, to later check if its caught up
+ initialSideInputSSPMetadata.put(ssp, sspMetadata);
+
+ // check if the ssp is caught to upcoming, even at start
+ checkSideInputCaughtUp(ssp, startingOffset,
SystemStreamMetadata.OffsetType.UPCOMING, false);
+ }
+
+ // start the systemConsumers for consuming input
+ this.sideInputSystemConsumers.start();
+
+ // create a thread for sideInput reads
+ Thread readSideInputs = new Thread(() -> {
+ while (!shutDownSideInputRead) {
+ IncomingMessageEnvelope envelope =
sideInputSystemConsumers.choose(true);
+ if (envelope != null) {
+
+ if (!envelope.isEndOfStream())
+
sideInputStorageManagers.get(envelope.getSystemStreamPartition()).process(envelope);
+
+ checkSideInputCaughtUp(envelope.getSystemStreamPartition(),
envelope.getOffset(),
+ SystemStreamMetadata.OffsetType.NEWEST,
envelope.isEndOfStream());
- LOG.info("Restore complete");
+ } else {
+ LOG.trace("No incoming message was available");
+ }
+ }
+ });
+
+ readSideInputs.setDaemon(true);
+ readSideInputs.setUncaughtExceptionHandler(new
Thread.UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ sideInputException = Optional.of(e);
+ sideInputsCaughtUp.countDown();
+ }
+ });
+
+ try {
+ readSideInputs.start();
+ // Make the main thread wait until all sideInputs have been caughtup or
thrown an exception
+ this.sideInputsCaughtUp.await();
+
+ if (sideInputException.isPresent()) { // Throw exception if there was an
exception in catching-up sideInputs
+ // TODO: SAMZA-2113 relay exception to main thread
+ throw new SamzaException("Exception in restoring side inputs",
sideInputException.get());
+ }
+ } catch (InterruptedException e) {
+ sideInputException = Optional.of(e);
+ throw new SamzaException("Side inputs read was interrupted", e);
+ }
+
+ LOG.info("SideInput Restore complete");
+ }
+
+ private boolean sideInputsPresent() {
+ return !this.sideInputSystemStreams.isEmpty();
+ }
+
+ // Method to check if the given offset means the stream is caught up for
reads
+ private void checkSideInputCaughtUp(SystemStreamPartition ssp, String
offset, SystemStreamMetadata.OffsetType offsetType, boolean isEndOfStream) {
+
+ if (isEndOfStream) {
+ this.initialSideInputSSPMetadata.remove(ssp);
+ this.sideInputsCaughtUp.countDown();
+ LOG.info("Side input ssp {} has caught up to offset {} ({}).", ssp,
offset, offsetType);
+ return;
+ }
+
+ SystemStreamMetadata.SystemStreamPartitionMetadata sspMetadata =
this.initialSideInputSSPMetadata.get(ssp);
+ String offsetToCheck = sspMetadata == null ? null :
sspMetadata.getOffset(offsetType);
+ LOG.trace("Checking {} offset {} against {} for {}.", offsetType, offset,
offsetToCheck, ssp);
+
+ // Let's compare offset of the chosen message with offsetToCheck.
+ Integer comparatorResult;
+ if (offset == null || offsetToCheck == null) {
+ comparatorResult = -1;
+ } else {
+ SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(ssp.getSystem());
+ comparatorResult = systemAdmin.offsetComparator(offset, offsetToCheck);
+ }
+
+ // The SSP is no longer lagging if the envelope's offset is greater than
or equal to the
+ // latest offset.
+ if (comparatorResult != null && comparatorResult.intValue() >= 0) {
+
+ LOG.info("Side input ssp {} has caught up to offset {} ({}).", ssp,
offset, offsetType);
+ // if its caught up, we remove the ssp from the map, and countDown the
latch
+ this.initialSideInputSSPMetadata.remove(ssp);
+ this.sideInputsCaughtUp.countDown();
+ return;
+ }
}
/**
@@ -407,15 +793,30 @@ public class ContainerStorageManager {
}
public void shutdown() {
- this.taskRestoreManagers.forEach((taskInstance, taskRestoreManager) -> {
- if (taskRestoreManager != null) {
- LOG.debug("Shutting down task storage manager for taskName: {} ",
taskInstance);
- taskRestoreManager.stop();
- } else {
- LOG.debug("Skipping task storage manager shutdown for taskName: {}",
taskInstance);
- }
- });
+ // stop all nonsideinputstores including persistent and non-persistent
stores
+ this.containerModel.getTasks().forEach((taskName, taskModel) ->
+ getNonSideInputStores(taskName).forEach((storeName, store) ->
store.stop())
+ );
+
+ // stop all sideinput consumers and stores
+ if (sideInputsPresent()) {
+ // stop reading sideInputs
+ this.shutDownSideInputRead = true;
+
+ this.sideInputSystemConsumers.stop();
+ // cancel all future sideInput flushes, shutdown the executor, and await
for finish
+ sideInputsFlushFuture.cancel(false);
+ sideInputsFlushExecutor.shutdown();
+ try {
+
sideInputsFlushExecutor.awaitTermination(SIDE_INPUT_FLUSH_TIMEOUT.toMillis(),
TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ throw new SamzaException("Exception while shutting down side inputs",
e);
+ }
+
+ // stop all sideInputStores -- this will perform one last flush on the
KV stores, and write the offset file
+ this.getSideInputStorageManagers().forEach(sideInputStorageManager ->
sideInputStorageManager.stop());
+ }
LOG.info("Shutdown complete");
}
@@ -514,7 +915,7 @@ public class ContainerStorageManager {
taskStores.keySet().forEach(storeName -> {
File nonLoggedStorePartitionDir =
-
StorageManagerUtil.getStorePartitionDir(nonLoggedStoreBaseDirectory, storeName,
taskModel.getTaskName());
+
StorageManagerUtil.getStorePartitionDir(nonLoggedStoreBaseDirectory, storeName,
taskModel.getTaskName(), taskModel.getTaskMode());
LOG.info("Got non logged storage partition directory as " +
nonLoggedStorePartitionDir.toPath().toString());
if (nonLoggedStorePartitionDir.exists()) {
@@ -523,7 +924,7 @@ public class ContainerStorageManager {
}
File loggedStorePartitionDir =
-
StorageManagerUtil.getStorePartitionDir(loggedStoreBaseDirectory, storeName,
taskModel.getTaskName());
+
StorageManagerUtil.getStorePartitionDir(loggedStoreBaseDirectory, storeName,
taskModel.getTaskName(), taskModel.getTaskMode());
LOG.info("Got logged storage partition directory as " +
loggedStorePartitionDir.toPath().toString());
// Delete the logged store if it is not valid.
@@ -579,7 +980,7 @@ public class ContainerStorageManager {
if (storageEngine.getStoreProperties().isLoggedStore()) {
File loggedStorePartitionDir =
-
StorageManagerUtil.getStorePartitionDir(loggedStoreBaseDirectory, storeName,
taskModel.getTaskName());
+
StorageManagerUtil.getStorePartitionDir(loggedStoreBaseDirectory, storeName,
taskModel.getTaskName(), taskModel.getTaskMode());
LOG.info("Using logged storage partition directory: " +
loggedStorePartitionDir.toPath().toString()
+ " for store: " + storeName);
@@ -589,7 +990,7 @@ public class ContainerStorageManager {
}
} else {
File nonLoggedStorePartitionDir =
-
StorageManagerUtil.getStorePartitionDir(nonLoggedStoreBaseDirectory, storeName,
taskModel.getTaskName());
+
StorageManagerUtil.getStorePartitionDir(nonLoggedStoreBaseDirectory, storeName,
taskModel.getTaskName(), taskModel.getTaskMode());
LOG.info("Using non logged storage partition directory: " +
nonLoggedStorePartitionDir.toPath().toString()
+ " for store: " + storeName);
nonLoggedStorePartitionDir.mkdirs();
@@ -659,7 +1060,7 @@ public class ContainerStorageManager {
SystemStreamPartition systemStreamPartition =
new SystemStreamPartition(changelogSystemStreamEntry.getValue(),
taskModel.getChangelogPartition());
SystemAdmin systemAdmin =
systemAdmins.getSystemAdmin(changelogSystemStreamEntry.getValue().getSystem());
- SystemConsumer systemConsumer =
systemConsumers.get(changelogSystemStreamEntry.getKey());
+ SystemConsumer systemConsumer =
storeConsumers.get(changelogSystemStreamEntry.getKey());
String offset = getStartingOffset(systemStreamPartition, systemAdmin);
@@ -708,7 +1109,7 @@ public class ContainerStorageManager {
LOG.debug("Restoring stores for task: {}", taskModel.getTaskName());
for (String storeName : taskStoresToRestore) {
- SystemConsumer systemConsumer = systemConsumers.get(storeName);
+ SystemConsumer systemConsumer = storeConsumers.get(storeName);
SystemStream systemStream = changelogSystemStreams.get(storeName);
SystemStreamPartitionIterator systemStreamPartitionIterator = new
SystemStreamPartitionIterator(systemConsumer,
@@ -732,11 +1133,16 @@ public class ContainerStorageManager {
* can invoke compaction.
*/
public void stopPersistentStores() {
- this.taskStores.values().stream().filter(storageEngine -> {
- return storageEngine.getStoreProperties().isPersistedToDisk();
- }).forEach(storageEngine -> {
- storageEngine.stop();
- });
+
+ Map<String, StorageEngine> persistentStores =
this.taskStores.entrySet().stream().filter(e -> {
+ return e.getValue().getStoreProperties().isPersistedToDisk();
+ }).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+ persistentStores.forEach((storeName, storageEngine) -> {
+ storageEngine.stop();
+ this.taskStores.remove(storeName);
+ });
+ LOG.info("Stopped persistent stores {}", persistentStores);
}
}
}
diff --git
a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
index 85cbf53..c2a2c7e 100644
---
a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
+++
b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
@@ -24,6 +24,7 @@ import java.io._
import com.google.common.annotations.VisibleForTesting
import org.apache.samza.Partition
import org.apache.samza.container.TaskName
+import org.apache.samza.job.model.TaskMode
import org.apache.samza.system._
import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
import org.apache.samza.util.{FileUtil, Logging}
@@ -90,7 +91,9 @@ class TaskStorageManager(
if (newestOffset != null) {
debug("Storing offset for store in OFFSET file ")
- StorageManagerUtil.writeOffsetFile(loggedStoreBaseDir, storeName,
taskName, Map(ssp -> newestOffset).asJava)
+
+ // TaskStorageManagers are only spun-up for active tasks
+ StorageManagerUtil.writeOffsetFile(loggedStoreBaseDir, storeName,
taskName, TaskMode.Active, Map(ssp -> newestOffset).asJava)
debug("Successfully stored offset %s for store %s in OFFSET file "
format(newestOffset, storeName))
} else {
//if newestOffset is null, then it means the store is (or has
become) empty. No need to persist the offset file
diff --git
a/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputStorageManager.java
b/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputStorageManager.java
index 2d60f7b..a7cefa0 100644
---
a/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputStorageManager.java
+++
b/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputStorageManager.java
@@ -30,6 +30,7 @@ import java.util.stream.IntStream;
import org.apache.samza.Partition;
import org.apache.samza.config.Config;
import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemAdmins;
@@ -288,7 +289,7 @@ public class TestTaskSideInputStorageManager {
}
TaskSideInputStorageManager build() {
- return spy(new TaskSideInputStorageManager(taskName,
streamMetadataCache, storeBaseDir, stores,
+ return spy(new TaskSideInputStorageManager(taskName, TaskMode.Active,
streamMetadataCache, new File(storeBaseDir), stores,
storeToProcessor, storeToSSps, systemAdmins, mock(Config.class),
clock));
}
}
diff --git
a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
index 06a4ebf..acaecdb 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
@@ -103,8 +103,6 @@ public class TestAsyncRunLoop {
null,
null,
null,
- new scala.collection.immutable.HashSet<>(),
- null,
mock(JobContext.class),
mock(ContainerContext.class),
Option.apply(null),
diff --git
a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
index 9517eec..99e133f 100644
---
a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
+++
b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
@@ -204,10 +204,10 @@ public class TestContainerStorageManager {
// Create the container storage manager
this.containerStorageManager =
new ContainerStorageManager(new ContainerModel("samza-container-test",
tasks), mockStreamMetadataCache,
- mockSystemAdmins, changelogSystemStreams, storageEngineFactories,
systemFactories, serdes, config,
+ mockSystemAdmins, changelogSystemStreams, new HashMap<>(),
storageEngineFactories, systemFactories, serdes, config,
taskInstanceMetrics, samzaContainerMetrics,
Mockito.mock(JobContext.class),
Mockito.mock(ContainerContext.class), Mockito.mock(Map.class),
DEFAULT_LOGGED_STORE_BASE_DIR,
- DEFAULT_STORE_BASE_DIR, 2, new SystemClock());
+ DEFAULT_STORE_BASE_DIR, 2, null, new SystemClock());
}
@Test
diff --git
a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
index 8e22533..424a0d0 100644
---
a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
+++
b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
@@ -27,7 +27,7 @@ import org.apache.samza.Partition
import org.apache.samza.config._
import org.apache.samza.container.{SamzaContainerMetrics, TaskInstanceMetrics,
TaskName}
import org.apache.samza.context.{ContainerContext, JobContext}
-import org.apache.samza.job.model.{ContainerModel, TaskModel}
+import org.apache.samza.job.model.{ContainerModel, TaskMode, TaskModel}
import org.apache.samza.serializers.{Serde, StringSerdeFactory}
import org.apache.samza.storage.StoreProperties.StorePropertiesBuilder
import
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
@@ -37,7 +37,7 @@ import org.apache.samza.util.{FileUtil, SystemClock}
import org.junit.Assert._
import org.junit.{After, Before, Test}
import org.mockito.Matchers._
-import org.mockito.Mockito
+import org.mockito.{Matchers, Mockito}
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
@@ -55,9 +55,9 @@ class TestTaskStorageManager extends MockitoSugar {
@Before
def setupTestDirs() {
-
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultStoreBaseDir,
store, taskName)
+
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultStoreBaseDir,
store, taskName, TaskMode.Active)
.mkdirs()
-
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
loggedStore, taskName)
+
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
loggedStore, taskName, TaskMode.Active)
.mkdirs()
}
@@ -67,6 +67,10 @@ class TestTaskStorageManager extends MockitoSugar {
FileUtil.rm(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir)
}
+ def getStreamName(storeName : String): String = {
+ "testStream-"+storeName
+ }
+
/**
* This tests the entire TaskStorageManager lifecycle for a Persisted
Logged Store
* For example, a RocksDb store with changelog needs to continuously update
the offset file on flush & stop
@@ -75,10 +79,10 @@ class TestTaskStorageManager extends MockitoSugar {
@Test
def testStoreLifecycleForLoggedPersistedStore(): Unit = {
// Basic test setup of SystemStream, SystemStreamPartition for this task
- val ss = new SystemStream("kafka", "testStream")
+ val ss = new SystemStream("kafka", getStreamName(loggedStore))
val partition = new Partition(0)
val ssp = new SystemStreamPartition(ss, partition)
- val storeDirectory =
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
loggedStore, taskName)
+ val storeDirectory =
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
loggedStore, taskName, TaskMode.Active)
val storeFile = new File(storeDirectory, "store.sst")
val offsetFile = new File(storeDirectory,
StorageManagerUtil.OFFSET_FILE_NAME)
@@ -89,14 +93,14 @@ class TestTaskStorageManager extends MockitoSugar {
val mockSSPMetadataCache = mock[SSPMetadataCache]
val mockSystemConsumer = mock[SystemConsumer]
val mockSystemAdmin = mock[SystemAdmin]
- val changelogSpec = StreamSpec.createChangeLogStreamSpec("testStream",
"kafka", 1)
+ val changelogSpec =
StreamSpec.createChangeLogStreamSpec(getStreamName(loggedStore), "kafka", 1)
doNothing().when(mockSystemAdmin).validateStream(changelogSpec)
doNothing().when(mockSystemConsumer).stop()
// Test 1: Initial invocation - No store on disk (only changelog has data)
// Setup initial sspMetadata
var sspMetadata = new SystemStreamPartitionMetadata("0", "50", "51")
- var metadata = new SystemStreamMetadata("testStream", new
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
+ var metadata = new SystemStreamMetadata(getStreamName(loggedStore), new
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
{
put(partition, sspMetadata)
}
@@ -121,18 +125,18 @@ class TestTaskStorageManager extends MockitoSugar {
// Test 2: flush should update the offset file
taskManager.flush()
assertTrue(offsetFile.exists())
- assertEquals("{\"kafka.testStream.0\":\"50\"}",
FileUtil.readWithChecksum(offsetFile))
+ assertEquals("{\"kafka.testStream-loggedStore1.0\":\"50\"}",
FileUtil.readWithChecksum(offsetFile))
// Test 3: Update sspMetadata before shutdown and verify that offset file
is updated correctly
when(mockSSPMetadataCache.getMetadata(ssp)).thenReturn(new
SystemStreamPartitionMetadata("0", "100", "101"))
taskManager.stop()
assertTrue(storeFile.exists())
assertTrue(offsetFile.exists())
- assertEquals("{\"kafka.testStream.0\":\"100\"}",
FileUtil.readWithChecksum(offsetFile))
+ assertEquals("{\"kafka.testStream-loggedStore1.0\":\"100\"}",
FileUtil.readWithChecksum(offsetFile))
// Test 4: Initialize again with an updated sspMetadata; Verify that it
restores from the correct offset
sspMetadata = new SystemStreamPartitionMetadata("0", "150", "151")
- metadata = new SystemStreamMetadata("testStream", new
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
+ metadata = new SystemStreamMetadata(getStreamName(loggedStore), new
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
{
put(partition, sspMetadata)
}
@@ -165,17 +169,17 @@ class TestTaskStorageManager extends MockitoSugar {
@Test
def testStoreLifecycleForLoggedInMemoryStore(): Unit = {
// Basic test setup of SystemStream, SystemStreamPartition for this task
- val ss = new SystemStream("kafka", "testStream")
+ val ss = new SystemStream("kafka", getStreamName(store))
val partition = new Partition(0)
val ssp = new SystemStreamPartition(ss, partition)
- val storeDirectory =
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
store, taskName)
+ val storeDirectory =
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
store, taskName, TaskMode.Active)
val mockStorageEngine: StorageEngine =
createMockStorageEngine(isLoggedStore = true, isPersistedStore = false, null)
// Mock for StreamMetadataCache, SystemConsumer, SystemAdmin
val mockStreamMetadataCache = mock[StreamMetadataCache]
val mockSystemAdmin = mock[SystemAdmin]
- val changelogSpec = StreamSpec.createChangeLogStreamSpec("testStream",
"kafka", 1)
+ val changelogSpec =
StreamSpec.createChangeLogStreamSpec(getStreamName(store), "kafka", 1)
doNothing().when(mockSystemAdmin).validateStream(changelogSpec)
val mockSystemConsumer = mock[SystemConsumer]
@@ -184,7 +188,7 @@ class TestTaskStorageManager extends MockitoSugar {
// Test 1: Initial invocation - No store data (only changelog has data)
// Setup initial sspMetadata
val sspMetadata = new SystemStreamPartitionMetadata("0", "50", "51")
- var metadata = new SystemStreamMetadata("testStream", new
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
+ var metadata = new SystemStreamMetadata(getStreamName(store), new
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
{
put(partition, sspMetadata)
}
@@ -208,7 +212,7 @@ class TestTaskStorageManager extends MockitoSugar {
assertTrue(storeDirectory.list().isEmpty)
// Test 3: Update sspMetadata before shutdown and verify that offset file
is NOT created
- metadata = new SystemStreamMetadata("testStream", new
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
+ metadata = new SystemStreamMetadata(getStreamName(store), new
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
{
put(partition, new SystemStreamPartitionMetadata("0", "100", "101"))
}
@@ -218,7 +222,7 @@ class TestTaskStorageManager extends MockitoSugar {
assertTrue(storeDirectory.list().isEmpty)
// Test 4: Initialize again with an updated sspMetadata; Verify that it
restores from the earliest offset
- metadata = new SystemStreamMetadata("testStream", new
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
+ metadata = new SystemStreamMetadata(getStreamName(store), new
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
{
put(partition, new SystemStreamPartitionMetadata("0", "150", "151"))
}
@@ -241,9 +245,9 @@ class TestTaskStorageManager extends MockitoSugar {
@Test
def testStoreDirsWithoutOffsetFileAreDeletedInCleanBaseDirs() {
- val checkFilePath1 = new
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultStoreBaseDir,
store, taskName), "check")
+ val checkFilePath1 = new
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultStoreBaseDir,
store, taskName, TaskMode.Active), "check")
checkFilePath1.createNewFile()
- val checkFilePath2 = new
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
loggedStore, taskName), "check")
+ val checkFilePath2 = new
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
loggedStore, taskName, TaskMode.Active), "check")
checkFilePath2.createNewFile()
val taskStorageManager = new TaskStorageManagerBuilder()
@@ -259,7 +263,7 @@ class TestTaskStorageManager extends MockitoSugar {
@Test
def testLoggedStoreDirsWithOffsetFileAreNotDeletedInCleanBaseDirs() {
- val offsetFilePath = new
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
loggedStore, taskName), StorageManagerUtil.OFFSET_FILE_NAME)
+ val offsetFilePath = new
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
loggedStore, taskName, TaskMode.Active), StorageManagerUtil.OFFSET_FILE_NAME)
FileUtil.writeWithChecksum(offsetFilePath, "100")
val taskStorageManager = new TaskStorageManagerBuilder()
@@ -275,7 +279,7 @@ class TestTaskStorageManager extends MockitoSugar {
def testStoreDeletedWhenOffsetFileOlderThanDeleteRetention() {
// This test ensures that store gets deleted when lastModifiedTime of the
offset file
// is older than deletionRetention of the changeLog.
- val storeDirectory =
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
loggedStore, taskName)
+ val storeDirectory =
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
loggedStore, taskName, TaskMode.Active)
storeDirectory.setLastModified(0)
val offsetFile = new File(storeDirectory,
StorageManagerUtil.OFFSET_FILE_NAME)
offsetFile.createNewFile()
@@ -294,7 +298,7 @@ class TestTaskStorageManager extends MockitoSugar {
@Test
def testOffsetFileIsRemovedInCleanBaseDirsForInMemoryLoggedStore() {
- val offsetFilePath = new
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
loggedStore, taskName), StorageManagerUtil.OFFSET_FILE_NAME)
+ val offsetFilePath = new
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
loggedStore, taskName, TaskMode.Active), StorageManagerUtil.OFFSET_FILE_NAME)
FileUtil.writeWithChecksum(offsetFilePath, "100")
val taskStorageManager = new TaskStorageManagerBuilder()
@@ -310,22 +314,22 @@ class TestTaskStorageManager extends MockitoSugar {
def testStopCreatesOffsetFileForLoggedStore() {
val partition = new Partition(0)
- val storeDirectory =
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
loggedStore, taskName)
+ val storeDirectory =
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
loggedStore, taskName, TaskMode.Active)
val offsetFile = new File(storeDirectory,
StorageManagerUtil.OFFSET_FILE_NAME)
val sspMetadataCache = mock[SSPMetadataCache]
val sspMetadata = new SystemStreamPartitionMetadata("20", "100", "101")
- when(sspMetadataCache.getMetadata(new SystemStreamPartition("kafka",
"testStream", partition)))
+ when(sspMetadataCache.getMetadata(new SystemStreamPartition("kafka",
getStreamName(loggedStore), partition)))
.thenReturn(sspMetadata)
- var metadata = new SystemStreamMetadata("testStream", new
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
+ var metadata = new SystemStreamMetadata(getStreamName(loggedStore), new
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
{
put(partition, sspMetadata)
}
})
val mockStreamMetadataCache = mock[StreamMetadataCache]
- when(mockStreamMetadataCache.getStreamMetadata(any(),
any())).thenReturn(Map(new SystemStream("kafka", "testStream") -> metadata))
+ when(mockStreamMetadataCache.getStreamMetadata(any(),
any())).thenReturn(Map(new SystemStream("kafka", getStreamName(loggedStore)) ->
metadata))
//Build TaskStorageManager
val taskStorageManager = new TaskStorageManagerBuilder()
@@ -341,7 +345,7 @@ class TestTaskStorageManager extends MockitoSugar {
//Check conditions
assertTrue("Offset file doesn't exist!", offsetFile.exists())
- assertEquals("Found incorrect value in offset file!",
"{\"kafka.testStream.0\":\"100\"}", FileUtil.readWithChecksum(offsetFile))
+ assertEquals("Found incorrect value in offset file!",
"{\"kafka.testStream-loggedStore1.0\":\"100\"}",
FileUtil.readWithChecksum(offsetFile))
}
/**
@@ -351,31 +355,23 @@ class TestTaskStorageManager extends MockitoSugar {
def testFlushCreatesOffsetFileForLoggedStore() {
val partition = new Partition(0)
- val offsetFilePath = new
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
loggedStore, taskName) + File.separator + StorageManagerUtil.OFFSET_FILE_NAME)
+ val offsetFilePath = new
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
loggedStore, taskName, TaskMode.Active) + File.separator +
StorageManagerUtil.OFFSET_FILE_NAME)
val anotherOffsetPath = new File(
- StorageManagerUtil.getStorePartitionDir(
- TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, store, taskName)
+ File.separator + StorageManagerUtil.OFFSET_FILE_NAME)
+
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
store, taskName, TaskMode.Active) + File.separator +
StorageManagerUtil.OFFSET_FILE_NAME)
val sspMetadataCache = mock[SSPMetadataCache]
val sspMetadata = new SystemStreamPartitionMetadata("20", "100", "101")
- when(sspMetadataCache.getMetadata(new SystemStreamPartition("kafka",
"testStream", partition)))
+ when(sspMetadataCache.getMetadata(new SystemStreamPartition("kafka",
getStreamName(loggedStore), partition)))
+ .thenReturn(sspMetadata)
+ when(sspMetadataCache.getMetadata(new SystemStreamPartition("kafka",
getStreamName(store), partition)))
.thenReturn(sspMetadata)
-
- var metadata = new SystemStreamMetadata("testStream", new
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
- {
- put(partition, sspMetadata)
- }
- })
-
- val mockStreamMetadataCache = mock[StreamMetadataCache]
- when(mockStreamMetadataCache.getStreamMetadata(any(),
any())).thenReturn(Map(new SystemStream("kafka", "testStream") -> metadata))
//Build TaskStorageManager
val taskStorageManager = new TaskStorageManagerBuilder()
.addLoggedStore(loggedStore, true)
.addStore(store, false)
.setSSPMetadataCache(sspMetadataCache)
- .setStreamMetadataCache(mockStreamMetadataCache)
+ .setStreamMetadataCache(createMockStreamMetadataCache("20", "100",
"101"))
.setPartition(partition)
.initializeContainerStorageManager()
.build
@@ -385,7 +381,7 @@ class TestTaskStorageManager extends MockitoSugar {
//Check conditions
assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
- assertEquals("Found incorrect value in offset file!",
"{\"kafka.testStream.0\":\"100\"}", FileUtil.readWithChecksum(offsetFilePath))
+ assertEquals("Found incorrect value in offset file!",
"{\"kafka.testStream-loggedStore1.0\":\"100\"}",
FileUtil.readWithChecksum(offsetFilePath))
assertTrue("Offset file got created for a store that is not persisted to
the disk!!", !anotherOffsetPath.exists())
}
@@ -397,24 +393,24 @@ class TestTaskStorageManager extends MockitoSugar {
def testFlushDeletesOffsetFileForLoggedStoreForEmptyPartition() {
val partition = new Partition(0)
- val offsetFilePath = new
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
loggedStore, taskName) + File.separator + StorageManagerUtil.OFFSET_FILE_NAME)
+ val offsetFilePath = new
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
loggedStore, taskName, TaskMode.Active) + File.separator +
StorageManagerUtil.OFFSET_FILE_NAME)
val sspMetadataCache = mock[SSPMetadataCache]
val sspMetadata = new SystemStreamPartitionMetadata("0", "100", "101")
- when(sspMetadataCache.getMetadata(new SystemStreamPartition("kafka",
"testStream", partition)))
+ when(sspMetadataCache.getMetadata(new SystemStreamPartition("kafka",
getStreamName(loggedStore), partition)))
// first return some metadata
.thenReturn(sspMetadata)
// then return no metadata to trigger the delete
.thenReturn(null)
- var metadata = new SystemStreamMetadata("testStream", new
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
+ var metadata = new SystemStreamMetadata(getStreamName(loggedStore), new
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
{
put(partition, sspMetadata)
}
})
val mockStreamMetadataCache = mock[StreamMetadataCache]
- when(mockStreamMetadataCache.getStreamMetadata(any(),
any())).thenReturn(Map(new SystemStream("kafka", "testStream") -> metadata))
+ when(mockStreamMetadataCache.getStreamMetadata(any(),
any())).thenReturn(Map(new SystemStream("kafka", getStreamName(loggedStore)) ->
metadata))
//Build TaskStorageManager
val taskStorageManager = new TaskStorageManagerBuilder()
@@ -430,7 +426,7 @@ class TestTaskStorageManager extends MockitoSugar {
//Check conditions
assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
- assertEquals("Found incorrect value in offset file!",
"{\"kafka.testStream.0\":\"100\"}", FileUtil.readWithChecksum(offsetFilePath))
+ assertEquals("Found incorrect value in offset file!",
"{\"kafka.testStream-loggedStore1.0\":\"100\"}",
FileUtil.readWithChecksum(offsetFilePath))
//Invoke test method again
taskStorageManager.flush()
@@ -442,23 +438,23 @@ class TestTaskStorageManager extends MockitoSugar {
@Test
def testFlushOverwritesOffsetFileForLoggedStore() {
val partition = new Partition(0)
- val ssp = new SystemStreamPartition("kafka", "testStream", partition)
+ val ssp = new SystemStreamPartition("kafka", getStreamName(loggedStore),
partition)
- val offsetFilePath = new
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
loggedStore, taskName) + File.separator + StorageManagerUtil.OFFSET_FILE_NAME)
+ val offsetFilePath = new
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
loggedStore, taskName, TaskMode.Active) + File.separator +
StorageManagerUtil.OFFSET_FILE_NAME)
FileUtil.writeWithChecksum(offsetFilePath, "100")
val sspMetadataCache = mock[SSPMetadataCache]
val sspMetadata = new SystemStreamPartitionMetadata("20", "139", "140")
when(sspMetadataCache.getMetadata(ssp)).thenReturn(sspMetadata)
- var metadata = new SystemStreamMetadata("testStream", new
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
+ var metadata = new SystemStreamMetadata(getStreamName(loggedStore), new
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
{
put(partition, sspMetadata)
}
})
val mockStreamMetadataCache = mock[StreamMetadataCache]
- when(mockStreamMetadataCache.getStreamMetadata(any(),
any())).thenReturn(Map(new SystemStream("kafka", "testStream") -> metadata))
+ when(mockStreamMetadataCache.getStreamMetadata(any(),
any())).thenReturn(Map(new SystemStream("kafka", getStreamName(loggedStore)) ->
metadata))
//Build TaskStorageManager
val taskStorageManager = new TaskStorageManagerBuilder()
@@ -474,7 +470,7 @@ class TestTaskStorageManager extends MockitoSugar {
//Check conditions
assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
- assertEquals("Found incorrect value in offset file!",
"{\"kafka.testStream.0\":\"139\"}", FileUtil.readWithChecksum(offsetFilePath))
+ assertEquals("Found incorrect value in offset file!",
"{\"kafka.testStream-loggedStore1.0\":\"139\"}",
FileUtil.readWithChecksum(offsetFilePath))
// Flush again
when(sspMetadataCache.getMetadata(ssp)).thenReturn(new
SystemStreamPartitionMetadata("20", "193", "194"))
@@ -484,18 +480,18 @@ class TestTaskStorageManager extends MockitoSugar {
//Check conditions
assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
- assertEquals("Found incorrect value in offset file!",
"{\"kafka.testStream.0\":\"193\"}", FileUtil.readWithChecksum(offsetFilePath))
+ assertEquals("Found incorrect value in offset file!",
"{\"kafka.testStream-loggedStore1.0\":\"193\"}",
FileUtil.readWithChecksum(offsetFilePath))
}
@Test
def testStopShouldNotCreateOffsetFileForEmptyStore() {
val partition = new Partition(0)
- val offsetFilePath = new
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
loggedStore, taskName) + File.separator + StorageManagerUtil.OFFSET_FILE_NAME)
+ val offsetFilePath = new
File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
loggedStore, taskName, TaskMode.Active) + File.separator +
StorageManagerUtil.OFFSET_FILE_NAME)
val sspMetadataCache = mock[SSPMetadataCache]
- when(sspMetadataCache.getMetadata(new SystemStreamPartition("kafka",
"testStream", partition))).thenReturn(null)
+ when(sspMetadataCache.getMetadata(new SystemStreamPartition("kafka",
getStreamName(loggedStore), partition))).thenReturn(null)
//Build TaskStorageManager
val taskStorageManager = new TaskStorageManagerBuilder()
@@ -569,7 +565,7 @@ class TestTaskStorageManager extends MockitoSugar {
@Test
def testReadOfOldOffsetFormat(): Unit = {
// Create a file in old single-offset format, with a sample offset
- val storeDirectory =
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
loggedStore, taskName)
+ val storeDirectory =
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
loggedStore, taskName, TaskMode.Active)
val storeFile = new File(storeDirectory, "store.sst")
val offsetFile = new File(storeDirectory,
StorageManagerUtil.OFFSET_FILE_NAME)
val sampleOldOffset = "912321"
@@ -584,13 +580,13 @@ class TestTaskStorageManager extends MockitoSugar {
private def testChangelogConsumerOffsetRegistration(oldestOffset: String,
newestOffset: String, upcomingOffset: String, expectedRegisteredOffset: String,
fileOffset: String, writeOffsetFile: Boolean): Unit = {
val systemName = "kafka"
- val streamName = "testStream"
+ val streamName = getStreamName(loggedStore)
val partitionCount = 1
// Basic test setup of SystemStream, SystemStreamPartition for this task
val ss = new SystemStream(systemName, streamName)
val partition = new Partition(0)
val ssp = new SystemStreamPartition(ss, partition)
- val storeDirectory =
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
loggedStore, taskName)
+ val storeDirectory =
StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
loggedStore, taskName, TaskMode.Active )
val storeFile = new File(storeDirectory, "store.sst")
if (writeOffsetFile) {
@@ -672,14 +668,24 @@ class TestTaskStorageManager extends MockitoSugar {
private def createMockStreamMetadataCache(oldestOffset: String,
newestOffset: String, upcomingOffset: String) = {
// an empty store would return a SSPMetadata with oldest, newest and
upcoming offset set to null
- var metadata = new SystemStreamMetadata("testStream", new
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
+ var metadata1 = new SystemStreamMetadata(getStreamName(loggedStore), new
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
+ {
+ put(new Partition(0), new SystemStreamPartitionMetadata(oldestOffset,
newestOffset, upcomingOffset))
+ }
+ })
+
+ var metadata2 = new SystemStreamMetadata(getStreamName(store), new
java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
{
put(new Partition(0), new SystemStreamPartitionMetadata(oldestOffset,
newestOffset, upcomingOffset))
}
})
val mockStreamMetadataCache = mock[StreamMetadataCache]
- when(mockStreamMetadataCache.getStreamMetadata(any(),
any())).thenReturn(Map(new SystemStream("kafka", "testStream") -> metadata))
+
when(mockStreamMetadataCache.getStreamMetadata(org.mockito.Matchers.eq(Set(new
SystemStream("kafka", getStreamName(loggedStore)))), any())).thenReturn(Map(new
SystemStream("kafka", getStreamName(loggedStore)) -> metadata1))
+
when(mockStreamMetadataCache.getStreamMetadata(org.mockito.Matchers.eq(Set(new
SystemStream("kafka", getStreamName(store)))), any())).thenReturn(Map(new
SystemStream("kafka", getStreamName(store)) -> metadata2))
+
when(mockStreamMetadataCache.getStreamMetadata(org.mockito.Matchers.eq(Set(new
SystemStream("kafka", getStreamName(store)), new SystemStream("kafka",
getStreamName(loggedStore)))), any())).
+ thenReturn(Map(new SystemStream("kafka", getStreamName(store)) ->
metadata2, new SystemStream("kafka", getStreamName(loggedStore)) -> metadata1))
+
mockStreamMetadataCache
}
@@ -727,10 +733,14 @@ class TaskStorageManagerBuilder extends MockitoSugar {
def addStore(storeName: String, storageEngine: StorageEngine,
systemConsumer: SystemConsumer): TaskStorageManagerBuilder = {
taskStores = taskStores ++ Map(storeName -> storageEngine)
storeConsumers = storeConsumers ++ Map("kafka" -> systemConsumer)
- changeLogSystemStreams = changeLogSystemStreams ++ Map(storeName -> new
SystemStream("kafka", "testStream"))
+ changeLogSystemStreams = changeLogSystemStreams ++ Map(storeName -> new
SystemStream("kafka", getStreamName(storeName)))
this
}
+ def getStreamName(storeName : String): String = {
+ "testStream-"+storeName
+ }
+
def addStore(storeName: String, isPersistedToDisk: Boolean):
TaskStorageManagerBuilder = {
val mockStorageEngine = mock[StorageEngine]
when(mockStorageEngine.getStoreProperties)
@@ -816,10 +826,10 @@ class TaskStorageManagerBuilder extends MockitoSugar {
containerStorageManager = new ContainerStorageManager(containerModel,
streamMetadataCache, mockSystemAdmins,
- changeLogSystemStreams.asJava, storageEngineFactories.asJava,
systemFactories.asJava, mockSerdes.asJava, config,
+ changeLogSystemStreams.asJava, Map[String,
util.Set[SystemStream]]().asJava, storageEngineFactories.asJava,
systemFactories.asJava, mockSerdes.asJava, config,
new HashMap[TaskName, TaskInstanceMetrics]().asJava,
Mockito.mock(classOf[SamzaContainerMetrics]), Mockito.mock(classOf[JobContext]),
Mockito.mock(classOf[ContainerContext]), new HashMap[TaskName,
TaskInstanceCollector].asJava, loggedStoreBaseDir,
TaskStorageManagerBuilder.defaultStoreBaseDir, 1,
- new SystemClock)
+ null, new SystemClock)
this
}
diff --git
a/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java
b/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java
index 9e1daaf..c711e8d 100644
--- a/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java
+++ b/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java
@@ -28,12 +28,11 @@ import java.util.List;
import org.apache.commons.io.filefilter.DirectoryFileFilter;
import org.apache.commons.io.FileUtils;
import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
import org.apache.samza.rest.model.JobStatus;
import org.apache.samza.rest.model.Task;
import org.apache.samza.rest.proxy.job.JobInstance;
-import org.apache.samza.storage.ContainerStorageManager;
import org.apache.samza.storage.StorageManagerUtil;
-import org.apache.samza.storage.TaskStorageManager;
import org.apache.samza.util.Clock;
import org.apache.samza.util.SystemClock;
import org.slf4j.Logger;
@@ -101,7 +100,8 @@ public class LocalStoreMonitor implements Monitor {
LOG.info(String.format("Local store: %s is actively used by the
task: %s.", storeName, task.getTaskName()));
} else {
LOG.info(String.format("Local store: %s not used by the task:
%s.", storeName, task.getTaskName()));
-
markSweepTaskStore(StorageManagerUtil.getStorePartitionDir(jobDir, storeName,
new TaskName(task.getTaskName())));
+
markSweepTaskStore(StorageManagerUtil.getStorePartitionDir(jobDir, storeName,
new TaskName(task.getTaskName()),
+ TaskMode.Active));
}
}
}