This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/native_raft by this push:
new f6973615341 add appendCompressedSingleEntryies
f6973615341 is described below
commit f6973615341f63b3db8b59205c4198839baaf304
Author: Tian Jiang <[email protected]>
AuthorDate: Fri Jun 16 09:39:12 2023 +0800
add appendCompressedSingleEntryies
---
.../natraft/client/SyncClientAdaptor.java | 9 ++
.../consensus/natraft/protocol/log/Entry.java | 11 ++-
.../natraft/protocol/log/EntrySerialization.java | 97 +++++++++++++++++++++-
.../protocol/log/dispatch/DispatcherThread.java | 86 ++++++++++++-------
.../natraft/service/RaftRPCServiceProcessor.java | 31 +++++++
.../iotdb/consensus/natraft/utils/LogUtils.java | 27 ++++++
.../thrift-raft/src/main/thrift/raft.thrift | 13 +++
7 files changed, 243 insertions(+), 31 deletions(-)
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java
index b1977de7d18..50d3e724ae5 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java
@@ -10,6 +10,7 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
import org.apache.iotdb.consensus.raft.thrift.AppendCompressedEntriesRequest;
+import
org.apache.iotdb.consensus.raft.thrift.AppendCompressedSingleEntriesRequest;
import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
import org.apache.iotdb.consensus.raft.thrift.ExecuteReq;
@@ -92,6 +93,14 @@ public class SyncClientAdaptor {
return matchTermHandler.getResult(config.getConnectionTimeoutInMS());
}
+ public static AppendEntryResult appendCompressedSingleEntries(
+ AsyncRaftServiceClient client, AppendCompressedSingleEntriesRequest
request)
+ throws TException, InterruptedException {
+ GenericHandler<AppendEntryResult> matchTermHandler = new
GenericHandler<>(client.getEndpoint());
+ client.appendCompressedSingleEntries(request, matchTermHandler);
+ return matchTermHandler.getResult(config.getConnectionTimeoutInMS());
+ }
+
public static TSStatus forceElection(AsyncRaftServiceClient client,
ConsensusGroupId groupId)
throws TException, InterruptedException {
GenericHandler<TSStatus> matchTermHandler = new
GenericHandler<>(client.getEndpoint());
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
index 17114a1c0f6..b81461c8403 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.consensus.natraft.protocol.log;
import org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
+import org.apache.iotdb.tsfile.compress.ICompressor;
import java.nio.ByteBuffer;
import java.util.Comparator;
@@ -56,7 +57,7 @@ public abstract class Entry implements Comparable<Entry> {
public long applyTime;
public long waitEndTime;
- public EntrySerialization serialization;
+ public EntrySerialization serialization = new EntrySerialization();
public int getDefaultSerializationBufferSize() {
return DEFAULT_SERIALIZATION_BUFFER_SIZE;
@@ -80,6 +81,10 @@ public abstract class Entry implements Comparable<Entry> {
return serialization.serialize(this);
}
+ public synchronized ByteBuffer serialize(ICompressor compressor) {
+ return serialization.serialize(this, compressor);
+ }
+
public abstract void deserialize(ByteBuffer buffer);
public enum Types {
@@ -214,4 +219,8 @@ public abstract class Entry implements Comparable<Entry> {
public void setVotingEntry(VotingEntry votingEntry) {
this.votingEntry = votingEntry;
}
+
+ public EntrySerialization getSerialization() {
+ return serialization;
+ }
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/EntrySerialization.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/EntrySerialization.java
index 3182754ab66..622a81b7867 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/EntrySerialization.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/EntrySerialization.java
@@ -19,13 +19,26 @@
package org.apache.iotdb.consensus.natraft.protocol.log;
-import java.nio.ByteBuffer;
import org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
+import org.apache.iotdb.tsfile.compress.ICompressor;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
public class EntrySerialization {
+
+ private static final Logger logger =
LoggerFactory.getLogger(EntrySerialization.class);
private volatile byte[] recycledBuffer;
private volatile ByteBuffer preSerializationCache;
private volatile ByteBuffer serializationCache;
+ private volatile ByteBuffer compressionCache;
+ private CompressionType compressionType = CompressionType.UNCOMPRESSED;
+ private int uncompressedSize;
public void preSerialize(Entry entry) {
if (preSerializationCache != null || serializationCache != null) {
@@ -61,6 +74,77 @@ public class EntrySerialization {
return serializationCache.slice();
}
+ public ByteBuffer serialize(Entry entry, ICompressor compressor) {
+ ByteBuffer cache = compressionCache;
+ if (cache != null && cache.limit() > 0) {
+ return cache.slice();
+ }
+
+ if (preSerializationCache != null) {
+ ByteBuffer slice = preSerializationCache.slice();
+ slice.position(1);
+ slice.putLong(entry.getCurrLogIndex());
+ slice.putLong(entry.getCurrLogTerm());
+ slice.putLong(entry.getPrevTerm());
+ slice.position(0);
+ serializationCache = slice;
+ preSerializationCache = null;
+ } else {
+ long startTime = Statistic.SERIALIZE_ENTRY.getOperationStartTime();
+ ByteBuffer byteBuffer = entry.serializeInternal(recycledBuffer);
+ Statistic.SERIALIZE_ENTRY.calOperationCostTimeFromStart(startTime);
+ serializationCache = byteBuffer;
+ }
+ compressSerializedCache(compressor);
+ entry.setByteSize(compressionCache.remaining());
+ return compressionCache.slice();
+ }
+
+ public static void main(String[] args) throws IOException {
+ byte[] test = "tetetagsahfdkjhxcvjboi".getBytes();
+ ByteBuffer testBuffer = ByteBuffer.wrap(test);
+ ICompressor compressor = ICompressor.getCompressor(CompressionType.LZ4);
+ int maxBytesForCompression =
compressor.getMaxBytesForCompression(test.length);
+ ByteBuffer compressed = ByteBuffer.allocate(maxBytesForCompression);
+ int compressLength = compressor.compress(testBuffer, compressed);
+ compressed.position(0);
+ compressed.limit(compressLength);
+
+ IUnCompressor unCompressor =
IUnCompressor.getUnCompressor(CompressionType.LZ4);
+ ByteBuffer uncompressed = ByteBuffer.allocate(test.length);
+ unCompressor.uncompress(compressed, uncompressed);
+ }
+
+ private void compressSerializedCache(ICompressor compressor) {
+ long startTime =
Statistic.RAFT_SENDER_COMPRESS_LOG.getOperationStartTime();
+ int uncompressedSize = serializationCache.remaining();
+ Statistic.LOG_DISPATCHER_RAW_SIZE.add(uncompressedSize);
+
+ this.uncompressedSize = uncompressedSize;
+ int maxBytesForCompression =
compressor.getMaxBytesForCompression(uncompressedSize);
+ if (compressionCache == null || compressionCache.remaining() <
maxBytesForCompression) {
+ compressionCache = ByteBuffer.allocate(maxBytesForCompression);
+ }
+ try {
+ int compressedLength =
+ compressor.compress(
+ serializationCache.array(),
+ serializationCache.arrayOffset() + serializationCache.position(),
+ uncompressedSize,
+ compressionCache.array());
+ Statistic.LOG_DISPATCHER_COMPRESSED_SIZE.add(compressedLength);
+ compressionCache.position(0);
+ compressionCache.limit(compressedLength);
+
+ this.compressionType = compressor.getType();
+ } catch (IOException e) {
+ logger.warn("Cannot compress entry", e);
+ this.compressionType = CompressionType.UNCOMPRESSED;
+ }
+
+
Statistic.RAFT_SENDER_COMPRESS_LOG.calOperationCostTimeFromStart(startTime);
+ }
+
public long serializedSize() {
ByteBuffer cache;
if ((cache = serializationCache) != null) {
@@ -80,6 +164,9 @@ public class EntrySerialization {
recycledBuffer = serializationCache.array();
serializationCache = null;
}
+ if (compressionCache != null) {
+ compressionCache.limit(0);
+ }
}
public byte[] getRecycledBuffer() {
@@ -102,7 +189,15 @@ public class EntrySerialization {
return serializationCache;
}
+ public CompressionType getCompressionType() {
+ return compressionType;
+ }
+
public void setSerializationCache(ByteBuffer serializationCache) {
this.serializationCache = serializationCache;
}
+
+ public int getUncompressedSize() {
+ return uncompressedSize;
+ }
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
index 23e8ee3e723..0d421b72cd4 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol.Flow
import org.apache.iotdb.consensus.natraft.utils.LogUtils;
import org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
import org.apache.iotdb.consensus.raft.thrift.AppendCompressedEntriesRequest;
+import
org.apache.iotdb.consensus.raft.thrift.AppendCompressedSingleEntriesRequest;
import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
import org.apache.iotdb.tsfile.compress.ICompressor;
@@ -81,7 +82,6 @@ abstract class DispatcherThread extends DynamicThread {
logger.debug("Sending {} logs to {}", currBatch.size(), receiver);
}
- serializeEntries();
if (!logDispatcher.queueOrdered) {
currBatch.sort(Comparator.comparingLong(s ->
s.getEntry().getCurrLogIndex()));
}
@@ -101,20 +101,31 @@ abstract class DispatcherThread extends DynamicThread {
}
}
- protected void serializeEntries() throws InterruptedException {
- for (VotingEntry request : currBatch) {
- ByteBuffer serialized = request.getEntry().serialize();
- request.getEntry().setByteSize(serialized.remaining());
+ private void appendEntriesAsync(AppendEntriesRequest request,
List<VotingEntry> currBatch) {
+ AsyncMethodCallback<AppendEntryResult> handler = new
AppendEntriesHandler(currBatch);
+ AsyncRaftServiceClient client =
logDispatcher.member.getClient(receiver.getEndpoint());
+ try {
+ long startTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
+ AppendEntryResult appendEntryResult =
SyncClientAdaptor.appendEntries(client, request);
+ Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(startTime);
+ if (appendEntryResult != null) {
+ handler.onComplete(appendEntryResult);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ handler.onError(e);
}
}
private void appendEntriesAsync(
- List<ByteBuffer> logList, AppendEntriesRequest request,
List<VotingEntry> currBatch) {
+ AppendCompressedEntriesRequest request, List<VotingEntry> currBatch) {
AsyncMethodCallback<AppendEntryResult> handler = new
AppendEntriesHandler(currBatch);
AsyncRaftServiceClient client =
logDispatcher.member.getClient(receiver.getEndpoint());
try {
long startTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
- AppendEntryResult appendEntryResult =
SyncClientAdaptor.appendEntries(client, request);
+ AppendEntryResult appendEntryResult =
+ SyncClientAdaptor.appendCompressedEntries(client, request);
Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(startTime);
if (appendEntryResult != null) {
handler.onComplete(appendEntryResult);
@@ -124,25 +135,16 @@ abstract class DispatcherThread extends DynamicThread {
} catch (Exception e) {
handler.onError(e);
}
- if (logger.isDebugEnabled()) {
- logger.debug(
- "{}: append entries {} with {} logs",
- logDispatcher.member.getName(),
- receiver,
- logList.size());
- }
}
private void appendEntriesAsync(
- List<ByteBuffer> logList,
- AppendCompressedEntriesRequest request,
- List<VotingEntry> currBatch) {
+ AppendCompressedSingleEntriesRequest request, List<VotingEntry>
currBatch) {
AsyncMethodCallback<AppendEntryResult> handler = new
AppendEntriesHandler(currBatch);
AsyncRaftServiceClient client =
logDispatcher.member.getClient(receiver.getEndpoint());
try {
long startTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
AppendEntryResult appendEntryResult =
- SyncClientAdaptor.appendCompressedEntries(client, request);
+ SyncClientAdaptor.appendCompressedSingleEntries(client, request);
Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(startTime);
if (appendEntryResult != null) {
handler.onComplete(appendEntryResult);
@@ -152,13 +154,6 @@ abstract class DispatcherThread extends DynamicThread {
} catch (Exception e) {
handler.onError(e);
}
- if (logger.isDebugEnabled()) {
- logger.debug(
- "{}: append entries {} with {} logs",
- logDispatcher.member.getName(),
- receiver,
- logList.size());
- }
}
protected AppendEntriesRequest prepareRequest(List<ByteBuffer> logList) {
@@ -189,6 +184,21 @@ abstract class DispatcherThread extends DynamicThread {
return request;
}
+ protected AppendCompressedSingleEntriesRequest
prepareCompressedSingleRequest(
+ List<ByteBuffer> logList, List<Byte> compressionTypes, List<Integer>
uncompressedSizes) {
+ AppendCompressedSingleEntriesRequest request = new
AppendCompressedSingleEntriesRequest();
+
+
request.setGroupId(logDispatcher.member.getRaftGroupId().convertToTConsensusGroupId());
+ request.setLeader(logDispatcher.member.getThisNode().getEndpoint());
+ request.setLeaderId(logDispatcher.member.getThisNode().getNodeId());
+
request.setLeaderCommit(logDispatcher.member.getLogManager().getCommitLogIndex());
+ request.setTerm(logDispatcher.member.getStatus().getTerm().get());
+ request.setEntries(logList);
+ request.setCompressionTypes(compressionTypes);
+ request.setUncompressedSizes(uncompressedSizes);
+ return request;
+ }
+
private void sendLogs(List<VotingEntry> currBatch) {
if (currBatch.isEmpty()) {
return;
@@ -203,27 +213,45 @@ abstract class DispatcherThread extends DynamicThread {
long logSize = 0;
long logSizeLimit = logDispatcher.getConfig().getThriftMaxFrameSize();
List<ByteBuffer> logList = new ArrayList<>();
+ List<Byte> compressionTypes = new ArrayList<>();
+ List<Integer> uncompressedSizes = new ArrayList<>();
int prevIndex = logIndex;
for (; logIndex < currBatch.size(); logIndex++) {
VotingEntry entry = currBatch.get(logIndex);
- ByteBuffer serialized = entry.getEntry().serialize();
+ ByteBuffer serialized;
+ if (!logDispatcher.enableCompressedDispatching) {
+ serialized = entry.getEntry().serialize();
+ } else {
+ serialized = entry.getEntry().serialize(compressor);
+ }
+
long curSize = serialized.remaining();
if (logSizeLimit - curSize - logSize <=
IoTDBConstant.LEFT_SIZE_IN_REQUEST) {
break;
}
logSize += curSize;
logList.add(serialized);
+ if (logDispatcher.enableCompressedDispatching) {
+ compressionTypes.add(
+
entry.getEntry().getSerialization().getCompressionType().serialize());
+
uncompressedSizes.add(entry.getEntry().getSerialization().getUncompressedSize());
+ }
Statistic.LOG_DISPATCHER_FROM_CREATE_TO_SENDING.calOperationCostTimeFromStart(
entry.getEntry().createTime);
}
if (!logDispatcher.enableCompressedDispatching && !group.isDelayed()) {
AppendEntriesRequest appendEntriesRequest = prepareRequest(logList);
- appendEntriesAsync(logList, appendEntriesRequest,
currBatch.subList(prevIndex, logIndex));
+ appendEntriesAsync(appendEntriesRequest, currBatch.subList(prevIndex,
logIndex));
} else {
- AppendCompressedEntriesRequest appendEntriesRequest =
prepareCompressedRequest(logList);
- appendEntriesAsync(logList, appendEntriesRequest,
currBatch.subList(prevIndex, logIndex));
+ // AppendCompressedEntriesRequest appendEntriesRequest =
+ // prepareCompressedRequest(logList);
+ // appendEntriesAsync(appendEntriesRequest,
currBatch.subList(prevIndex, logIndex));
+
+ AppendCompressedSingleEntriesRequest appendEntriesRequest =
+ prepareCompressedSingleRequest(logList, compressionTypes,
uncompressedSizes);
+ appendEntriesAsync(appendEntriesRequest, currBatch.subList(prevIndex,
logIndex));
}
if (logDispatcher.getConfig().isUseFollowerLoadBalance()) {
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
index d8b5070a532..3d0daf5b3c6 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.consensus.natraft.utils.IOUtils;
import org.apache.iotdb.consensus.natraft.utils.LogUtils;
import org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
import org.apache.iotdb.consensus.raft.thrift.AppendCompressedEntriesRequest;
+import
org.apache.iotdb.consensus.raft.thrift.AppendCompressedSingleEntriesRequest;
import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
import org.apache.iotdb.consensus.raft.thrift.ElectionRequest;
@@ -182,6 +183,36 @@ public class RaftRPCServiceProcessor implements
RaftService.AsyncIface {
Statistic.RAFT_RECEIVER_APPEND_ENTRY_FULL.calOperationCostTimeFromStart(startTime);
}
+ @Override
+ public void appendCompressedSingleEntries(
+ AppendCompressedSingleEntriesRequest request,
+ AsyncMethodCallback<AppendEntryResult> resultHandler)
+ throws TException {
+ long startTime =
Statistic.RAFT_RECEIVER_APPEND_ENTRY_FULL.getOperationStartTime();
+ AppendEntriesRequest decompressedRequest = new AppendEntriesRequest();
+ decompressedRequest
+ .setTerm(request.getTerm())
+ .setLeader(request.leader)
+ .setLeaderCommit(request.leaderCommit)
+ .setGroupId(request.groupId)
+ .setLeaderId(request.leaderId);
+
+ try {
+ long compressionStartTime =
Statistic.RAFT_RECEIVER_DECOMPRESS_ENTRY.getOperationStartTime();
+ List<ByteBuffer> buffers =
+ LogUtils.decompressEntries(
+ request.entries, request.compressionTypes,
request.uncompressedSizes);
+ decompressedRequest.setEntries(buffers);
+
Statistic.RAFT_RECEIVER_DECOMPRESS_ENTRY.calOperationCostTimeFromStart(compressionStartTime);
+
+ RaftMember member = getMemberOrCreate(request.groupId,
decompressedRequest);
+ resultHandler.onComplete(member.appendEntries(decompressedRequest));
+ } catch (UnknownLogTypeException | IOException e) {
+ throw new TException(e);
+ }
+
Statistic.RAFT_RECEIVER_APPEND_ENTRY_FULL.calOperationCostTimeFromStart(startTime);
+ }
+
@Override
public void sendSnapshot(SendSnapshotRequest request,
AsyncMethodCallback<TSStatus> resultHandler)
throws TException {
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
index 6e9f3c429a7..21a287f81a2 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
@@ -30,6 +30,7 @@ import
org.apache.iotdb.consensus.raft.thrift.AppendCompressedEntriesRequest;
import org.apache.iotdb.consensus.raft.thrift.AppendEntryRequest;
import org.apache.iotdb.tsfile.compress.ICompressor;
import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.slf4j.Logger;
@@ -148,6 +149,32 @@ public class LogUtils {
return buffers;
}
+ public static List<ByteBuffer> decompressEntries(
+ List<ByteBuffer> buffers, List<Byte> unCompressorTypes, List<Integer>
uncompressedSizes)
+ throws IOException {
+ List<ByteBuffer> result = new ArrayList<>();
+ for (int i = 0; i < buffers.size(); i++) {
+ ByteBuffer buffer = buffers.get(i);
+ byte[] uncompressed = new byte[uncompressedSizes.get(i)];
+ IUnCompressor unCompressor =
+
IUnCompressor.getUnCompressor(CompressionType.deserialize(unCompressorTypes.get(i)));
+ try {
+ unCompressor.uncompress(
+ buffer.array(),
+ buffer.arrayOffset() + buffer.position(),
+ buffer.remaining(),
+ uncompressed,
+ 0);
+ } catch (IOException e) {
+ logger.error("Cannot uncompress buffer {}/{}: {}", i, buffers.size(),
buffer);
+ throw e;
+ }
+ ByteBuffer uncompressedBuffer = ByteBuffer.wrap(uncompressed);
+ result.add(uncompressedBuffer);
+ }
+ return result;
+ }
+
public static List<Entry> parseEntries(List<ByteBuffer> buffers,
IStateMachine stateMachine)
throws UnknownLogTypeException {
List<Entry> entries = new ArrayList<>();
diff --git a/iotdb-protocol/thrift-raft/src/main/thrift/raft.thrift
b/iotdb-protocol/thrift-raft/src/main/thrift/raft.thrift
index c3500cf6b4a..59d4712f6ab 100644
--- a/iotdb-protocol/thrift-raft/src/main/thrift/raft.thrift
+++ b/iotdb-protocol/thrift-raft/src/main/thrift/raft.thrift
@@ -29,6 +29,17 @@ struct AppendEntriesRequest {
6: required i32 leaderId
}
+struct AppendCompressedSingleEntriesRequest {
+ 1: required i64 term // leader's
+ 2: required common.TEndPoint leader
+ 3: required list<binary> entries // data
+ 4: required i64 leaderCommit
+ 5: required common.TConsensusGroupId groupId
+ 6: required i32 leaderId
+ 7: required list<i8> compressionTypes
+ 8: required list<i32> uncompressedSizes
+}
+
struct AppendCompressedEntriesRequest {
1: required i64 term // leader's
2: required common.TEndPoint leader
@@ -161,6 +172,8 @@ service RaftService {
AppendEntryResult appendCompressedEntries(1:AppendCompressedEntriesRequest
request)
+ AppendEntryResult
appendCompressedSingleEntries(1:AppendCompressedSingleEntriesRequest request)
+
common.TSStatus sendSnapshot(1:SendSnapshotRequest request)
/**