This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 66f3b8e  SAMZA-2419: removing duplicate cleaning of stores (#1236)
66f3b8e is described below

commit 66f3b8ec9dfcf88e30577d7724ab8da5976943f1
Author: bkonold <[email protected]>
AuthorDate: Wed Feb 19 16:54:55 2020 -0800

    SAMZA-2419: removing duplicate cleaning of stores (#1236)
---
 .../TransactionalStateTaskRestoreManager.java      | 27 ++++++++++------------
 .../TestTransactionalStateTaskRestoreManager.java  | 15 ++++++++++--
 2 files changed, 25 insertions(+), 17 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
 
b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
index 8133fac..c578d9a 100644
--- 
a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
@@ -243,30 +243,27 @@ public class TransactionalStateTaskRestoreManager 
implements TaskRestoreManager
           timeSinceLastCheckpointInMs = System.currentTimeMillis() -
               checkpointedChangelogOffset.getCheckpointId().getMillis();
         }
-      
-        // if the clean.store.start config is set, delete the currentDir, 
restore from oldest offset to checkpointed
-        if (storageEngine.getStoreProperties().isPersistedToDisk() && new 
StorageConfig(
-          config).getCleanLoggedStoreDirsOnStart(storeName)) {
-          File currentDir = 
storageManagerUtil.getTaskStoreDir(nonLoggedStoreBaseDirectory, storeName, 
taskName, taskMode);
-          LOG.info("Marking current directory: {} for store: {} in task: {}.", 
currentDir, storeName, taskName);
-          storeDirsToDelete.put(storeName, currentDir);
-          LOG.info("Marking restore offsets for store: {} in task: {} to {}, 
{} ", storeName, taskName, oldestOffset, checkpointedOffset);
-          storesToRestore.put(storeName, new RestoreOffsets(oldestOffset, 
checkpointedOffset));
-          return;
-        }
 
-        // if the clean.store.start config is set, delete the currentDir, 
restore from oldest offset to checkpointed
+        // if the clean.store.start config is set, delete current and 
checkpoint dirs, restore from oldest offset to checkpointed
         if (storageEngine.getStoreProperties().isPersistedToDisk() && new 
StorageConfig(
           config).getCleanLoggedStoreDirsOnStart(storeName)) {
-          File currentDir = 
storageManagerUtil.getTaskStoreDir(nonLoggedStoreBaseDirectory, storeName, 
taskName, taskMode);
-          LOG.info("Marking current directory: {} for store: {} in task: {}.", 
currentDir, storeName, taskName);
+          File currentDir = 
storageManagerUtil.getTaskStoreDir(loggedStoreBaseDirectory, storeName, 
taskName, taskMode);
+          LOG.info("Marking current directory: {} for store: {} in task: {} 
for deletion due to clean.on.container.start config.",
+              currentDir, storeName, taskName);
           storeDirsToDelete.put(storeName, currentDir);
+
+          
storageManagerUtil.getTaskStoreCheckpointDirs(loggedStoreBaseDirectory, 
storeName, taskName, taskMode)
+              .forEach(checkpointDir -> {
+                  LOG.info("Marking checkpoint directory: {} for store: {} in 
task: {} for deletion due to clean.on.container.start config.",
+                      checkpointDir, storeName, taskName);
+                  storeDirsToDelete.put(storeName, checkpointDir);
+                });
+
           LOG.info("Marking restore offsets for store: {} in task: {} to {}, 
{} ", storeName, taskName, oldestOffset, checkpointedOffset);
           storesToRestore.put(storeName, new RestoreOffsets(oldestOffset, 
checkpointedOffset));
           return;
         }
 
-
         Optional<File> currentDirOptional;
         Optional<List<File>> checkpointDirsOptional;
 
diff --git 
a/samza-core/src/test/java/org/apache/samza/storage/TestTransactionalStateTaskRestoreManager.java
 
b/samza-core/src/test/java/org/apache/samza/storage/TestTransactionalStateTaskRestoreManager.java
index 0bb88d5..c37cca3 100644
--- 
a/samza-core/src/test/java/org/apache/samza/storage/TestTransactionalStateTaskRestoreManager.java
+++ 
b/samza-core/src/test/java/org/apache/samza/storage/TestTransactionalStateTaskRestoreManager.java
@@ -38,6 +38,7 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
 import org.apache.samza.job.model.TaskModel;
 import 
org.apache.samza.storage.TransactionalStateTaskRestoreManager.RestoreOffsets;
 import 
org.apache.samza.storage.TransactionalStateTaskRestoreManager.StoreActions;
@@ -166,6 +167,7 @@ public class TestTransactionalStateTaskRestoreManager {
     when(mockTaskModel.getTaskName()).thenReturn(taskName);
     Partition taskChangelogPartition = new Partition(0);
     
when(mockTaskModel.getChangelogPartition()).thenReturn(taskChangelogPartition);
+    when(mockTaskModel.getTaskMode()).thenReturn(TaskMode.Active);
 
     String store1Name = "store1";
     StorageEngine store1Engine = mock(StorageEngine.class);
@@ -208,13 +210,22 @@ public class TestTransactionalStateTaskRestoreManager {
             return Long.valueOf(offset1).compareTo(Long.valueOf(offset2));
           });
 
+    File dummyCurrentDir = new File("currentDir");
+    File dummyCheckpointDir = new File("checkpointDir1");
+    when(mockStorageManagerUtil.getTaskStoreDir(mockLoggedStoreBaseDir, 
store1Name, taskName, TaskMode.Active))
+        .thenReturn(dummyCurrentDir);
+    
when(mockStorageManagerUtil.getTaskStoreCheckpointDirs(mockLoggedStoreBaseDir, 
store1Name, taskName, TaskMode.Active))
+        .thenReturn(ImmutableList.of(dummyCheckpointDir));
+
     StoreActions storeActions = 
TransactionalStateTaskRestoreManager.getStoreActions(
         mockTaskModel, mockStoreEngines, mockStoreChangelogs, 
mockCheckpointedChangelogOffset,
         mockCurrentChangelogOffsets, mockSystemAdmins, mockStorageManagerUtil,
         mockLoggedStoreBaseDir, mockNonLoggedStoreBaseDir, mockConfig, 
mockClock);
 
-    // ensure that there is one directory to delete
-    assertEquals(1, storeActions.storeDirsToDelete.size());
+    // ensure that current and checkpoint directories are marked for deletion
+    assertEquals(2, storeActions.storeDirsToDelete.size());
+    
assertTrue(storeActions.storeDirsToDelete.containsValue(dummyCheckpointDir));
+    assertTrue(storeActions.storeDirsToDelete.containsValue(dummyCurrentDir));
     // ensure that we restore from the oldest changelog offset to checkpointed 
changelog offset
     assertEquals("0", 
storeActions.storesToRestore.get(store1Name).startingOffset);
     assertEquals(changelog1CheckpointedOffset, 
storeActions.storesToRestore.get(store1Name).endingOffset);

Reply via email to