Repository: samza Updated Branches: refs/heads/master c91da7842 -> 0feb5c2da
SAMZA-1229; Disk space monitor should only count data in use by the container Author: Prateek Maheshwari <pmahe...@linkedin.com> Reviewers: Jagadish <jagad...@apache.org> Closes #134 from prateekm/disk-space-monitor Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/0feb5c2d Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/0feb5c2d Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/0feb5c2d Branch: refs/heads/master Commit: 0feb5c2dae2ee5d071f33ae215e549e127b2e8fe Parents: c91da78 Author: Prateek Maheshwari <pmahe...@linkedin.com> Authored: Mon Apr 24 15:03:00 2017 -0700 Committer: vjagadish1989 <jvenk...@linkedin.com> Committed: Mon Apr 24 15:03:00 2017 -0700 ---------------------------------------------------------------------- .../apache/samza/container/SamzaContainer.scala | 16 ++++---- .../samza/storage/TaskStorageManager.scala | 39 ++++++++++---------- 2 files changed, 29 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/0feb5c2d/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index aba0d17..a30b793 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -413,7 +413,6 @@ object SamzaContainer extends Logging { info("Got default storage engine base directory: %s" format defaultStoreBaseDir) val storeWatchPaths = new util.HashSet[Path]() - storeWatchPaths.add(defaultStoreBaseDir.toPath) val taskInstances: Map[TaskName, TaskInstance] = containerModel.getTasks.values.asScala.map(taskModel => { debug("Setting up task instance: %s" format taskModel) @@ -455,8 +454,6 @@ object SamzaContainer extends Logging { loggedStorageBaseDir = defaultStoreBaseDir } - storeWatchPaths.add(loggedStorageBaseDir.toPath) - info("Got base directory for logged data stores: %s" format loggedStorageBaseDir) val taskStores = storageEngineFactories @@ -467,25 +464,30 @@ object SamzaContainer extends Logging { } else { null } + val keySerde = config.getStorageKeySerde(storeName) match { case Some(keySerde) => serdes.getOrElse(keySerde, throw new SamzaException("StorageKeySerde: No class defined for serde: %s." format keySerde)) case _ => null } + val msgSerde = config.getStorageMsgSerde(storeName) match { case Some(msgSerde) => serdes.getOrElse(msgSerde, throw new SamzaException("StorageMsgSerde: No class defined for serde: %s." format msgSerde)) case _ => null } - val storeBaseDir = if(changeLogSystemStreamPartition != null) { + + val storeDir = if (changeLogSystemStreamPartition != null) { TaskStorageManager.getStorePartitionDir(loggedStorageBaseDir, storeName, taskName) - } - else { + } else { TaskStorageManager.getStorePartitionDir(defaultStoreBaseDir, storeName, taskName) } + + storeWatchPaths.add(storeDir.toPath) + val storageEngine = storageEngineFactory.getStorageEngine( storeName, - storeBaseDir, + storeDir, keySerde, msgSerde, collector, http://git-wip-us.apache.org/repos/asf/samza/blob/0feb5c2d/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala index 491f77b..977ac5b 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala @@ -93,24 +93,24 @@ class TaskStorageManager( debug("Cleaning base directories for stores.") taskStores.keys.foreach(storeName => { - val storagePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName) - info("Got default storage partition directory as %s" format storagePartitionDir.toPath.toString) + val storePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName) + info("Got default storage partition directory as %s" format storePartitionDir.toPath.toString) - if(storagePartitionDir.exists()) { - info("Deleting default storage partition directory %s" format storagePartitionDir.toPath.toString) - Util.rm(storagePartitionDir) + if(storePartitionDir.exists()) { + info("Deleting default storage partition directory %s" format storePartitionDir.toPath.toString) + Util.rm(storePartitionDir) } - val loggedStoreDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName) - info("Got logged storage partition directory as %s" format loggedStoreDir.toPath.toString) + val loggedStorePartitionDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName) + info("Got logged storage partition directory as %s" format loggedStorePartitionDir.toPath.toString) // Delete the logged store if it is not valid. - if (!isLoggedStoreValid(storeName, loggedStoreDir)) { - info("Deleting logged storage partition directory %s." format loggedStoreDir.toPath.toString) - Util.rm(loggedStoreDir) + if (!isLoggedStoreValid(storeName, loggedStorePartitionDir)) { + info("Deleting logged storage partition directory %s." format loggedStorePartitionDir.toPath.toString) + Util.rm(loggedStorePartitionDir) } else { - val offset = readOffsetFile(loggedStoreDir) - info("Read offset %s for the store %s from logged storage partition directory %s." format(offset, storeName, loggedStoreDir)) + val offset = readOffsetFile(loggedStorePartitionDir) + info("Read offset %s for the store %s from logged storage partition directory %s." format(offset, storeName, loggedStorePartitionDir)) fileOffset.put(new SystemStreamPartition(changeLogSystemStreams(storeName), partition), offset) } }) @@ -182,13 +182,13 @@ class TaskStorageManager( taskStores.foreach { case (storeName, storageEngine) => if (storageEngine.getStoreProperties.isLoggedStore) { - val loggedStoragePartitionDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName) - info("Using logged storage partition directory: %s for store: %s." format(loggedStoragePartitionDir.toPath.toString, storeName)) - if (!loggedStoragePartitionDir.exists()) loggedStoragePartitionDir.mkdirs() + val loggedStorePartitionDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName) + info("Using logged storage partition directory: %s for store: %s." format(loggedStorePartitionDir.toPath.toString, storeName)) + if (!loggedStorePartitionDir.exists()) loggedStorePartitionDir.mkdirs() } else { - val storagePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName) - info("Using storage partition directory: %s for store: %s." format(storagePartitionDir.toPath.toString, storeName)) - storagePartitionDir.mkdirs() + val storePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName) + info("Using storage partition directory: %s for store: %s." format(storePartitionDir.toPath.toString, storeName)) + storePartitionDir.mkdirs() } } } @@ -322,7 +322,8 @@ class TaskStorageManager( } debug("Got offset %s for store %s" format(newestOffset, storeName)) - val offsetFile = new File(TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName), offsetFileName) + val loggedStorePartitionDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName) + val offsetFile = new File(loggedStorePartitionDir, offsetFileName) if (newestOffset != null) { debug("Storing offset for store in OFFSET file ") Util.writeDataToFile(offsetFile, newestOffset)