This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new cde99e33a4 [IOTDB-3253] Upgrade dependency: Ratis from 2.2.0 to 2.3.0
(#5991)
cde99e33a4 is described below
commit cde99e33a4242b0707e810c69e755b8d9724788e
Author: SzyWilliam <[email protected]>
AuthorDate: Wed May 25 13:25:30 2022 +0800
[IOTDB-3253] Upgrade dependency: Ratis from 2.2.0 to 2.3.0 (#5991)
* upgrade ratis 2.2.0 to 2.3.0
* spotless
* CI bug fix
* CI bug fix
* disable useCache option
* spotless
* fix reviews
---
consensus/pom.xml | 2 +-
.../ratis/ApplicationStateMachineProxy.java | 19 ++++-
.../iotdb/consensus/ratis/RatisConsensus.java | 4 +
.../iotdb/consensus/ratis/SnapshotStorage.java | 87 ++++++++++++++++++++--
.../org/apache/iotdb/consensus/ratis/Utils.java | 7 +-
.../apache/iotdb/consensus/ratis/SnapshotTest.java | 12 ++-
6 files changed, 116 insertions(+), 15 deletions(-)
diff --git a/consensus/pom.xml b/consensus/pom.xml
index 7284a5fee7..e4d873bb19 100644
--- a/consensus/pom.xml
+++ b/consensus/pom.xml
@@ -56,6 +56,6 @@
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
- <ratis.version>2.2.0</ratis.version>
+ <ratis.version>2.3.0</ratis.version>
</properties>
</project>
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
index ab0f65fa38..e6467401a0 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
@@ -86,8 +86,10 @@ public class ApplicationStateMachineProxy extends
BaseStateMachine {
@Override
public void pause() {
- getLifeCycle().transition(LifeCycle.State.PAUSING);
- getLifeCycle().transition(LifeCycle.State.PAUSED);
+ if (getLifeCycleState() == LifeCycle.State.RUNNING) {
+ getLifeCycle().transition(LifeCycle.State.PAUSING);
+ getLifeCycle().transition(LifeCycle.State.PAUSED);
+ }
}
@Override
@@ -142,13 +144,22 @@ public class ApplicationStateMachineProxy extends
BaseStateMachine {
// require the application statemachine to take the latest snapshot
String metadata = Utils.getMetadataFromTermIndex(lastApplied);
File snapshotDir = snapshotStorage.getSnapshotDir(metadata);
+
+ // delete snapshotDir fully in case of last takeSnapshot() crashed
+ FileUtils.deleteFully(snapshotDir);
+
snapshotDir.mkdir();
if (!snapshotDir.isDirectory()) {
logger.error("Unable to create snapshotDir at {}", snapshotDir);
return RaftLog.INVALID_LOG_INDEX;
}
- boolean success = applicationStateMachine.takeSnapshot(snapshotDir);
- if (!success) {
+
+ boolean applicationTakeSnapshotSuccess =
applicationStateMachine.takeSnapshot(snapshotDir);
+ boolean addTermIndexMetafileSuccess =
+ snapshotStorage.addTermIndexMetaFile(snapshotDir, metadata);
+
+ if (!applicationTakeSnapshotSuccess || !addTermIndexMetafileSuccess) {
+ // this takeSnapshot failed, clean up files and directories
// statemachine is supposed to clear snapshotDir on failure
boolean isEmpty = snapshotDir.delete();
if (!isEmpty) {
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 74916fba41..03bca95863 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -116,11 +116,15 @@ class RatisConsensus implements IConsensus {
*/
public RatisConsensus(TEndPoint endpoint, File ratisStorageDir,
IStateMachine.Registry registry)
throws IOException {
+ System.setProperty(
+
"org.apache.ratis.thirdparty.io.netty.allocator.useCacheForAllThreads",
"false");
String address = Utils.IPAddress(endpoint);
myself = Utils.fromTEndPointAndPriorityToRaftPeer(endpoint,
DEFAULT_PRIORITY);
RaftServerConfigKeys.setStorageDir(properties,
Collections.singletonList(ratisStorageDir));
RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(properties, true);
+ // TODO make this configurable so that RatisConsensusTest can trigger
multiple snapshot process
+ // RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold(properties, 20);
RaftServerConfigKeys.Rpc.setSlownessTimeout(
properties, TimeDuration.valueOf(10, TimeUnit.MINUTES));
RaftServerConfigKeys.Rpc.setTimeoutMin(properties, TimeDuration.valueOf(2,
TimeUnit.SECONDS));
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
index 8d6d2224a3..14d3d64590 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/SnapshotStorage.java
@@ -43,17 +43,12 @@ import java.util.Arrays;
import java.util.List;
import java.util.Objects;
-/**
- * TODO: Warning, currently in Ratis 2.2.0, there is a bug in installSnapshot.
In subsequent
- * installSnapshot, a follower may fail to install while the leader assume it
success. This bug will
- * be triggered when the snapshot threshold is low. This is fixed in current
Ratis Master, and
- * hopefully will be introduced in Ratis 2.3.0.
- */
public class SnapshotStorage implements StateMachineStorage {
+ private final Logger logger = LoggerFactory.getLogger(SnapshotStorage.class);
private final IStateMachine applicationStateMachine;
+ private final String META_FILE_PREFIX = ".ratis_meta.";
private File stateMachineDir;
- private final Logger logger = LoggerFactory.getLogger(SnapshotStorage.class);
public SnapshotStorage(IStateMachine applicationStateMachine) {
this.applicationStateMachine = applicationStateMachine;
@@ -87,6 +82,7 @@ public class SnapshotStorage implements StateMachineStorage {
}
public File findLatestSnapshotDir() {
+ moveSnapshotFileToSubDirectory();
Path[] snapshots = getSortedSnapshotDirPaths();
if (snapshots == null || snapshots.length == 0) {
return null;
@@ -141,4 +137,81 @@ public class SnapshotStorage implements
StateMachineStorage {
public File getSnapshotDir(String snapshotMetadata) {
return new File(stateMachineDir.getAbsolutePath() + File.separator +
snapshotMetadata);
}
+
+ /**
+ * Currently, we name the snapshotDir with Term_Index so that we can tell
which directory contains
+ * the latest snapshot files. Unfortunately, when leader install snapshot to
a slow follower,
+ * current Ratis implementation will flatten the directory and place all the
snapshots directly
+ * under statemachine dir. Under this scenario, we cannot restore Term_Index
from directory name.
+ * We decided to add an empty metadata file containing only Term_Index into
the snapshotDir. his
+ * metadata file will be installed along with application snapshot files, so
that Term_Index
+ * information is kept during InstallSnapshot.
+ */
+ public boolean addTermIndexMetaFile(File snapshotDir, String
termIndexMetadata) {
+ File snapshotMetaFile = new File(getMetafilePath(snapshotDir,
termIndexMetadata));
+ try {
+ return snapshotMetaFile.createNewFile();
+ } catch (IOException e) {
+ logger.warn("cannot create snapshot metafile: ", e);
+ return false;
+ }
+ }
+
+ private String getMetafilePath(File snapshotDir, String termIndexMetadata) {
+ // e.g. /_sm/3_39/.ratis_meta.3_39
+ return snapshotDir.getAbsolutePath() + File.separator + META_FILE_PREFIX +
termIndexMetadata;
+ }
+
+ private String getMetafileMatcherRegex() {
+ // meta file should always end with term_index
+ return META_FILE_PREFIX + "\\d+_\\d+$";
+ }
+
+ /**
+ * After leader InstallSnapshot to a slow follower, Ratis will put all
snapshot files directly
+ * under statemachineDir. We need to handle this special scenario and
rearrange these files to
+ * appropriate sub-directory this function will move all snapshot files
directly under /sm to
+ * /sm/term_index/
+ */
+ void moveSnapshotFileToSubDirectory() {
+ File[] potentialMetafile =
+ stateMachineDir.listFiles((dir, name) ->
name.matches(getMetafileMatcherRegex()));
+ if (potentialMetafile == null || potentialMetafile.length == 0) {
+ // the statemachine dir contains no direct metafile
+ return;
+ }
+ String metadata =
potentialMetafile[0].getName().substring(META_FILE_PREFIX.length());
+
+ File snapshotDir = getSnapshotDir(metadata);
+ snapshotDir.mkdir();
+
+ File[] snapshotFiles = stateMachineDir.listFiles();
+
+ // move files to snapshotDir, if an error occurred, delete snapshotDir
+ try {
+ if (snapshotFiles == null) {
+ logger.error(
+ "An unexpected condition triggered. please check implementation "
+ + this.getClass().getName());
+ FileUtils.deleteFully(snapshotDir);
+ return;
+ }
+
+ for (File file : snapshotFiles) {
+ boolean success = file.renameTo(new File(snapshotDir + File.separator
+ file.getName()));
+ if (!success) {
+ logger.warn(
+ "move snapshot file "
+ + file.getAbsolutePath()
+ + " to sub-directory "
+ + snapshotDir.getAbsolutePath()
+ + "failed");
+ FileUtils.deleteFully(snapshotDir);
+ break;
+ }
+ }
+ } catch (IOException e) {
+ logger.warn("delete directory failed: ", e);
+ }
+ }
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
index e1773c7f80..48f8598230 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
@@ -141,8 +141,11 @@ public class Utils {
}
public static TermIndex getTermIndexFromDir(File snapshotDir) {
- String ordinal = snapshotDir.getName();
- String[] items = ordinal.split("_");
+ return getTermIndexFromMetadataString(snapshotDir.getName());
+ }
+
+ public static TermIndex getTermIndexFromMetadataString(String metadata) {
+ String[] items = metadata.split("_");
return TermIndex.valueOf(Long.parseLong(items[0]),
Long.parseLong(items[1]));
}
}
diff --git
a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java
b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java
index e42f981706..9a741a5332 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java
@@ -97,6 +97,7 @@ public class SnapshotTest {
long index = proxy.takeSnapshot();
Assert.assertEquals(index, 616);
Assert.assertTrue(new File(snapshotFilename).exists());
+ Assert.assertTrue(new File(getSnapshotMetaFilename("421_616")).exists());
// take a snapshot at 616-4217
proxy.notifyTermIndexUpdated(616, 4217);
@@ -105,16 +106,25 @@ public class SnapshotTest {
long indexLatest = proxy.takeSnapshot();
Assert.assertEquals(indexLatest, 4217);
Assert.assertTrue(new File(snapshotFilenameLatest).exists());
+ Assert.assertTrue(new File(getSnapshotMetaFilename("616_4217")).exists());
// query the latest snapshot
SnapshotInfo info = proxy.getLatestSnapshot();
Assert.assertEquals(info.getTerm(), 616);
Assert.assertEquals(info.getIndex(), 4217);
-
Assert.assertTrue(info.getFiles().get(0).getPath().endsWith(snapshotFilenameLatest));
// clean up
proxy.getStateMachineStorage().cleanupOldSnapshots(null);
Assert.assertFalse(new File(snapshotFilename).exists());
Assert.assertTrue(new File(snapshotFilenameLatest).exists());
}
+
+ private String getSnapshotMetaFilename(String termIndexMeta) {
+ return testDir.getAbsolutePath()
+ + File.separator
+ + termIndexMeta
+ + File.separator
+ + ".ratis_meta."
+ + termIndexMeta;
+ }
}