This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 81644cc  RATIS-643. Allow Ratis to take a configurable snapshot 
retention policy. Contributed by Aravindan Vijayan.
81644cc is described below

commit 81644ccf3b29524a6139d47a6f0ae035c15efd9f
Author: Shashikant Banerjee <[email protected]>
AuthorDate: Wed Jul 31 14:41:05 2019 +0530

    RATIS-643. Allow Ratis to take a configurable snapshot retention policy. 
Contributed by Aravindan Vijayan.
---
 .../apache/ratis/server/RaftServerConfigKeys.java  | 10 +++
 .../ratis/server/impl/StateMachineUpdater.java     | 10 +++
 .../ratis/statemachine/StateMachineStorage.java    |  2 +
 .../ratis/statemachine/impl/BaseStateMachine.java  |  4 ++
 .../impl/SimpleStateMachineStorage.java            | 49 +++++++++++++-
 .../SnapshotRetentionPolicy.java}                  | 30 ++++-----
 .../ratis/server/storage/TestRaftStorage.java      | 75 ++++++++++++++++++++++
 7 files changed, 162 insertions(+), 18 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java 
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 90a3134..eefadde 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -335,6 +335,16 @@ public interface RaftServerConfigKeys {
     static void setAutoTriggerThreshold(RaftProperties properties, long 
autoTriggerThreshold) {
       setLong(properties::setLong, AUTO_TRIGGER_THRESHOLD_KEY, 
autoTriggerThreshold);
     }
+
+    String RETENTION_POLICY_KEY = PREFIX + ".retention.num.files";
+    int DEFAULT_ALL_SNAPSHOTS_RETAINED = -1;
+    static int snapshotRetentionPolicy(RaftProperties raftProperties) {
+      return getInt(raftProperties::getInt,
+          RETENTION_POLICY_KEY, DEFAULT_ALL_SNAPSHOTS_RETAINED, 
getDefaultLog());
+    }
+    static void setSnapshotRetentionPolicy(RaftProperties properties, int 
numSnapshotFilesRetained) {
+      setInt(properties::setInt, RETENTION_POLICY_KEY, 
numSnapshotFilesRetained);
+    }
   }
 
   /** server rpc timeout related */
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index f0a4c58..d17f10b 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -28,6 +28,7 @@ import org.apache.ratis.server.raftlog.RaftLogIOException;
 import org.apache.ratis.server.raftlog.RaftLogIndex;
 import org.apache.ratis.statemachine.SnapshotInfo;
 import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.statemachine.impl.SnapshotRetentionPolicy;
 import org.apache.ratis.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -73,6 +74,7 @@ class StateMachineUpdater implements Runnable {
   private final RaftLogIndex snapshotIndex;
   private final AtomicReference<Long> stopIndex = new AtomicReference<>();
   private volatile State state = State.RUNNING;
+  private SnapshotRetentionPolicy snapshotRetentionPolicy;
 
   StateMachineUpdater(StateMachine stateMachine, RaftServerImpl server,
       ServerState serverState, long lastAppliedIndex, RaftProperties 
properties) {
@@ -89,6 +91,13 @@ class StateMachineUpdater implements Runnable {
 
     final boolean autoSnapshot = 
RaftServerConfigKeys.Snapshot.autoTriggerEnabled(properties);
     this.autoSnapshotThreshold = autoSnapshot? 
RaftServerConfigKeys.Snapshot.autoTriggerThreshold(properties): null;
+    int numSnapshotFilesRetained = 
RaftServerConfigKeys.Snapshot.snapshotRetentionPolicy(properties);
+    this.snapshotRetentionPolicy = new SnapshotRetentionPolicy() {
+      @Override
+      public int getNumSnapshotsRetained() {
+        return numSnapshotFilesRetained;
+      }
+    };
     updater = new Daemon(this);
   }
 
@@ -233,6 +242,7 @@ class StateMachineUpdater implements Runnable {
             "Bug in StateMachine: snapshot index = " + i + " > appliedIndex = 
" + appliedIndex
             + "; StateMachine class=" +  stateMachine.getClass().getName() + 
", stateMachine=" + stateMachine);
       }
+      
stateMachine.getStateMachineStorage().cleanupOldSnapshots(snapshotRetentionPolicy);
     } catch (IOException e) {
       LOG.error(name + ": Failed to take snapshot", e);
       return;
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java
 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java
index 4f7951a..b96b654 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java
@@ -20,6 +20,7 @@ package org.apache.ratis.statemachine;
 import java.io.IOException;
 
 import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.statemachine.impl.SnapshotRetentionPolicy;
 
 public interface StateMachineStorage {
 
@@ -37,4 +38,5 @@ public interface StateMachineStorage {
 
   void format() throws IOException;
 
+  void cleanupOldSnapshots(SnapshotRetentionPolicy snapshotRetentionPolicy) 
throws IOException;
 }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
index bc8961e..fad9612 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
@@ -149,6 +149,10 @@ public class BaseStateMachine implements StateMachine {
       @Override
       public void format() throws IOException {
       }
+
+      @Override
+      public void cleanupOldSnapshots(SnapshotRetentionPolicy 
snapshotRetentionPolicy) {
+      }
     };
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java
 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java
index 18501c7..af5b0b5 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.statemachine.impl;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.ratis.io.MD5Hash;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.FileInfo;
@@ -33,8 +34,12 @@ import java.io.IOException;
 import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 /**
  * A StateMachineStorage that stores the snapshot in a single file.
@@ -46,7 +51,7 @@ public class SimpleStateMachineStorage implements 
StateMachineStorage {
   static final String SNAPSHOT_FILE_PREFIX = "snapshot";
   static final String CORRUPT_SNAPSHOT_FILE_SUFFIX = ".corrupt";
   /** snapshot.term_index */
-  static final Pattern SNAPSHOT_REGEX =
+  public static final Pattern SNAPSHOT_REGEX =
       Pattern.compile(SNAPSHOT_FILE_PREFIX + "\\.(\\d+)_(\\d+)");
 
   private RaftStorage raftStorage;
@@ -66,6 +71,38 @@ public class SimpleStateMachineStorage implements 
StateMachineStorage {
     // TODO
   }
 
+  @Override
+  public void cleanupOldSnapshots(SnapshotRetentionPolicy 
snapshotRetentionPolicy) throws IOException {
+    if (snapshotRetentionPolicy != null && 
snapshotRetentionPolicy.getNumSnapshotsRetained() > 0) {
+
+      List<SingleFileSnapshotInfo> allSnapshotFiles = new ArrayList<>();
+      try (DirectoryStream<Path> stream =
+               Files.newDirectoryStream(smDir.toPath())) {
+        for (Path path : stream) {
+          Matcher matcher = 
SNAPSHOT_REGEX.matcher(path.getFileName().toString());
+          if (matcher.matches()) {
+            final long endIndex = Long.parseLong(matcher.group(2));
+            final long term = Long.parseLong(matcher.group(1));
+            final FileInfo fileInfo = new FileInfo(path, null); //We don't 
need FileDigest here.
+            allSnapshotFiles.add(new SingleFileSnapshotInfo(fileInfo, term, 
endIndex));
+          }
+        }
+      }
+
+      if (allSnapshotFiles.size() > 
snapshotRetentionPolicy.getNumSnapshotsRetained()) {
+        allSnapshotFiles.sort(new SnapshotFileComparator());
+        List<File> snapshotFilesToBeCleaned = allSnapshotFiles.subList(
+            snapshotRetentionPolicy.getNumSnapshotsRetained(), 
allSnapshotFiles.size()).stream()
+            .map(singleFileSnapshotInfo -> 
singleFileSnapshotInfo.getFile().getPath().toFile())
+            .collect(Collectors.toList());
+        for (File snapshotFile : snapshotFilesToBeCleaned) {
+          LOG.info("Deleting old snapshot at {}", 
snapshotFile.getAbsolutePath());
+          FileUtils.deleteQuietly(snapshotFile);
+        }
+      }
+    }
+  }
+
   public static TermIndex getTermIndexFromSnapshotFile(File file) {
     final String name = file.getName();
     final Matcher m = SNAPSHOT_REGEX.matcher(name);
@@ -137,3 +174,13 @@ public class SimpleStateMachineStorage implements 
StateMachineStorage {
     return smDir;
   }
 }
+
+/**
+ * Compare snapshot files based on transaction indexes.
+ */
+class SnapshotFileComparator implements Comparator<SingleFileSnapshotInfo> {
+  @Override
+  public int compare(SingleFileSnapshotInfo file1, SingleFileSnapshotInfo 
file2) {
+    return (int) (file2.getIndex() - file1.getIndex());
+  }
+}
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java
 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SnapshotRetentionPolicy.java
similarity index 54%
copy from 
ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java
copy to 
ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SnapshotRetentionPolicy.java
index 4f7951a..30e81b3 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SnapshotRetentionPolicy.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,26 +15,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ratis.statemachine;
-
-import java.io.IOException;
 
-import org.apache.ratis.server.storage.RaftStorage;
+package org.apache.ratis.statemachine.impl;
 
-public interface StateMachineStorage {
+/**
+ * Policy for retention of Ratis snapshot files.
+ */
+public interface SnapshotRetentionPolicy {
 
-  void init(RaftStorage raftStorage) throws IOException;
+  int DEFAULT_ALL_SNAPSHOTS_RETAINED = -1;
 
   /**
-   * Returns the information for the latest durable snapshot.
+   * If a retention policy is configured, get the number of recent snapshots to
+   * retain. Default is -1, which means Ratis will retain ALL old snapshots.
+   * @return number of recent snapshots to retain.
    */
-  SnapshotInfo getLatestSnapshot();
-
-  // TODO: StateMachine can decide to compact the files independently of 
concurrent install snapshot
-  // etc requests. We should have ref counting for the SnapshotInfo with a 
release mechanism
-  // so that raft server will release the files after the snapshot file copy 
in case a compaction
-  // is waiting for deleting these files.
-
-  void format() throws IOException;
-
+  default int getNumSnapshotsRetained() {
+    return DEFAULT_ALL_SNAPSHOTS_RETAINED;
+  }
 }
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java 
b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
index 4a26f8c..da17dd2 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
@@ -17,11 +17,16 @@
  */
 package org.apache.ratis.server.storage;
 
+import static 
org.apache.ratis.statemachine.impl.SimpleStateMachineStorage.SNAPSHOT_REGEX;
+
 import org.apache.ratis.BaseTest;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerConstants.StartupOption;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.RaftStorageDirectory.StorageState;
 import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
+import org.apache.ratis.statemachine.impl.SnapshotRetentionPolicy;
 import org.apache.ratis.util.FileUtils;
 import org.junit.After;
 import org.junit.Assert;
@@ -31,7 +36,12 @@ import org.mockito.internal.util.reflection.Whitebox;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.regex.Matcher;
 
 /**
  * Test RaftStorage and RaftStorageDirectory
@@ -203,4 +213,69 @@ public class TestRaftStorage extends BaseTest {
       System.out.println("Good " + iae);
     }
   }
+
+  @Test
+  public void testSnapshotCleanup() throws IOException {
+
+
+    SnapshotRetentionPolicy snapshotRetentionPolicy = new 
SnapshotRetentionPolicy() {
+      @Override
+      public int getNumSnapshotsRetained() {
+        return 3;
+      }
+    };
+
+
+    SimpleStateMachineStorage simpleStateMachineStorage = new 
SimpleStateMachineStorage();
+    RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR);
+    simpleStateMachineStorage.init(storage);
+
+    List<Long> indices = new ArrayList<>();
+
+    //Create 5 snapshot files in storage dir.
+    for (int i = 0; i < 5; i++) {
+      final long term = ThreadLocalRandom.current().nextLong(10L);
+      final long index = ThreadLocalRandom.current().nextLong(1000L);
+      indices.add(index);
+      File file = simpleStateMachineStorage.getSnapshotFile(term, index);
+      file.createNewFile();
+    }
+
+    File stateMachineDir = storage.getStorageDir().getStateMachineDir();
+    Assert.assertTrue(stateMachineDir.listFiles().length == 5);
+    simpleStateMachineStorage.cleanupOldSnapshots(snapshotRetentionPolicy);
+    File[] remainingFiles = stateMachineDir.listFiles();
+    Assert.assertTrue(remainingFiles.length == 3);
+
+    Collections.sort(indices);
+    Collections.reverse(indices);
+    List<Long> remainingIndices = indices.subList(0, 3);
+    for (File file : remainingFiles) {
+      System.out.println(file.getName());
+      Matcher matcher = SNAPSHOT_REGEX.matcher(file.getName());
+      if (matcher.matches()) {
+        
Assert.assertTrue(remainingIndices.contains(Long.parseLong(matcher.group(2))));
+      }
+    }
+
+    // Attempt to clean up again should not delete any more files.
+    simpleStateMachineStorage.cleanupOldSnapshots(snapshotRetentionPolicy);
+    remainingFiles = stateMachineDir.listFiles();
+    Assert.assertTrue(remainingFiles.length == 3);
+
+    //Test with Retention disabled.
+    //Create 2 snapshot files in storage dir.
+    for (int i = 0; i < 2; i++) {
+      final long term = ThreadLocalRandom.current().nextLong(10L);
+      final long index = ThreadLocalRandom.current().nextLong(1000L);
+      indices.add(index);
+      File file = simpleStateMachineStorage.getSnapshotFile(term, index);
+      file.createNewFile();
+    }
+
+    simpleStateMachineStorage.cleanupOldSnapshots(new 
SnapshotRetentionPolicy() {
+    });
+    Assert.assertTrue(stateMachineDir.listFiles().length == 5);
+
+  }
 }

Reply via email to