Repository: samza Updated Branches: refs/heads/master 5d50c0830 -> 0c64c54eb
SAMZA-827: fix NPE while clean-shutdown state store with host-affinity enabled Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/0c64c54e Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/0c64c54e Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/0c64c54e Branch: refs/heads/master Commit: 0c64c54eb3597ebd40d237e57c81cb28b80047ae Parents: 5d50c08 Author: Jagadish Venkatraman <[email protected]> Authored: Thu Dec 3 16:05:18 2015 -0800 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Thu Dec 3 16:05:18 2015 -0800 ---------------------------------------------------------------------- .../samza/storage/TaskStorageManager.scala | 12 ++++++--- .../samza/storage/TestTaskStorageManager.scala | 27 ++++++++++++++++++++ 2 files changed, 36 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/0c64c54e/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala index 9588dcd..422799a 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala @@ -206,9 +206,15 @@ class TaskStorageManager( .get(partition) val newestOffset = sspMetadata.getNewestOffset - val offsetFile = new File(TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, store, taskName), offsetFileName) - Util.writeDataToFile(offsetFile, newestOffset) - info("Successfully stored offset %s for store %s in OFFSET file " format (newestOffset, store)) + if (newestOffset != null) { + val offsetFile = new File(TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, store, taskName), offsetFileName) + Util.writeDataToFile(offsetFile, newestOffset) + info("Successfully stored offset %s for store %s in OFFSET file " format (newestOffset, store)) + } + else { + //if newestOffset is null, then it means the store is empty. No need to persist the offset file + info("Not storing OFFSET file for taskName %s. Store %s backed by changelog topic : %s, partition: %s is empty. " format (taskName, store, systemStream.getStream, partition.getPartitionId)) + } }} } http://git-wip-us.apache.org/repos/asf/samza/blob/0c64c54e/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala index 6491d09..f08e8dd 100644 --- a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala +++ b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala @@ -123,6 +123,33 @@ class TestTaskStorageManager extends MockitoSugar { assertTrue("Offset file doesn't exist!", offsetFilePath.exists()) assertEquals("Found incorrect value in offset file!", "100", Util.readDataFromFile(offsetFilePath)) } + + @Test + def testStopShouldNotCreateOffsetFileForEmptyStore() { + val partition = new Partition(0) + + val offsetFilePath = new File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName) + File.separator + "OFFSET") + + val mockSystemAdmin = mock[SystemAdmin] + val mockSspMetadata = Map("testStream" -> new SystemStreamMetadata("testStream" , JavaConversions.mapAsJavaMap[Partition, SystemStreamPartitionMetadata](Map(partition -> new SystemStreamPartitionMetadata("20", null, null))))) + val myMap = JavaConversions.mapAsJavaMap[String, SystemStreamMetadata](mockSspMetadata) + when(mockSystemAdmin.getSystemStreamMetadata(any(JavaConversions.setAsJavaSet(Set("")).getClass))).thenReturn(myMap) + + //Build TaskStorageManager + val taskStorageManager = new TaskStorageManagerBuilder() + .addStore(loggedStore) + .setSystemAdmin("kafka", mockSystemAdmin) + .setPartition(partition) + .build + + //Invoke test method + val stopMethod = taskStorageManager.getClass.getDeclaredMethod("stop", new Array[java.lang.Class[_]](0):_*) + stopMethod.setAccessible(true) + stopMethod.invoke(taskStorageManager, new Array[Object](0):_*) + + //Check conditions + assertTrue("Offset file should not exist!", !offsetFilePath.exists()) + } } object TaskStorageManagerBuilder {
