Repository: samza
Updated Branches:
  refs/heads/master c91da7842 -> 0feb5c2da


SAMZA-1229; Disk space monitor should only count data in use by the container

Author: Prateek Maheshwari <pmahe...@linkedin.com>

Reviewers: Jagadish <jagad...@apache.org>

Closes #134 from prateekm/disk-space-monitor


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/0feb5c2d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/0feb5c2d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/0feb5c2d

Branch: refs/heads/master
Commit: 0feb5c2dae2ee5d071f33ae215e549e127b2e8fe
Parents: c91da78
Author: Prateek Maheshwari <pmahe...@linkedin.com>
Authored: Mon Apr 24 15:03:00 2017 -0700
Committer: vjagadish1989 <jvenk...@linkedin.com>
Committed: Mon Apr 24 15:03:00 2017 -0700

----------------------------------------------------------------------
 .../apache/samza/container/SamzaContainer.scala | 16 ++++----
 .../samza/storage/TaskStorageManager.scala      | 39 ++++++++++----------
 2 files changed, 29 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/0feb5c2d/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index aba0d17..a30b793 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -413,7 +413,6 @@ object SamzaContainer extends Logging {
     info("Got default storage engine base directory: %s" format 
defaultStoreBaseDir)
 
     val storeWatchPaths = new util.HashSet[Path]()
-    storeWatchPaths.add(defaultStoreBaseDir.toPath)
 
     val taskInstances: Map[TaskName, TaskInstance] = 
containerModel.getTasks.values.asScala.map(taskModel => {
       debug("Setting up task instance: %s" format taskModel)
@@ -455,8 +454,6 @@ object SamzaContainer extends Logging {
         loggedStorageBaseDir = defaultStoreBaseDir
       }
 
-      storeWatchPaths.add(loggedStorageBaseDir.toPath)
-
       info("Got base directory for logged data stores: %s" format 
loggedStorageBaseDir)
 
       val taskStores = storageEngineFactories
@@ -467,25 +464,30 @@ object SamzaContainer extends Logging {
             } else {
               null
             }
+
             val keySerde = config.getStorageKeySerde(storeName) match {
               case Some(keySerde) => serdes.getOrElse(keySerde,
                 throw new SamzaException("StorageKeySerde: No class defined 
for serde: %s." format keySerde))
               case _ => null
             }
+
             val msgSerde = config.getStorageMsgSerde(storeName) match {
               case Some(msgSerde) => serdes.getOrElse(msgSerde,
                 throw new SamzaException("StorageMsgSerde: No class defined 
for serde: %s." format msgSerde))
               case _ => null
             }
-            val storeBaseDir = if(changeLogSystemStreamPartition != null) {
+
+            val storeDir = if (changeLogSystemStreamPartition != null) {
               TaskStorageManager.getStorePartitionDir(loggedStorageBaseDir, 
storeName, taskName)
-            }
-            else {
+            } else {
               TaskStorageManager.getStorePartitionDir(defaultStoreBaseDir, 
storeName, taskName)
             }
+
+            storeWatchPaths.add(storeDir.toPath)
+
             val storageEngine = storageEngineFactory.getStorageEngine(
               storeName,
-              storeBaseDir,
+              storeDir,
               keySerde,
               msgSerde,
               collector,

http://git-wip-us.apache.org/repos/asf/samza/blob/0feb5c2d/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
index 491f77b..977ac5b 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
@@ -93,24 +93,24 @@ class TaskStorageManager(
     debug("Cleaning base directories for stores.")
 
     taskStores.keys.foreach(storeName => {
-      val storagePartitionDir = 
TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName)
-      info("Got default storage partition directory as %s" format 
storagePartitionDir.toPath.toString)
+      val storePartitionDir = 
TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName)
+      info("Got default storage partition directory as %s" format 
storePartitionDir.toPath.toString)
 
-      if(storagePartitionDir.exists()) {
-        info("Deleting default storage partition directory %s" format 
storagePartitionDir.toPath.toString)
-        Util.rm(storagePartitionDir)
+      if(storePartitionDir.exists()) {
+        info("Deleting default storage partition directory %s" format 
storePartitionDir.toPath.toString)
+        Util.rm(storePartitionDir)
       }
 
-      val loggedStoreDir = 
TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName)
-      info("Got logged storage partition directory as %s" format 
loggedStoreDir.toPath.toString)
+      val loggedStorePartitionDir = 
TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName)
+      info("Got logged storage partition directory as %s" format 
loggedStorePartitionDir.toPath.toString)
 
       // Delete the logged store if it is not valid.
-      if (!isLoggedStoreValid(storeName, loggedStoreDir)) {
-        info("Deleting logged storage partition directory %s." format 
loggedStoreDir.toPath.toString)
-        Util.rm(loggedStoreDir)
+      if (!isLoggedStoreValid(storeName, loggedStorePartitionDir)) {
+        info("Deleting logged storage partition directory %s." format 
loggedStorePartitionDir.toPath.toString)
+        Util.rm(loggedStorePartitionDir)
       } else {
-        val offset = readOffsetFile(loggedStoreDir)
-        info("Read offset %s for the store %s from logged storage partition 
directory %s." format(offset, storeName, loggedStoreDir))
+        val offset = readOffsetFile(loggedStorePartitionDir)
+        info("Read offset %s for the store %s from logged storage partition 
directory %s." format(offset, storeName, loggedStorePartitionDir))
         fileOffset.put(new 
SystemStreamPartition(changeLogSystemStreams(storeName), partition), offset)
       }
     })
@@ -182,13 +182,13 @@ class TaskStorageManager(
     taskStores.foreach {
       case (storeName, storageEngine) =>
         if (storageEngine.getStoreProperties.isLoggedStore) {
-          val loggedStoragePartitionDir = 
TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName)
-          info("Using logged storage partition directory: %s for store: %s." 
format(loggedStoragePartitionDir.toPath.toString, storeName))
-          if (!loggedStoragePartitionDir.exists()) 
loggedStoragePartitionDir.mkdirs()
+          val loggedStorePartitionDir = 
TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName)
+          info("Using logged storage partition directory: %s for store: %s." 
format(loggedStorePartitionDir.toPath.toString, storeName))
+          if (!loggedStorePartitionDir.exists()) 
loggedStorePartitionDir.mkdirs()
         } else {
-          val storagePartitionDir = 
TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName)
-          info("Using storage partition directory: %s for store: %s." 
format(storagePartitionDir.toPath.toString, storeName))
-          storagePartitionDir.mkdirs()
+          val storePartitionDir = 
TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName)
+          info("Using storage partition directory: %s for store: %s." 
format(storePartitionDir.toPath.toString, storeName))
+          storePartitionDir.mkdirs()
         }
     }
   }
@@ -322,7 +322,8 @@ class TaskStorageManager(
         }
         debug("Got offset %s for store %s" format(newestOffset, storeName))
 
-        val offsetFile = new 
File(TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, 
taskName), offsetFileName)
+        val loggedStorePartitionDir = 
TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName)
+        val offsetFile = new File(loggedStorePartitionDir, offsetFileName)
         if (newestOffset != null) {
           debug("Storing offset for store in OFFSET file ")
           Util.writeDataToFile(offsetFile, newestOffset)

Reply via email to