This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/native_raft by this push:
     new 7214e62c5be reset window after snapshot installation clear 
serialization cache in  followers fix match term
7214e62c5be is described below

commit 7214e62c5bebef67c4c369c45859be9ed09bc438
Author: Tian Jiang <[email protected]>
AuthorDate: Mon May 22 11:24:05 2023 +0800

    reset window after snapshot installation
    clear serialization cache in  followers
    fix match term
---
 .../org/apache/iotdb/consensus/natraft/protocol/RaftMember.java  | 9 ++++++++-
 .../natraft/protocol/log/appender/SlidingWindowLogAppender.java  | 8 +++++---
 .../natraft/protocol/log/catchup/LogCatchUpInBatchHandler.java   | 5 ++++-
 .../consensus/natraft/protocol/log/manager/RaftLogManager.java   | 4 ++++
 .../log/manager/serialization/SyncLogDequeSerializer.java        | 9 ++++++++-
 .../consensus/natraft/protocol/log/recycle/EntryAllocator.java   | 1 -
 .../iotdb/db/mpp/execution/executor/RegionWriteExecutor.java     | 4 ++--
 7 files changed, 31 insertions(+), 9 deletions(-)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
index 9be05b5dcce..2e6cc318930 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
@@ -237,7 +237,7 @@ public class RaftMember {
         new EntryAllocator<>(config, RequestEntry::new, this::getSafeIndex);
     this.logManager =
         new DirectorySnapshotRaftLogManager(
-            new SyncLogDequeSerializer(groupId, config),
+            new SyncLogDequeSerializer(groupId, config, this),
             new AsyncLogApplier(new BaseApplier(stateMachine, this), name, 
config),
             name,
             stateMachine,
@@ -1085,6 +1085,13 @@ public class RaftMember {
     } finally {
       snapshotApplyLock.unlock();
     }
+    logManager.getLock().writeLock().lock();
+    try {
+      logAppender.reset();
+    } finally {
+      logManager.getLock().writeLock().unlock();
+    }
+
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
 
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
index 07486cb4edb..edd4dbe63cb 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
@@ -165,9 +165,11 @@ public class SlidingWindowLogAppender implements 
LogAppender {
 
   private void moveWindowLeftward(int step) {
     int length = Math.max(windowCapacity - step, 0);
-    System.arraycopy(logWindow, 0, logWindow, step, length);
-    for (int i = 0; i < length; i++) {
-      logWindow[i] = null;
+    if (length > 0) {
+      System.arraycopy(logWindow, 0, logWindow, step, length);
+      for (int i = 0; i < length; i++) {
+        logWindow[i] = null;
+      }
     }
     firstPosPrevIndex = logManager.getLastLogIndex();
   }
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/LogCatchUpInBatchHandler.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/LogCatchUpInBatchHandler.java
index e6892886c81..9a049a168f0 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/LogCatchUpInBatchHandler.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/LogCatchUpInBatchHandler.java
@@ -82,7 +82,10 @@ public class LogCatchUpInBatchHandler implements 
AsyncMethodCallback<AppendEntry
         appendSucceed.notifyAll();
       }
       logger.warn(
-          "{}: Catch-up with {} logs aborted because leadership is lost", 
logs.size(), memberName);
+          "{}: Catch-up with {} logs aborted because leadership is lost, resp: 
{}",
+          logs.size(),
+          memberName,
+          resp);
     }
   }
 
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
index 337cc04e9d1..2b4eb410e8f 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
@@ -702,6 +702,10 @@ public abstract class RaftLogManager {
     } catch (Exception e) {
       return false;
     }
+    if (term == -1) {
+      // the leader is probing for catch-up and the entry must be committed
+      return index <= commitIndex;
+    }
     return t == term;
   }
 
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
index a3a2ea890c2..56b742c825d 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.file.SystemFileFactory;
 import org.apache.iotdb.consensus.natraft.exception.UnknownLogTypeException;
 import org.apache.iotdb.consensus.natraft.protocol.HardState;
 import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
+import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
 import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
 import org.apache.iotdb.consensus.natraft.protocol.log.LogParser;
 import 
org.apache.iotdb.consensus.natraft.protocol.log.manager.serialization.SyncLogDequeSerializer.VersionController.SimpleFileVersionController;
@@ -161,6 +162,7 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
   private RaftConfig config;
   private ICompressor compressor = 
ICompressor.getCompressor(CompressionType.SNAPPY);
   private IUnCompressor unCompressor = 
IUnCompressor.getUnCompressor(CompressionType.SNAPPY);
+  private RaftMember member;
 
   private void initCommonProperties() {
     logDataBuffer = ByteBuffer.allocate(config.getRaftLogBufferSize());
@@ -210,12 +212,13 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
    *
    * <p>build serializer with node id
    */
-  public SyncLogDequeSerializer(ConsensusGroupId groupId, RaftConfig config) {
+  public SyncLogDequeSerializer(ConsensusGroupId groupId, RaftConfig config, 
RaftMember member) {
     this.config = config;
     name = groupId.toString();
     logDir = getLogDir(groupId);
     initCommonProperties();
     initMetaAndLogFiles();
+    this.member = member;
   }
 
   public String getLogDir(ConsensusGroupId groupId) {
@@ -320,6 +323,10 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
       flushLogBuffer(true);
       logDataBuffer.put(logData);
     }
+    // followers only use the cache in persistence, and it is of no use beyond 
this point
+    if (!member.isLeader()) {
+      entry.setSerializationCache(null);
+    }
     lastLogIndex = entry.getCurrLogIndex();
   }
 
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/recycle/EntryAllocator.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/recycle/EntryAllocator.java
index b98cdbde9ae..d08e5e9a5da 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/recycle/EntryAllocator.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/recycle/EntryAllocator.java
@@ -125,7 +125,6 @@ public class EntryAllocator<T extends Entry> {
         + ","
         + "recycleRatio="
         + recycleHitRatio()
-        + ","
         + "}";
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
index 302cd7e3fc2..06735caa5d4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
@@ -235,8 +235,8 @@ public class RegionWriteExecutor {
         if (writeResponse.getStatus() != null) {
           response.setAccepted(
               TSStatusCode.SUCCESS_STATUS.getStatusCode() == 
writeResponse.getStatus().getCode()
-                      || TSStatusCode.WEAKLY_ACCEPTED.getStatusCode()
-                          == writeResponse.getStatus().getCode());
+                  || TSStatusCode.WEAKLY_ACCEPTED.getStatusCode()
+                      == writeResponse.getStatus().getCode());
           if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != 
writeResponse.getStatus().getCode()) {
             response.setMessage(writeResponse.getStatus().message);
             response.setStatus(writeResponse.getStatus());

Reply via email to