This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6f6f6425c7f [IOTDB-6197] Improvements on periodic snapshot guardian
(#11342)
6f6f6425c7f is described below
commit 6f6f6425c7f044b652f2b53cf834ad1bd84dc9a3
Author: William Song <[email protected]>
AuthorDate: Fri Oct 20 06:00:41 2023 -0500
[IOTDB-6197] Improvements on periodic snapshot guardian (#11342)
---
.../iotdb/confignode/conf/ConfigNodeConfig.java | 31 +++
.../confignode/conf/ConfigNodeDescriptor.java | 18 ++
.../manager/consensus/ConsensusManager.java | 4 +-
.../iotdb/confignode/manager/node/NodeManager.java | 5 +
.../apache/iotdb/consensus/config/RatisConfig.java | 53 +++--
.../apache/iotdb/consensus/ratis/DiskGuardian.java | 252 +++++++++++++++++++++
.../iotdb/consensus/ratis/RatisConsensus.java | 71 ++----
.../consensus/ratis/utils/RatisLogMonitor.java | 88 -------
.../iotdb/consensus/ratis/DiskGuardianTest.java | 120 ++++++++++
.../iotdb/consensus/ratis/RatisConsensusTest.java | 4 +-
.../apache/iotdb/consensus/ratis/TestUtils.java | 7 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 19 ++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 4 +
.../db/consensus/DataRegionConsensusImpl.java | 4 +-
.../db/consensus/SchemaRegionConsensusImpl.java | 4 +-
.../resources/conf/iotdb-common.properties | 5 +
.../src/main/thrift/confignode.thrift | 3 +
17 files changed, 529 insertions(+), 163 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 0143a0314fe..db12e430679 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -293,6 +293,10 @@ public class ConfigNodeConfig {
private long schemaRegionRatisLogMax = 2L * 1024 * 1024 * 1024; // 2G
private long dataRegionRatisLogMax = 20L * 1024 * 1024 * 1024; // 20G
+ private long configNodeRatisPeriodicSnapshotInterval = 60 * 60 * 24L; // 24h
+ private long schemaRegionRatisPeriodicSnapshotInterval = 60 * 60 * 24L; //
24h
+ private long dataRegionRatisPeriodicSnapshotInterval = 60 * 60 * 24L; // 24h
+
/** The getOrCreatePartitionTable interface will log new created Partition
if set true. */
private boolean isEnablePrintingNewlyCreatedPartition = false;
@@ -1201,4 +1205,31 @@ public class ConfigNodeConfig {
}
return configMessage.toString();
}
+
+ public long getConfigNodeRatisPeriodicSnapshotInterval() {
+ return configNodeRatisPeriodicSnapshotInterval;
+ }
+
+ public void setConfigNodeRatisPeriodicSnapshotInterval(
+ long configNodeRatisPeriodicSnapshotInterval) {
+ this.configNodeRatisPeriodicSnapshotInterval =
configNodeRatisPeriodicSnapshotInterval;
+ }
+
+ public long getSchemaRegionRatisPeriodicSnapshotInterval() {
+ return schemaRegionRatisPeriodicSnapshotInterval;
+ }
+
+ public void setSchemaRegionRatisPeriodicSnapshotInterval(
+ long schemaRegionRatisPeriodicSnapshotInterval) {
+ this.schemaRegionRatisPeriodicSnapshotInterval =
schemaRegionRatisPeriodicSnapshotInterval;
+ }
+
+ public long getDataRegionRatisPeriodicSnapshotInterval() {
+ return dataRegionRatisPeriodicSnapshotInterval;
+ }
+
+ public void setDataRegionRatisPeriodicSnapshotInterval(
+ long dataRegionRatisPeriodicSnapshotInterval) {
+ this.dataRegionRatisPeriodicSnapshotInterval =
dataRegionRatisPeriodicSnapshotInterval;
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 3fc8a75da34..5faadebc128 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -802,6 +802,24 @@ public class ConfigNodeDescriptor {
String.valueOf(conf.getDataRegionRatisLogMax()))
.trim()));
+ conf.setConfigNodeRatisPeriodicSnapshotInterval(
+ Long.parseLong(
+ properties.getProperty(
+ "config_node_ratis_periodic_snapshot_interval",
+
String.valueOf(conf.getConfigNodeRatisPeriodicSnapshotInterval()).trim())));
+
+ conf.setSchemaRegionRatisPeriodicSnapshotInterval(
+ Long.parseLong(
+ properties.getProperty(
+ "schema_region_ratis_periodic_snapshot_interval",
+
String.valueOf(conf.getSchemaRegionRatisPeriodicSnapshotInterval()).trim())));
+
+ conf.setDataRegionRatisPeriodicSnapshotInterval(
+ Long.parseLong(
+ properties.getProperty(
+ "data_region_ratis_periodic_snapshot_interval",
+
String.valueOf(conf.getDataRegionRatisPeriodicSnapshotInterval()).trim())));
+
conf.setEnablePrintingNewlyCreatedPartition(
Boolean.parseBoolean(
properties
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
index b0efb9324f4..0a260f2f91c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
@@ -184,7 +184,9 @@ public class ConsensusManager {
.build())
.setImpl(
RatisConfig.Impl.newBuilder()
-
.setTriggerSnapshotFileSize(CONF.getConfigNodeRatisLogMax())
+
.setRaftLogSizeMaxThreshold(CONF.getConfigNodeRatisLogMax())
+ .setForceSnapshotInterval(
+
CONF.getConfigNodeRatisPeriodicSnapshotInterval())
.build())
.setRead(
RatisConfig.Read.newBuilder()
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 14bb88a1edd..41ef5ca9ce5 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -210,6 +210,11 @@ public class NodeManager {
ratisConfig.setSchemaRegionRatisLogMax(conf.getSchemaRegionRatisLogMax());
ratisConfig.setDataRegionRatisLogMax(conf.getDataRegionRatisLogMax());
+ ratisConfig.setSchemaRegionPeriodicSnapshotInterval(
+ conf.getSchemaRegionRatisPeriodicSnapshotInterval());
+ ratisConfig.setDataRegionPeriodicSnapshotInterval(
+ conf.getDataRegionRatisPeriodicSnapshotInterval());
+
dataSet.setRatisConfig(ratisConfig);
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
index f55cbb75d22..e8bde5d2252 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
@@ -939,18 +939,22 @@ public class RatisConfig {
private final int retryTimesMax;
private final long retryWaitMillis;
- private final long triggerSnapshotTime;
- private final long triggerSnapshotFileSize;
+ private final long checkAndTakeSnapshotInterval;
+ private final long raftLogSizeMaxThreshold;
+
+ private final long forceSnapshotInterval;
public Impl(
int retryTimesMax,
long retryWaitMillis,
- long triggerSnapshotTime,
- long triggerSnapshotFileSize) {
+ long checkAndTakeSnapshotInterval,
+ long raftLogSizeMaxThreshold,
+ long forceSnapshotInterval) {
this.retryTimesMax = retryTimesMax;
this.retryWaitMillis = retryWaitMillis;
- this.triggerSnapshotTime = triggerSnapshotTime;
- this.triggerSnapshotFileSize = triggerSnapshotFileSize;
+ this.checkAndTakeSnapshotInterval = checkAndTakeSnapshotInterval;
+ this.raftLogSizeMaxThreshold = raftLogSizeMaxThreshold;
+ this.forceSnapshotInterval = forceSnapshotInterval;
}
public int getRetryTimesMax() {
@@ -961,12 +965,16 @@ public class RatisConfig {
return retryWaitMillis;
}
- public long getTriggerSnapshotTime() {
- return triggerSnapshotTime;
+ public long getCheckAndTakeSnapshotInterval() {
+ return checkAndTakeSnapshotInterval;
+ }
+
+ public long getRaftLogSizeMaxThreshold() {
+ return raftLogSizeMaxThreshold;
}
- public long getTriggerSnapshotFileSize() {
- return triggerSnapshotFileSize;
+ public long getForceSnapshotInterval() {
+ return forceSnapshotInterval;
}
public static Impl.Builder newBuilder() {
@@ -979,13 +987,19 @@ public class RatisConfig {
private long retryWaitMillis = 500;
// 120s
- private long triggerSnapshotTime = 120;
+ private long checkAndTakeSnapshotInterval = 120;
// 20GB
- private long triggerSnapshotFileSize = 20L << 30;
+ private long raftLogSizeMaxThreshold = 20L << 30;
+ // -1L means no force, measured in seconds
+ private long forceSnapshotInterval = -1;
public Impl build() {
return new Impl(
- retryTimesMax, retryWaitMillis, triggerSnapshotTime,
triggerSnapshotFileSize);
+ retryTimesMax,
+ retryWaitMillis,
+ checkAndTakeSnapshotInterval,
+ raftLogSizeMaxThreshold,
+ forceSnapshotInterval);
}
public Impl.Builder setRetryTimesMax(int retryTimesMax) {
@@ -998,13 +1012,18 @@ public class RatisConfig {
return this;
}
- public Impl.Builder setTriggerSnapshotTime(long triggerSnapshotTime) {
- this.triggerSnapshotTime = triggerSnapshotTime;
+ public Impl.Builder setCheckAndTakeSnapshotInterval(long
checkAndTakeSnapshotInterval) {
+ this.checkAndTakeSnapshotInterval = checkAndTakeSnapshotInterval;
+ return this;
+ }
+
+ public Impl.Builder setRaftLogSizeMaxThreshold(long
raftLogSizeMaxThreshold) {
+ this.raftLogSizeMaxThreshold = raftLogSizeMaxThreshold;
return this;
}
- public Impl.Builder setTriggerSnapshotFileSize(long
triggerSnapshotFileSize) {
- this.triggerSnapshotFileSize = triggerSnapshotFileSize;
+ public Impl.Builder setForceSnapshotInterval(long forceSnapshotInterval)
{
+ this.forceSnapshotInterval = forceSnapshotInterval;
return this;
}
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/DiskGuardian.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/DiskGuardian.java
new file mode 100644
index 00000000000..4834d2222b4
--- /dev/null
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/DiskGuardian.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.consensus.config.RatisConfig;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.consensus.ratis.utils.Utils;
+
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.util.MemoizedSupplier;
+import org.apache.ratis.util.TimeDuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * DiskGuardian manages to take snapshots periodically for each Raft Group in
order to control disk
+ * storage consumption.
+ */
+class DiskGuardian {
+ private static final Logger logger =
LoggerFactory.getLogger(DiskGuardian.class);
+
+ /**
+ * A summary of Raft Log disk file snapshot. Containing following summary
statistics:
+ *
+ * <p>
+ * <li>1. Disk space consumed by this group's Raft Logs.
+ * <li>2. How many raft log segments exists.
+ */
+ static final class RaftLogSummary {
+
+ /** whether the path denotes an open segment under active writing progress
*/
+ private static final Predicate<Path> isOpenSegment =
+ p -> p.toFile().getName().startsWith("log_inprogress");
+
+ private final RaftGroupId gid;
+ private final File raftLogStorageRoot;
+ private long totalSize;
+ private Set<Path> logFiles;
+
+ private RaftLogSummary(RaftGroupId gid, File raftLogStorageRoot) {
+ this.gid = gid;
+ this.raftLogStorageRoot = raftLogStorageRoot;
+ this.totalSize = 0;
+ this.logFiles = Collections.emptySet();
+ }
+
+ public long getTotalSize() {
+ return totalSize;
+ }
+
+ void updateNow() {
+ try (final Stream<Path> files = Files.list(raftLogStorageRoot.toPath()))
{
+ final Set<Path> latest =
files.filter(isOpenSegment).collect(Collectors.toSet());
+ this.totalSize += diff(logFiles, latest);
+ this.logFiles = latest;
+ } catch (IOException e) {
+ // keep the files unchanged
+ logger.warn("{}: Error caught when listing files for {} at {}:", this,
gid, e);
+ }
+ }
+
+ private static long diff(Set<Path> old, Set<Path> latest) {
+ final long incremental = totalSize(latest.stream().filter(p ->
!old.contains(p)));
+ final long decremental = totalSize(old.stream().filter(p ->
!latest.contains(p)));
+ return incremental - decremental;
+ }
+
+ private static long totalSize(Stream<Path> files) {
+ return files.mapToLong(p -> p.toFile().length()).sum();
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "[Raft Log Summary]: group=%s, total size=%d, files=%s", gid,
totalSize, logFiles);
+ }
+ }
+
+ /** use a {@link MemoizedSupplier} here to avoid this reference leak */
+ private final MemoizedSupplier<RatisConsensus> serverRef;
+
+ /** whether a specific RaftGroup need to take snapshot during next check
period */
+ private final Map<RaftGroupId, AtomicBoolean> snapshotFlag = new
ConcurrentHashMap<>();
+
+ /** Raft Log disk usage summary statistics */
+ private final Map<RaftGroupId, RaftLogSummary> bookkeeper = new
ConcurrentHashMap<>();
+
+ /** Registered checkers */
+ private final Map<TimeDuration, List<Predicate<RaftLogSummary>>>
snapshotArbitrators =
+ new HashMap<>();
+
+ private final ScheduledExecutorService workerThread =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+ ThreadName.RATIS_BG_DISK_GUARDIAN.getName());
+ private final AtomicBoolean isStopped = new AtomicBoolean(false);
+ private final long daemonIntervalMs;
+
+ DiskGuardian(Supplier<RatisConsensus> server, RatisConfig config) {
+ this.serverRef = MemoizedSupplier.valueOf(server);
+ this.daemonIntervalMs = config.getImpl().getCheckAndTakeSnapshotInterval();
+ }
+
+ void start() {
+ // first schedule the snapshot daemon
+ ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ workerThread, this::snapshotDaemon, 0L, daemonIntervalMs,
TimeUnit.SECONDS);
+
+ // then schedule all checker daemons
+ snapshotArbitrators.forEach(
+ (interval, checkers) ->
+ ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ workerThread,
+ () -> checkerDaemon(checkers),
+ 0L,
+ interval.toLong(TimeUnit.SECONDS),
+ TimeUnit.SECONDS));
+ }
+
+ void stop() throws InterruptedException {
+ if (isStopped.compareAndSet(true, false)) {
+ workerThread.shutdown();
+ workerThread.awaitTermination(5, TimeUnit.SECONDS);
+ }
+ }
+
+ /** call this method to register checkers before {@link #start()} */
+ void registerChecker(Predicate<RaftLogSummary> checker, TimeDuration
interval) {
+ final List<Predicate<RaftLogSummary>> checkers =
+ snapshotArbitrators.computeIfAbsent(interval, i -> new
CopyOnWriteArrayList<>());
+ checkers.add(checker);
+ }
+
+ /** periodically woken up. Will take a snapshot if a flag is set. */
+ private void snapshotDaemon() {
+ if (isStopped.get()) {
+ return;
+ }
+ for (RaftGroupId groupId : serverRef.get().getServer().getGroupIds()) {
+ if (getSnapshotFlag(groupId).get()) {
+ try {
+
serverRef.get().triggerSnapshot(Utils.fromRaftGroupIdToConsensusGroupId(groupId));
+ final boolean flagCleared =
snapshotFlag.get(groupId).compareAndSet(true, false);
+ if (!flagCleared) {
+ logger.warn(
+ "{}: clear snapshot flag failed for group {}, please check the
related implementation",
+ this,
+ groupId);
+ }
+ } catch (ConsensusException e) {
+ logger.warn(
+ "{} take snapshot failed for group {} due to {}. Disk file
status {}",
+ this,
+ groupId,
+ e,
+ getLatestSummary(groupId).orElse(null));
+ }
+ }
+ }
+ }
+
+ /** periodically woken up. Will check the snapshot condition and set the
snapshot flag. */
+ private void checkerDaemon(List<Predicate<RaftLogSummary>> checkerList) {
+ if (isStopped.get()) {
+ return;
+ }
+ for (RaftGroupId groupId : serverRef.get().getServer().getGroupIds()) {
+ final Optional<RaftLogSummary> summary = getLatestSummary(groupId);
+ if (summary.isPresent()) {
+ final Optional<Boolean> anyCheckerPositive =
+ checkerList.stream()
+ .map(checker -> checker.test(summary.get()))
+ .filter(Boolean::booleanValue)
+ .findAny();
+ if (anyCheckerPositive.isPresent()) {
+ getSnapshotFlag(groupId).set(true);
+ }
+ }
+ }
+ }
+
+ private AtomicBoolean getSnapshotFlag(RaftGroupId groupId) {
+ return snapshotFlag.computeIfAbsent(groupId, id -> new
AtomicBoolean(false));
+ }
+
+ private Optional<RaftLogSummary> getLatestSummary(RaftGroupId groupId) {
+ // initialize the RaftLog Summary for the first time
+ final RaftLogSummary summary =
+ bookkeeper.computeIfAbsent(
+ groupId,
+ gid -> {
+ final File root;
+ try {
+ root =
+ serverRef
+ .get()
+ .getServer()
+ .getDivision(groupId)
+ .getRaftStorage()
+ .getStorageDir()
+ .getCurrentDir();
+ return new RaftLogSummary(gid, root);
+ } catch (IOException e) {
+ logger.warn("{}: group not exists for {} and caught exception
", this, groupId, e);
+ return null;
+ }
+ });
+
+ if (summary != null) {
+ summary.updateNow();
+ }
+ return Optional.ofNullable(summary);
+ }
+}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 0bc95247d69..a4bbf19cb8a 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -28,9 +28,6 @@ import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.IClientPoolFactory;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.property.ClientPoolProperty;
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.concurrent.ThreadName;
-import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.utils.StatusUtils;
@@ -51,7 +48,6 @@ import
org.apache.iotdb.consensus.exception.RatisRequestFailedException;
import org.apache.iotdb.consensus.exception.RatisUnderRecoveryException;
import org.apache.iotdb.consensus.ratis.metrics.RatisMetricSet;
import org.apache.iotdb.consensus.ratis.metrics.RatisMetricsManager;
-import org.apache.iotdb.consensus.ratis.utils.RatisLogMonitor;
import org.apache.iotdb.consensus.ratis.utils.Utils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -82,6 +78,7 @@ import
org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
import org.apache.ratis.server.DivisionInfo;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.function.CheckedSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -94,7 +91,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -115,6 +111,8 @@ class RatisConsensus implements IConsensus {
private final IClientManager<RaftGroup, RatisClient> clientManager;
+ private final DiskGuardian diskGuardian;
+
private final Map<RaftGroupId, RaftGroup> lastSeen = new
ConcurrentHashMap<>();
private final ClientId localFakeId = ClientId.randomId();
@@ -125,13 +123,8 @@ class RatisConsensus implements IConsensus {
private static final int DEFAULT_WAIT_LEADER_READY_TIMEOUT = (int)
TimeUnit.SECONDS.toMillis(20);
- private final ScheduledExecutorService diskGuardian;
- private final long triggerSnapshotThreshold;
-
private final RatisConfig config;
- private final RatisLogMonitor monitor = new RatisLogMonitor();
-
private final RatisMetricSet ratisMetricSet;
private final TConsensusGroupType consensusGroupType;
@@ -153,10 +146,7 @@ class RatisConsensus implements IConsensus {
this.consensusGroupType = config.getConsensusGroupType();
this.ratisMetricSet = new RatisMetricSet();
- this.triggerSnapshotThreshold =
this.config.getImpl().getTriggerSnapshotFileSize();
- diskGuardian =
- IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
- ThreadName.RATIS_BG_DISK_GUARDIAN.getName());
+ this.diskGuardian = new DiskGuardian(() -> this, this.config);
clientManager =
new IClientManager.Factory<RaftGroup, RatisClient>()
@@ -181,14 +171,13 @@ class RatisConsensus implements IConsensus {
public synchronized void start() throws IOException {
MetricService.getInstance().addMetricSet(this.ratisMetricSet);
server.start();
- startSnapshotGuardian();
+ registerAndStartDiskGuardian();
}
@Override
public synchronized void stop() throws IOException {
- diskGuardian.shutdown();
try {
- diskGuardian.awaitTermination(5, TimeUnit.SECONDS);
+ diskGuardian.stop();
} catch (InterruptedException e) {
logger.warn("{}: interrupted when shutting down add Executor with
exception {}", this, e);
Thread.currentThread().interrupt();
@@ -676,44 +665,22 @@ class RatisConsensus implements IConsensus {
}
}
- private void triggerSnapshotByCustomize() {
-
- for (RaftGroupId raftGroupId : server.getGroupIds()) {
- File currentDir;
-
- try {
- currentDir =
-
server.getDivision(raftGroupId).getRaftStorage().getStorageDir().getCurrentDir();
- } catch (IOException e) {
- logger.warn("{}: get division {} failed: ", this, raftGroupId, e);
- continue;
- }
+ private void registerAndStartDiskGuardian() {
+ final RatisConfig.Impl implConfig = config.getImpl();
+ // by default, we control disk space
+ diskGuardian.registerChecker(
+ summary -> summary.getTotalSize() >
implConfig.getRaftLogSizeMaxThreshold(),
+ TimeDuration.valueOf(implConfig.getCheckAndTakeSnapshotInterval(),
TimeUnit.SECONDS));
- final long currentDirLength =
monitor.updateAndGetDirectorySize(currentDir);
-
- if (currentDirLength >= triggerSnapshotThreshold) {
- final int filesCount = monitor.getFilesUnder(currentDir).size();
- logger.info(
- "{}: take snapshot for region {}, current dir size {}, {} files to
be purged",
- this,
- raftGroupId,
- currentDirLength,
- filesCount);
-
- try {
-
triggerSnapshot(Utils.fromRaftGroupIdToConsensusGroupId(raftGroupId));
- logger.info("Raft group {} took snapshot successfully", raftGroupId);
- } catch (ConsensusException e) {
- logger.warn("Raft group {} failed to take snapshot due to",
raftGroupId, e);
- }
- }
+ // if we force periodic snapshot
+ final long forceSnapshotInterval = implConfig.getForceSnapshotInterval();
+ if (forceSnapshotInterval > 0) {
+ diskGuardian.registerChecker(
+ summary -> true, // just take it!
+ TimeDuration.valueOf(forceSnapshotInterval, TimeUnit.SECONDS));
}
- }
- private void startSnapshotGuardian() {
- final long delay = config.getImpl().getTriggerSnapshotTime();
- ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
- diskGuardian, this::triggerSnapshotByCustomize, 0, delay,
TimeUnit.SECONDS);
+ diskGuardian.start();
}
private RaftClientRequest buildRawRequest(
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/RatisLogMonitor.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/RatisLogMonitor.java
deleted file mode 100644
index 4f1b2e1cf72..00000000000
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/RatisLogMonitor.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.consensus.ratis.utils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Set;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/**
- * Monitoring Ratis RaftLog total size. It will memorize the state of all
files in the last update
- * and calculates the diff incrementally each run.
- */
-public class RatisLogMonitor {
- private static final Logger logger =
LoggerFactory.getLogger(RatisLogMonitor.class);
-
- /* whether the path denotes an open segment under active writing progress */
- private static final Predicate<Path> isOpenSegment =
- p -> p.toFile().getName().startsWith("log_inprogress");
-
- private static final class DirectoryState {
- private long size = 0;
- private Set<Path> memorizedFiles = Collections.emptySet();
-
- private void update(long size, Set<Path> latest) {
- this.size = size;
- this.memorizedFiles = latest;
- }
- }
-
- private final HashMap<File, DirectoryState> directoryMap = new HashMap<>();
-
- public long updateAndGetDirectorySize(File dir) {
- final DirectoryState state = directoryMap.computeIfAbsent(dir, d -> new
DirectoryState());
- Set<Path> latest;
- try (final Stream<Path> files = Files.list(dir.toPath())) {
- latest = files.filter(isOpenSegment).collect(Collectors.toSet());
- } catch (IOException e) {
- logger.warn("{}: Error caught when listing files under {}:", this, dir,
e);
- // keep the files unchanged and return the size calculated last time
- return state.size;
- }
- final long sizeDiff = diff(state.memorizedFiles, latest);
- final long newSize = state.size + sizeDiff;
- state.update(newSize, latest);
- return newSize;
- }
-
- public Set<Path> getFilesUnder(File dir) {
- return Collections.unmodifiableSet(directoryMap.get(dir).memorizedFiles);
- }
-
- private static long diff(Set<Path> old, Set<Path> latest) {
- final long incremental = totalSize(latest.stream().filter(p ->
!old.contains(p)));
- final long decremental = totalSize(old.stream().filter(p ->
!latest.contains(p)));
- return incremental - decremental;
- }
-
- private static long totalSize(Stream<Path> files) {
- return files.mapToLong(p -> p.toFile().length()).sum();
- }
-}
diff --git
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/DiskGuardianTest.java
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/DiskGuardianTest.java
new file mode 100644
index 00000000000..e4ca6b503c7
--- /dev/null
+++
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/DiskGuardianTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.SchemaRegionId;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.config.RatisConfig;
+import org.apache.iotdb.consensus.ratis.utils.Utils;
+
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+public class DiskGuardianTest {
+
+ private static final Logger logger =
LoggerFactory.getLogger(DiskGuardian.class);
+
+ private TestUtils.MiniCluster miniCluster;
+
+ private void buildMiniCluster(boolean periodicSnapshot, boolean
thresholdSnapshot) {
+ final TestUtils.MiniClusterFactory factory = new
TestUtils.MiniClusterFactory();
+
+ final RatisConfig config =
+ RatisConfig.newBuilder()
+ .setSnapshot(
+ RatisConfig.Snapshot.newBuilder()
+ .setCreationGap(1L)
+ .setAutoTriggerEnabled(false)
+ .build())
+ .setImpl(
+ RatisConfig.Impl.newBuilder()
+ .setForceSnapshotInterval(periodicSnapshot ? 5 : -1)
+ .setCheckAndTakeSnapshotInterval(5)
+ .setRaftLogSizeMaxThreshold(thresholdSnapshot ? 1 : 1024 *
1024 * 1024)
+ .build())
+ .build();
+
+ miniCluster = factory.setRatisConfig(config).setGid(new
SchemaRegionId(1)).create();
+ }
+
+ @After
+ public void tearUp() throws Exception {
+ miniCluster.cleanUp();
+ }
+
+ @Test
+ public void testForceSnapshot() throws Exception {
+ testSnapshotImpl(true, false);
+ }
+
+ @Test
+ public void testThresholdSnapshot() throws Exception {
+ testSnapshotImpl(false, true);
+ }
+
+ private void testSnapshotImpl(boolean periodic, boolean threshold) throws
Exception {
+ buildMiniCluster(periodic, threshold);
+ miniCluster.start();
+ final ConsensusGroupId gid = miniCluster.getGid();
+ final List<Peer> members = miniCluster.getPeers();
+ for (RatisConsensus s : miniCluster.getServers()) {
+ s.createLocalPeer(gid, members);
+ }
+
+ miniCluster.writeManySerial(0, 10);
+ Assert.assertFalse(hasSnapshot(gid));
+ JavaUtils.attemptUntilTrue(
+ () -> hasSnapshot(gid),
+ 3,
+ TimeDuration.valueOf(5, TimeUnit.SECONDS),
+ "should take snapshot",
+ logger);
+ Assert.assertTrue(hasSnapshot(gid));
+ }
+
+ private boolean hasSnapshot(ConsensusGroupId gid) {
+ try {
+ return Objects.requireNonNull(
+ miniCluster
+ .getServer(0)
+ .getServer()
+
.getDivision(Utils.fromConsensusGroupIdToRaftGroupId(gid))
+ .getRaftStorage()
+ .getStorageDir()
+ .getStateMachineDir()
+ .listFiles())
+ .length
+ != 0;
+ } catch (IOException ioe) {
+ Assert.fail("caught IOException:" + ioe);
+ return false; // required by the compiler
+ }
+ }
+}
diff --git
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
index 5679890a654..d04196523f5 100644
---
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
+++
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
@@ -76,8 +76,8 @@ public class RatisConsensusTest {
.build())
.setImpl(
RatisConfig.Impl.newBuilder()
- .setTriggerSnapshotFileSize(1)
- .setTriggerSnapshotTime(4)
+ .setRaftLogSizeMaxThreshold(1)
+ .setCheckAndTakeSnapshotInterval(4)
.build())
.build();
diff --git
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
index dffde2cf994..64f00266d71 100644
---
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
+++
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
@@ -431,7 +431,7 @@ public class TestUtils {
static class MiniClusterFactory {
private final int replicas = 3;
- private final ConsensusGroupId gid = new DataRegionId(1);
+ private ConsensusGroupId gid = new DataRegionId(1);
private final Function<Integer, File> peerStorageProvider =
peerId -> new File("target" + java.io.File.separator + peerId);
@@ -448,6 +448,11 @@ public class TestUtils {
return this;
}
+ MiniClusterFactory setGid(ConsensusGroupId gid) {
+ this.gid = gid;
+ return this;
+ }
+
MiniCluster create() {
return new MiniCluster(gid, replicas, peerStorageProvider, smProvider,
ratisConfig);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 6d618752442..0cf68f97426 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1046,6 +1046,9 @@ public class IoTDBConfig {
private long dataRatisLogMax = 20L * 1024 * 1024 * 1024; // 20G
private long schemaRatisLogMax = 2L * 1024 * 1024 * 1024; // 2G
+ private long dataRatisPeriodicSnapshotInterval = 24L * 60 * 60; // 24hr
+ private long schemaRatisPeriodicSnapshotInterval = 24L * 60 * 60; // 24hr
+
/** whether to enable the audit log * */
private boolean enableAuditLog = false;
@@ -3758,4 +3761,20 @@ public class IoTDBConfig {
public String getObjectStorageBucket() {
throw new UnsupportedOperationException("object storage is not supported
yet");
}
+
+ public long getDataRatisPeriodicSnapshotInterval() {
+ return dataRatisPeriodicSnapshotInterval;
+ }
+
+ public void setDataRatisPeriodicSnapshotInterval(long
dataRatisPeriodicSnapshotInterval) {
+ this.dataRatisPeriodicSnapshotInterval = dataRatisPeriodicSnapshotInterval;
+ }
+
+ public long getSchemaRatisPeriodicSnapshotInterval() {
+ return schemaRatisPeriodicSnapshotInterval;
+ }
+
+ public void setSchemaRatisPeriodicSnapshotInterval(long
schemaRatisPeriodicSnapshotInterval) {
+ this.schemaRatisPeriodicSnapshotInterval =
schemaRatisPeriodicSnapshotInterval;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index c7cb82dcef3..cb20779d0b7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2176,6 +2176,10 @@ public class IoTDBDescriptor {
conf.setSchemaRatisLogMax(ratisConfig.getSchemaRegionRatisLogMax());
conf.setDataRatisLogMax(ratisConfig.getDataRegionRatisLogMax());
+
+ conf.setSchemaRatisPeriodicSnapshotInterval(
+ ratisConfig.getSchemaRegionPeriodicSnapshotInterval());
+
conf.setDataRatisPeriodicSnapshotInterval(ratisConfig.getDataRegionPeriodicSnapshotInterval());
}
public void loadCQConfig(TCQConfig cqConfig) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index b3b50a2216e..ca8a3e7f524 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -168,7 +168,9 @@ public class DataRegionConsensusImpl {
.build())
.setImpl(
RatisConfig.Impl.newBuilder()
-
.setTriggerSnapshotFileSize(CONF.getDataRatisLogMax())
+
.setRaftLogSizeMaxThreshold(CONF.getDataRatisLogMax())
+ .setForceSnapshotInterval(
+
CONF.getDataRatisPeriodicSnapshotInterval())
.build())
.setLeaderLogAppender(
RatisConfig.LeaderLogAppender.newBuilder()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
index 3fa0ed62ef1..f0c2679f585 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
@@ -129,7 +129,9 @@ public class SchemaRegionConsensusImpl {
.build())
.setImpl(
RatisConfig.Impl.newBuilder()
-
.setTriggerSnapshotFileSize(CONF.getSchemaRatisLogMax())
+
.setRaftLogSizeMaxThreshold(CONF.getSchemaRatisLogMax())
+ .setForceSnapshotInterval(
+
CONF.getSchemaRatisPeriodicSnapshotInterval())
.build())
.setLeaderLogAppender(
RatisConfig.LeaderLogAppender.newBuilder()
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index a01e9b90da4..1291424562b 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -1035,6 +1035,11 @@ timestamp_precision=ms
# schema_region_ratis_log_max_size = 2147483648
# data_region_ratis_log_max_size = 21474836480
+# Raft periodic snapshot interval, time unit is second
+# config_node_ratis_periodic_snapshot_interval=86400
+# schema_region_ratis_periodic_snapshot_interval=86400
+# data_region_ratis_periodic_snapshot_interval=86400
+
####################
### Procedure Configuration
####################
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index cfdb46967a7..9ec58497e6e 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -88,6 +88,9 @@ struct TRatisConfig {
31: required i32 schemaRegionGrpcLeaderOutstandingAppendsMax
32: required i32 schemaRegionLogForceSyncNum
+
+ 33: required i64 schemaRegionPeriodicSnapshotInterval
+ 34: required i64 dataRegionPeriodicSnapshotInterval
}
struct TCQConfig {