This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 2ba1e8cf85 IGNITE-22843 Optimize Logit log storage & log entry
serialization format (#4390)
2ba1e8cf85 is described below
commit 2ba1e8cf85e31bf2fb6001acb12bcc5223cdd299
Author: Ivan Bessonov <[email protected]>
AuthorDate: Mon Sep 23 10:18:06 2024 +0300
IGNITE-22843 Optimize Logit log storage & log entry serialization format
(#4390)
---
.../persistence/checkpoint/CheckpointManager.java | 1 +
.../raft/storage/logit/LogitLogStorageFactory.java | 1 -
.../raft/util/SharedLogStorageFactoryUtils.java | 4 +-
.../raft/jraft/entity/codec/v1/V1Decoder.java | 30 +--
.../raft/jraft/entity/codec/v1/V1Encoder.java | 204 ++++++++++++++++-----
.../storage/logit/storage/LogitLogStorage.java | 14 +-
.../jraft/storage/logit/storage/db/AbstractDB.java | 28 ++-
.../storage/logit/storage/file/AbstractFile.java | 33 ++--
.../storage/logit/storage/file/FileHeader.java | 5 +-
.../logit/storage/file/index/IndexFile.java | 28 ++-
.../logit/storage/file/segment/SegmentFile.java | 58 +++++-
.../org/apache/ignite/raft/jraft/util/Bits.java | 54 ++++++
.../jraft/entity/codec/LogEntryCodecPerfTest.java | 1 +
.../raft/jraft/entity/codec/v1/V1EncoderTest.java | 111 +++++++++++
14 files changed, 461 insertions(+), 111 deletions(-)
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
index 4d85b72087..9bb883597c 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
@@ -309,6 +309,7 @@ public class CheckpointManager {
CheckpointProgress lastCheckpointProgress = lastCheckpointProgress();
assert lastCheckpointProgress != null : "Checkpoint has not happened
yet";
+ // TODO https://issues.apache.org/jira/browse/IGNITE-23212 This
assertion fails sometimes.
assert lastCheckpointProgress.inProgress() : "Checkpoint must be in
progress";
CheckpointDirtyPages pagesToWrite =
lastCheckpointProgress.pagesToWrite();
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/logit/LogitLogStorageFactory.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/logit/LogitLogStorageFactory.java
index 3bcd788fb9..bbd5d51b25 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/logit/LogitLogStorageFactory.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/logit/LogitLogStorageFactory.java
@@ -115,7 +115,6 @@ public class LogitLogStorageFactory implements
LogStorageFactory {
@Override
public void sync() {
// TODO: https://issues.apache.org/jira/browse/IGNITE-21955
- throw new UnsupportedOperationException("Not implemented yet");
}
/** Returns path to log storage by group ID. */
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/SharedLogStorageFactoryUtils.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/SharedLogStorageFactoryUtils.java
index d1d77b6a9f..aa2f9df200 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/SharedLogStorageFactoryUtils.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/SharedLogStorageFactoryUtils.java
@@ -33,6 +33,8 @@ public class SharedLogStorageFactoryUtils {
*/
public static final String LOGIT_STORAGE_ENABLED_PROPERTY =
"LOGIT_STORAGE_ENABLED";
+ private static final boolean LOGIT_STORAGE_ENABLED_PROPERTY_DEFAULT =
false;
+
/**
* Creates a LogStorageFactory with {@link DefaultLogStorageFactory} or
{@link LogitLogStorageFactory} implementation depending on
* LOGIT_STORAGE_ENABLED_PROPERTY and fsync set to true.
@@ -47,7 +49,7 @@ public class SharedLogStorageFactoryUtils {
* LOGIT_STORAGE_ENABLED_PROPERTY.
*/
public static LogStorageFactory create(String factoryName, String
nodeName, Path logStoragePath, boolean fsync) {
- return
IgniteSystemProperties.getBoolean(LOGIT_STORAGE_ENABLED_PROPERTY, false)
+ return
IgniteSystemProperties.getBoolean(LOGIT_STORAGE_ENABLED_PROPERTY,
LOGIT_STORAGE_ENABLED_PROPERTY_DEFAULT)
? new LogitLogStorageFactory(nodeName, new StoreOptions(),
logStoragePath)
: new DefaultLogStorageFactory(factoryName, nodeName,
logStoragePath, fsync);
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/codec/v1/V1Decoder.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/codec/v1/V1Decoder.java
index 242592fcff..5b84c6f9d8 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/codec/v1/V1Decoder.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/codec/v1/V1Decoder.java
@@ -53,7 +53,7 @@ public final class V1Decoder implements LogEntryDecoder {
}
// Refactored to look closer to Ignites code style.
- public void decode(final LogEntry log, final byte[] content) {
+ private void decode(final LogEntry log, final byte[] content) {
var reader = new Reader(content);
reader.pos = LogEntryV1CodecFactory.PAYLOAD_OFFSET;
@@ -65,7 +65,7 @@ public final class V1Decoder implements LogEntryDecoder {
long term = reader.readLong();
log.setId(new LogId(index, term));
- long checksum = Bits.getLong(content, reader.pos);
+ long checksum = Bits.getLongLittleEndian(content, reader.pos);
log.setChecksum(checksum);
int pos = reader.pos + Long.BYTES;
@@ -78,7 +78,7 @@ public final class V1Decoder implements LogEntryDecoder {
if (peerCount > 0) {
List<PeerId> peers = new ArrayList<>(peerCount);
- pos = readNodesList(pos, content, peerCount, peers);
+ pos = readNodesList(reader, pos, content, peerCount, peers);
log.setPeers(peers);
}
@@ -89,7 +89,7 @@ public final class V1Decoder implements LogEntryDecoder {
if (oldPeerCount > 0) {
List<PeerId> oldPeers = new ArrayList<>(oldPeerCount);
- pos = readNodesList(pos, content, oldPeerCount, oldPeers);
+ pos = readNodesList(reader, pos, content, oldPeerCount,
oldPeers);
log.setOldPeers(oldPeers);
}
@@ -100,7 +100,7 @@ public final class V1Decoder implements LogEntryDecoder {
if (learnersCount > 0) {
List<PeerId> learners = new ArrayList<>(learnersCount);
- pos = readNodesList(pos, content, learnersCount, learners);
+ pos = readNodesList(reader, pos, content, learnersCount,
learners);
log.setLearners(learners);
}
@@ -111,7 +111,7 @@ public final class V1Decoder implements LogEntryDecoder {
if (oldLearnersCount > 0) {
List<PeerId> oldLearners = new ArrayList<>(oldLearnersCount);
- pos = readNodesList(pos, content, oldLearnersCount,
oldLearners);
+ pos = readNodesList(reader, pos, content, oldLearnersCount,
oldLearners);
log.setOldLearners(oldLearners);
}
@@ -129,16 +129,20 @@ public final class V1Decoder implements LogEntryDecoder {
}
}
- private static int readNodesList(int pos, byte[] content, int count,
List<PeerId> nodes) {
+ private static int readNodesList(Reader reader, int pos, byte[] content,
int count, List<PeerId> nodes) {
for (int i = 0; i < count; i++) {
- short len = Bits.getShort(content, pos);
- pos += 2;
-
- PeerId peer = new PeerId();
- peer.parse(AsciiStringUtil.unsafeDecode(content, pos, len));
- nodes.add(peer);
+ short len = Bits.getShortLittleEndian(content, pos);
+ pos += Short.BYTES;
+ String consistentId = AsciiStringUtil.unsafeDecode(content, pos,
len);
pos += len;
+
+ reader.pos = pos;
+ int idx = (int) reader.readLong();
+ int priority = (int) (reader.readLong() - 1);
+ pos = reader.pos;
+
+ nodes.add(new PeerId(consistentId, idx, priority));
}
return pos;
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/codec/v1/V1Encoder.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/codec/v1/V1Encoder.java
index 7c9cbce4d0..7a1840eb26 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/codec/v1/V1Encoder.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/codec/v1/V1Encoder.java
@@ -17,8 +17,8 @@
package org.apache.ignite.raft.jraft.entity.codec.v1;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.List;
+import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.raft.jraft.entity.EnumOutter.EntryType;
import org.apache.ignite.raft.jraft.entity.LogEntry;
import org.apache.ignite.raft.jraft.entity.LogId;
@@ -35,7 +35,92 @@ public final class V1Encoder implements LogEntryEncoder {
private V1Encoder() {
}
- public static final LogEntryEncoder INSTANCE = new V1Encoder();
+ public static final V1Encoder INSTANCE = new V1Encoder();
+
+ /**
+ * Returns a size of an encoded entry. Must match the size of {@link
#encode(LogEntry)} output.
+ *
+ * @param logEntry Log entry.
+ */
+ public int size(LogEntry logEntry) {
+ EntryType type = logEntry.getType();
+ LogId id = logEntry.getId();
+
+ List<PeerId> peers = logEntry.getPeers();
+ List<PeerId> oldPeers = logEntry.getOldPeers();
+ List<PeerId> learners = logEntry.getLearners();
+ List<PeerId> oldLearners = logEntry.getOldLearners();
+
+ ByteBuffer data = logEntry.getData();
+
+ int totalLen = LogEntryV1CodecFactory.PAYLOAD_OFFSET;
+ int typeNumber = type.getNumber();
+ long index = id.getIndex();
+ long term = id.getTerm();
+
+ // Checksum is not a varlen value.
+ totalLen += sizeInBytes(typeNumber) + sizeInBytes(index) +
sizeInBytes(term) + Long.BYTES;
+
+ // Includes "ENTRY_TYPE_CONFIGURATION" and "ENTRY_TYPE_NO_OP".
+ if (type != EntryType.ENTRY_TYPE_DATA) {
+ totalLen += nodesListSizeInBytes(peers);
+ totalLen += nodesListSizeInBytes(oldPeers);
+ totalLen += nodesListSizeInBytes(learners);
+ totalLen += nodesListSizeInBytes(oldLearners);
+ }
+
+ // Includes "ENTRY_TYPE_DATA" and "ENTRY_TYPE_NO_OP".
+ if (type != EntryType.ENTRY_TYPE_CONFIGURATION) {
+ int bodyLen = data != null ? data.remaining() : 0;
+ totalLen += bodyLen;
+ }
+
+ return totalLen;
+ }
+
+ /**
+ * Writes the same data as {@link #encode(LogEntry)} directly into a given
address.
+ *
+ * @param addr Off-heap address.
+ * @param logEntry Log entry.
+ */
+ public void append(long addr, LogEntry logEntry) {
+ EntryType type = logEntry.getType();
+ LogId id = logEntry.getId();
+
+ List<PeerId> peers = logEntry.getPeers();
+ List<PeerId> oldPeers = logEntry.getOldPeers();
+ List<PeerId> learners = logEntry.getLearners();
+ List<PeerId> oldLearners = logEntry.getOldLearners();
+
+ ByteBuffer data = logEntry.getData();
+
+ int typeNumber = type.getNumber();
+ long index = id.getIndex();
+ long term = id.getTerm();
+
+ GridUnsafe.putByte(addr++, LogEntryV1CodecFactory.MAGIC);
+
+ addr = writeLong(typeNumber, addr);
+ addr = writeLong(index, addr);
+ addr = writeLong(term, addr);
+
+ Bits.putLongLittleEndian(addr, logEntry.getChecksum());
+ addr += Long.BYTES;
+
+ // Includes "ENTRY_TYPE_CONFIGURATION" and "ENTRY_TYPE_NO_OP".
+ if (type != EntryType.ENTRY_TYPE_DATA) {
+ addr = writeNodesList(addr, peers);
+ addr = writeNodesList(addr, oldPeers);
+ addr = writeNodesList(addr, learners);
+ addr = writeNodesList(addr, oldLearners);
+ }
+
+ // Includes "ENTRY_TYPE_DATA" and "ENTRY_TYPE_NO_OP".
+ if (type != EntryType.ENTRY_TYPE_CONFIGURATION && data != null) {
+ GridUnsafe.copyHeapOffheap(data.array(), data.position() +
GridUnsafe.BYTE_ARR_OFF, addr, data.remaining());
+ }
+ }
// Refactored to look closer to Ignites code style.
@Override
@@ -48,36 +133,11 @@ public final class V1Encoder implements LogEntryEncoder {
List<PeerId> oldLearners = log.getOldLearners();
ByteBuffer data = log.getData();
- int totalLen = LogEntryV1CodecFactory.PAYLOAD_OFFSET;
int typeNumber = type.getNumber();
long index = id.getIndex();
long term = id.getTerm();
- totalLen += sizeInBytes(typeNumber) + sizeInBytes(index) +
sizeInBytes(term) + 8;
-
- List<String> peerStrs = null;
- List<String> oldPeerStrs = null;
- List<String> learnerStrs = null;
- List<String> oldLearnerStrs = null;
-
- if (type != EntryType.ENTRY_TYPE_DATA) {
- peerStrs = new ArrayList<>();
- totalLen += nodesListSizeInBytes(peers, peerStrs);
-
- oldPeerStrs = new ArrayList<>();
- totalLen += nodesListSizeInBytes(oldPeers, oldPeerStrs);
-
- learnerStrs = new ArrayList<>();
- totalLen += nodesListSizeInBytes(learners, learnerStrs);
-
- oldLearnerStrs = new ArrayList<>();
- totalLen += nodesListSizeInBytes(oldLearners, oldLearnerStrs);
- }
-
- if (type != EntryType.ENTRY_TYPE_CONFIGURATION) {
- int bodyLen = data != null ? data.remaining() : 0;
- totalLen += bodyLen;
- }
+ int totalLen = size(log);
byte[] content = new byte[totalLen];
content[0] = LogEntryV1CodecFactory.MAGIC;
@@ -87,17 +147,17 @@ public final class V1Encoder implements LogEntryEncoder {
pos = writeLong(index, content, pos);
pos = writeLong(term, content, pos);
- Bits.putLong(content, pos, log.getChecksum());
+ Bits.putLongLittleEndian(content, pos, log.getChecksum());
pos += Long.BYTES;
if (type != EntryType.ENTRY_TYPE_DATA) {
- pos = writeNodesList(pos, content, peerStrs);
+ pos = writeNodesList(pos, content, peers);
- pos = writeNodesList(pos, content, oldPeerStrs);
+ pos = writeNodesList(pos, content, oldPeers);
- pos = writeNodesList(pos, content, learnerStrs);
+ pos = writeNodesList(pos, content, learners);
- pos = writeNodesList(pos, content, oldLearnerStrs);
+ pos = writeNodesList(pos, content, oldLearners);
}
if (type != EntryType.ENTRY_TYPE_CONFIGURATION && data != null) {
@@ -107,32 +167,70 @@ public final class V1Encoder implements LogEntryEncoder {
return content;
}
- private static int nodesListSizeInBytes(@Nullable List<PeerId> nodes,
List<String> nodeStrs) {
+ private static int nodesListSizeInBytes(@Nullable List<PeerId> nodes) {
+ if (nodes == null) {
+ // The size of encoded "0".
+ return 1;
+ }
+
int size = 0;
- if (nodes != null) {
- for (PeerId node : nodes) {
- String nodeStr = node.toString();
+ for (PeerId node : nodes) {
+ String consistentId = node.getConsistentId();
+
+ size += Short.BYTES + consistentId.length() +
sizeInBytes(node.getIdx()) + sizeInBytes(node.getPriority() + 1);
+ }
+
+ return size + sizeInBytes(nodes.size());
+ }
+
+ private static long writeNodesList(long addr, List<PeerId> nodes) {
+ if (nodes == null) {
+ return writeLong(0, addr);
+ }
+
+ addr = writeLong(nodes.size(), addr);
- nodeStrs.add(nodeStr);
- size += 2 + nodeStr.length();
+ for (PeerId node : nodes) {
+ String nodeStr = node.getConsistentId();
+ int length = nodeStr.length();
+
+ Bits.putShortLittleEndian(addr, (short) length);
+ addr += Short.BYTES;
+
+ for (int i = 0; i < length; i++) {
+ GridUnsafe.putByte(addr + i, (byte) nodeStr.charAt(i));
}
+ addr += length;
+
+ addr = writeLong(node.getIdx(), addr);
+ addr = writeLong(node.getPriority() + 1, addr);
}
- return size + sizeInBytes(nodeStrs.size());
+ return addr;
}
- private static int writeNodesList(int pos, byte[] content, List<String>
nodeStrs) {
+ private static int writeNodesList(int pos, byte[] content, List<PeerId>
nodeStrs) {
+ if (nodeStrs == null) {
+ content[pos] = 0;
+
+ return pos + 1;
+ }
+
pos = writeLong(nodeStrs.size(), content, pos);
- for (String nodeStr : nodeStrs) {
- int length = nodeStr.length();
+ for (PeerId peerId : nodeStrs) {
+ String consistentId = peerId.getConsistentId();
+ int length = consistentId.length();
- Bits.putShort(content, pos, (short) length);
- pos += 2;
+ Bits.putShortLittleEndian(content, pos, (short) length);
+ pos += Short.BYTES;
- AsciiStringUtil.unsafeEncode(nodeStr, content, pos);
+ AsciiStringUtil.unsafeEncode(consistentId, content, pos);
pos += length;
+
+ pos = writeLong(peerId.getIdx(), content, pos);
+ pos = writeLong(peerId.getPriority() + 1, content, pos);
}
return pos;
@@ -153,6 +251,20 @@ public final class V1Encoder implements LogEntryEncoder {
return pos;
}
+ private static long writeLong(long val, long addr) {
+ while ((val & 0xFFFF_FFFF_FFFF_FF80L) != 0) {
+ byte b = (byte) (val | 0x80);
+
+ GridUnsafe.putByte(addr++, b);
+
+ val >>>= 7;
+ }
+
+ GridUnsafe.putByte(addr++, (byte) val);
+
+ return addr;
+ }
+
/**
* Returns the number of bytes, required by the {@link #writeLong(long,
byte[], int)} to write the value.
*/
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/LogitLogStorage.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/LogitLogStorage.java
index 0e55460932..21d47a493b 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/LogitLogStorage.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/LogitLogStorage.java
@@ -363,11 +363,10 @@ public class LogitLogStorage implements LogStorage {
this.readLock.lock();
try {
final long logIndex = entry.getId().getIndex();
- final byte[] logData = this.logEntryEncoder.encode(entry);
if (entry.getType() == EntryType.ENTRY_TYPE_CONFIGURATION) {
- return doAppendEntry(logIndex, logData, this.confDB,
IndexType.IndexConf, true);
+ return doAppendEntry(logIndex, entry, this.confDB,
IndexType.IndexConf, true);
} else {
- return doAppendEntry(logIndex, logData, this.segmentLogDB,
IndexType.IndexSegment, true);
+ return doAppendEntry(logIndex, entry, this.segmentLogDB,
IndexType.IndexSegment, true);
}
} finally {
this.readLock.unlock();
@@ -401,13 +400,12 @@ public class LogitLogStorage implements LogStorage {
final boolean isWaitingFlush = (i == lastLogIndex || i ==
lastConfIndex);
final LogEntry entry = entries.get(i);
final long logIndex = entry.getId().getIndex();
- final byte[] logData = this.logEntryEncoder.encode(entry);
if (entry.getType() == EntryType.ENTRY_TYPE_CONFIGURATION) {
- if (doAppendEntry(logIndex, logData, this.confDB,
IndexType.IndexConf, isWaitingFlush)) {
+ if (doAppendEntry(logIndex, entry, this.confDB,
IndexType.IndexConf, isWaitingFlush)) {
appendCount++;
}
} else {
- if (doAppendEntry(logIndex, logData, this.segmentLogDB,
IndexType.IndexSegment, isWaitingFlush)) {
+ if (doAppendEntry(logIndex, entry, this.segmentLogDB,
IndexType.IndexSegment, isWaitingFlush)) {
appendCount++;
}
}
@@ -419,7 +417,7 @@ public class LogitLogStorage implements LogStorage {
}
}
- private boolean doAppendEntry(final long logIndex, final byte[] data,
final AbstractDB logDB,
+ private boolean doAppendEntry(final long logIndex, final LogEntry entry,
final AbstractDB logDB,
final IndexType indexType, final boolean
isWaitingFlush) {
this.readLock.lock();
try {
@@ -428,7 +426,7 @@ public class LogitLogStorage implements LogStorage {
}
// Append log async , get position infos
- final Pair<Integer, Long> logPair = logDB.appendLogAsync(logIndex,
data);
+ final Pair<Integer, Long> logPair = logDB.appendLogAsync(logIndex,
logEntryEncoder, entry);
if (logPair.getFirst() < 0 || logPair.getSecond() < 0) {
return false;
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/AbstractDB.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/AbstractDB.java
index cc1db57af7..f436745f60 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/AbstractDB.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/AbstractDB.java
@@ -31,7 +31,8 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.raft.jraft.Lifecycle;
import org.apache.ignite.raft.jraft.entity.LogEntry;
import org.apache.ignite.raft.jraft.entity.codec.LogEntryDecoder;
-import org.apache.ignite.raft.jraft.storage.logit.option.StoreOptions;
+import org.apache.ignite.raft.jraft.entity.codec.LogEntryEncoder;
+import org.apache.ignite.raft.jraft.entity.codec.v1.V1Encoder;import
org.apache.ignite.raft.jraft.storage.logit.option.StoreOptions;
import
org.apache.ignite.raft.jraft.storage.logit.storage.factory.LogStoreFactory;
import org.apache.ignite.raft.jraft.storage.logit.storage.file.AbstractFile;
import
org.apache.ignite.raft.jraft.storage.logit.storage.file.AbstractFile.RecoverResult;
@@ -320,9 +321,9 @@ public abstract class AbstractDB implements
Lifecycle<LogStoreFactory> {
}
/**
- * Write the data and return it's wrote position.
+ * Write the data and return its written position.
* @param data logEntry data
- * @return (wrotePosition, expectFlushPosition)
+ * @return (writtenPosition, expectFlushPosition)
*/
public Pair<Integer, Long> appendLogAsync(final long logIndex, final
byte[] data) {
final int waitToWroteSize = SegmentFile.getWriteBytes(data);
@@ -335,6 +336,27 @@ public abstract class AbstractDB implements
Lifecycle<LogStoreFactory> {
return Pair.of(-1, (long) -1);
}
+ /**
+ * Write the data and return its written position. Based on {@link
#appendLogAsync(long, byte[])}, but more efficient.
+ *
+ * @param encoder Log entry encoder
+ * @param logEntry Log entry
+ * @return (writtenPosition, expectFlushPosition)
+ */
+ public Pair<Integer, Long> appendLogAsync(final long logIndex,
LogEntryEncoder encoder, LogEntry logEntry) {
+ V1Encoder v1Encoder = (V1Encoder)encoder;
+ int dataSize = v1Encoder.size(logEntry);
+ final int waitToWroteSize = SegmentFile.getWriteBytes(dataSize);
+
+ final SegmentFile segmentFile = (SegmentFile)
this.fileManager.getLastFile(logIndex, waitToWroteSize, true);
+ if (segmentFile != null) {
+ final int pos = segmentFile.appendData(logIndex, v1Encoder,
logEntry, dataSize);
+ final long expectFlushPosition = segmentFile.getFileFromOffset() +
pos + waitToWroteSize;
+ return Pair.of(pos, expectFlushPosition);
+ }
+ return Pair.of(-1, (long) -1);
+ }
+
/**
* Read log from the segmentFile.
*
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/AbstractFile.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/AbstractFile.java
index 7c3972e470..0d29aebd23 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/AbstractFile.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/AbstractFile.java
@@ -20,6 +20,7 @@ package
org.apache.ignite.raft.jraft.storage.logit.storage.file;
import java.io.File;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
@@ -29,6 +30,7 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.LongToIntFunction;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.util.GridUnsafe;
@@ -50,6 +52,9 @@ import sun.nio.ch.DirectBuffer;
public abstract class AbstractFile extends ReferenceResource {
private static final IgniteLogger LOG =
Loggers.forClass(AbstractFile.class);
+ /** Byte order that's used to encode data in log. */
+ public static final ByteOrder LOGIT_BYTE_ORDER = ByteOrder.LITTLE_ENDIAN;
+
protected static final int BLANK_HOLE_SIZE = 64;
protected static final byte FILE_END_BYTE = 'x';
@@ -68,10 +73,10 @@ public abstract class AbstractFile extends
ReferenceResource {
protected MappedByteBuffer mappedByteBuffer;
// Current write position
- protected final AtomicInteger wrotePosition = new AtomicInteger(0);
+ protected volatile int wrotePosition = 0;
// Current flush position
- protected final AtomicInteger flushedPosition = new AtomicInteger(0);
+ protected volatile int flushedPosition = 0;
protected final ReadWriteLock readWriteLock = new
ReentrantReadWriteLock();
protected final Lock readLock =
this.readWriteLock.readLock();
@@ -119,6 +124,7 @@ public abstract class AbstractFile extends
ReferenceResource {
try (final RandomAccessFile randomAccessFile = new
RandomAccessFile(this.file, "rw");
final FileChannel fileChannel =
randomAccessFile.getChannel()) {
this.mappedByteBuffer = fileChannel.map(mapMode, 0,
this.fileSize);
+ mappedByteBuffer.order(LOGIT_BYTE_ORDER);
this.isMapped = true;
}
}
@@ -136,7 +142,7 @@ public abstract class AbstractFile extends
ReferenceResource {
try {
if (isMapped()) {
this.mappedByteBuffer.force();
- this.flushedPosition.set(getWrotePosition());
+ this.flushedPosition = getWrotePosition();
if (this.mappedByteBuffer != null) {
if (Platform.isLinux()) {
hintUnload();
@@ -262,10 +268,10 @@ public abstract class AbstractFile extends
ReferenceResource {
/**
* Append data to file end
* @param logIndex logEntry index
- * @param data data array
+ * @param append Data append function
* @return wrote position
*/
- protected int doAppend(final long logIndex, final byte[] data) {
+ protected int doAppend(final long logIndex, LongToIntFunction append) {
this.writeLock.lock();
try {
int wrotePos = getWrotePosition();
@@ -277,8 +283,11 @@ public abstract class AbstractFile extends
ReferenceResource {
}
// Write data and update header
final ByteBuffer buffer = sliceByteBuffer();
- put(buffer, wrotePos, data);
- setWrotePosition(wrotePos + data.length);
+
+ long pointer = GridUnsafe.bufferAddress(buffer) + wrotePos;
+ int length = append.applyAsInt(pointer);
+ setWrotePosition(wrotePos + length);
+
this.header.setLastLogIndex(logIndex);
return wrotePos;
} finally {
@@ -436,7 +445,7 @@ public abstract class AbstractFile extends
ReferenceResource {
}
public ByteBuffer sliceByteBuffer() {
- return this.mappedByteBuffer.slice();
+ return this.mappedByteBuffer.slice().order(LOGIT_BYTE_ORDER);
}
public void warmupFile() {
@@ -481,19 +490,19 @@ public abstract class AbstractFile extends
ReferenceResource {
}
public int getWrotePosition() {
- return wrotePosition.get();
+ return wrotePosition;
}
public void setWrotePosition(final int position) {
- this.wrotePosition.set(position);
+ this.wrotePosition = position;
}
public int getFlushedPosition() {
- return flushedPosition.get();
+ return flushedPosition;
}
public void setFlushPosition(final int position) {
- this.flushedPosition.set(position);
+ this.flushedPosition = position;
}
/**
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/FileHeader.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/FileHeader.java
index d68f54e5f4..18dffe2760 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/FileHeader.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/FileHeader.java
@@ -17,6 +17,8 @@
package org.apache.ignite.raft.jraft.storage.logit.storage.file;
+import static
org.apache.ignite.raft.jraft.storage.logit.storage.file.AbstractFile.LOGIT_BYTE_ORDER;
+
import java.nio.ByteBuffer;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -52,7 +54,7 @@ public class FileHeader {
}
public ByteBuffer encode() {
- ByteBuffer buffer = ByteBuffer.allocate(HEADER_SIZE);
+ ByteBuffer buffer =
ByteBuffer.allocate(HEADER_SIZE).order(LOGIT_BYTE_ORDER);
buffer.put(MAGIC);
buffer.put(MAGIC);
buffer.putLong(this.FirstLogIndex);
@@ -67,6 +69,7 @@ public class FileHeader {
LOG.error("Fail to decode file header, invalid buffer length: {}",
buffer == null ? 0 : buffer.remaining());
return false;
}
+ assert buffer.order() == LOGIT_BYTE_ORDER;
if (buffer.get() != MAGIC) {
return false;
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/index/IndexFile.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/index/IndexFile.java
index a18f5dc289..3c6e7c93f7 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/index/IndexFile.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/index/IndexFile.java
@@ -22,8 +22,10 @@ import java.util.Arrays;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.raft.jraft.option.RaftOptions;
import org.apache.ignite.raft.jraft.storage.logit.storage.file.AbstractFile;
+import org.apache.ignite.raft.jraft.util.Bits;
/**
* * File header:
@@ -144,27 +146,21 @@ public class IndexFile extends AbstractFile {
this.writeLock.lock();
try {
assert (logIndex > getLastLogIndex());
- final byte[] writeData = encodeData(toRelativeOffset(logIndex),
position, logType);
- return doAppend(logIndex, writeData);
+
+ return doAppend(logIndex, addr -> {
+ GridUnsafe.putByte(addr, RECORD_MAGIC_BYTES[0]);
+ GridUnsafe.putByte(addr + 1, logType);
+
+ Bits.putIntLittleEndian(addr + 2, toRelativeOffset(logIndex));
+ Bits.putIntLittleEndian(addr + 6, position);
+
+ return getIndexSize();
+ });
} finally {
this.writeLock.unlock();
}
}
- private byte[] encodeData(final int offset, final int position, final byte
logType) {
- final ByteBuffer buffer = ByteBuffer.allocate(getIndexSize());
- // Magics
- buffer.put(RECORD_MAGIC_BYTES);
- // logType (segmentLog or conf)
- buffer.put(logType);
- // offset from FirstLogIndex
- buffer.putInt(offset);
- // phyPosition in segmentFile
- buffer.putInt(position);
- buffer.flip();
- return buffer.array();
- }
-
/**
* Find the index entry
* @param logIndex the target log index
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/segment/SegmentFile.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/segment/SegmentFile.java
index 01fe46f272..2d988454bc 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/segment/SegmentFile.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/segment/SegmentFile.java
@@ -22,10 +22,13 @@ import java.util.Arrays;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.raft.jraft.entity.LogEntry;
import org.apache.ignite.raft.jraft.entity.codec.v1.LogEntryV1CodecFactory;
+import org.apache.ignite.raft.jraft.entity.codec.v1.V1Encoder;
import org.apache.ignite.raft.jraft.option.RaftOptions;
import org.apache.ignite.raft.jraft.storage.logit.storage.file.AbstractFile;
+import org.apache.ignite.raft.jraft.util.Bits;
/**
* * File header:
@@ -69,20 +72,49 @@ public class SegmentFile extends AbstractFile {
this.writeLock.lock();
try {
assert (logIndex > getLastLogIndex());
- final byte[] writeData = encodeData(data);
- return doAppend(logIndex, writeData);
+
+ return doAppend(logIndex, addr -> {
+ GridUnsafe.putByte(addr, RECORD_MAGIC_BYTES[0]);
+ GridUnsafe.putByte(addr + 1, RECORD_MAGIC_BYTES[1]);
+
+ Bits.putIntLittleEndian(addr + 2, data.length);
+
+ GridUnsafe.copyHeapOffheap(data, GridUnsafe.BYTE_ARR_OFF, addr
+ 6, data.length);
+
+ return getWriteBytes(data);
+ });
} finally {
this.writeLock.unlock();
}
}
- private byte[] encodeData(final byte[] data) {
- ByteBuffer buffer = ByteBuffer.allocate(getWriteBytes(data));
- buffer.put(RECORD_MAGIC_BYTES);
- buffer.putInt(data.length);
- buffer.put(data);
- buffer.flip();
- return buffer.array();
+ /**
+ *
+ * Write the data and return its written position. Based on {@link
#appendData(long, byte[])}, but more efficient.
+ * @param logIndex the log index
+ * @param encoder Log entry encoder
+ * @param entry Log entry
+ * @param entrySize Pre-calculated serialized entry size
+ * @return the wrote position
+ */
+ public int appendData(final long logIndex, V1Encoder encoder, LogEntry
entry, int entrySize) {
+ this.writeLock.lock();
+ try {
+ assert (logIndex > getLastLogIndex());
+
+ return doAppend(logIndex, addr -> {
+ GridUnsafe.putByte(addr, RECORD_MAGIC_BYTES[0]);
+ GridUnsafe.putByte(addr + 1, RECORD_MAGIC_BYTES[1]);
+
+ Bits.putIntLittleEndian(addr + 2, entrySize);
+
+ encoder.append(addr + 6, entry);
+
+ return entrySize + 6;
+ });
+ } finally {
+ this.writeLock.unlock();
+ }
}
/**
@@ -151,6 +183,8 @@ public class SegmentFile extends AbstractFile {
@Override
public CheckDataResult checkData(final ByteBuffer buffer) {
+ assert buffer.order() == LOGIT_BYTE_ORDER;
+
if (buffer.remaining() < RECORD_MAGIC_BYTES_SIZE) {
return CheckDataResult.CHECK_FAIL;
}
@@ -205,6 +239,10 @@ public class SegmentFile extends AbstractFile {
}
public static int getWriteBytes(final byte[] data) {
- return RECORD_MAGIC_BYTES_SIZE + RECORD_DATA_LENGTH_SIZE + data.length;
+ return getWriteBytes(data.length);
+ }
+
+ public static int getWriteBytes(int dataSize) {
+ return RECORD_MAGIC_BYTES_SIZE + RECORD_DATA_LENGTH_SIZE + dataSize;
}
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Bits.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Bits.java
index 36a4ca8418..b548de2e8c 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Bits.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Bits.java
@@ -16,6 +16,8 @@
*/
package org.apache.ignite.raft.jraft.util;
+import org.apache.ignite.internal.util.GridUnsafe;
+
/**
* Bits util.
*/
@@ -43,4 +45,56 @@ public class Bits {
public static void putLong(final byte[] b, final int off, final long val) {
HeapByteBufUtil.setLong(b, off, val);
}
+
+ public static void putShortLittleEndian(long addr, short value) {
+ if (GridUnsafe.IS_BIG_ENDIAN) {
+ GridUnsafe.putShort(addr, Short.reverseBytes(value));
+ } else {
+ GridUnsafe.putShort(addr, value);
+ }
+ }
+
+ public static void putShortLittleEndian(byte[] b, int off, short value) {
+ if (GridUnsafe.IS_BIG_ENDIAN) {
+ GridUnsafe.putShort(b, GridUnsafe.BYTE_ARR_OFF + off,
Short.reverseBytes(value));
+ } else {
+ GridUnsafe.putShort(b, GridUnsafe.BYTE_ARR_OFF + off, value);
+ }
+ }
+
+ public static short getShortLittleEndian(byte[] b, int off) {
+ short value = GridUnsafe.getShort(b, GridUnsafe.BYTE_ARR_OFF + off);
+
+ return GridUnsafe.IS_BIG_ENDIAN ? Short.reverseBytes(value) : value;
+ }
+
+ public static void putIntLittleEndian(long addr, int value) {
+ if (GridUnsafe.IS_BIG_ENDIAN) {
+ GridUnsafe.putInt(addr, Integer.reverseBytes(value));
+ } else {
+ GridUnsafe.putInt(addr, value);
+ }
+ }
+
+ public static void putLongLittleEndian(long addr, long value) {
+ if (GridUnsafe.IS_BIG_ENDIAN) {
+ GridUnsafe.putLong(addr, Long.reverseBytes(value));
+ } else {
+ GridUnsafe.putLong(addr, value);
+ }
+ }
+
+ public static void putLongLittleEndian(byte[] b, int off, long value) {
+ if (GridUnsafe.IS_BIG_ENDIAN) {
+ GridUnsafe.putLong(b, GridUnsafe.BYTE_ARR_OFF + off,
Long.reverseBytes(value));
+ } else {
+ GridUnsafe.putLong(b, GridUnsafe.BYTE_ARR_OFF + off, value);
+ }
+ }
+
+ public static long getLongLittleEndian(byte[] b, int off) {
+ long value = GridUnsafe.getLong(b, GridUnsafe.BYTE_ARR_OFF + off);
+
+ return GridUnsafe.IS_BIG_ENDIAN ? Long.reverseBytes(value) : value;
+ }
}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/codec/LogEntryCodecPerfTest.java
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/codec/LogEntryCodecPerfTest.java
index 19e113481e..eb5250a338 100644
---
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/codec/LogEntryCodecPerfTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/codec/LogEntryCodecPerfTest.java
@@ -67,6 +67,7 @@ public class LogEntryCodecPerfTest {
LogEntry entry = new LogEntry(EnumOutter.EntryType.ENTRY_TYPE_NO_OP);
entry.setData(buf);
entry.setPeers(Arrays.asList(new PeerId("localhost", 99, 1), new
PeerId("localhost", 100, 2)));
+ entry.setOldPeers(Arrays.asList(new PeerId("localhost", 99, 1), new
PeerId("localhost", 100, 2)));
if (barrier != null) {
barrier.await();
diff --git
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/codec/v1/V1EncoderTest.java
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/codec/v1/V1EncoderTest.java
new file mode 100644
index 0000000000..0a20b5ebff
--- /dev/null
+++
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/codec/v1/V1EncoderTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.entity.codec.v1;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.raft.jraft.entity.EnumOutter.EntryType;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.entity.LogId;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.FieldSource;
+
+class V1EncoderTest {
+ @SuppressWarnings("unused")
+ private static final List<LogEntry> ENTRIES = List.of(
+ createDataEntry(),
+ createCfgEntry(false),
+ createCfgEntry(true)
+ );
+
+ @ParameterizedTest
+ @FieldSource("ENTRIES")
+ void testEncodeDecode(LogEntry logEntry) {
+ byte[] bytes = V1Encoder.INSTANCE.encode(logEntry);
+
+ LogEntry decodedEntry = V1Decoder.INSTANCE.decode(bytes);
+
+ assertEquals(logEntry, decodedEntry);
+ }
+
+ @ParameterizedTest
+ @FieldSource("ENTRIES")
+ void testSize(LogEntry logEntry) {
+ byte[] bytes = V1Encoder.INSTANCE.encode(logEntry);
+
+ int size = V1Encoder.INSTANCE.size(logEntry);
+
+ assertEquals(size, bytes.length);
+ }
+
+ @ParameterizedTest
+ @FieldSource("ENTRIES")
+ void testDirectWrite(LogEntry logEntry) {
+ byte[] bytes = V1Encoder.INSTANCE.encode(logEntry);
+
+ int size = V1Encoder.INSTANCE.size(logEntry);
+ ByteBuffer direct = ByteBuffer.allocateDirect(size);
+
+ V1Encoder.INSTANCE.append(GridUnsafe.bufferAddress(direct), logEntry);
+
+ assertEquals(ByteBuffer.wrap(bytes), direct);
+ }
+
+ private static LogEntry createDataEntry() {
+ LogEntry logEntry = new LogEntry(EntryType.ENTRY_TYPE_DATA);
+
+ LogId id = new LogId();
+ id.setIndex(1000);
+ id.setTerm(10_000);
+
+ logEntry.setId(id);
+ logEntry.setChecksum(100_000);
+
+ byte[] bytes = new byte[4096];
+ ThreadLocalRandom.current().nextBytes(bytes);
+ logEntry.setData(ByteBuffer.wrap(bytes));
+
+ return logEntry;
+ }
+
+ private static LogEntry createCfgEntry(boolean emptyLists) {
+ LogEntry logEntry = new LogEntry(EntryType.ENTRY_TYPE_CONFIGURATION);
+
+ LogId id = new LogId();
+ id.setIndex(1000);
+ id.setTerm(10_000);
+
+ logEntry.setId(id);
+ logEntry.setChecksum(100_000);
+
+ if (!emptyLists) {
+ logEntry.setOldLearners(List.of(new PeerId("oldLearner")));
+ logEntry.setOldPeers(List.of(new PeerId("oldPeer")));
+
+ logEntry.setLearners(List.of(new PeerId("learner")));
+ logEntry.setPeers(List.of(new PeerId("peer")));
+ }
+
+ return logEntry;
+ }
+}