This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/native_raft by this push:
     new f6973615341 add appendCompressedSingleEntryies
f6973615341 is described below

commit f6973615341f63b3db8b59205c4198839baaf304
Author: Tian Jiang <[email protected]>
AuthorDate: Fri Jun 16 09:39:12 2023 +0800

    add appendCompressedSingleEntryies
---
 .../natraft/client/SyncClientAdaptor.java          |  9 ++
 .../consensus/natraft/protocol/log/Entry.java      | 11 ++-
 .../natraft/protocol/log/EntrySerialization.java   | 97 +++++++++++++++++++++-
 .../protocol/log/dispatch/DispatcherThread.java    | 86 ++++++++++++-------
 .../natraft/service/RaftRPCServiceProcessor.java   | 31 +++++++
 .../iotdb/consensus/natraft/utils/LogUtils.java    | 27 ++++++
 .../thrift-raft/src/main/thrift/raft.thrift        | 13 +++
 7 files changed, 243 insertions(+), 31 deletions(-)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java
index b1977de7d18..50d3e724ae5 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java
@@ -10,6 +10,7 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
 import org.apache.iotdb.consensus.raft.thrift.AppendCompressedEntriesRequest;
+import 
org.apache.iotdb.consensus.raft.thrift.AppendCompressedSingleEntriesRequest;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
 import org.apache.iotdb.consensus.raft.thrift.ExecuteReq;
@@ -92,6 +93,14 @@ public class SyncClientAdaptor {
     return matchTermHandler.getResult(config.getConnectionTimeoutInMS());
   }
 
