This is an automated email from the ASF dual-hosted git repository.

lhaiesp pushed a commit to branch 1.3.1
in repository https://gitbox.apache.org/repos/asf/samza.git

commit 17be8671ba317e01ed3abcae704bff82aac41aca
Author: bkonold <[email protected]>
AuthorDate: Tue Jan 28 15:56:44 2020 -0800

    SAMZA-2447: Checkpoint dir removal should only search in valid store dirs 
(#1261)
---
 .../TransactionalStateTaskStorageManager.scala     | 12 ++++++----
 .../TestTransactionalStateTaskStorageManager.java  | 27 ++++++++++++++++++++++
 2 files changed, 34 insertions(+), 5 deletions(-)

diff --git 
a/samza-core/src/main/scala/org/apache/samza/storage/TransactionalStateTaskStorageManager.scala
 
b/samza-core/src/main/scala/org/apache/samza/storage/TransactionalStateTaskStorageManager.scala
index 20c7271..0335710 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/storage/TransactionalStateTaskStorageManager.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/storage/TransactionalStateTaskStorageManager.scala
@@ -90,11 +90,13 @@ class TransactionalStateTaskStorageManager(
             val fileFilter: FileFilter = new WildcardFileFilter(taskStoreName 
+ "-*")
             val checkpointDirs = storeDir.listFiles(fileFilter)
 
-            checkpointDirs
-              .filter(!_.getName.contains(latestCheckpointId.toString))
-              .foreach(checkpointDir => {
-                FileUtils.deleteDirectory(checkpointDir)
-              })
+            if (checkpointDirs != null) {
+              checkpointDirs
+                .filter(!_.getName.contains(latestCheckpointId.toString))
+                .foreach(checkpointDir => {
+                  FileUtils.deleteDirectory(checkpointDir)
+                })
+            }
           })
       }
     }
diff --git 
a/samza-core/src/test/scala/org/apache/samza/storage/TestTransactionalStateTaskStorageManager.java
 
b/samza-core/src/test/scala/org/apache/samza/storage/TestTransactionalStateTaskStorageManager.java
index f2d4972..244a35b 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/storage/TestTransactionalStateTaskStorageManager.java
+++ 
b/samza-core/src/test/scala/org/apache/samza/storage/TestTransactionalStateTaskStorageManager.java
@@ -21,6 +21,7 @@ package org.apache.samza.storage;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import java.io.FileFilter;
 import scala.Option;
 import scala.collection.immutable.Map;
 
@@ -492,6 +493,32 @@ public class TestTransactionalStateTaskStorageManager {
     fail("Should have thrown an exception if no changelog offset found for 
checkpointed store");
   }
 
+  @Test
+  public void testRemoveOldCheckpointsWhenBaseDirContainsRegularFiles() {
+    TaskName taskName = new TaskName("Partition 0");
+    ContainerStorageManager containerStorageManager = 
mock(ContainerStorageManager.class);
+    Map<String, SystemStream> changelogSystemStreams = mock(Map.class);
+    SystemAdmins systemAdmins = mock(SystemAdmins.class);
+    File loggedStoreBaseDir = mock(File.class);
+    Partition changelogPartition = new Partition(0);
+    TaskMode taskMode = TaskMode.Active;
+    StorageManagerUtil storageManagerUtil = mock(StorageManagerUtil.class);
+
+    File mockStoreDir = mock(File.class);
+    String mockStoreDirName = "notDirectory";
+
+    when(loggedStoreBaseDir.listFiles()).thenReturn(new File[] {mockStoreDir});
+    when(mockStoreDir.getName()).thenReturn(mockStoreDirName);
+    when(storageManagerUtil.getTaskStoreDir(eq(loggedStoreBaseDir), 
eq(mockStoreDirName), eq(taskName), eq(taskMode))).thenReturn(mockStoreDir);
+    // null here can happen if listFiles is called on a non-directory
+    when(mockStoreDir.listFiles(any(FileFilter.class))).thenReturn(null);
+
+    TransactionalStateTaskStorageManager tsm = new 
TransactionalStateTaskStorageManager(taskName, containerStorageManager,
+        changelogSystemStreams, systemAdmins, loggedStoreBaseDir, 
changelogPartition, taskMode, storageManagerUtil);
+
+    tsm.removeOldCheckpoints(CheckpointId.create());
+  }
+
   private TransactionalStateTaskStorageManager 
buildTSM(ContainerStorageManager csm, Partition changelogPartition,
       StorageManagerUtil smu) {
     TaskName taskName = new TaskName("Partition 0");

Reply via email to