This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 5eccdcd8b1 IGNITE-19828 Optimize RAFT log encoder/decoder data format
(#2249)
5eccdcd8b1 is described below
commit 5eccdcd8b13ca374767d109b89800c922427dba2
Author: Ivan Bessonov <[email protected]>
AuthorDate: Tue Jun 27 16:15:31 2023 +0300
IGNITE-19828 Optimize RAFT log encoder/decoder data format (#2249)
---
.../stream/DirectByteBufferStreamImplV1.java | 4 +-
.../internal/raft/server/impl/JraftServerImpl.java | 2 +-
.../internal/raft/util/OptimizedMarshaller.java | 4 +-
.../raft/util/ThreadLocalOptimizedMarshaller.java | 3 +-
.../entity/codec/v1/LogEntryV1CodecFactory.java | 22 +-
.../raft/jraft/entity/codec/v1/V1Decoder.java | 196 ++++++++++-------
.../raft/jraft/entity/codec/v1/V1Encoder.java | 239 +++++++++++----------
.../ignite/raft/jraft/storage/io/MessageFile.java | 3 +-
.../snapshot/local/LocalSnapshotMetaTable.java | 2 +-
.../ignite/raft/jraft/util/AsciiStringUtil.java | 16 +-
.../ignite/raft/jraft/util/JDKMarshaller.java | 5 +-
.../apache/ignite/raft/jraft/util/Marshaller.java | 4 +-
.../jraft/entity/codec/LogEntryCodecPerfTest.java | 3 +-
13 files changed, 286 insertions(+), 217 deletions(-)
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java
index 0bf37f5ea6..06a6264413 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java
@@ -81,7 +81,7 @@ public class DirectByteBufferStreamImplV1 implements
DirectByteBufferStream {
protected ByteBuffer buf;
- protected byte[] heapArr;
+ protected byte @Nullable [] heapArr;
protected long baseOff;
@@ -188,7 +188,7 @@ public class DirectByteBufferStreamImplV1 implements
DirectByteBufferStream {
this.buf = buf;
heapArr = buf.isDirect() ? null : buf.array();
- baseOff = buf.isDirect() ? GridUnsafe.bufferAddress(buf) :
BYTE_ARR_OFF;
+ baseOff = buf.isDirect() ? GridUnsafe.bufferAddress(buf) :
BYTE_ARR_OFF + buf.arrayOffset();
}
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index 9a8fe0c727..ce3285c841 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -600,7 +600,7 @@ public class JraftServerImpl implements RaftServer {
@Nullable CommandClosure<WriteCommand> done =
(CommandClosure<WriteCommand>) iter.done();
ByteBuffer data = iter.getData();
- WriteCommand command = done == null ?
marshaller.unmarshall(data.array()) : done.command();
+ WriteCommand command = done == null ?
marshaller.unmarshall(data) : done.command();
long commandIndex = iter.getIndex();
long commandTerm = iter.getTerm();
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/OptimizedMarshaller.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/OptimizedMarshaller.java
index b704cef947..7c108ea30a 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/OptimizedMarshaller.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/OptimizedMarshaller.java
@@ -107,8 +107,8 @@ public class OptimizedMarshaller implements Marshaller {
@SuppressWarnings("unchecked")
@Override
- public <T> T unmarshall(byte[] bytes) {
- stream.setBuffer(ByteBuffer.wrap(bytes).order(ORDER));
+ public <T> T unmarshall(ByteBuffer bytes) {
+ stream.setBuffer(bytes.duplicate().order(ORDER));
return stream.readMessage(messageReader);
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/ThreadLocalOptimizedMarshaller.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/ThreadLocalOptimizedMarshaller.java
index 9c2464f0d3..a25664c53d 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/ThreadLocalOptimizedMarshaller.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/ThreadLocalOptimizedMarshaller.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.raft.util;
+import java.nio.ByteBuffer;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.raft.jraft.util.Marshaller;
@@ -42,7 +43,7 @@ public class ThreadLocalOptimizedMarshaller implements
Marshaller {
}
@Override
- public <T> T unmarshall(byte[] bytes) {
+ public <T> T unmarshall(ByteBuffer bytes) {
return marshaller.get().unmarshall(bytes);
}
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/codec/v1/LogEntryV1CodecFactory.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/codec/v1/LogEntryV1CodecFactory.java
index e850091f5e..8a8d45fda9 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/codec/v1/LogEntryV1CodecFactory.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/codec/v1/LogEntryV1CodecFactory.java
@@ -16,18 +16,38 @@
*/
package org.apache.ignite.raft.jraft.entity.codec.v1;
+import org.apache.ignite.raft.jraft.entity.EnumOutter.EntryType;
import org.apache.ignite.raft.jraft.entity.codec.LogEntryCodecFactory;
import org.apache.ignite.raft.jraft.entity.codec.LogEntryDecoder;
import org.apache.ignite.raft.jraft.entity.codec.LogEntryEncoder;
/**
- * Log entry codec implementation.
+ * Log entry codec implementation. Data format description:
+ * <ul>
+ * <li>Magic header, 1 byte. {@link #MAGIC}</li>
+ * <li>Log entry type, formally a var-long, effectively 1 byte. {@link
EntryType}</li>
+ * <li>Log index, var-long, from 1 to 9 bytes.</li>
+ * <li>Log term, var-long, from 1 to 9 bytes.</li>
+ * <li>Checksum, 8 bytes, Big Endian.</li>
+ * <li>If type is not {@link EntryType#ENTRY_TYPE_DATA}:<ul>
+ * <li>Number of peers, val-long. Following it is the array of
peers:</li>
+ * <li>Length of the peer name, 2 bytes, Big Endian.</li>
+ * <li>ASCII characters of the peer name, according to the read
count.</li>
+ * <li>... same block repeats for "oldPeers", "learners" and
"oldLearners".</li>
+ * </ul></li>
+ * <li>If type is not {@link EntryType#ENTRY_TYPE_DATA}:<ul>
+ * <li>The rest of the {@code byte[]} is the data payload.</li>
+ * </ul></li>
+ * </ul>
*/
public class LogEntryV1CodecFactory implements LogEntryCodecFactory {
//"Beeep boop beep beep boop beeeeeep" -BB8
public static final byte MAGIC = (byte) 0xB8;
+ // Size of the magic header.
+ public static final int PAYLOAD_OFFSET = 1;
+
private LogEntryV1CodecFactory() {
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/codec/v1/V1Decoder.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/codec/v1/V1Decoder.java
index 952095419e..242592fcff 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/codec/v1/V1Decoder.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/codec/v1/V1Decoder.java
@@ -19,6 +19,7 @@ package org.apache.ignite.raft.jraft.entity.codec.v1;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import org.apache.ignite.raft.jraft.entity.EnumOutter;
import org.apache.ignite.raft.jraft.entity.LogEntry;
import org.apache.ignite.raft.jraft.entity.LogId;
@@ -51,98 +52,127 @@ public final class V1Decoder implements LogEntryDecoder {
return log;
}
+ // Refactored to look closer to Ignites code style.
public void decode(final LogEntry log, final byte[] content) {
- // 1-5 type
- final int iType = Bits.getInt(content, 1);
- log.setType(EnumOutter.EntryType.forNumber(iType));
- // 5-13 index
- // 13-21 term
- final long index = Bits.getLong(content, 5);
- final long term = Bits.getLong(content, 13);
+ var reader = new Reader(content);
+ reader.pos = LogEntryV1CodecFactory.PAYLOAD_OFFSET;
+
+ int typeNumber = (int)reader.readLong();
+ EnumOutter.EntryType type =
Objects.requireNonNull(EnumOutter.EntryType.forNumber(typeNumber));
+ log.setType(type);
+
+ long index = reader.readLong();
+ long term = reader.readLong();
log.setId(new LogId(index, term));
- // 21-29 checksum
- log.setChecksum(Bits.getLong(content, 21));
- // 29-33 peer count
- int peerCount = Bits.getInt(content, 29);
- // peers
- int pos = 33;
- if (peerCount > 0) {
- List<PeerId> peers = new ArrayList<>(peerCount);
- while (peerCount-- > 0) {
- final short len = Bits.getShort(content, pos);
- final byte[] bs = new byte[len];
- System.arraycopy(content, pos + 2, bs, 0, len);
- // peer len (short in 2 bytes)
- // peer str
- pos += 2 + len;
- final PeerId peer = new PeerId();
- peer.parse(AsciiStringUtil.unsafeDecode(bs));
- peers.add(peer);
+
+ long checksum = Bits.getLong(content, reader.pos);
+ log.setChecksum(checksum);
+
+ int pos = reader.pos + Long.BYTES;
+
+ // Peers and learners.
+ if (type != EnumOutter.EntryType.ENTRY_TYPE_DATA) {
+ reader.pos = pos;
+ int peerCount = (int)reader.readLong();
+ pos = reader.pos;
+ if (peerCount > 0) {
+ List<PeerId> peers = new ArrayList<>(peerCount);
+
+ pos = readNodesList(pos, content, peerCount, peers);
+
+ log.setPeers(peers);
}
- log.setPeers(peers);
- }
- // old peers
- int oldPeerCount = Bits.getInt(content, pos);
- pos += 4;
- if (oldPeerCount > 0) {
- List<PeerId> oldPeers = new ArrayList<>(oldPeerCount);
- while (oldPeerCount-- > 0) {
- final short len = Bits.getShort(content, pos);
- final byte[] bs = new byte[len];
- System.arraycopy(content, pos + 2, bs, 0, len);
- // peer len (short in 2 bytes)
- // peer str
- pos += 2 + len;
- final PeerId peer = new PeerId();
- peer.parse(AsciiStringUtil.unsafeDecode(bs));
- oldPeers.add(peer);
+
+ reader.pos = pos;
+ int oldPeerCount = (int)reader.readLong();
+ pos = reader.pos;
+ if (oldPeerCount > 0) {
+ List<PeerId> oldPeers = new ArrayList<>(oldPeerCount);
+
+ pos = readNodesList(pos, content, oldPeerCount, oldPeers);
+
+ log.setOldPeers(oldPeers);
}
- log.setOldPeers(oldPeers);
- }
- // learners
- int learnersCount = Bits.getInt(content, pos);
- pos += 4;
- if (learnersCount > 0) {
- List<PeerId> learners = new ArrayList<>(learnersCount);
- while (learnersCount-- > 0) {
- final short len = Bits.getShort(content, pos);
- final byte[] bs = new byte[len];
- System.arraycopy(content, pos + 2, bs, 0, len);
- // peer len (short in 2 bytes)
- // peer str
- pos += 2 + len;
- final PeerId peer = new PeerId();
- peer.parse(AsciiStringUtil.unsafeDecode(bs));
- learners.add(peer);
+
+ reader.pos = pos;
+ int learnersCount = (int)reader.readLong();
+ pos = reader.pos;
+ if (learnersCount > 0) {
+ List<PeerId> learners = new ArrayList<>(learnersCount);
+
+ pos = readNodesList(pos, content, learnersCount, learners);
+
+ log.setLearners(learners);
+ }
+
+ reader.pos = pos;
+ int oldLearnersCount = (int)reader.readLong();
+ pos = reader.pos;
+ if (oldLearnersCount > 0) {
+ List<PeerId> oldLearners = new ArrayList<>(oldLearnersCount);
+
+ pos = readNodesList(pos, content, oldLearnersCount,
oldLearners);
+
+ log.setOldLearners(oldLearners);
}
- log.setLearners(learners);
}
- // old learners
- int oldLearnersCount = Bits.getInt(content, pos);
- pos += 4;
- if (oldLearnersCount > 0) {
- List<PeerId> oldLearners = new ArrayList<>(oldLearnersCount);
- while (oldLearnersCount-- > 0) {
- final short len = Bits.getShort(content, pos);
- final byte[] bs = new byte[len];
- System.arraycopy(content, pos + 2, bs, 0, len);
- // peer len (short in 2 bytes)
- // peer str
- pos += 2 + len;
- final PeerId peer = new PeerId();
- peer.parse(AsciiStringUtil.unsafeDecode(bs));
- oldLearners.add(peer);
+
+ // Data.
+ if (type != EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION) {
+ if (content.length > pos) {
+ int len = content.length - pos;
+
+ ByteBuffer data = ByteBuffer.wrap(content, pos, len).slice();
+
+ log.setData(data);
}
- log.setOldLearners(oldLearners);
}
+ }
+
+ private static int readNodesList(int pos, byte[] content, int count,
List<PeerId> nodes) {
+ for (int i = 0; i < count; i++) {
+ short len = Bits.getShort(content, pos);
+ pos += 2;
+
+ PeerId peer = new PeerId();
+ peer.parse(AsciiStringUtil.unsafeDecode(content, pos, len));
+ nodes.add(peer);
+
+ pos += len;
+ }
+
+ return pos;
+ }
+
+ /*
+ * Allows reading varlen numbers and tracking position at the same time.
Simply having a "readLong" method wasn't enough.
+ */
+ private static class Reader {
+ private final byte[] content;
+ int pos;
+
+ private Reader(byte[] content) {
+ this.content = content;
+ }
+
+ // Based on DirectByteBufferStreamImplV1.
+ long readLong() {
+ long val = 0;
+ int shift = 0;
- // data
- if (content.length > pos) {
- final int len = content.length - pos;
- ByteBuffer data = ByteBuffer.allocate(len);
- data.put(content, pos, len);
- data.flip();
- log.setData(data);
+ while (true) {
+ byte b = content[pos];
+
+ pos++;
+
+ val |= ((long) b & 0x7F) << shift;
+
+ if ((b & 0x80) == 0) {
+ return val;
+ } else {
+ shift += 7;
+ }
+ }
}
}
}
\ No newline at end of file
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/codec/v1/V1Encoder.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/codec/v1/V1Encoder.java
index 3caca2d2e8..7c9cbce4d0 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/codec/v1/V1Encoder.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/codec/v1/V1Encoder.java
@@ -26,6 +26,7 @@ import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.entity.codec.LogEntryEncoder;
import org.apache.ignite.raft.jraft.util.AsciiStringUtil;
import org.apache.ignite.raft.jraft.util.Bits;
+import org.jetbrains.annotations.Nullable;
/**
* V1 log entry encoder
@@ -36,6 +37,7 @@ public final class V1Encoder implements LogEntryEncoder {
public static final LogEntryEncoder INSTANCE = new V1Encoder();
+ // Refactored to look closer to Ignites code style.
@Override
public byte[] encode(final LogEntry log) {
EntryType type = log.getType();
@@ -46,130 +48,137 @@ public final class V1Encoder implements LogEntryEncoder {
List<PeerId> oldLearners = log.getOldLearners();
ByteBuffer data = log.getData();
- // magic number 1 byte
- int totalLen = 1;
- final int iType = type.getNumber();
- final long index = id.getIndex();
- final long term = id.getTerm();
- // type(4) + index(8) + term(8) + checksum(8)
- totalLen += 4 + 8 + 8 + 8;
- int peerCount = 0;
- // peer count
- totalLen += 4;
- final List<String> peerStrs = new ArrayList<>();
- if (peers != null) {
- peerCount = peers.size();
- for (final PeerId peer : peers) {
- final String peerStr = peer.toString();
- // peer len (short in 2 bytes)
- // peer str
- totalLen += 2 + peerStr.length();
- peerStrs.add(peerStr);
- }
- }
- int oldPeerCount = 0;
- // old peer count
- totalLen += 4;
- final List<String> oldPeerStrs = new ArrayList<>();
- if (oldPeers != null) {
- oldPeerCount = oldPeers.size();
- for (final PeerId peer : oldPeers) {
- final String peerStr = peer.toString();
- // peer len (short in 2 bytes)
- // peer str
- totalLen += 2 + peerStr.length();
- oldPeerStrs.add(peerStr);
- }
- }
- int learnerCount = 0;
- // peer count
- totalLen += 4;
- final List<String> learnerStrs = new ArrayList<>();
- if (learners != null) {
- learnerCount = learners.size();
- for (final PeerId learner : learners) {
- final String learnerStr = learner.toString();
- // learner len (short in 2 bytes)
- // learner str
- totalLen += 2 + learnerStr.length();
- learnerStrs.add(learnerStr);
- }
- }
- int oldLearnerCount = 0;
- // old peer count
- totalLen += 4;
- final List<String> oldLearnerStrs = new ArrayList<>();
- if (oldLearners != null) {
- oldLearnerCount = oldLearners.size();
- for (final PeerId oldLearner : oldLearners) {
- final String learnerStr = oldLearner.toString();
- // oldLearner len (short in 2 bytes)
- // oldLearner str
- totalLen += 2 + learnerStr.length();
- oldLearnerStrs.add(learnerStr);
- }
+ int totalLen = LogEntryV1CodecFactory.PAYLOAD_OFFSET;
+ int typeNumber = type.getNumber();
+ long index = id.getIndex();
+ long term = id.getTerm();
+
+ totalLen += sizeInBytes(typeNumber) + sizeInBytes(index) +
sizeInBytes(term) + 8;
+
+ List<String> peerStrs = null;
+ List<String> oldPeerStrs = null;
+ List<String> learnerStrs = null;
+ List<String> oldLearnerStrs = null;
+
+ if (type != EntryType.ENTRY_TYPE_DATA) {
+ peerStrs = new ArrayList<>();
+ totalLen += nodesListSizeInBytes(peers, peerStrs);
+
+ oldPeerStrs = new ArrayList<>();
+ totalLen += nodesListSizeInBytes(oldPeers, oldPeerStrs);
+
+ learnerStrs = new ArrayList<>();
+ totalLen += nodesListSizeInBytes(learners, learnerStrs);
+
+ oldLearnerStrs = new ArrayList<>();
+ totalLen += nodesListSizeInBytes(oldLearners, oldLearnerStrs);
}
- final int bodyLen = data != null ? data.remaining() : 0;
- totalLen += bodyLen;
+ if (type != EntryType.ENTRY_TYPE_CONFIGURATION) {
+ int bodyLen = data != null ? data.remaining() : 0;
+ totalLen += bodyLen;
+ }
- final byte[] content = new byte[totalLen];
- // {0} magic
+ byte[] content = new byte[totalLen];
content[0] = LogEntryV1CodecFactory.MAGIC;
- // 1-5 type
- Bits.putInt(content, 1, iType);
- // 5-13 index
- Bits.putLong(content, 5, index);
- // 13-21 term
- Bits.putLong(content, 13, term);
- // checksum
- Bits.putLong(content, 21, log.getChecksum());
-
- // peers
- // 21-25 peer count
- Bits.putInt(content, 29, peerCount);
- int pos = 33;
- for (final String peerStr : peerStrs) {
- final byte[] ps = AsciiStringUtil.unsafeEncode(peerStr);
- Bits.putShort(content, pos, (short) peerStr.length());
- System.arraycopy(ps, 0, content, pos + 2, ps.length);
- pos += 2 + ps.length;
+ int pos = LogEntryV1CodecFactory.PAYLOAD_OFFSET;
+
+ pos = writeLong(typeNumber, content, pos);
+ pos = writeLong(index, content, pos);
+ pos = writeLong(term, content, pos);
+
+ Bits.putLong(content, pos, log.getChecksum());
+ pos += Long.BYTES;
+
+ if (type != EntryType.ENTRY_TYPE_DATA) {
+ pos = writeNodesList(pos, content, peerStrs);
+
+ pos = writeNodesList(pos, content, oldPeerStrs);
+
+ pos = writeNodesList(pos, content, learnerStrs);
+
+ pos = writeNodesList(pos, content, oldLearnerStrs);
}
- // old peers
- // old peers count
- Bits.putInt(content, pos, oldPeerCount);
- pos += 4;
- for (final String peerStr : oldPeerStrs) {
- final byte[] ps = AsciiStringUtil.unsafeEncode(peerStr);
- Bits.putShort(content, pos, (short) peerStr.length());
- System.arraycopy(ps, 0, content, pos + 2, ps.length);
- pos += 2 + ps.length;
+
+ if (type != EntryType.ENTRY_TYPE_CONFIGURATION && data != null) {
+ System.arraycopy(data.array(), data.position(), content, pos,
data.remaining());
}
- // learners
- // learners count
- Bits.putInt(content, pos, learnerCount);
- pos += 4;
- for (final String peerStr : learnerStrs) {
- final byte[] ps = AsciiStringUtil.unsafeEncode(peerStr);
- Bits.putShort(content, pos, (short) peerStr.length());
- System.arraycopy(ps, 0, content, pos + 2, ps.length);
- pos += 2 + ps.length;
+
+ return content;
+ }
+
+ private static int nodesListSizeInBytes(@Nullable List<PeerId> nodes,
List<String> nodeStrs) {
+ int size = 0;
+
+ if (nodes != null) {
+ for (PeerId node : nodes) {
+ String nodeStr = node.toString();
+
+ nodeStrs.add(nodeStr);
+ size += 2 + nodeStr.length();
+ }
}
- // old learners
- // old learners count
- Bits.putInt(content, pos, oldLearnerCount);
- pos += 4;
- for (final String peerStr : oldLearnerStrs) {
- final byte[] ps = AsciiStringUtil.unsafeEncode(peerStr);
- Bits.putShort(content, pos, (short) peerStr.length());
- System.arraycopy(ps, 0, content, pos + 2, ps.length);
- pos += 2 + ps.length;
+
+ return size + sizeInBytes(nodeStrs.size());
+ }
+
+ private static int writeNodesList(int pos, byte[] content, List<String>
nodeStrs) {
+ pos = writeLong(nodeStrs.size(), content, pos);
+
+ for (String nodeStr : nodeStrs) {
+ int length = nodeStr.length();
+
+ Bits.putShort(content, pos, (short) length);
+ pos += 2;
+
+ AsciiStringUtil.unsafeEncode(nodeStr, content, pos);
+ pos += length;
}
- // data
- if (data != null) {
- System.arraycopy(data.array(), data.position(), content, pos,
data.remaining());
+
+ return pos;
+ }
+
+ // Based on DirectByteBufferStreamImplV1.
+ private static int writeLong(long val, byte[] out, int pos) {
+ while ((val & 0xFFFF_FFFF_FFFF_FF80L) != 0) {
+ byte b = (byte) (val | 0x80);
+
+ out[pos++] = b;
+
+ val >>>= 7;
}
- return content;
+ out[pos++] = (byte) val;
+
+ return pos;
+ }
+
+ /**
+ * Returns the number of bytes, required by the {@link #writeLong(long,
byte[], int)} to write the value.
+ */
+ private static int sizeInBytes(long val) {
+ if (val >= 0) {
+ if (val < (1L << 7)) {
+ return 1;
+ } else if (val < (1L << 14)) {
+ return 2;
+ } else if (val < (1L << 21)) {
+ return 3;
+ } else if (val < (1L << 28)) {
+ return 4;
+ } else if (val < (1L << 35)) {
+ return 5;
+ } else if (val < (1L << 42)) {
+ return 6;
+ } else if (val < (1L << 49)) {
+ return 7;
+ } else if (val < (1L << 56)) {
+ return 8;
+ } else {
+ return 9;
+ }
+ } else {
+ return 10;
+ }
}
}
\ No newline at end of file
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/io/MessageFile.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/io/MessageFile.java
index 97923fc093..1242f4d608 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/io/MessageFile.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/io/MessageFile.java
@@ -23,6 +23,7 @@ import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.ByteBuffer;
import org.apache.ignite.raft.jraft.rpc.Message;
import org.apache.ignite.raft.jraft.util.Bits;
import org.apache.ignite.raft.jraft.util.Marshaller;
@@ -59,7 +60,7 @@ public class MessageFile {
}
final byte[] nameBytes = new byte[len];
readBytes(nameBytes, input);
- return Marshaller.DEFAULT.unmarshall(nameBytes);
+ return Marshaller.DEFAULT.unmarshall(ByteBuffer.wrap(nameBytes));
}
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotMetaTable.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotMetaTable.java
index 24884a9c7e..c0444379e9 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotMetaTable.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotMetaTable.java
@@ -83,7 +83,7 @@ public class LocalSnapshotMetaTable {
return false;
}
try {
- final LocalSnapshotPbMeta pbMeta =
Marshaller.DEFAULT.unmarshall(buf.array());
+ final LocalSnapshotPbMeta pbMeta =
Marshaller.DEFAULT.unmarshall(buf);
if (pbMeta == null) {
LOG.error("Fail to load meta from buffer.");
return false;
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/AsciiStringUtil.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/AsciiStringUtil.java
index 3d51bbbbcf..a3556f3ab6 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/AsciiStringUtil.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/AsciiStringUtil.java
@@ -16,11 +16,21 @@
*/
package org.apache.ignite.raft.jraft.util;
+import java.nio.charset.StandardCharsets;
+
/**
*
*/
public final class AsciiStringUtil {
+ public static byte[] unsafeEncode(final CharSequence in, byte[] out, int
offset) {
+ final int len = in.length();
+ for (int i = 0; i < len; i++) {
+ out[i + offset] = (byte) in.charAt(i);
+ }
+ return out;
+ }
+
public static byte[] unsafeEncode(final CharSequence in) {
final int len = in.length();
final byte[] out = new byte[len];
@@ -31,11 +41,7 @@ public final class AsciiStringUtil {
}
public static String unsafeDecode(final byte[] in, final int offset, final
int len) {
- final char[] out = new char[len];
- for (int i = 0; i < len; i++) {
- out[i] = (char) (in[i + offset] & 0xFF);
- }
- return moveToString(out);
+ return new String(in, offset, len, StandardCharsets.ISO_8859_1);
}
public static String unsafeDecode(final byte[] in) {
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/JDKMarshaller.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/JDKMarshaller.java
index b188921853..9ecf321ef4 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/JDKMarshaller.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/JDKMarshaller.java
@@ -20,6 +20,7 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
/**
*
@@ -44,9 +45,9 @@ public class JDKMarshaller implements Marshaller {
/**
* {@inheritDoc}
*/
- @Override public <T> T unmarshall(byte[] raw) {
+ @Override public <T> T unmarshall(ByteBuffer raw) {
try {
- ByteArrayInputStream bais = new ByteArrayInputStream(raw);
+ ByteArrayInputStream bais = new ByteArrayInputStream(raw.array(),
raw.arrayOffset() + raw.position(), raw.remaining());
ObjectInputStream oos = new ObjectInputStream(bais);
return (T) oos.readObject();
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Marshaller.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Marshaller.java
index 1650344d89..cb4891e9cb 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Marshaller.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Marshaller.java
@@ -16,10 +16,12 @@
*/
package org.apache.ignite.raft.jraft.util;
+import java.nio.ByteBuffer;
+
public interface Marshaller {
public static Marshaller DEFAULT = new JDKMarshaller();
byte[] marshall(Object o);
- <T> T unmarshall(byte[] raw);
+ <T> T unmarshall(ByteBuffer raw);
}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/codec/LogEntryCodecPerfTest.java
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/codec/LogEntryCodecPerfTest.java
index d93211f298..19e113481e 100644
---
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/codec/LogEntryCodecPerfTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/codec/LogEntryCodecPerfTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.ignite.raft.jraft.entity.codec;
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
@@ -80,7 +79,7 @@ public class LogEntryCodecPerfTest {
this.logSize.addAndGet(content.length);
LogEntry nLog = decoder.decode(content);
assertEquals(2, nLog.getPeers().size());
- assertArrayEquals(DATA, nLog.getData().array());
+ assertEquals(ByteBuffer.wrap(DATA), nLog.getData());
assertEquals(i, nLog.getId().getIndex());
assertEquals(i, nLog.getId().getTerm());
}