+  public static AppendEntryResult appendCompressedSingleEntries(
+      AsyncRaftServiceClient client, AppendCompressedSingleEntriesRequest 
request)
+      throws TException, InterruptedException {
+    GenericHandler<AppendEntryResult> matchTermHandler = new 
GenericHandler<>(client.getEndpoint());
+    client.appendCompressedSingleEntries(request, matchTermHandler);
+    return matchTermHandler.getResult(config.getConnectionTimeoutInMS());
+  }
+
   public static TSStatus forceElection(AsyncRaftServiceClient client, 
ConsensusGroupId groupId)
       throws TException, InterruptedException {
     GenericHandler<TSStatus> matchTermHandler = new 
GenericHandler<>(client.getEndpoint());
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
index 17114a1c0f6..b81461c8403 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.consensus.natraft.protocol.log;
 
 import org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
+import org.apache.iotdb.tsfile.compress.ICompressor;
 
 import java.nio.ByteBuffer;
 import java.util.Comparator;
@@ -56,7 +57,7 @@ public abstract class Entry implements Comparable<Entry> {
   public long applyTime;
   public long waitEndTime;
 
-  public EntrySerialization serialization;
+  public EntrySerialization serialization = new EntrySerialization();
 
   public int getDefaultSerializationBufferSize() {
     return DEFAULT_SERIALIZATION_BUFFER_SIZE;
@@ -80,6 +81,10 @@ public abstract class Entry implements Comparable<Entry> {
     return serialization.serialize(this);
   }
 
+  public synchronized ByteBuffer serialize(ICompressor compressor) {
+    return serialization.serialize(this, compressor);
+  }
+
   public abstract void deserialize(ByteBuffer buffer);
 
   public enum Types {
@@ -214,4 +219,8 @@ public abstract class Entry implements Comparable<Entry> {
   public void setVotingEntry(VotingEntry votingEntry) {
     this.votingEntry = votingEntry;
   }
+
+  public EntrySerialization getSerialization() {
+    return serialization;
+  }
 }
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/EntrySerialization.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/EntrySerialization.java
index 3182754ab66..622a81b7867 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/EntrySerialization.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/EntrySerialization.java
@@ -19,13 +19,26 @@
 
 package org.apache.iotdb.consensus.natraft.protocol.log;
 
-import java.nio.ByteBuffer;
 import org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
+import org.apache.iotdb.tsfile.compress.ICompressor;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
 
 public class EntrySerialization {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(EntrySerialization.class);
   private volatile byte[] recycledBuffer;
   private volatile ByteBuffer preSerializationCache;
   private volatile ByteBuffer serializationCache;
+  private volatile ByteBuffer compressionCache;
+  private CompressionType compressionType = CompressionType.UNCOMPRESSED;
+  private int uncompressedSize;
 
   public void preSerialize(Entry entry) {
     if (preSerializationCache != null || serializationCache != null) {
@@ -61,6 +74,77 @@ public class EntrySerialization {
     return serializationCache.slice();
   }
 
+  public ByteBuffer serialize(Entry entry, ICompressor compressor) {
+    ByteBuffer cache = compressionCache;
+    if (cache != null && cache.limit() > 0) {
+      return cache.slice();
+    }
+
+    if (preSerializationCache != null) {
+      ByteBuffer slice = preSerializationCache.slice();
+      slice.position(1);
+      slice.putLong(entry.getCurrLogIndex());
+      slice.putLong(entry.getCurrLogTerm());
+      slice.putLong(entry.getPrevTerm());
+      slice.position(0);
+      serializationCache = slice;
+      preSerializationCache = null;
+    } else {
+      long startTime = Statistic.SERIALIZE_ENTRY.getOperationStartTime();
+      ByteBuffer byteBuffer = entry.serializeInternal(recycledBuffer);
+      Statistic.SERIALIZE_ENTRY.calOperationCostTimeFromStart(startTime);
+      serializationCache = byteBuffer;
+    }
+    compressSerializedCache(compressor);
+    entry.setByteSize(compressionCache.remaining());
+    return compressionCache.slice();
+  }
+
+  public static void main(String[] args) throws IOException {
+    byte[] test = "tetetagsahfdkjhxcvjboi".getBytes();
+    ByteBuffer testBuffer = ByteBuffer.wrap(test);
+    ICompressor compressor = ICompressor.getCompressor(CompressionType.LZ4);
+    int maxBytesForCompression = 
compressor.getMaxBytesForCompression(test.length);
+    ByteBuffer compressed = ByteBuffer.allocate(maxBytesForCompression);
+    int compressLength = compressor.compress(testBuffer, compressed);
+    compressed.position(0);
+    compressed.limit(compressLength);
+
+    IUnCompressor unCompressor = 
IUnCompressor.getUnCompressor(CompressionType.LZ4);
+    ByteBuffer uncompressed = ByteBuffer.allocate(test.length);
+    unCompressor.uncompress(compressed, uncompressed);
+  }
+
+  private void compressSerializedCache(ICompressor compressor) {
+    long startTime = 
Statistic.RAFT_SENDER_COMPRESS_LOG.getOperationStartTime();
+    int uncompressedSize = serializationCache.remaining();
+    Statistic.LOG_DISPATCHER_RAW_SIZE.add(uncompressedSize);
+
+    this.uncompressedSize = uncompressedSize;
+    int maxBytesForCompression = 
compressor.getMaxBytesForCompression(uncompressedSize);
+    if (compressionCache == null || compressionCache.remaining() < 
maxBytesForCompression) {
+      compressionCache = ByteBuffer.allocate(maxBytesForCompression);
+    }
+    try {
+      int compressedLength =
+          compressor.compress(
+              serializationCache.array(),
+              serializationCache.arrayOffset() + serializationCache.position(),
+              uncompressedSize,
+              compressionCache.array());
+      Statistic.LOG_DISPATCHER_COMPRESSED_SIZE.add(compressedLength);
+      compressionCache.position(0);
+      compressionCache.limit(compressedLength);
+
+      this.compressionType = compressor.getType();
+    } catch (IOException e) {
+      logger.warn("Cannot compress entry", e);
+      this.compressionType = CompressionType.UNCOMPRESSED;
+    }
+
+    
Statistic.RAFT_SENDER_COMPRESS_LOG.calOperationCostTimeFromStart(startTime);
+  }
+
   public long serializedSize() {
     ByteBuffer cache;
     if ((cache = serializationCache) != null) {
@@ -80,6 +164,9 @@ public class EntrySerialization {
       recycledBuffer = serializationCache.array();
       serializationCache = null;
     }
+    if (compressionCache != null) {
+      compressionCache.limit(0);
+    }
   }
 
   public byte[] getRecycledBuffer() {
@@ -102,7 +189,15 @@ public class EntrySerialization {
     return serializationCache;
   }
 
+  public CompressionType getCompressionType() {
+    return compressionType;
+  }
+
   public void setSerializationCache(ByteBuffer serializationCache) {
     this.serializationCache = serializationCache;
   }
+
+  public int getUncompressedSize() {
+    return uncompressedSize;
+  }
 }
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
index 23e8ee3e723..0d421b72cd4 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol.Flow
 import org.apache.iotdb.consensus.natraft.utils.LogUtils;
 import org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
 import org.apache.iotdb.consensus.raft.thrift.AppendCompressedEntriesRequest;
+import 
org.apache.iotdb.consensus.raft.thrift.AppendCompressedSingleEntriesRequest;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
 import org.apache.iotdb.tsfile.compress.ICompressor;
@@ -81,7 +82,6 @@ abstract class DispatcherThread extends DynamicThread {
           logger.debug("Sending {} logs to {}", currBatch.size(), receiver);
         }
 
-        serializeEntries();
         if (!logDispatcher.queueOrdered) {
           currBatch.sort(Comparator.comparingLong(s -> 
s.getEntry().getCurrLogIndex()));
         }
@@ -101,20 +101,31 @@ abstract class DispatcherThread extends DynamicThread {
     }
   }
 
-  protected void serializeEntries() throws InterruptedException {
-    for (VotingEntry request : currBatch) {
-      ByteBuffer serialized = request.getEntry().serialize();
-      request.getEntry().setByteSize(serialized.remaining());
+  private void appendEntriesAsync(AppendEntriesRequest request, 
List<VotingEntry> currBatch) {
+    AsyncMethodCallback<AppendEntryResult> handler = new 
AppendEntriesHandler(currBatch);
+    AsyncRaftServiceClient client = 
logDispatcher.member.getClient(receiver.getEndpoint());
+    try {
+      long startTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
+      AppendEntryResult appendEntryResult = 
SyncClientAdaptor.appendEntries(client, request);
+      Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(startTime);
+      if (appendEntryResult != null) {
+        handler.onComplete(appendEntryResult);
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    } catch (Exception e) {
+      handler.onError(e);
     }
   }
 
   private void appendEntriesAsync(
-      List<ByteBuffer> logList, AppendEntriesRequest request, 
List<VotingEntry> currBatch) {
+      AppendCompressedEntriesRequest request, List<VotingEntry> currBatch) {
     AsyncMethodCallback<AppendEntryResult> handler = new 
AppendEntriesHandler(currBatch);
     AsyncRaftServiceClient client = 
logDispatcher.member.getClient(receiver.getEndpoint());
     try {
       long startTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
-      AppendEntryResult appendEntryResult = 
SyncClientAdaptor.appendEntries(client, request);
+      AppendEntryResult appendEntryResult =
+          SyncClientAdaptor.appendCompressedEntries(client, request);
       Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(startTime);
       if (appendEntryResult != null) {
         handler.onComplete(appendEntryResult);
@@ -124,25 +135,16 @@ abstract class DispatcherThread extends DynamicThread {
     } catch (Exception e) {
       handler.onError(e);
     }
-    if (logger.isDebugEnabled()) {
-      logger.debug(
-          "{}: append entries {} with {} logs",
-          logDispatcher.member.getName(),
-          receiver,
-          logList.size());
-    }
   }
 
   private void appendEntriesAsync(
-      List<ByteBuffer> logList,
-      AppendCompressedEntriesRequest request,
-      List<VotingEntry> currBatch) {
+      AppendCompressedSingleEntriesRequest request, List<VotingEntry> 
currBatch) {
     AsyncMethodCallback<AppendEntryResult> handler = new 
AppendEntriesHandler(currBatch);
     AsyncRaftServiceClient client = 
logDispatcher.member.getClient(receiver.getEndpoint());
     try {
       long startTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
       AppendEntryResult appendEntryResult =
-          SyncClientAdaptor.appendCompressedEntries(client, request);
+          SyncClientAdaptor.appendCompressedSingleEntries(client, request);
       Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(startTime);
       if (appendEntryResult != null) {
         handler.onComplete(appendEntryResult);
@@ -152,13 +154,6 @@ abstract class DispatcherThread extends DynamicThread {
     } catch (Exception e) {
       handler.onError(e);
     }
-    if (logger.isDebugEnabled()) {
-      logger.debug(
-          "{}: append entries {} with {} logs",
-          logDispatcher.member.getName(),
-          receiver,
-          logList.size());
-    }
   }
 
   protected AppendEntriesRequest prepareRequest(List<ByteBuffer> logList) {
@@ -189,6 +184,21 @@ abstract class DispatcherThread extends DynamicThread {
     return request;
   }
 
+  protected AppendCompressedSingleEntriesRequest 
prepareCompressedSingleRequest(
+      List<ByteBuffer> logList, List<Byte> compressionTypes, List<Integer> 
uncompressedSizes) {
+    AppendCompressedSingleEntriesRequest request = new 
AppendCompressedSingleEntriesRequest();
+
+    
request.setGroupId(logDispatcher.member.getRaftGroupId().convertToTConsensusGroupId());
+    request.setLeader(logDispatcher.member.getThisNode().getEndpoint());
+    request.setLeaderId(logDispatcher.member.getThisNode().getNodeId());
+    
request.setLeaderCommit(logDispatcher.member.getLogManager().getCommitLogIndex());
+    request.setTerm(logDispatcher.member.getStatus().getTerm().get());
+    request.setEntries(logList);
+    request.setCompressionTypes(compressionTypes);
+    request.setUncompressedSizes(uncompressedSizes);
+    return request;
+  }
+
   private void sendLogs(List<VotingEntry> currBatch) {
     if (currBatch.isEmpty()) {
       return;
@@ -203,27 +213,45 @@ abstract class DispatcherThread extends DynamicThread {
       long logSize = 0;
       long logSizeLimit = logDispatcher.getConfig().getThriftMaxFrameSize();
       List<ByteBuffer> logList = new ArrayList<>();
+      List<Byte> compressionTypes = new ArrayList<>();
+      List<Integer> uncompressedSizes = new ArrayList<>();
       int prevIndex = logIndex;
 
       for (; logIndex < currBatch.size(); logIndex++) {
         VotingEntry entry = currBatch.get(logIndex);
-        ByteBuffer serialized = entry.getEntry().serialize();
+        ByteBuffer serialized;
+        if (!logDispatcher.enableCompressedDispatching) {
+          serialized = entry.getEntry().serialize();
+        } else {
+          serialized = entry.getEntry().serialize(compressor);
+        }
+
         long curSize = serialized.remaining();
         if (logSizeLimit - curSize - logSize <= 
IoTDBConstant.LEFT_SIZE_IN_REQUEST) {
           break;
         }
         logSize += curSize;
         logList.add(serialized);
+        if (logDispatcher.enableCompressedDispatching) {
+          compressionTypes.add(
+              
entry.getEntry().getSerialization().getCompressionType().serialize());
+          
uncompressedSizes.add(entry.getEntry().getSerialization().getUncompressedSize());
+        }
         
Statistic.LOG_DISPATCHER_FROM_CREATE_TO_SENDING.calOperationCostTimeFromStart(
             entry.getEntry().createTime);
       }
 
       if (!logDispatcher.enableCompressedDispatching && !group.isDelayed()) {
         AppendEntriesRequest appendEntriesRequest = prepareRequest(logList);
-        appendEntriesAsync(logList, appendEntriesRequest, 
currBatch.subList(prevIndex, logIndex));
+        appendEntriesAsync(appendEntriesRequest, currBatch.subList(prevIndex, 
logIndex));
       } else {
-        AppendCompressedEntriesRequest appendEntriesRequest = 
prepareCompressedRequest(logList);
-        appendEntriesAsync(logList, appendEntriesRequest, 
currBatch.subList(prevIndex, logIndex));
+        //        AppendCompressedEntriesRequest appendEntriesRequest =
+        // prepareCompressedRequest(logList);
+        //        appendEntriesAsync(appendEntriesRequest, 
currBatch.subList(prevIndex, logIndex));
+
+        AppendCompressedSingleEntriesRequest appendEntriesRequest =
+            prepareCompressedSingleRequest(logList, compressionTypes, 
uncompressedSizes);
+        appendEntriesAsync(appendEntriesRequest, currBatch.subList(prevIndex, 
logIndex));
       }
 
       if (logDispatcher.getConfig().isUseFollowerLoadBalance()) {
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
index d8b5070a532..3d0daf5b3c6 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.consensus.natraft.utils.IOUtils;
 import org.apache.iotdb.consensus.natraft.utils.LogUtils;
 import org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
 import org.apache.iotdb.consensus.raft.thrift.AppendCompressedEntriesRequest;
+import 
org.apache.iotdb.consensus.raft.thrift.AppendCompressedSingleEntriesRequest;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
 import org.apache.iotdb.consensus.raft.thrift.ElectionRequest;
@@ -182,6 +183,36 @@ public class RaftRPCServiceProcessor implements 
RaftService.AsyncIface {
     
Statistic.RAFT_RECEIVER_APPEND_ENTRY_FULL.calOperationCostTimeFromStart(startTime);
   }
 
+  @Override
+  public void appendCompressedSingleEntries(
+      AppendCompressedSingleEntriesRequest request,
+      AsyncMethodCallback<AppendEntryResult> resultHandler)
+      throws TException {
+    long startTime = 
Statistic.RAFT_RECEIVER_APPEND_ENTRY_FULL.getOperationStartTime();
+    AppendEntriesRequest decompressedRequest = new AppendEntriesRequest();
+    decompressedRequest
+        .setTerm(request.getTerm())
+        .setLeader(request.leader)
+        .setLeaderCommit(request.leaderCommit)
+        .setGroupId(request.groupId)
+        .setLeaderId(request.leaderId);
+
+    try {
+      long compressionStartTime = 
Statistic.RAFT_RECEIVER_DECOMPRESS_ENTRY.getOperationStartTime();
+      List<ByteBuffer> buffers =
+          LogUtils.decompressEntries(
+              request.entries, request.compressionTypes, 
request.uncompressedSizes);
+      decompressedRequest.setEntries(buffers);
+      
Statistic.RAFT_RECEIVER_DECOMPRESS_ENTRY.calOperationCostTimeFromStart(compressionStartTime);
+
+      RaftMember member = getMemberOrCreate(request.groupId, 
decompressedRequest);
+      resultHandler.onComplete(member.appendEntries(decompressedRequest));
+    } catch (UnknownLogTypeException | IOException e) {
+      throw new TException(e);
+    }
+    
Statistic.RAFT_RECEIVER_APPEND_ENTRY_FULL.calOperationCostTimeFromStart(startTime);
+  }
+
   @Override
   public void sendSnapshot(SendSnapshotRequest request, 
AsyncMethodCallback<TSStatus> resultHandler)
       throws TException {
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
index 6e9f3c429a7..21a287f81a2 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
@@ -30,6 +30,7 @@ import 
org.apache.iotdb.consensus.raft.thrift.AppendCompressedEntriesRequest;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntryRequest;
 import org.apache.iotdb.tsfile.compress.ICompressor;
 import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
 import org.slf4j.Logger;
@@ -148,6 +149,32 @@ public class LogUtils {
     return buffers;
   }
 
+  public static List<ByteBuffer> decompressEntries(
+      List<ByteBuffer> buffers, List<Byte> unCompressorTypes, List<Integer> 
uncompressedSizes)
+      throws IOException {
+    List<ByteBuffer> result = new ArrayList<>();
+    for (int i = 0; i < buffers.size(); i++) {
+      ByteBuffer buffer = buffers.get(i);
+      byte[] uncompressed = new byte[uncompressedSizes.get(i)];
+      IUnCompressor unCompressor =
+          
IUnCompressor.getUnCompressor(CompressionType.deserialize(unCompressorTypes.get(i)));
+      try {
+        unCompressor.uncompress(
+            buffer.array(),
+            buffer.arrayOffset() + buffer.position(),
+            buffer.remaining(),
+            uncompressed,
+            0);
+      } catch (IOException e) {
+        logger.error("Cannot uncompress buffer {}/{}: {}", i, buffers.size(), 
buffer);
+        throw e;
+      }
+      ByteBuffer uncompressedBuffer = ByteBuffer.wrap(uncompressed);
+      result.add(uncompressedBuffer);
+    }
+    return result;
+  }
+
   public static List<Entry> parseEntries(List<ByteBuffer> buffers, 
IStateMachine stateMachine)
       throws UnknownLogTypeException {
     List<Entry> entries = new ArrayList<>();
diff --git a/iotdb-protocol/thrift-raft/src/main/thrift/raft.thrift 
b/iotdb-protocol/thrift-raft/src/main/thrift/raft.thrift
index c3500cf6b4a..59d4712f6ab 100644
--- a/iotdb-protocol/thrift-raft/src/main/thrift/raft.thrift
+++ b/iotdb-protocol/thrift-raft/src/main/thrift/raft.thrift
@@ -29,6 +29,17 @@ struct AppendEntriesRequest {
   6: required i32 leaderId
 }
 
+struct AppendCompressedSingleEntriesRequest {
+  1: required i64 term // leader's
+  2: required common.TEndPoint leader
+  3: required list<binary> entries // data
+  4: required i64 leaderCommit
+  5: required common.TConsensusGroupId groupId
+  6: required i32 leaderId
+  7: required list<i8> compressionTypes
+  8: required list<i32> uncompressedSizes
+}
+
 struct AppendCompressedEntriesRequest {
   1: required i64 term // leader's
   2: required common.TEndPoint leader
@@ -161,6 +172,8 @@ service RaftService {
 
   AppendEntryResult appendCompressedEntries(1:AppendCompressedEntriesRequest 
request)
 
+  AppendEntryResult 
appendCompressedSingleEntries(1:AppendCompressedSingleEntriesRequest request)
+
   common.TSStatus sendSnapshot(1:SendSnapshotRequest request)
 
   /**

Reply via email to