This is an automated email from the ASF dual-hosted git repository.

ajothomas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new ef8bc7629 Clear local logged stores if input checkpoints are empty 
(#1713)
ef8bc7629 is described below

commit ef8bc7629e8b72e1855adbae9e1f7fbba2c12bb2
Author: ajo thomas <[email protected]>
AuthorDate: Wed Dec 18 16:34:43 2024 -0800

    Clear local logged stores if input checkpoints are empty (#1713)
---
 .../samza/storage/ContainerStorageManager.java     | 29 ++++++++++++++-
 .../samza/storage/TestContainerStorageManager.java | 41 ++++++++++++++++++++--
 2 files changed, 66 insertions(+), 4 deletions(-)

diff --git 
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
 
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index a2b3b51fb..4b995117a 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ 
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -54,6 +54,7 @@ import org.apache.samza.system.SystemFactory;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.task.TaskInstanceCollector;
 import org.apache.samza.util.Clock;
+import org.apache.samza.util.FileUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -274,7 +275,11 @@ public class ContainerStorageManager {
         taskCheckpoint = checkpointManager.readLastCheckpoint(taskName);
         LOG.info("Obtained checkpoint: {} for state restore for taskName: {}", 
taskCheckpoint, taskName);
       }
-      taskCheckpoints.put(taskName, taskCheckpoint);
+
+      // Only insert non-null checkpoints
+      if (taskCheckpoint != null) {
+        taskCheckpoints.put(taskName, taskCheckpoint);
+      }
 
       Map<String, Set<String>> backendFactoryToStoreNames =
           ContainerStorageManagerUtil.getBackendFactoryStoreNames(
@@ -308,6 +313,15 @@ public class ContainerStorageManager {
       taskBackendFactoryToStoreNames.put(taskName, backendFactoryToStoreNames);
     });
 
+    // if we have received no input checkpoints, it can only be due to two 
reasons:
+    // a) Samza job is new, so it has no previous checkpoints.
+    // b) The checkpoints were cleared.
+    // We should be able to safely clear local logged stores in either case
+    if (taskCheckpoints.isEmpty()) {
+      LOG.info("No checkpoints read. Attempting to clear logged stores.");
+      clearLoggedStores(loggedStoreBaseDirectory);
+    }
+
     // Init all taskRestores and if successful, restores all the task stores 
concurrently
     LOG.debug("Pre init and restore checkpoints is: {}", taskCheckpoints);
     CompletableFuture<Map<TaskName, Checkpoint>> 
initRestoreAndNewCheckpointFuture =
@@ -357,6 +371,19 @@ public class ContainerStorageManager {
     return taskCheckpoints;
   }
 
+  private static void clearLoggedStores(File loggedStoreBaseDir) {
+    final FileUtil fileUtil = new FileUtil();
+    final File[] storeDirs = loggedStoreBaseDir.listFiles();
+    if (storeDirs == null || storeDirs.length == 0) {
+      LOG.info("No stores to delete");
+      return;
+    }
+    for (File storeDir: storeDirs) {
+      LOG.info("Clearing store dir {} from logged stores.", storeDir);
+      fileUtil.rm(storeDir);
+    }
+  }
+
   /**
    * Get the {@link StorageEngine} instance with a given name for a given task.
    * @param taskName the task name for which the storage engine is desired.
diff --git 
a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
 
b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
index b8996d456..a79a3bfa0 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
+++ 
b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.IOException;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -102,7 +103,6 @@ import static org.mockito.Mockito.*;
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({ReflectionUtil.class, 
ContainerStorageManagerRestoreUtil.class})
 public class TestContainerStorageManager {
-
   private static final String STORE_NAME = "store";
   private static final String SYSTEM_NAME = "kafka";
   private static final String STREAM_NAME = "store-stream";
@@ -116,6 +116,7 @@ public class TestContainerStorageManager {
   private SamzaContainerMetrics samzaContainerMetrics;
   private Map<TaskName, TaskModel> tasks;
   private StandbyTestContext testContext;
+  private CheckpointManager checkpointManager;
 
   private volatile int systemConsumerCreationCount;
   private volatile int systemConsumerStartCount;
@@ -143,7 +144,7 @@ public class TestContainerStorageManager {
    * Method to create a containerStorageManager with mocked dependencies
    */
   @Before
-  public void setUp() throws InterruptedException {
+  public void setUp() throws InterruptedException, IOException {
     taskRestoreMetricGauges = new HashMap<>();
     this.tasks = new HashMap<>();
     this.taskInstanceMetrics = new HashMap<>();
@@ -248,7 +249,7 @@ public class TestContainerStorageManager {
         .thenReturn(
             new scala.collection.immutable.Map.Map1(new 
SystemStream(SYSTEM_NAME, STREAM_NAME), systemStreamMetadata));
 
-    CheckpointManager checkpointManager = mock(CheckpointManager.class);
+    this.checkpointManager = mock(CheckpointManager.class);
     
when(checkpointManager.readLastCheckpoint(any(TaskName.class))).thenReturn(new 
CheckpointV1(new HashMap<>()));
 
     SSPMetadataCache mockSSPMetadataCache = mock(SSPMetadataCache.class);
@@ -320,6 +321,40 @@ public class TestContainerStorageManager {
     Assert.assertEquals("systemConsumerStartCount count should be 1", 1, 
this.systemConsumerStartCount);
   }
 
+  /**
+   * This test will attempt to verify if logged stores are deleted if the 
input checkpoints are empty.
+   * */
+  @Test
+  @SuppressWarnings("ResultOfMethodCallIgnored")
+  public void testDeleteLoggedStoreOnNoCheckpoints() {
+    // reset the mock to reset the stubs in setup method
+    reset(this.checkpointManager);
+    // redo stubbing to return null checkpoints
+    when(this.checkpointManager.readLastCheckpoint(any())).thenReturn(null);
+    // create store under logged stores to demonstrate deletion
+    final File storeFile = new File(DEFAULT_LOGGED_STORE_BASE_DIR.getPath() + 
File.separator + STORE_NAME);
+    // add contents to store
+    final File storeFilePartition = new 
File(DEFAULT_LOGGED_STORE_BASE_DIR.getPath() + File.separator + STORE_NAME + 
File.separator + "Partition_0");
+    storeFilePartition.deleteOnExit();
+    storeFile.deleteOnExit();
+    try {
+      storeFile.mkdirs();
+      storeFilePartition.createNewFile();
+      Assert.assertTrue("Assert that stores are present prior to the test.", 
storeFile.exists());
+      Assert.assertTrue("Assert that store files are present prior to the 
test.", storeFilePartition.exists());
+      this.containerStorageManager.start();
+      this.containerStorageManager.shutdown();
+      Assert.assertFalse("Assert that stores are deleted after the test.", 
storeFile.exists());
+      Assert.assertFalse("Assert that store files are deleted after the 
test.", storeFilePartition.exists());
+    } catch (Exception e) {
+      System.out.printf("File %s could not be created.", storeFile);
+      Assert.fail();
+    } finally {
+      storeFilePartition.delete();
+      storeFile.delete();
+    }
+  }
+
   @Test
   public void testNoConfiguredDurableStores() throws InterruptedException {
     taskRestoreMetricGauges = new HashMap<>();

Reply via email to