This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch cluster_new
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster_new by this push:
     new 16d3ca4  [IOTDB-723] add raft log delete mechanism
     new 30cbbd3  Merge pull request #1339 from neuyilan/cluster_new_IOTDB-723
16d3ca4 is described below

commit 16d3ca4e0ddf0aa29468f140cce780454bbaf400
Author: HouliangQi <[email protected]>
AuthorDate: Tue Jun 9 19:16:47 2020 +0800

    [IOTDB-723] add raft log delete mechanism
---
 .../resources/conf/iotdb-cluster.properties        |  6 ++
 .../apache/iotdb/cluster/config/ClusterConfig.java | 26 ++++++++
 .../iotdb/cluster/config/ClusterDescriptor.java    |  7 +++
 .../cluster/log/manage/CommittedEntryManager.java  |  9 +++
 .../iotdb/cluster/log/manage/RaftLogManager.java   | 60 +++++++++++++++++-
 .../serializable/SyncLogDequeSerializer.java       | 71 ++++++++++++++++++----
 .../cluster/log/manage/RaftLogManagerTest.java     | 48 ++++++++++++++-
 7 files changed, 212 insertions(+), 15 deletions(-)

diff --git a/cluster/src/assembly/resources/conf/iotdb-cluster.properties 
b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
index ef7c26b..97fd987 100644
--- a/cluster/src/assembly/resources/conf/iotdb-cluster.properties
+++ b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
@@ -61,3 +61,9 @@ MAX_REMOVED_LOG_SIZE=134217728
 # whether to use batch append entries in log catch up
 USE_BATCH_IN_CATCH_UP=true
 
