Repository: samza
Updated Branches:
  refs/heads/master 5d50c0830 -> 0c64c54eb


SAMZA-827: fix NPE while clean-shutdown state store with host-affinity enabled


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/0c64c54e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/0c64c54e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/0c64c54e

Branch: refs/heads/master
Commit: 0c64c54eb3597ebd40d237e57c81cb28b80047ae
Parents: 5d50c08
Author: Jagadish Venkatraman <[email protected]>
Authored: Thu Dec 3 16:05:18 2015 -0800
Committer: Yi Pan (Data Infrastructure) <[email protected]>
Committed: Thu Dec 3 16:05:18 2015 -0800

----------------------------------------------------------------------
 .../samza/storage/TaskStorageManager.scala      | 12 ++++++---
 .../samza/storage/TestTaskStorageManager.scala  | 27 ++++++++++++++++++++
 2 files changed, 36 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/0c64c54e/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala 
b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
index 9588dcd..422799a 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
@@ -206,9 +206,15 @@ class TaskStorageManager(
                           .get(partition)
       val newestOffset = sspMetadata.getNewestOffset
 
-      val offsetFile = new 
File(TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, store, 
taskName), offsetFileName)
-      Util.writeDataToFile(offsetFile, newestOffset)
-      info("Successfully stored offset %s for store %s in OFFSET file " format 
(newestOffset, store))
+      if (newestOffset != null) {
+        val offsetFile = new 
File(TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, store, 
taskName), offsetFileName)
+        Util.writeDataToFile(offsetFile, newestOffset)
+        info("Successfully stored offset %s for store %s in OFFSET file " 
format (newestOffset, store))
+      }
+      else {
+        //if newestOffset is null, then it means the store is empty. No need 
to persist the offset file
+        info("Not storing OFFSET file for taskName %s. Store %s backed by 
changelog topic : %s, partition: %s is empty. " format (taskName, store, 
systemStream.getStream, partition.getPartitionId))
+      }
     }}
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/0c64c54e/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
 
b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
index 6491d09..f08e8dd 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
@@ -123,6 +123,33 @@ class TestTaskStorageManager extends MockitoSugar {
     assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
     assertEquals("Found incorrect value in offset file!", "100", 
Util.readDataFromFile(offsetFilePath))
   }
+
+  @Test
+  def testStopShouldNotCreateOffsetFileForEmptyStore() {
+    val partition = new Partition(0)
+
+    val offsetFilePath = new 
File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir,
 loggedStore, taskName) + File.separator + "OFFSET")
+
+    val mockSystemAdmin = mock[SystemAdmin]
+    val mockSspMetadata = Map("testStream" -> new 
SystemStreamMetadata("testStream" , JavaConversions.mapAsJavaMap[Partition, 
SystemStreamPartitionMetadata](Map(partition -> new 
SystemStreamPartitionMetadata("20", null, null)))))
+    val myMap = JavaConversions.mapAsJavaMap[String, 
SystemStreamMetadata](mockSspMetadata)
+    
when(mockSystemAdmin.getSystemStreamMetadata(any(JavaConversions.setAsJavaSet(Set("")).getClass))).thenReturn(myMap)
+
+    //Build TaskStorageManager
+    val taskStorageManager = new TaskStorageManagerBuilder()
+      .addStore(loggedStore)
+      .setSystemAdmin("kafka", mockSystemAdmin)
+      .setPartition(partition)
+      .build
+
+    //Invoke test method
+    val stopMethod = taskStorageManager.getClass.getDeclaredMethod("stop", new 
Array[java.lang.Class[_]](0):_*)
+    stopMethod.setAccessible(true)
+    stopMethod.invoke(taskStorageManager, new Array[Object](0):_*)
+
+    //Check conditions
+    assertTrue("Offset file should not exist!", !offsetFilePath.exists())
+  }
 }
 
 object TaskStorageManagerBuilder {

Reply via email to