This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f6f6425c7f [IOTDB-6197] Improvements on periodic snapshot guardian 
(#11342)
6f6f6425c7f is described below

commit 6f6f6425c7f044b652f2b53cf834ad1bd84dc9a3
Author: William Song <[email protected]>
AuthorDate: Fri Oct 20 06:00:41 2023 -0500

    [IOTDB-6197] Improvements on periodic snapshot guardian (#11342)
---
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |  31 +++
 .../confignode/conf/ConfigNodeDescriptor.java      |  18 ++
 .../manager/consensus/ConsensusManager.java        |   4 +-
 .../iotdb/confignode/manager/node/NodeManager.java |   5 +
 .../apache/iotdb/consensus/config/RatisConfig.java |  53 +++--
 .../apache/iotdb/consensus/ratis/DiskGuardian.java | 252 +++++++++++++++++++++
 .../iotdb/consensus/ratis/RatisConsensus.java      |  71 ++----
 .../consensus/ratis/utils/RatisLogMonitor.java     |  88 -------
 .../iotdb/consensus/ratis/DiskGuardianTest.java    | 120 ++++++++++
 .../iotdb/consensus/ratis/RatisConsensusTest.java  |   4 +-
 .../apache/iotdb/consensus/ratis/TestUtils.java    |   7 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  19 ++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   4 +
 .../db/consensus/DataRegionConsensusImpl.java      |   4 +-
 .../db/consensus/SchemaRegionConsensusImpl.java    |   4 +-
 .../resources/conf/iotdb-common.properties         |   5 +
 .../src/main/thrift/confignode.thrift              |   3 +
 17 files changed, 529 insertions(+), 163 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 0143a0314fe..db12e430679 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -293,6 +293,10 @@ public class ConfigNodeConfig {
   private long schemaRegionRatisLogMax = 2L * 1024 * 1024 * 1024; // 2G
   private long dataRegionRatisLogMax = 20L * 1024 * 1024 * 1024; // 20G
 
+  private long configNodeRatisPeriodicSnapshotInterval = 60 * 60 * 24L; // 24h
+  private long schemaRegionRatisPeriodicSnapshotInterval = 60 * 60 * 24L; // 
24h
+  private long dataRegionRatisPeriodicSnapshotInterval = 60 * 60 * 24L; // 24h
+
   /** The getOrCreatePartitionTable interface will log new created Partition 
if set true. */
   private boolean isEnablePrintingNewlyCreatedPartition = false;
 
@@ -1201,4 +1205,31 @@ public class ConfigNodeConfig {
     }
     return configMessage.toString();
   }
+
+  public long getConfigNodeRatisPeriodicSnapshotInterval() {
+    return configNodeRatisPeriodicSnapshotInterval;
+  }
+
+  public void setConfigNodeRatisPeriodicSnapshotInterval(
+      long configNodeRatisPeriodicSnapshotInterval) {
+    this.configNodeRatisPeriodicSnapshotInterval = 
configNodeRatisPeriodicSnapshotInterval;
+  }
+
+  public long getSchemaRegionRatisPeriodicSnapshotInterval() {
+    return schemaRegionRatisPeriodicSnapshotInterval;
+  }
+
+  public void setSchemaRegionRatisPeriodicSnapshotInterval(
+      long schemaRegionRatisPeriodicSnapshotInterval) {
+    this.schemaRegionRatisPeriodicSnapshotInterval = 
schemaRegionRatisPeriodicSnapshotInterval;
+  }
+
+  public long getDataRegionRatisPeriodicSnapshotInterval() {
+    return dataRegionRatisPeriodicSnapshotInterval;
+  }
+
+  public void setDataRegionRatisPeriodicSnapshotInterval(
+      long dataRegionRatisPeriodicSnapshotInterval) {
+    this.dataRegionRatisPeriodicSnapshotInterval = 
dataRegionRatisPeriodicSnapshotInterval;
+  }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index 3fc8a75da34..5faadebc128 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -802,6 +802,24 @@ public class ConfigNodeDescriptor {
                     String.valueOf(conf.getDataRegionRatisLogMax()))
                 .trim()));
 