+# max number of committed logs to be saved
+MAX_NUMBER_OF_LOGS=100
+
+# deletion check period of the submitted log
+LOG_DELETION_CHECK_INTERVAL_SECOND=3600
+
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 346c372..c0242f8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -53,6 +53,16 @@ public class ClusterConfig {
 
   private boolean useBatchInLogCatchUp = true;
 
+  /**
+   * max number of committed logs to be saved
+   */
+  private int maxNumberOfLogs = 100;
+
+  /**
+   * deletion check period of the submitted log
+   */
+  private int logDeleteCheckIntervalSecond = 3600;
+
   public boolean isUseBatchInLogCatchUp() {
     return useBatchInLogCatchUp;
   }
@@ -148,4 +158,20 @@ public class ClusterConfig {
   public void setQueryTimeoutInSec(int queryTimeoutInSec) {
     this.queryTimeoutInSec = queryTimeoutInSec;
   }
+
+  public int getMaxNumberOfLogs() {
+    return maxNumberOfLogs;
+  }
+
+  public void setMaxNumberOfLogs(int maxNumberOfLogs) {
+    this.maxNumberOfLogs = maxNumberOfLogs;
+  }
+
+  public int getLogDeleteCheckIntervalSecond() {
+    return logDeleteCheckIntervalSecond;
+  }
+
+  public void setLogDeleteCheckIntervalSecond(int 
logDeleteCheckIntervalSecond) {
+    this.logDeleteCheckIntervalSecond = logDeleteCheckIntervalSecond;
+  }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index 2ddab3d..6491d8d 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -216,6 +216,13 @@ public class ClusterDescriptor {
     config.setUseBatchInLogCatchUp(Boolean.parseBoolean(properties.getProperty(
         "USE_BATCH_IN_CATCH_UP", 
String.valueOf(config.isUseBatchInLogCatchUp()))));
 
+    config.setMaxNumberOfLogs(Integer.parseInt(
+        properties.getProperty("MAX_NUMBER_OF_LOGS", 
String.valueOf(config.getMaxNumberOfLogs()))));
+
+    config.setLogDeleteCheckIntervalSecond(Integer.parseInt(properties
+        .getProperty("LOG_DELETION_CHECK_INTERVAL_SECOND",
+            String.valueOf(config.getLogDeleteCheckIntervalSecond()))));
+
     String seedUrls = properties.getProperty("SEED_NODES");
     if (seedUrls != null) {
       List<String> urlList = getSeedUrlList(seedUrls);
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
index ea1166d..bab681d 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
@@ -94,6 +94,15 @@ public class CommittedEntryManager {
   }
 
   /**
+   * Return the entries's size
+   *
+   * @return entries's size
+   */
+  public long getTotalSize() {
+    return getLastIndex() - getFirstIndex() + 1;
+  }
+
+  /**
    * Return the entry's term for given index. Note that the called should 
ensure index <=
    * entries[entries.size()-1].index.
    *
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index 95fe0ad..f4c766a 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -22,6 +22,11 @@ package org.apache.iotdb.cluster.log.manage;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.exception.EntryCompactedException;
 import org.apache.iotdb.cluster.exception.EntryUnavailableException;
 import org.apache.iotdb.cluster.exception.GetEntriesWrongParametersException;
@@ -50,6 +55,22 @@ public class RaftLogManager {
   private long commitIndex;
   private LogApplier logApplier;
 
+
+  private ScheduledExecutorService executorService;
+
+
+  /**
+   * max number of committed logs to be saved
+   */
+  private int maxNumberOfLogs = 
ClusterDescriptor.getInstance().getConfig().getMaxNumberOfLogs();
+
+  /**
+   * deletion check period of the submitted log
+   */
+  private int logDeleteCheckIntervalSecond = 
ClusterDescriptor.getInstance().getConfig()
+      .getLogDeleteCheckIntervalSecond();
+
+
   public RaftLogManager(StableEntryManager stableEntryManager, LogApplier 
applier) {
     this.logApplier = applier;
     this.setCommittedEntryManager(new CommittedEntryManager());
@@ -63,6 +84,14 @@ public class RaftLogManager {
     this.setUnCommittedEntryManager(new UnCommittedEntryManager(last + 1));
     // must have applied entry [compactIndex,last] to state machine
     this.commitIndex = last;
+
+    executorService = new ScheduledThreadPoolExecutor(1,
+        new 
BasicThreadFactory.Builder().namingPattern("raft-log-delete-%d").daemon(true)
+            .build());
+    executorService
+        .scheduleAtFixedRate(this::checkDeleteLog, 
logDeleteCheckIntervalSecond,
+            logDeleteCheckIntervalSecond,
+            TimeUnit.SECONDS);
   }
 
   public Snapshot getSnapshot() {
@@ -364,7 +393,8 @@ public class RaftLogManager {
    *
    * @param newCommitIndex request commitIndex
    */
-  public void commitTo(long newCommitIndex, boolean ignoreExecutionExceptions) 
throws LogExecutionException {
+  public void commitTo(long newCommitIndex, boolean ignoreExecutionExceptions)
+      throws LogExecutionException {
     if (commitIndex < newCommitIndex) {
       long lo = getUnCommittedEntryManager().getFirstUnCommittedIndex();
       long hi = newCommitIndex + 1;
@@ -411,11 +441,12 @@ public class RaftLogManager {
   /**
    * Used by commitTo to apply newly committed entries
    *
-   * @param entries applying entries
+   * @param entries                  applying entries
    * @param ignoreExecutionException when set to true, exceptions during 
applying the logs are
    *                                 ignored, otherwise they are reported to 
the upper level
    */
-  protected void applyEntries(List<Log> entries, boolean 
ignoreExecutionException) throws LogExecutionException {
+  protected void applyEntries(List<Log> entries, boolean 
ignoreExecutionException)
+      throws LogExecutionException {
     for (Log entry : entries) {
       try {
         logApplier.apply(entry);
@@ -488,6 +519,11 @@ public class RaftLogManager {
     this.commitIndex = last;
   }
 
+  @TestOnly
+  public void setMaxNumberOfLogs(int maxNumberOfLogs) {
+    this.maxNumberOfLogs = maxNumberOfLogs;
+  }
+
   public void close() {
     getStableEntryManager().close();
   }
@@ -517,4 +553,22 @@ public class RaftLogManager {
   public void setStableEntryManager(StableEntryManager stableEntryManager) {
     this.stableEntryManager = stableEntryManager;
   }
+
+
+  /**
+   * check whether delete the committed log
+   */
+  public void checkDeleteLog() {
+    if (committedEntryManager.getTotalSize() <= maxNumberOfLogs) {
+      return;
+    }
+    long removeSize = committedEntryManager.getTotalSize() - maxNumberOfLogs;
+    long compactIndex = committedEntryManager.getDummyIndex() + removeSize;
+    try {
+      getCommittedEntryManager().compactEntries(compactIndex);
+      getStableEntryManager().removeCompactedEntries(compactIndex);
+    } catch (EntryUnavailableException e) {
+      logger.error("regular compact log entries failed, error={}", 
e.getMessage());
+    }
+  }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
index 7bf6e66..a6bd7c5 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
@@ -36,6 +36,8 @@ import java.util.Date;
 import java.util.Deque;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
 import org.apache.iotdb.cluster.log.HardState;
@@ -80,6 +82,12 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
   private VersionController versionController;
 
   /**
+   * the lock uses when change the logSizeDeque
+   */
+  private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+
+  /**
    * for log tools
    *
    * @param logPath log dir path
@@ -126,10 +134,12 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
     this.maxRemovedLogSize = maxRemovedLogSize;
   }
 
+  @Override
   public List<Log> getAllEntries() {
     return recoverLog();
   }
 
+  @Override
   public void append(List<Log> entries) {
     Log entry = entries.get(entries.size() - 1);
     meta.setCommitLogIndex(entry.getCurrLogIndex());
@@ -139,17 +149,32 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
     append(entries, meta);
   }
 
+  @Override
   public void setHardStateAndFlush(HardState state) {
     this.state = state;
     serializeMeta(meta);
   }
 
+  @Override
   public HardState getHardState() {
     return state;
   }
 
+  @Override
   public void removeCompactedEntries(long index) {
-    // TODO-Cluster: decide a log discarding policy
+    long distance = meta.getCommitLogIndex() - index;
+    if (distance <= 0) {
+      logger
+          .info("compact ({}) is out of bound lastIndex ({})", index, 
meta.getCommitLogIndex());
+      return;
+    }
+    if (distance > logSizeDeque.size()) {
+      logger.info(
+          "entries before request index ({}) have been compacted", index);
+    }
+
+    int numToRemove = logSizeDeque.size() - (int) distance;
+    removeFirst(numToRemove);
   }
 
   public Deque<Integer> getLogSizeDeque() {
@@ -293,20 +318,32 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
       logger.error("Error in appending log {} ", log, e);
     }
 
-    logSizeDeque.addLast(totalSize);
+    lock.writeLock().lock();
+    try {
+      logSizeDeque.addLast(totalSize);
+    } finally {
+      lock.writeLock().unlock();
+    }
+
     serializeMeta(meta);
+
   }
 
   public void append(List<Log> logs, LogManagerMeta meta) {
     int bufferSize = 0;
     List<ByteBuffer> bufferList = new ArrayList<>(logs.size());
-    for (Log log : logs) {
-      ByteBuffer data = log.serialize();
-      int size = data.capacity() + Integer.BYTES;
-      logSizeDeque.addLast(size);
-      bufferSize += size;
+    lock.writeLock().lock();
+    try {
+      for (Log log : logs) {
+        ByteBuffer data = log.serialize();
+        int size = data.capacity() + Integer.BYTES;
+        logSizeDeque.addLast(size);
+        bufferSize += size;
 
-      bufferList.add(data);
+        bufferList.add(data);
+      }
+    } finally {
+      lock.writeLock().unlock();
     }
 
     ByteBuffer finalBuffer = ByteBuffer.allocate(bufferSize);
@@ -343,9 +380,15 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
     }
 
     int size = 0;
-    for (int i = 0; i < count; i++) {
-      size += logSizeDeque.removeLast();
+    lock.writeLock().lock();
+    try {
+      for (int i = 0; i < count; i++) {
+        size += logSizeDeque.removeLast();
+      }
+    } finally {
+      lock.writeLock().unlock();
     }
+
     // truncate file
     while (size > 0) {
       File currentLogFile = getCurrentLogFile();
@@ -454,7 +497,12 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
       logger.error("Unknown log detected ", e);
     }
 
-    logSizeDeque.addLast(totalSize);
+    lock.writeLock().lock();
+    try {
+      logSizeDeque.addLast(totalSize);
+    } finally {
+      lock.writeLock().unlock();
+    }
 
     return log;
   }
@@ -527,6 +575,7 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
     logger.debug("Serialized log meta into {}", tempMetaFile.getPath());
   }
 
+  @Override
   public void close() {
     try {
       if (currentLogOutputStream != null) {
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/log/manage/RaftLogManagerTest.java
 
b/cluster/src/test/java/org/apache/iotdb/cluster/log/manage/RaftLogManagerTest.java
index bdea25e..a779b1e 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/log/manage/RaftLogManagerTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/log/manage/RaftLogManagerTest.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.cluster.common.TestUtils;
 import org.apache.iotdb.cluster.exception.EntryCompactedException;
 import org.apache.iotdb.cluster.exception.EntryUnavailableException;
 import org.apache.iotdb.cluster.exception.GetEntriesWrongParametersException;
+import org.apache.iotdb.cluster.exception.LogExecutionException;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogApplier;
 import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
@@ -619,7 +620,8 @@ public class RaftLogManagerTest {
       try {
         List<Log> entries = instance.getEntries(1, Integer.MAX_VALUE);
         assertEquals(test.testEntries, entries);
-        assertEquals(test.testOffset, 
instance.getUnCommittedEntryManager().getFirstUnCommittedIndex());
+        assertEquals(test.testOffset,
+            instance.getUnCommittedEntryManager().getFirstUnCommittedIndex());
       } catch (Exception e) {
         fail("An unexpected exception was thrown.");
       } finally {
@@ -1011,4 +1013,48 @@ public class RaftLogManagerTest {
       instance.close();
     }
   }
+
+  @Test
+  public void testCheckDeleteLog() {
+    SyncLogDequeSerializer syncLogDequeSerializer = new 
SyncLogDequeSerializer(testIdentifier);
+    syncLogDequeSerializer.setMaxRemovedLogSize(10);
+
+    CommittedEntryManager committedEntryManager = new CommittedEntryManager();
+    RaftLogManager raftLogManager = new RaftLogManager(committedEntryManager,
+        syncLogDequeSerializer, logApplier);
+
+    int maxNumberOfLogs = 100;
+    List<Log> testLogs1;
+
+    raftLogManager.setMaxNumberOfLogs(maxNumberOfLogs);
+    testLogs1 = TestUtils.prepareNodeLogs(130);
+    raftLogManager.append(testLogs1);
+    try {
+      raftLogManager.commitTo(testLogs1.get(testLogs1.size() - 
1).getCurrLogIndex(), false);
+    } catch (LogExecutionException e) {
+      assertEquals("why failed?", e.toString());
+    }
+
+    assertEquals(130, committedEntryManager.getTotalSize());
+    assertEquals(130, syncLogDequeSerializer.getLogSizeDeque().size());
+
+    raftLogManager.checkDeleteLog();
+
+    assertEquals(maxNumberOfLogs, committedEntryManager.getTotalSize());
+    assertEquals(maxNumberOfLogs, 
syncLogDequeSerializer.getLogSizeDeque().size());
+
+    raftLogManager.close();
+
+    // recovery
+    syncLogDequeSerializer = new SyncLogDequeSerializer(testIdentifier);
+    try {
+      List<Log> logs = syncLogDequeSerializer.recoverLog();
+      assertEquals(maxNumberOfLogs, logs.size());
+      for (int i = 0; i < maxNumberOfLogs; i++) {
+        assertEquals(testLogs1.get(i + 30), logs.get(i));
+      }
+    } finally {
+      syncLogDequeSerializer.close();
+    }
+  }
 }

Reply via email to