This is an automated email from the ASF dual-hosted git repository.
ajothomas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new ef8bc7629 Clear local logged stores if input checkpoints are empty
(#1713)
ef8bc7629 is described below
commit ef8bc7629e8b72e1855adbae9e1f7fbba2c12bb2
Author: ajo thomas <[email protected]>
AuthorDate: Wed Dec 18 16:34:43 2024 -0800
Clear local logged stores if input checkpoints are empty (#1713)
---
.../samza/storage/ContainerStorageManager.java | 29 ++++++++++++++-
.../samza/storage/TestContainerStorageManager.java | 41 ++++++++++++++++++++--
2 files changed, 66 insertions(+), 4 deletions(-)
diff --git
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index a2b3b51fb..4b995117a 100644
---
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -54,6 +54,7 @@ import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemStream;
import org.apache.samza.task.TaskInstanceCollector;
import org.apache.samza.util.Clock;
+import org.apache.samza.util.FileUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -274,7 +275,11 @@ public class ContainerStorageManager {
taskCheckpoint = checkpointManager.readLastCheckpoint(taskName);
LOG.info("Obtained checkpoint: {} for state restore for taskName: {}",
taskCheckpoint, taskName);
}
- taskCheckpoints.put(taskName, taskCheckpoint);
+
+ // Only insert non-null checkpoints
+ if (taskCheckpoint != null) {
+ taskCheckpoints.put(taskName, taskCheckpoint);
+ }
Map<String, Set<String>> backendFactoryToStoreNames =
ContainerStorageManagerUtil.getBackendFactoryStoreNames(
@@ -308,6 +313,15 @@ public class ContainerStorageManager {
taskBackendFactoryToStoreNames.put(taskName, backendFactoryToStoreNames);
});
+ // if we have received no input checkpoints, it can only be due to two
reasons:
+ // a) Samza job is new, so it has no previous checkpoints.
+ // b) The checkpoints were cleared.
+ // We should be able to safely clear local logged stores in either case
+ if (taskCheckpoints.isEmpty()) {
+ LOG.info("No checkpoints read. Attempting to clear logged stores.");
+ clearLoggedStores(loggedStoreBaseDirectory);
+ }
+
// Init all taskRestores and if successful, restores all the task stores
concurrently
LOG.debug("Pre init and restore checkpoints is: {}", taskCheckpoints);
CompletableFuture<Map<TaskName, Checkpoint>>
initRestoreAndNewCheckpointFuture =
@@ -357,6 +371,19 @@ public class ContainerStorageManager {
return taskCheckpoints;
}
+ private static void clearLoggedStores(File loggedStoreBaseDir) {
+ final FileUtil fileUtil = new FileUtil();
+ final File[] storeDirs = loggedStoreBaseDir.listFiles();
+ if (storeDirs == null || storeDirs.length == 0) {
+ LOG.info("No stores to delete");
+ return;
+ }
+ for (File storeDir: storeDirs) {
+ LOG.info("Clearing store dir {} from logged stores.", storeDir);
+ fileUtil.rm(storeDir);
+ }
+ }
+
/**
* Get the {@link StorageEngine} instance with a given name for a given task.
* @param taskName the task name for which the storage engine is desired.
diff --git
a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
index b8996d456..a79a3bfa0 100644
---
a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
+++
b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.io.ByteArrayOutputStream;
import java.io.File;
+import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
@@ -102,7 +103,6 @@ import static org.mockito.Mockito.*;
@RunWith(PowerMockRunner.class)
@PrepareForTest({ReflectionUtil.class,
ContainerStorageManagerRestoreUtil.class})
public class TestContainerStorageManager {
-
private static final String STORE_NAME = "store";
private static final String SYSTEM_NAME = "kafka";
private static final String STREAM_NAME = "store-stream";
@@ -116,6 +116,7 @@ public class TestContainerStorageManager {
private SamzaContainerMetrics samzaContainerMetrics;
private Map<TaskName, TaskModel> tasks;
private StandbyTestContext testContext;
+ private CheckpointManager checkpointManager;
private volatile int systemConsumerCreationCount;
private volatile int systemConsumerStartCount;
@@ -143,7 +144,7 @@ public class TestContainerStorageManager {
* Method to create a containerStorageManager with mocked dependencies
*/
@Before
- public void setUp() throws InterruptedException {
+ public void setUp() throws InterruptedException, IOException {
taskRestoreMetricGauges = new HashMap<>();
this.tasks = new HashMap<>();
this.taskInstanceMetrics = new HashMap<>();
@@ -248,7 +249,7 @@ public class TestContainerStorageManager {
.thenReturn(
new scala.collection.immutable.Map.Map1(new
SystemStream(SYSTEM_NAME, STREAM_NAME), systemStreamMetadata));
- CheckpointManager checkpointManager = mock(CheckpointManager.class);
+ this.checkpointManager = mock(CheckpointManager.class);
when(checkpointManager.readLastCheckpoint(any(TaskName.class))).thenReturn(new
CheckpointV1(new HashMap<>()));
SSPMetadataCache mockSSPMetadataCache = mock(SSPMetadataCache.class);
@@ -320,6 +321,40 @@ public class TestContainerStorageManager {
Assert.assertEquals("systemConsumerStartCount count should be 1", 1,
this.systemConsumerStartCount);
}
+ /**
+ * This test will attempt to verify if logged stores are deleted if the
input checkpoints are empty.
+ * */
+ @Test
+ @SuppressWarnings("ResultOfMethodCallIgnored")
+ public void testDeleteLoggedStoreOnNoCheckpoints() {
+ // reset the mock to reset the stubs in setup method
+ reset(this.checkpointManager);
+ // redo stubbing to return null checkpoints
+ when(this.checkpointManager.readLastCheckpoint(any())).thenReturn(null);
+ // create store under logged stores to demonstrate deletion
+ final File storeFile = new File(DEFAULT_LOGGED_STORE_BASE_DIR.getPath() +
File.separator + STORE_NAME);
+ // add contents to store
+ final File storeFilePartition = new
File(DEFAULT_LOGGED_STORE_BASE_DIR.getPath() + File.separator + STORE_NAME +
File.separator + "Partition_0");
+ storeFilePartition.deleteOnExit();
+ storeFile.deleteOnExit();
+ try {
+ storeFile.mkdirs();
+ storeFilePartition.createNewFile();
+ Assert.assertTrue("Assert that stores are present prior to the test.",
storeFile.exists());
+ Assert.assertTrue("Assert that store files are present prior to the
test.", storeFilePartition.exists());
+ this.containerStorageManager.start();
+ this.containerStorageManager.shutdown();
+ Assert.assertFalse("Assert that stores are deleted after the test.",
storeFile.exists());
+ Assert.assertFalse("Assert that store files are deleted after the
test.", storeFilePartition.exists());
+ } catch (Exception e) {
+ System.out.printf("File %s could not be created.", storeFile);
+ Assert.fail();
+ } finally {
+ storeFilePartition.delete();
+ storeFile.delete();
+ }
+ }
+
@Test
public void testNoConfiguredDurableStores() throws InterruptedException {
taskRestoreMetricGauges = new HashMap<>();