Repository: samza Updated Branches: refs/heads/master e57b4a342 -> 38b1dc38d
SAMZA-1083 Do not load task stores which are older than delete tombstones during container startup Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/38b1dc38 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/38b1dc38 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/38b1dc38 Branch: refs/heads/master Commit: 38b1dc38dfc0b71df5dd5ca983c4c668a6cc20be Parents: e57b4a3 Author: Shanthoosh Venkataraman <[email protected]> Authored: Wed Feb 8 16:07:27 2017 -0800 Committer: Jacob Maes <[email protected]> Committed: Wed Feb 8 16:07:27 2017 -0800 ---------------------------------------------------------------------- .../apache/samza/storage/StorageRecovery.java | 9 +- .../org/apache/samza/config/StorageConfig.scala | 17 ++- .../apache/samza/container/SamzaContainer.scala | 8 +- .../samza/storage/TaskStorageManager.scala | 130 +++++++++++++++---- .../samza/storage/TestTaskStorageManager.scala | 30 ++++- .../org/apache/samza/config/KafkaConfig.scala | 1 + 6 files changed, 161 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/38b1dc38/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java index 9329edf..9471a23 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java +++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java @@ -30,6 +30,7 @@ import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.JavaStorageConfig; import org.apache.samza.config.JavaSystemConfig; +import org.apache.samza.config.StorageConfig; import org.apache.samza.container.SamzaContainerContext; import org.apache.samza.coordinator.JobModelManager; import org.apache.samza.job.model.ContainerModel; @@ -238,7 +239,6 @@ public class StorageRecovery extends CommandLine { taskStores.put(storeName, storageEngine); } } - TaskStorageManager taskStorageManager = new TaskStorageManager( taskModel.getTaskName(), Util.javaMapAsScalaMap(taskStores), @@ -247,8 +247,11 @@ public class StorageRecovery extends CommandLine { maxPartitionNumber, streamMetadataCache, storeBaseDir, - storeBaseDir, taskModel.getChangelogPartition(), - Util.javaMapAsScalaMap(systemAdmins)); + storeBaseDir, + taskModel.getChangelogPartition(), + Util.javaMapAsScalaMap(systemAdmins), + new StorageConfig(jobConfig).getChangeLogDeleteRetentionsInMs(), + new SystemClock()); taskStorageManagers.add(taskStorageManager); } http://git-wip-us.apache.org/repos/asf/samza/blob/38b1dc38/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala index a3587d0..3785011 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala @@ -20,8 +20,8 @@ package org.apache.samza.config +import java.util.concurrent.TimeUnit import org.apache.samza.SamzaException - import scala.collection.JavaConversions._ import org.apache.samza.util.Logging import org.apache.samza.util.Util @@ -33,6 +33,8 @@ object StorageConfig { val MSG_SERDE = "stores.%s.msg.serde" val CHANGELOG_STREAM = "stores.%s.changelog" val CHANGELOG_SYSTEM = "job.changelog.system" + val CHANGELOG_DELETE_RETENTION_MS = "stores.%s.changelog.delete.retention.ms" + val DEFAULT_CHANGELOG_DELETE_RETENTION_MS = TimeUnit.DAYS.toMillis(1) implicit def Config2Storage(config: Config) = new StorageConfig(config) } @@ -42,6 +44,7 @@ class StorageConfig(config: Config) extends ScalaMapConfig(config) with Logging def getStorageFactoryClassName(name: String) = getOption(FACTORY.format(name)) def getStorageKeySerde(name: String) = getOption(StorageConfig.KEY_SERDE format name) def getStorageMsgSerde(name: String) = getOption(StorageConfig.MSG_SERDE format name) + def getChangelogStream(name: String) = { // If the config specifies 'stores.<storename>.changelog' as '<system>.<stream>' combination - it will take precedence. // If this config only specifies <astream> and there is a value in job.changelog.system=<asystem> - @@ -63,12 +66,24 @@ class StorageConfig(config: Config) extends ScalaMapConfig(config) with Logging systemStreamRes } + def getChangeLogDeleteRetentionInMs(storeName: String) = { + getLong(CHANGELOG_DELETE_RETENTION_MS format storeName, DEFAULT_CHANGELOG_DELETE_RETENTION_MS) + } + def getStoreNames: Seq[String] = { val conf = config.subset("stores.", true) conf.keys.filter(k => k.endsWith(".factory")).map(k => k.substring(0, k.length - ".factory".length)).toSeq } /** + * Build a map of storeName to changeLogDeleteRetention for all of the stores. + * @return a map from storeName to the changeLogDeleteRetention of the store in ms. + */ + def getChangeLogDeleteRetentionsInMs: Map[String, Long] = { + Map(getStoreNames map {storeName => (storeName, getChangeLogDeleteRetentionInMs(storeName))} : _*) + } + + /** * Helper method to check if a system has a changelog attached to it. */ def isChangelogSystem(systemName: String) = { http://git-wip-us.apache.org/repos/asf/samza/blob/38b1dc38/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index c3308bf..89522dc 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -30,7 +30,7 @@ import org.apache.samza.checkpoint.{CheckpointListener, CheckpointManagerFactory import org.apache.samza.config.JobConfig.Config2Job import org.apache.samza.config.MetricsConfig.Config2Metrics import org.apache.samza.config.SerializerConfig.Config2Serializer -import org.apache.samza.config.{Config, ShellCommandConfig} +import org.apache.samza.config.{Config, ShellCommandConfig, StorageConfig} import org.apache.samza.config.StorageConfig.Config2Storage import org.apache.samza.config.StreamConfig.Config2Stream import org.apache.samza.config.SystemConfig.Config2System @@ -76,6 +76,8 @@ import org.apache.samza.util.ExponentialSleepStrategy import org.apache.samza.util.Logging import org.apache.samza.util.Throttleable import org.apache.samza.util.MetricsReporterLoader +import org.apache.samza.util.ThrottlingExecutor +import org.apache.samza.util.SystemClock import org.apache.samza.util.Util import org.apache.samza.util.Util.asScalaClock @@ -582,7 +584,9 @@ object SamzaContainer extends Logging { storeBaseDir = defaultStoreBaseDir, loggedStoreBaseDir = loggedStorageBaseDir, partition = taskModel.getChangelogPartition, - systemAdmins = systemAdmins) + systemAdmins = systemAdmins, + new StorageConfig(config).getChangeLogDeleteRetentionsInMs, + new SystemClock) val systemStreamPartitions = taskModel .getSystemStreamPartitions http://git-wip-us.apache.org/repos/asf/samza/blob/38b1dc38/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala index 0b7bcdd..695f53a 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala @@ -22,10 +22,20 @@ package org.apache.samza.storage import java.io._ import java.util +import org.apache.samza.config.StorageConfig import org.apache.samza.{Partition, SamzaException} import org.apache.samza.container.TaskName -import org.apache.samza.system._ -import org.apache.samza.util.{Logging, Util} +import org.apache.samza.system.StreamMetadataCache +import org.apache.samza.system.SystemAdmin +import org.apache.samza.system.SystemConsumer +import org.apache.samza.system.SystemStream +import org.apache.samza.system.SystemStreamPartition +import org.apache.samza.system.SystemStreamPartitionIterator +import org.apache.samza.system.ExtendedSystemAdmin +import org.apache.samza.system.SystemStreamMetadata +import org.apache.samza.util.Logging +import org.apache.samza.util.Util +import org.apache.samza.util.Clock import scala.collection.{JavaConversions, Map} @@ -53,7 +63,9 @@ class TaskStorageManager( storeBaseDir: File = new File(System.getProperty("user.dir"), "state"), loggedStoreBaseDir: File = new File(System.getProperty("user.dir"), "state"), partition: Partition, - systemAdmins: Map[String, SystemAdmin]) extends Logging { + systemAdmins: Map[String, SystemAdmin], + changeLogDeleteRetentionsInMs: Map[String, Long], + clock: Clock) extends Logging { var taskStoresToRestore = taskStores.filter{ case (storeName, storageEngine) => storageEngine.getStoreProperties.isLoggedStore @@ -82,57 +94,121 @@ class TaskStorageManager( taskStores.keys.foreach(storeName => { val storagePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName) - info("Got storage partition directory as %s" format storagePartitionDir.toPath.toString) + info("Got default storage partition directory as %s" format storagePartitionDir.toPath.toString) if(storagePartitionDir.exists()) { - debug("Deleting default storage partition directory %s" format storagePartitionDir.toPath.toString) + info("Deleting default storage partition directory %s" format storagePartitionDir.toPath.toString) Util.rm(storagePartitionDir) } - val loggedStoragePartitionDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName) - info("Got logged storage partition directory as %s" format loggedStoragePartitionDir.toPath.toString) - // If we find valid offsets s.t. we can restore the state, keep the disk files. Otherwise, delete them. - if (!persistedStores.contains(storeName) || - (loggedStoragePartitionDir.exists() && !readOffsetFile(storeName, loggedStoragePartitionDir))) { - Util.rm(loggedStoragePartitionDir) + val loggedStoreDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName) + info("Got logged storage partition directory as %s" format loggedStoreDir.toPath.toString) + + // Delete the logged store if it is not valid. + if (!isLoggedStoreValid(storeName, loggedStoreDir)) { + info("Deleting logged storage partition directory %s." format loggedStoreDir.toPath.toString) + Util.rm(loggedStoreDir) + } else { + val offset = readOffsetFile(loggedStoreDir) + info("Read offset %s for the store %s from logged storage partition directory %s." format(offset, storeName, loggedStoreDir)) + fileOffset.put(new SystemStreamPartition(changeLogSystemStreams(storeName), partition), offset) } }) } + /** + * Directory {@code loggedStoreDir} associated with the logged store {@code storeName} is valid, + * if all of the following conditions are true. + * a) If the store has to be persisted to disk. + * b) If there is a valid offset file associated with the logged store. + * c) If the logged store has not gone stale. + * + * @return true if the logged store is valid, false otherwise. + */ + private def isLoggedStoreValid(storeName: String, loggedStoreDir: File): Boolean = { + val changeLogDeleteRetentionInMs = changeLogDeleteRetentionsInMs.getOrElse(storeName, + StorageConfig.DEFAULT_CHANGELOG_DELETE_RETENTION_MS) + persistedStores.contains(storeName) && isOffsetFileValid(loggedStoreDir) && + !isStaleLoggedStore(loggedStoreDir, changeLogDeleteRetentionInMs) + } + + /** + * Determines if the logged store directory {@code loggedStoreDir} is stale. A store is stale if the following condition is true. + * + * ((CurrentTime) - (LastModifiedTime of the Offset file) is greater than the changelog's tombstone retention). + * + * @param loggedStoreDir the base directory of the local change-logged store. + * @param changeLogDeleteRetentionInMs the delete retention of the changelog in milli seconds. + * @return true if the store is stale, false otherwise. + * + */ + private def isStaleLoggedStore(loggedStoreDir: File, changeLogDeleteRetentionInMs: Long): Boolean = { + var isStaleStore = false + val storePath = loggedStoreDir.toPath.toString + if (loggedStoreDir.exists()) { + val offsetFileRef = new File(loggedStoreDir, offsetFileName) + val offsetFileLastModifiedTime = offsetFileRef.lastModified() + if ((clock.currentTimeMillis() - offsetFileLastModifiedTime) >= changeLogDeleteRetentionInMs) { + info ("Store: %s is stale since lastModifiedTime of offset file: %s, " + + "is older than changelog deleteRetentionMs: %s." format(storePath, offsetFileLastModifiedTime, changeLogDeleteRetentionInMs)) + isStaleStore = true + } + } else { + info("Logged storage partition directory: %s does not exist." format storePath) + } + isStaleStore + } + + /** + * An offset file associated with logged store {@code loggedStoreDir} is valid if it exists and is not empty. + * + * @return true if the offset file is valid. false otherwise. + */ + private def isOffsetFileValid(loggedStoreDir: File): Boolean = { + var hasValidOffsetFile = false + if (loggedStoreDir.exists()) { + val offsetContents = readOffsetFile(loggedStoreDir) + if (offsetContents != null && !offsetContents.isEmpty) { + hasValidOffsetFile = true + } else { + info("Offset file is not valid for store: %s." format loggedStoreDir.toPath.toString) + } + } + hasValidOffsetFile + } + private def setupBaseDirs() { debug("Setting up base directories for stores.") taskStores.foreach { case (storeName, storageEngine) => if (storageEngine.getStoreProperties.isLoggedStore) { val loggedStoragePartitionDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName) + info("Using logged storage partition directory: %s for store: %s." format(loggedStoragePartitionDir.toPath.toString, storeName)) if (!loggedStoragePartitionDir.exists()) loggedStoragePartitionDir.mkdirs() } else { - TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName).mkdirs() + val storagePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName) + info("Using storage partition directory: %s for store: %s." format(storagePartitionDir.toPath.toString, storeName)) + storagePartitionDir.mkdirs() } } } /** - * Attempts to read the offset file and returns {@code true} if the offsets were read successfully. + * Read and return the contents of the offset file. * - * @param storeName the name of the store for which the offsets are needed - * @param loggedStoragePartitionDir the directory for the store - * @return true if the offsets were read successfully, false otherwise. + * @param loggedStoragePartitionDir the base directory of the store + * @return the content of the offset file if it exists for the store, null otherwise. */ - private def readOffsetFile(storeName: String, loggedStoragePartitionDir: File): Boolean = { - var offsetsRead = false + private def readOffsetFile(loggedStoragePartitionDir: File): String = { + var offset : String = null val offsetFileRef = new File(loggedStoragePartitionDir, offsetFileName) - if(offsetFileRef.exists()) { - debug("Found offset file in partition directory: %s" format offsetFileRef.toPath.toString) - val offset = Util.readDataFromFile(offsetFileRef) - if(offset != null && !offset.isEmpty) { - fileOffset.put(new SystemStreamPartition(changeLogSystemStreams(storeName), partition), offset) - offsetsRead = true - } + if (offsetFileRef.exists()) { + info("Found offset file in logged storage partition directory: %s" format loggedStoragePartitionDir.toPath.toString) + offset = Util.readDataFromFile(offsetFileRef) } else { info("No offset file found in logged storage partition directory: %s" format loggedStoragePartitionDir.toPath.toString) } - offsetsRead + offset } private def validateChangelogStreams() = { @@ -256,7 +332,7 @@ class TaskStorageManager( if (offsetFile.exists()) { Util.rm(offsetFile) } - debug("Not storing OFFSET file for taskName %s. Store %s backed by changelog topic : %s, partition: %s is empty. " format (taskName, storeName, systemStream.getStream, partition.getPartitionId)) + debug("Not storing OFFSET file for taskName %s. Store %s backed by changelog topic: %s, partition: %s is empty. " format (taskName, storeName, systemStream.getStream, partition.getPartitionId)) } } catch { case e: Exception => error("Exception storing offset for store %s. Skipping." format(storeName), e) http://git-wip-us.apache.org/repos/asf/samza/blob/38b1dc38/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala index 4d40f52..c82e6b1 100644 --- a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala +++ b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala @@ -24,10 +24,13 @@ import java.io.File import java.util import org.apache.samza.Partition +import org.apache.samza.config.MapConfig +import org.apache.samza.config.StorageConfig import org.apache.samza.container.TaskName import org.apache.samza.storage.StoreProperties.StorePropertiesBuilder import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata import org.apache.samza.system._ +import org.apache.samza.util.SystemClock import org.apache.samza.util.Util import org.junit.Assert._ import org.junit.{After, Before, Test} @@ -307,6 +310,29 @@ class TestTaskStorageManager extends MockitoSugar { } @Test + def testStoreDeletedWhenOffsetFileOlderThanDeleteRetention() { + // This test ensures that store gets deleted when lastModifiedTime of the offset file + // is older than deletionRetention of the changeLog. + val storeDirectory = TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName) + val offsetFile = new File(storeDirectory, "OFFSET") + offsetFile.createNewFile() + Util.writeDataToFile(offsetFile, "Test Offset Data") + offsetFile.setLastModified(0) + val taskStorageManager = new TaskStorageManagerBuilder().addStore(store, false) + .addStore(loggedStore, true) + .build + + val cleanDirMethod = taskStorageManager.getClass + .getDeclaredMethod("cleanBaseDirs", + new Array[java.lang.Class[_]](0):_*) + cleanDirMethod.setAccessible(true) + cleanDirMethod.invoke(taskStorageManager, new Array[Object](0):_*) + + assertTrue("Offset file was found in store partition directory. Clean up failed!", !offsetFile.exists()) + assertTrue("Store directory exists. Clean up failed!", !storeDirectory.exists()) + } + + @Test def testOffsetFileIsRemovedInCleanBaseDirsForInMemoryLoggedStore() { val offsetFilePath = new File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName), "OFFSET") Util.writeDataToFile(offsetFilePath, "100") @@ -556,7 +582,9 @@ class TaskStorageManagerBuilder extends MockitoSugar { storeBaseDir = storeBaseDir, loggedStoreBaseDir = loggedStoreBaseDir, partition = partition, - systemAdmins = systemAdmins + systemAdmins = systemAdmins, + new StorageConfig(new MapConfig()).getChangeLogDeleteRetentionsInMs, + SystemClock.instance ) } } http://git-wip-us.apache.org/repos/asf/samza/blob/38b1dc38/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala index 9320cf7..770220c 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala @@ -146,6 +146,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { val kafkaChangeLogProperties = new Properties kafkaChangeLogProperties.setProperty("cleanup.policy", "compact") kafkaChangeLogProperties.setProperty("segment.bytes", KafkaConfig.CHANGELOG_DEFAULT_SEGMENT_SIZE) + kafkaChangeLogProperties.setProperty("delete.retention.ms", String.valueOf(new StorageConfig(config).getChangeLogDeleteRetentionInMs(name))) filteredConfigs.foreach{kv => kafkaChangeLogProperties.setProperty(kv._1, kv._2)} kafkaChangeLogProperties }
