This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 81644cc RATIS-643. Allow Ratis to take a configurable snapshot
retention policy. Contributed by Aravindan Vijayan.
81644cc is described below
commit 81644ccf3b29524a6139d47a6f0ae035c15efd9f
Author: Shashikant Banerjee <[email protected]>
AuthorDate: Wed Jul 31 14:41:05 2019 +0530
RATIS-643. Allow Ratis to take a configurable snapshot retention policy.
Contributed by Aravindan Vijayan.
---
.../apache/ratis/server/RaftServerConfigKeys.java | 10 +++
.../ratis/server/impl/StateMachineUpdater.java | 10 +++
.../ratis/statemachine/StateMachineStorage.java | 2 +
.../ratis/statemachine/impl/BaseStateMachine.java | 4 ++
.../impl/SimpleStateMachineStorage.java | 49 +++++++++++++-
.../SnapshotRetentionPolicy.java} | 30 ++++-----
.../ratis/server/storage/TestRaftStorage.java | 75 ++++++++++++++++++++++
7 files changed, 162 insertions(+), 18 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 90a3134..eefadde 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -335,6 +335,16 @@ public interface RaftServerConfigKeys {
static void setAutoTriggerThreshold(RaftProperties properties, long
autoTriggerThreshold) {
setLong(properties::setLong, AUTO_TRIGGER_THRESHOLD_KEY,
autoTriggerThreshold);
}
+
+ String RETENTION_POLICY_KEY = PREFIX + ".retention.num.files";
+ int DEFAULT_ALL_SNAPSHOTS_RETAINED = -1;
+ static int snapshotRetentionPolicy(RaftProperties raftProperties) {
+ return getInt(raftProperties::getInt,
+ RETENTION_POLICY_KEY, DEFAULT_ALL_SNAPSHOTS_RETAINED,
getDefaultLog());
+ }
+ static void setSnapshotRetentionPolicy(RaftProperties properties, int
numSnapshotFilesRetained) {
+ setInt(properties::setInt, RETENTION_POLICY_KEY,
numSnapshotFilesRetained);
+ }
}
/** server rpc timeout related */
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index f0a4c58..d17f10b 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -28,6 +28,7 @@ import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.server.raftlog.RaftLogIndex;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.statemachine.impl.SnapshotRetentionPolicy;
import org.apache.ratis.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,6 +74,7 @@ class StateMachineUpdater implements Runnable {
private final RaftLogIndex snapshotIndex;
private final AtomicReference<Long> stopIndex = new AtomicReference<>();
private volatile State state = State.RUNNING;
+ private SnapshotRetentionPolicy snapshotRetentionPolicy;
StateMachineUpdater(StateMachine stateMachine, RaftServerImpl server,
ServerState serverState, long lastAppliedIndex, RaftProperties
properties) {
@@ -89,6 +91,13 @@ class StateMachineUpdater implements Runnable {
final boolean autoSnapshot =
RaftServerConfigKeys.Snapshot.autoTriggerEnabled(properties);
this.autoSnapshotThreshold = autoSnapshot?
RaftServerConfigKeys.Snapshot.autoTriggerThreshold(properties): null;
+ int numSnapshotFilesRetained =
RaftServerConfigKeys.Snapshot.snapshotRetentionPolicy(properties);
+ this.snapshotRetentionPolicy = new SnapshotRetentionPolicy() {
+ @Override
+ public int getNumSnapshotsRetained() {
+ return numSnapshotFilesRetained;
+ }
+ };
updater = new Daemon(this);
}
@@ -233,6 +242,7 @@ class StateMachineUpdater implements Runnable {
"Bug in StateMachine: snapshot index = " + i + " > appliedIndex =
" + appliedIndex
+ "; StateMachine class=" + stateMachine.getClass().getName() +
", stateMachine=" + stateMachine);
}
+
stateMachine.getStateMachineStorage().cleanupOldSnapshots(snapshotRetentionPolicy);
} catch (IOException e) {
LOG.error(name + ": Failed to take snapshot", e);
return;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java
b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java
index 4f7951a..b96b654 100644
---
a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java
+++
b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java
@@ -20,6 +20,7 @@ package org.apache.ratis.statemachine;
import java.io.IOException;
import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.statemachine.impl.SnapshotRetentionPolicy;
public interface StateMachineStorage {
@@ -37,4 +38,5 @@ public interface StateMachineStorage {
void format() throws IOException;
+ void cleanupOldSnapshots(SnapshotRetentionPolicy snapshotRetentionPolicy)
throws IOException;
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
index bc8961e..fad9612 100644
---
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
+++
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
@@ -149,6 +149,10 @@ public class BaseStateMachine implements StateMachine {
@Override
public void format() throws IOException {
}
+
+ @Override
+ public void cleanupOldSnapshots(SnapshotRetentionPolicy
snapshotRetentionPolicy) {
+ }
};
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java
index 18501c7..af5b0b5 100644
---
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java
+++
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.statemachine.impl;
+import org.apache.commons.io.FileUtils;
import org.apache.ratis.io.MD5Hash;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.FileInfo;
@@ -33,8 +34,12 @@ import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
/**
* A StateMachineStorage that stores the snapshot in a single file.
@@ -46,7 +51,7 @@ public class SimpleStateMachineStorage implements
StateMachineStorage {
static final String SNAPSHOT_FILE_PREFIX = "snapshot";
static final String CORRUPT_SNAPSHOT_FILE_SUFFIX = ".corrupt";
/** snapshot.term_index */
- static final Pattern SNAPSHOT_REGEX =
+ public static final Pattern SNAPSHOT_REGEX =
Pattern.compile(SNAPSHOT_FILE_PREFIX + "\\.(\\d+)_(\\d+)");
private RaftStorage raftStorage;
@@ -66,6 +71,38 @@ public class SimpleStateMachineStorage implements
StateMachineStorage {
// TODO
}
+ @Override
+ public void cleanupOldSnapshots(SnapshotRetentionPolicy
snapshotRetentionPolicy) throws IOException {
+ if (snapshotRetentionPolicy != null &&
snapshotRetentionPolicy.getNumSnapshotsRetained() > 0) {
+
+ List<SingleFileSnapshotInfo> allSnapshotFiles = new ArrayList<>();
+ try (DirectoryStream<Path> stream =
+ Files.newDirectoryStream(smDir.toPath())) {
+ for (Path path : stream) {
+ Matcher matcher =
SNAPSHOT_REGEX.matcher(path.getFileName().toString());
+ if (matcher.matches()) {
+ final long endIndex = Long.parseLong(matcher.group(2));
+ final long term = Long.parseLong(matcher.group(1));
+ final FileInfo fileInfo = new FileInfo(path, null); //We don't
need FileDigest here.
+ allSnapshotFiles.add(new SingleFileSnapshotInfo(fileInfo, term,
endIndex));
+ }
+ }
+ }
+
+ if (allSnapshotFiles.size() >
snapshotRetentionPolicy.getNumSnapshotsRetained()) {
+ allSnapshotFiles.sort(new SnapshotFileComparator());
+ List<File> snapshotFilesToBeCleaned = allSnapshotFiles.subList(
+ snapshotRetentionPolicy.getNumSnapshotsRetained(),
allSnapshotFiles.size()).stream()
+ .map(singleFileSnapshotInfo ->
singleFileSnapshotInfo.getFile().getPath().toFile())
+ .collect(Collectors.toList());
+ for (File snapshotFile : snapshotFilesToBeCleaned) {
+ LOG.info("Deleting old snapshot at {}",
snapshotFile.getAbsolutePath());
+ FileUtils.deleteQuietly(snapshotFile);
+ }
+ }
+ }
+ }
+
public static TermIndex getTermIndexFromSnapshotFile(File file) {
final String name = file.getName();
final Matcher m = SNAPSHOT_REGEX.matcher(name);
@@ -137,3 +174,13 @@ public class SimpleStateMachineStorage implements
StateMachineStorage {
return smDir;
}
}
+
+/**
+ * Compare snapshot files based on transaction indexes.
+ */
+class SnapshotFileComparator implements Comparator<SingleFileSnapshotInfo> {
+ @Override
+ public int compare(SingleFileSnapshotInfo file1, SingleFileSnapshotInfo
file2) {
+ return (int) (file2.getIndex() - file1.getIndex());
+ }
+}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SnapshotRetentionPolicy.java
similarity index 54%
copy from
ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java
copy to
ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SnapshotRetentionPolicy.java
index 4f7951a..30e81b3 100644
---
a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java
+++
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SnapshotRetentionPolicy.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,26 +15,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.statemachine;
-
-import java.io.IOException;
-import org.apache.ratis.server.storage.RaftStorage;
+package org.apache.ratis.statemachine.impl;
-public interface StateMachineStorage {
+/**
+ * Policy for retention of Ratis snapshot files.
+ */
+public interface SnapshotRetentionPolicy {
- void init(RaftStorage raftStorage) throws IOException;
+ int DEFAULT_ALL_SNAPSHOTS_RETAINED = -1;
/**
- * Returns the information for the latest durable snapshot.
+ * If a retention policy is configured, get the number of recent snapshots to
+ * retain. Default is -1, which means Ratis will retain ALL old snapshots.
+ * @return number of recent snapshots to retain.
*/
- SnapshotInfo getLatestSnapshot();
-
- // TODO: StateMachine can decide to compact the files independently of
concurrent install snapshot
- // etc requests. We should have ref counting for the SnapshotInfo with a
release mechanism
- // so that raft server will release the files after the snapshot file copy
in case a compaction
- // is waiting for deleting these files.
-
- void format() throws IOException;
-
+ default int getNumSnapshotsRetained() {
+ return DEFAULT_ALL_SNAPSHOTS_RETAINED;
+ }
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
index 4a26f8c..da17dd2 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
@@ -17,11 +17,16 @@
*/
package org.apache.ratis.server.storage;
+import static
org.apache.ratis.statemachine.impl.SimpleStateMachineStorage.SNAPSHOT_REGEX;
+
import org.apache.ratis.BaseTest;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerConstants.StartupOption;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.RaftStorageDirectory.StorageState;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
+import org.apache.ratis.statemachine.impl.SnapshotRetentionPolicy;
import org.apache.ratis.util.FileUtils;
import org.junit.After;
import org.junit.Assert;
@@ -31,7 +36,12 @@ import org.mockito.internal.util.reflection.Whitebox;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.regex.Matcher;
/**
* Test RaftStorage and RaftStorageDirectory
@@ -203,4 +213,69 @@ public class TestRaftStorage extends BaseTest {
System.out.println("Good " + iae);
}
}
+
+ @Test
+ public void testSnapshotCleanup() throws IOException {
+
+
+ SnapshotRetentionPolicy snapshotRetentionPolicy = new
SnapshotRetentionPolicy() {
+ @Override
+ public int getNumSnapshotsRetained() {
+ return 3;
+ }
+ };
+
+
+ SimpleStateMachineStorage simpleStateMachineStorage = new
SimpleStateMachineStorage();
+ RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR);
+ simpleStateMachineStorage.init(storage);
+
+ List<Long> indices = new ArrayList<>();
+
+ //Create 5 snapshot files in storage dir.
+ for (int i = 0; i < 5; i++) {
+ final long term = ThreadLocalRandom.current().nextLong(10L);
+ final long index = ThreadLocalRandom.current().nextLong(1000L);
+ indices.add(index);
+ File file = simpleStateMachineStorage.getSnapshotFile(term, index);
+ file.createNewFile();
+ }
+
+ File stateMachineDir = storage.getStorageDir().getStateMachineDir();
+ Assert.assertTrue(stateMachineDir.listFiles().length == 5);
+ simpleStateMachineStorage.cleanupOldSnapshots(snapshotRetentionPolicy);
+ File[] remainingFiles = stateMachineDir.listFiles();
+ Assert.assertTrue(remainingFiles.length == 3);
+
+ Collections.sort(indices);
+ Collections.reverse(indices);
+ List<Long> remainingIndices = indices.subList(0, 3);
+ for (File file : remainingFiles) {
+ System.out.println(file.getName());
+ Matcher matcher = SNAPSHOT_REGEX.matcher(file.getName());
+ if (matcher.matches()) {
+
Assert.assertTrue(remainingIndices.contains(Long.parseLong(matcher.group(2))));
+ }
+ }
+
+ // Attempt to clean up again should not delete any more files.
+ simpleStateMachineStorage.cleanupOldSnapshots(snapshotRetentionPolicy);
+ remainingFiles = stateMachineDir.listFiles();
+ Assert.assertTrue(remainingFiles.length == 3);
+
+ //Test with Retention disabled.
+ //Create 2 snapshot files in storage dir.
+ for (int i = 0; i < 2; i++) {
+ final long term = ThreadLocalRandom.current().nextLong(10L);
+ final long index = ThreadLocalRandom.current().nextLong(1000L);
+ indices.add(index);
+ File file = simpleStateMachineStorage.getSnapshotFile(term, index);
+ file.createNewFile();
+ }
+
+ simpleStateMachineStorage.cleanupOldSnapshots(new
SnapshotRetentionPolicy() {
+ });
+ Assert.assertTrue(stateMachineDir.listFiles().length == 5);
+
+ }
}