+    conf.setConfigNodeRatisPeriodicSnapshotInterval(
+        Long.parseLong(
+            properties.getProperty(
+                "config_node_ratis_periodic_snapshot_interval",
+                
String.valueOf(conf.getConfigNodeRatisPeriodicSnapshotInterval()).trim())));
+
+    conf.setSchemaRegionRatisPeriodicSnapshotInterval(
+        Long.parseLong(
+            properties.getProperty(
+                "schema_region_ratis_periodic_snapshot_interval",
+                
String.valueOf(conf.getSchemaRegionRatisPeriodicSnapshotInterval()).trim())));
+
+    conf.setDataRegionRatisPeriodicSnapshotInterval(
+        Long.parseLong(
+            properties.getProperty(
+                "data_region_ratis_periodic_snapshot_interval",
+                
String.valueOf(conf.getDataRegionRatisPeriodicSnapshotInterval()).trim())));
+
     conf.setEnablePrintingNewlyCreatedPartition(
         Boolean.parseBoolean(
             properties
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
index b0efb9324f4..0a260f2f91c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
@@ -184,7 +184,9 @@ public class ConsensusManager {
                                       .build())
                               .setImpl(
                                   RatisConfig.Impl.newBuilder()
-                                      
.setTriggerSnapshotFileSize(CONF.getConfigNodeRatisLogMax())
+                                      
.setRaftLogSizeMaxThreshold(CONF.getConfigNodeRatisLogMax())
+                                      .setForceSnapshotInterval(
+                                          
CONF.getConfigNodeRatisPeriodicSnapshotInterval())
                                       .build())
                               .setRead(
                                   RatisConfig.Read.newBuilder()
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 14bb88a1edd..41ef5ca9ce5 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -210,6 +210,11 @@ public class NodeManager {
     ratisConfig.setSchemaRegionRatisLogMax(conf.getSchemaRegionRatisLogMax());
     ratisConfig.setDataRegionRatisLogMax(conf.getDataRegionRatisLogMax());
 
+    ratisConfig.setSchemaRegionPeriodicSnapshotInterval(
+        conf.getSchemaRegionRatisPeriodicSnapshotInterval());
+    ratisConfig.setDataRegionPeriodicSnapshotInterval(
+        conf.getDataRegionRatisPeriodicSnapshotInterval());
+
     dataSet.setRatisConfig(ratisConfig);
   }
 
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
index f55cbb75d22..e8bde5d2252 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
@@ -939,18 +939,22 @@ public class RatisConfig {
     private final int retryTimesMax;
     private final long retryWaitMillis;
 
-    private final long triggerSnapshotTime;
-    private final long triggerSnapshotFileSize;
+    private final long checkAndTakeSnapshotInterval;
+    private final long raftLogSizeMaxThreshold;
+
+    private final long forceSnapshotInterval;
 
     public Impl(
         int retryTimesMax,
         long retryWaitMillis,
-        long triggerSnapshotTime,
-        long triggerSnapshotFileSize) {
+        long checkAndTakeSnapshotInterval,
+        long raftLogSizeMaxThreshold,
+        long forceSnapshotInterval) {
       this.retryTimesMax = retryTimesMax;
       this.retryWaitMillis = retryWaitMillis;
-      this.triggerSnapshotTime = triggerSnapshotTime;
-      this.triggerSnapshotFileSize = triggerSnapshotFileSize;
+      this.checkAndTakeSnapshotInterval = checkAndTakeSnapshotInterval;
+      this.raftLogSizeMaxThreshold = raftLogSizeMaxThreshold;
+      this.forceSnapshotInterval = forceSnapshotInterval;
     }
 
     public int getRetryTimesMax() {
@@ -961,12 +965,16 @@ public class RatisConfig {
       return retryWaitMillis;
     }
 
-    public long getTriggerSnapshotTime() {
-      return triggerSnapshotTime;
+    public long getCheckAndTakeSnapshotInterval() {
+      return checkAndTakeSnapshotInterval;
+    }
+
+    public long getRaftLogSizeMaxThreshold() {
+      return raftLogSizeMaxThreshold;
     }
 
-    public long getTriggerSnapshotFileSize() {
-      return triggerSnapshotFileSize;
+    public long getForceSnapshotInterval() {
+      return forceSnapshotInterval;
     }
 
     public static Impl.Builder newBuilder() {
@@ -979,13 +987,19 @@ public class RatisConfig {
       private long retryWaitMillis = 500;
 
       // 120s
-      private long triggerSnapshotTime = 120;
+      private long checkAndTakeSnapshotInterval = 120;
       // 20GB
-      private long triggerSnapshotFileSize = 20L << 30;
+      private long raftLogSizeMaxThreshold = 20L << 30;
+      // -1L means no force, measured in seconds
+      private long forceSnapshotInterval = -1;
 
       public Impl build() {
         return new Impl(
-            retryTimesMax, retryWaitMillis, triggerSnapshotTime, 
triggerSnapshotFileSize);
+            retryTimesMax,
+            retryWaitMillis,
+            checkAndTakeSnapshotInterval,
+            raftLogSizeMaxThreshold,
+            forceSnapshotInterval);
       }
 
       public Impl.Builder setRetryTimesMax(int retryTimesMax) {
@@ -998,13 +1012,18 @@ public class RatisConfig {
         return this;
       }
 
-      public Impl.Builder setTriggerSnapshotTime(long triggerSnapshotTime) {
-        this.triggerSnapshotTime = triggerSnapshotTime;
+      public Impl.Builder setCheckAndTakeSnapshotInterval(long 
checkAndTakeSnapshotInterval) {
+        this.checkAndTakeSnapshotInterval = checkAndTakeSnapshotInterval;
+        return this;
+      }
+
+      public Impl.Builder setRaftLogSizeMaxThreshold(long 
raftLogSizeMaxThreshold) {
+        this.raftLogSizeMaxThreshold = raftLogSizeMaxThreshold;
         return this;
       }
 
-      public Impl.Builder setTriggerSnapshotFileSize(long 
triggerSnapshotFileSize) {
-        this.triggerSnapshotFileSize = triggerSnapshotFileSize;
+      public Impl.Builder setForceSnapshotInterval(long forceSnapshotInterval) 
{
+        this.forceSnapshotInterval = forceSnapshotInterval;
         return this;
       }
     }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/DiskGuardian.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/DiskGuardian.java
new file mode 100644
index 00000000000..4834d2222b4
--- /dev/null
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/DiskGuardian.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.consensus.config.RatisConfig;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.consensus.ratis.utils.Utils;
+
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.util.MemoizedSupplier;
+import org.apache.ratis.util.TimeDuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * DiskGuardian manages to take snapshots periodically for each Raft Group in 
order to control disk
+ * storage consumption.
+ */
+class DiskGuardian {
+  private static final Logger logger = 
LoggerFactory.getLogger(DiskGuardian.class);
+
+  /**
+   * A summary of Raft Log disk file snapshot. Containing following summary 
statistics:
+   *
+   * <p>
+   * <li>1. Disk space consumed by this group's Raft Logs.
+   * <li>2. How many raft log segments exists.
+   */
+  static final class RaftLogSummary {
+
+    /** whether the path denotes an open segment under active writing progress 
*/
+    private static final Predicate<Path> isOpenSegment =
+        p -> p.toFile().getName().startsWith("log_inprogress");
+
+    private final RaftGroupId gid;
+    private final File raftLogStorageRoot;
+    private long totalSize;
+    private Set<Path> logFiles;
+
+    private RaftLogSummary(RaftGroupId gid, File raftLogStorageRoot) {
+      this.gid = gid;
+      this.raftLogStorageRoot = raftLogStorageRoot;
+      this.totalSize = 0;
+      this.logFiles = Collections.emptySet();
+    }
+
+    public long getTotalSize() {
+      return totalSize;
+    }
+
+    void updateNow() {
+      try (final Stream<Path> files = Files.list(raftLogStorageRoot.toPath())) 
{
+        final Set<Path> latest = 
files.filter(isOpenSegment).collect(Collectors.toSet());
+        this.totalSize += diff(logFiles, latest);
+        this.logFiles = latest;
+      } catch (IOException e) {
+        // keep the files unchanged
+        logger.warn("{}: Error caught when listing files for {} at {}:", this, 
gid, e);
+      }
+    }
+
+    private static long diff(Set<Path> old, Set<Path> latest) {
+      final long incremental = totalSize(latest.stream().filter(p -> 
!old.contains(p)));
+      final long decremental = totalSize(old.stream().filter(p -> 
!latest.contains(p)));
+      return incremental - decremental;
+    }
+
+    private static long totalSize(Stream<Path> files) {
+      return files.mapToLong(p -> p.toFile().length()).sum();
+    }
+
+    @Override
+    public String toString() {
+      return String.format(
+          "[Raft Log Summary]: group=%s, total size=%d, files=%s", gid, 
totalSize, logFiles);
+    }
+  }
+
+  /** use a {@link MemoizedSupplier} here to avoid this reference leak */
+  private final MemoizedSupplier<RatisConsensus> serverRef;
+
+  /** whether a specific RaftGroup need to take snapshot during next check 
period */
+  private final Map<RaftGroupId, AtomicBoolean> snapshotFlag = new 
ConcurrentHashMap<>();
+
+  /** Raft Log disk usage summary statistics */
+  private final Map<RaftGroupId, RaftLogSummary> bookkeeper = new 
ConcurrentHashMap<>();
+
+  /** Registered checkers */
+  private final Map<TimeDuration, List<Predicate<RaftLogSummary>>> 
snapshotArbitrators =
+      new HashMap<>();
+
+  private final ScheduledExecutorService workerThread =
+      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+          ThreadName.RATIS_BG_DISK_GUARDIAN.getName());
+  private final AtomicBoolean isStopped = new AtomicBoolean(false);
+  private final long daemonIntervalMs;
+
+  DiskGuardian(Supplier<RatisConsensus> server, RatisConfig config) {
+    this.serverRef = MemoizedSupplier.valueOf(server);
+    this.daemonIntervalMs = config.getImpl().getCheckAndTakeSnapshotInterval();
+  }
+
+  void start() {
+    // first schedule the snapshot daemon
+    ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+        workerThread, this::snapshotDaemon, 0L, daemonIntervalMs, 
TimeUnit.SECONDS);
+
+    // then schedule all checker daemons
+    snapshotArbitrators.forEach(
+        (interval, checkers) ->
+            ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+                workerThread,
+                () -> checkerDaemon(checkers),
+                0L,
+                interval.toLong(TimeUnit.SECONDS),
+                TimeUnit.SECONDS));
+  }
+
+  void stop() throws InterruptedException {
+    if (isStopped.compareAndSet(true, false)) {
+      workerThread.shutdown();
+      workerThread.awaitTermination(5, TimeUnit.SECONDS);
+    }
+  }
+
+  /** call this method to register checkers before {@link #start()} */
+  void registerChecker(Predicate<RaftLogSummary> checker, TimeDuration 
interval) {
+    final List<Predicate<RaftLogSummary>> checkers =
+        snapshotArbitrators.computeIfAbsent(interval, i -> new 
CopyOnWriteArrayList<>());
+    checkers.add(checker);
+  }
+
+  /** periodically woken up. Will take a snapshot if a flag is set. */
+  private void snapshotDaemon() {
+    if (isStopped.get()) {
+      return;
+    }
+    for (RaftGroupId groupId : serverRef.get().getServer().getGroupIds()) {
+      if (getSnapshotFlag(groupId).get()) {
+        try {
+          
serverRef.get().triggerSnapshot(Utils.fromRaftGroupIdToConsensusGroupId(groupId));
+          final boolean flagCleared = 
snapshotFlag.get(groupId).compareAndSet(true, false);
+          if (!flagCleared) {
+            logger.warn(
+                "{}: clear snapshot flag failed for group {}, please check the 
related implementation",
+                this,
+                groupId);
+          }
+        } catch (ConsensusException e) {
+          logger.warn(
+              "{} take snapshot failed for group {} due to {}. Disk file 
status {}",
+              this,
+              groupId,
+              e,
+              getLatestSummary(groupId).orElse(null));
+        }
+      }
+    }
+  }
+
+  /** periodically woken up. Will check the snapshot condition and set the 
snapshot flag. */
+  private void checkerDaemon(List<Predicate<RaftLogSummary>> checkerList) {
+    if (isStopped.get()) {
+      return;
+    }
+    for (RaftGroupId groupId : serverRef.get().getServer().getGroupIds()) {
+      final Optional<RaftLogSummary> summary = getLatestSummary(groupId);
+      if (summary.isPresent()) {
+        final Optional<Boolean> anyCheckerPositive =
+            checkerList.stream()
+                .map(checker -> checker.test(summary.get()))
+                .filter(Boolean::booleanValue)
+                .findAny();
+        if (anyCheckerPositive.isPresent()) {
+          getSnapshotFlag(groupId).set(true);
+        }
+      }
+    }
+  }
+
+  private AtomicBoolean getSnapshotFlag(RaftGroupId groupId) {
+    return snapshotFlag.computeIfAbsent(groupId, id -> new 
AtomicBoolean(false));
+  }
+
+  private Optional<RaftLogSummary> getLatestSummary(RaftGroupId groupId) {
+    // initialize the RaftLog Summary for the first time
+    final RaftLogSummary summary =
+        bookkeeper.computeIfAbsent(
+            groupId,
+            gid -> {
+              final File root;
+              try {
+                root =
+                    serverRef
+                        .get()
+                        .getServer()
+                        .getDivision(groupId)
+                        .getRaftStorage()
+                        .getStorageDir()
+                        .getCurrentDir();
+                return new RaftLogSummary(gid, root);
+              } catch (IOException e) {
+                logger.warn("{}: group not exists for {} and caught exception 
", this, groupId, e);
+                return null;
+              }
+            });
+
+    if (summary != null) {
+      summary.updateNow();
+    }
+    return Optional.ofNullable(summary);
+  }
+}
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 0bc95247d69..a4bbf19cb8a 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -28,9 +28,6 @@ import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.IClientPoolFactory;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.client.property.ClientPoolProperty;
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.concurrent.ThreadName;
-import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.utils.StatusUtils;
@@ -51,7 +48,6 @@ import 
org.apache.iotdb.consensus.exception.RatisRequestFailedException;
 import org.apache.iotdb.consensus.exception.RatisUnderRecoveryException;
 import org.apache.iotdb.consensus.ratis.metrics.RatisMetricSet;
 import org.apache.iotdb.consensus.ratis.metrics.RatisMetricsManager;
-import org.apache.iotdb.consensus.ratis.utils.RatisLogMonitor;
 import org.apache.iotdb.consensus.ratis.utils.Utils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -82,6 +78,7 @@ import 
org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
 import org.apache.ratis.server.DivisionInfo;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.function.CheckedSupplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -94,7 +91,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -115,6 +111,8 @@ class RatisConsensus implements IConsensus {
 
   private final IClientManager<RaftGroup, RatisClient> clientManager;
 
+  private final DiskGuardian diskGuardian;
+
   private final Map<RaftGroupId, RaftGroup> lastSeen = new 
ConcurrentHashMap<>();
 
   private final ClientId localFakeId = ClientId.randomId();
@@ -125,13 +123,8 @@ class RatisConsensus implements IConsensus {
 
   private static final int DEFAULT_WAIT_LEADER_READY_TIMEOUT = (int) 
TimeUnit.SECONDS.toMillis(20);
 
-  private final ScheduledExecutorService diskGuardian;
-  private final long triggerSnapshotThreshold;
-
   private final RatisConfig config;
 
-  private final RatisLogMonitor monitor = new RatisLogMonitor();
-
   private final RatisMetricSet ratisMetricSet;
   private final TConsensusGroupType consensusGroupType;
 
@@ -153,10 +146,7 @@ class RatisConsensus implements IConsensus {
     this.consensusGroupType = config.getConsensusGroupType();
     this.ratisMetricSet = new RatisMetricSet();
 
-    this.triggerSnapshotThreshold = 
this.config.getImpl().getTriggerSnapshotFileSize();
-    diskGuardian =
-        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
-            ThreadName.RATIS_BG_DISK_GUARDIAN.getName());
+    this.diskGuardian = new DiskGuardian(() -> this, this.config);
 
     clientManager =
         new IClientManager.Factory<RaftGroup, RatisClient>()
@@ -181,14 +171,13 @@ class RatisConsensus implements IConsensus {
   public synchronized void start() throws IOException {
     MetricService.getInstance().addMetricSet(this.ratisMetricSet);
     server.start();
-    startSnapshotGuardian();
+    registerAndStartDiskGuardian();
   }
 
   @Override
   public synchronized void stop() throws IOException {
-    diskGuardian.shutdown();
     try {
-      diskGuardian.awaitTermination(5, TimeUnit.SECONDS);
+      diskGuardian.stop();
     } catch (InterruptedException e) {
       logger.warn("{}: interrupted when shutting down add Executor with 
exception {}", this, e);
       Thread.currentThread().interrupt();
@@ -676,44 +665,22 @@ class RatisConsensus implements IConsensus {
     }
   }
 
-  private void triggerSnapshotByCustomize() {
-
-    for (RaftGroupId raftGroupId : server.getGroupIds()) {
-      File currentDir;
-
-      try {
-        currentDir =
-            
server.getDivision(raftGroupId).getRaftStorage().getStorageDir().getCurrentDir();
-      } catch (IOException e) {
-        logger.warn("{}: get division {} failed: ", this, raftGroupId, e);
-        continue;
-      }
+  private void registerAndStartDiskGuardian() {
+    final RatisConfig.Impl implConfig = config.getImpl();
+    // by default, we control disk space
+    diskGuardian.registerChecker(
+        summary -> summary.getTotalSize() > 
implConfig.getRaftLogSizeMaxThreshold(),
+        TimeDuration.valueOf(implConfig.getCheckAndTakeSnapshotInterval(), 
TimeUnit.SECONDS));
 
-      final long currentDirLength = 
monitor.updateAndGetDirectorySize(currentDir);
-
-      if (currentDirLength >= triggerSnapshotThreshold) {
-        final int filesCount = monitor.getFilesUnder(currentDir).size();
-        logger.info(
-            "{}: take snapshot for region {}, current dir size {}, {} files to 
be purged",
-            this,
-            raftGroupId,
-            currentDirLength,
-            filesCount);
-
-        try {
-          
triggerSnapshot(Utils.fromRaftGroupIdToConsensusGroupId(raftGroupId));
-          logger.info("Raft group {} took snapshot successfully", raftGroupId);
-        } catch (ConsensusException e) {
-          logger.warn("Raft group {} failed to take snapshot due to", 
raftGroupId, e);
-        }
-      }
+    // if we force periodic snapshot
+    final long forceSnapshotInterval = implConfig.getForceSnapshotInterval();
+    if (forceSnapshotInterval > 0) {
+      diskGuardian.registerChecker(
+          summary -> true, // just take it!
+          TimeDuration.valueOf(forceSnapshotInterval, TimeUnit.SECONDS));
     }
-  }
 
-  private void startSnapshotGuardian() {
-    final long delay = config.getImpl().getTriggerSnapshotTime();
-    ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
-        diskGuardian, this::triggerSnapshotByCustomize, 0, delay, 
TimeUnit.SECONDS);
+    diskGuardian.start();
   }
 
   private RaftClientRequest buildRawRequest(
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/RatisLogMonitor.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/RatisLogMonitor.java
deleted file mode 100644
index 4f1b2e1cf72..00000000000
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/RatisLogMonitor.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.consensus.ratis.utils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Set;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/**
- * Monitoring Ratis RaftLog total size. It will memorize the state of all 
files in the last update
- * and calculates the diff incrementally each run.
- */
-public class RatisLogMonitor {
-  private static final Logger logger = 
LoggerFactory.getLogger(RatisLogMonitor.class);
-
-  /* whether the path denotes an open segment under active writing progress */
-  private static final Predicate<Path> isOpenSegment =
-      p -> p.toFile().getName().startsWith("log_inprogress");
-
-  private static final class DirectoryState {
-    private long size = 0;
-    private Set<Path> memorizedFiles = Collections.emptySet();
-
-    private void update(long size, Set<Path> latest) {
-      this.size = size;
-      this.memorizedFiles = latest;
-    }
-  }
-
-  private final HashMap<File, DirectoryState> directoryMap = new HashMap<>();
-
-  public long updateAndGetDirectorySize(File dir) {
-    final DirectoryState state = directoryMap.computeIfAbsent(dir, d -> new 
DirectoryState());
-    Set<Path> latest;
-    try (final Stream<Path> files = Files.list(dir.toPath())) {
-      latest = files.filter(isOpenSegment).collect(Collectors.toSet());
-    } catch (IOException e) {
-      logger.warn("{}: Error caught when listing files under {}:", this, dir, 
e);
-      // keep the files unchanged and return the size calculated last time
-      return state.size;
-    }
-    final long sizeDiff = diff(state.memorizedFiles, latest);
-    final long newSize = state.size + sizeDiff;
-    state.update(newSize, latest);
-    return newSize;
-  }
-
-  public Set<Path> getFilesUnder(File dir) {
-    return Collections.unmodifiableSet(directoryMap.get(dir).memorizedFiles);
-  }
-
-  private static long diff(Set<Path> old, Set<Path> latest) {
-    final long incremental = totalSize(latest.stream().filter(p -> 
!old.contains(p)));
-    final long decremental = totalSize(old.stream().filter(p -> 
!latest.contains(p)));
-    return incremental - decremental;
-  }
-
-  private static long totalSize(Stream<Path> files) {
-    return files.mapToLong(p -> p.toFile().length()).sum();
-  }
-}
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/DiskGuardianTest.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/DiskGuardianTest.java
new file mode 100644
index 00000000000..e4ca6b503c7
--- /dev/null
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/DiskGuardianTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.SchemaRegionId;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.config.RatisConfig;
+import org.apache.iotdb.consensus.ratis.utils.Utils;
+
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+public class DiskGuardianTest {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(DiskGuardian.class);
+
+  private TestUtils.MiniCluster miniCluster;
+
+  private void buildMiniCluster(boolean periodicSnapshot, boolean 
thresholdSnapshot) {
+    final TestUtils.MiniClusterFactory factory = new 
TestUtils.MiniClusterFactory();
+
+    final RatisConfig config =
+        RatisConfig.newBuilder()
+            .setSnapshot(
+                RatisConfig.Snapshot.newBuilder()
+                    .setCreationGap(1L)
+                    .setAutoTriggerEnabled(false)
+                    .build())
+            .setImpl(
+                RatisConfig.Impl.newBuilder()
+                    .setForceSnapshotInterval(periodicSnapshot ? 5 : -1)
+                    .setCheckAndTakeSnapshotInterval(5)
+                    .setRaftLogSizeMaxThreshold(thresholdSnapshot ? 1 : 1024 * 
1024 * 1024)
+                    .build())
+            .build();
+
+    miniCluster = factory.setRatisConfig(config).setGid(new 
SchemaRegionId(1)).create();
+  }
+
+  @After
+  public void tearUp() throws Exception {
+    miniCluster.cleanUp();
+  }
+
+  @Test
+  public void testForceSnapshot() throws Exception {
+    testSnapshotImpl(true, false);
+  }
+
+  @Test
+  public void testThresholdSnapshot() throws Exception {
+    testSnapshotImpl(false, true);
+  }
+
+  private void testSnapshotImpl(boolean periodic, boolean threshold) throws 
Exception {
+    buildMiniCluster(periodic, threshold);
+    miniCluster.start();
+    final ConsensusGroupId gid = miniCluster.getGid();
+    final List<Peer> members = miniCluster.getPeers();
+    for (RatisConsensus s : miniCluster.getServers()) {
+      s.createLocalPeer(gid, members);
+    }
+
+    miniCluster.writeManySerial(0, 10);
+    Assert.assertFalse(hasSnapshot(gid));
+    JavaUtils.attemptUntilTrue(
+        () -> hasSnapshot(gid),
+        3,
+        TimeDuration.valueOf(5, TimeUnit.SECONDS),
+        "should take snapshot",
+        logger);
+    Assert.assertTrue(hasSnapshot(gid));
+  }
+
+  private boolean hasSnapshot(ConsensusGroupId gid) {
+    try {
+      return Objects.requireNonNull(
+                  miniCluster
+                      .getServer(0)
+                      .getServer()
+                      
.getDivision(Utils.fromConsensusGroupIdToRaftGroupId(gid))
+                      .getRaftStorage()
+                      .getStorageDir()
+                      .getStateMachineDir()
+                      .listFiles())
+              .length
+          != 0;
+    } catch (IOException ioe) {
+      Assert.fail("caught IOException:" + ioe);
+      return false; // required by the compiler
+    }
+  }
+}
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
index 5679890a654..d04196523f5 100644
--- 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
@@ -76,8 +76,8 @@ public class RatisConsensusTest {
                   .build())
           .setImpl(
               RatisConfig.Impl.newBuilder()
-                  .setTriggerSnapshotFileSize(1)
-                  .setTriggerSnapshotTime(4)
+                  .setRaftLogSizeMaxThreshold(1)
+                  .setCheckAndTakeSnapshotInterval(4)
                   .build())
           .build();
 
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
index dffde2cf994..64f00266d71 100644
--- 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
@@ -431,7 +431,7 @@ public class TestUtils {
 
   static class MiniClusterFactory {
     private final int replicas = 3;
-    private final ConsensusGroupId gid = new DataRegionId(1);
+    private ConsensusGroupId gid = new DataRegionId(1);
     private final Function<Integer, File> peerStorageProvider =
         peerId -> new File("target" + java.io.File.separator + peerId);
 
@@ -448,6 +448,11 @@ public class TestUtils {
       return this;
     }
 
+    MiniClusterFactory setGid(ConsensusGroupId gid) {
+      this.gid = gid;
+      return this;
+    }
+
     MiniCluster create() {
       return new MiniCluster(gid, replicas, peerStorageProvider, smProvider, 
ratisConfig);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 6d618752442..0cf68f97426 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1046,6 +1046,9 @@ public class IoTDBConfig {
   private long dataRatisLogMax = 20L * 1024 * 1024 * 1024; // 20G
   private long schemaRatisLogMax = 2L * 1024 * 1024 * 1024; // 2G
 
+  private long dataRatisPeriodicSnapshotInterval = 24L * 60 * 60; // 24hr
+  private long schemaRatisPeriodicSnapshotInterval = 24L * 60 * 60; // 24hr
+
   /** whether to enable the audit log * */
   private boolean enableAuditLog = false;
 
@@ -3758,4 +3761,20 @@ public class IoTDBConfig {
   public String getObjectStorageBucket() {
     throw new UnsupportedOperationException("object storage is not supported 
yet");
   }
+
+  public long getDataRatisPeriodicSnapshotInterval() {
+    return dataRatisPeriodicSnapshotInterval;
+  }
+
+  public void setDataRatisPeriodicSnapshotInterval(long 
dataRatisPeriodicSnapshotInterval) {
+    this.dataRatisPeriodicSnapshotInterval = dataRatisPeriodicSnapshotInterval;
+  }
+
+  public long getSchemaRatisPeriodicSnapshotInterval() {
+    return schemaRatisPeriodicSnapshotInterval;
+  }
+
+  public void setSchemaRatisPeriodicSnapshotInterval(long 
schemaRatisPeriodicSnapshotInterval) {
+    this.schemaRatisPeriodicSnapshotInterval = 
schemaRatisPeriodicSnapshotInterval;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index c7cb82dcef3..cb20779d0b7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2176,6 +2176,10 @@ public class IoTDBDescriptor {
 
     conf.setSchemaRatisLogMax(ratisConfig.getSchemaRegionRatisLogMax());
     conf.setDataRatisLogMax(ratisConfig.getDataRegionRatisLogMax());
+
+    conf.setSchemaRatisPeriodicSnapshotInterval(
+        ratisConfig.getSchemaRegionPeriodicSnapshotInterval());
+    
conf.setDataRatisPeriodicSnapshotInterval(ratisConfig.getDataRegionPeriodicSnapshotInterval());
   }
 
   public void loadCQConfig(TCQConfig cqConfig) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index b3b50a2216e..ca8a3e7f524 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -168,7 +168,9 @@ public class DataRegionConsensusImpl {
                                     .build())
                             .setImpl(
                                 RatisConfig.Impl.newBuilder()
-                                    
.setTriggerSnapshotFileSize(CONF.getDataRatisLogMax())
+                                    
.setRaftLogSizeMaxThreshold(CONF.getDataRatisLogMax())
+                                    .setForceSnapshotInterval(
+                                        
CONF.getDataRatisPeriodicSnapshotInterval())
                                     .build())
                             .setLeaderLogAppender(
                                 RatisConfig.LeaderLogAppender.newBuilder()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
index 3fa0ed62ef1..f0c2679f585 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/SchemaRegionConsensusImpl.java
@@ -129,7 +129,9 @@ public class SchemaRegionConsensusImpl {
                                     .build())
                             .setImpl(
                                 RatisConfig.Impl.newBuilder()
-                                    
.setTriggerSnapshotFileSize(CONF.getSchemaRatisLogMax())
+                                    
.setRaftLogSizeMaxThreshold(CONF.getSchemaRatisLogMax())
+                                    .setForceSnapshotInterval(
+                                        
CONF.getSchemaRatisPeriodicSnapshotInterval())
                                     .build())
                             .setLeaderLogAppender(
                                 RatisConfig.LeaderLogAppender.newBuilder()
diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index a01e9b90da4..1291424562b 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -1035,6 +1035,11 @@ timestamp_precision=ms
 # schema_region_ratis_log_max_size = 2147483648
 # data_region_ratis_log_max_size = 21474836480
 
+# Raft periodic snapshot interval, time unit is second
+# config_node_ratis_periodic_snapshot_interval=86400
+# schema_region_ratis_periodic_snapshot_interval=86400
+# data_region_ratis_periodic_snapshot_interval=86400
+
 ####################
 ### Procedure Configuration
 ####################
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift 
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index cfdb46967a7..9ec58497e6e 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -88,6 +88,9 @@ struct TRatisConfig {
 
   31: required i32 schemaRegionGrpcLeaderOutstandingAppendsMax
   32: required i32 schemaRegionLogForceSyncNum
+
+  33: required i64 schemaRegionPeriodicSnapshotInterval
+  34: required i64 dataRegionPeriodicSnapshotInterval
 }
 
 struct TCQConfig {


Reply via email to