This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d7563b1ff31 IGNITE-26793 Add index and term to segment entry payload
(#6832)
d7563b1ff31 is described below
commit d7563b1ff31b5e11a1c7fd8549fb246258829a3d
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Wed Oct 22 16:00:28 2025 +0300
IGNITE-26793 Add index and term to segment entry payload (#6832)
---
.../raft/storage/segstore/SegmentFileManager.java | 24 ++--
.../raft/storage/segstore/SegmentPayload.java | 66 +++++++----
.../ignite/internal/raft/util/VarlenEncoder.java | 126 +++++++++++++++++++++
.../raft/jraft/entity/codec/v1/V1Encoder.java | 72 ++----------
.../segstore/DeserializedSegmentPayload.java | 10 +-
.../storage/segstore/SegmentFileManagerTest.java | 31 ++---
.../internal/raft/util/VarlenEncoderTest.java | 92 +++++++++++++++
7 files changed, 308 insertions(+), 113 deletions(-)
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
index 8f25ddae7cf..ec768607d71 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java
@@ -60,11 +60,14 @@ import org.jetbrains.annotations.Nullable;
*
* <p>Binary representation of each entry is as follows:
* <pre>
- *
+---------------+---------+--------------------------+---------+----------------+
- * | Raft Group ID (8 bytes) | Payload Length (4 bytes) | Payload | Hash (4
bytes) |
- *
+---------------+---------+--------------------------+---------+----------------+
+ *
+-------------------------+--------------------------+-------------------+---------------------+---------+----------------+
+ * | Raft Group ID (8 bytes) | Payload Length (4 bytes) | Term (1-10 bytes) |
Index (1-10 bytes) | Payload | Hash (4 bytes) |
+ *
+-------------------------+--------------------------+-------------------+---------------------+---------+----------------+
* </pre>
*
+ * <p>Log Entry Index and Term are stored as variable-length integers
(varints), hence the non-fixed size in bytes. They are treated as
+ * a part of the payload, so payload length includes their size as well.
+ *
* <p>In addition to regular Raft log entries, payload can also represent a
special type of entry which are written when Raft suffix
* is truncated. Such entries are identified by having a payload length of 0,
followed by 8 bytes of the last log index kept after the
* truncation.
@@ -180,22 +183,21 @@ class SegmentFileManager implements ManuallyCloseable {
}
void appendEntry(long groupId, LogEntry entry, LogEntryEncoder encoder)
throws IOException {
- int entrySize = encoder.size(entry);
+ int segmentEntrySize = SegmentPayload.size(entry, encoder);
- if (entrySize > maxPossibleEntrySize()) {
+ if (segmentEntrySize > maxPossibleEntrySize()) {
throw new IllegalArgumentException(String.format(
- "Entry size is too big (%d bytes), maximum allowed entry
size: %d bytes.", entrySize, maxPossibleEntrySize()
+ "Segment entry is too big (%d bytes), maximum allowed
segment entry size: %d bytes.",
+ segmentEntrySize, maxPossibleEntrySize()
));
}
- int payloadSize = SegmentPayload.size(entrySize);
-
- try (WriteBufferWithMemtable writeBufferWithMemtable =
reserveBytesWithRollover(payloadSize)) {
+ try (WriteBufferWithMemtable writeBufferWithMemtable =
reserveBytesWithRollover(segmentEntrySize)) {
ByteBuffer segmentBuffer = writeBufferWithMemtable.buffer();
int segmentOffset = segmentBuffer.position();
- SegmentPayload.writeTo(segmentBuffer, groupId, entrySize, entry,
encoder);
+ SegmentPayload.writeTo(segmentBuffer, groupId, segmentEntrySize,
entry, encoder);
// Append to memtable before write buffer is released to avoid
races with checkpoint on rollover.
writeBufferWithMemtable.memtable.appendSegmentFileOffset(groupId,
entry.getId().getIndex(), segmentOffset);
@@ -409,7 +411,7 @@ class SegmentFileManager implements ManuallyCloseable {
}
private long maxPossibleEntrySize() {
- return fileSize - HEADER_RECORD.length - SegmentPayload.overheadSize();
+ return fileSize - HEADER_RECORD.length;
}
private @Nullable ByteBuffer readFromOtherSegmentFiles(long groupId, long
logIndex) throws IOException {
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayload.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayload.java
index ddf1ad65373..dd5e958cf3c 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayload.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayload.java
@@ -17,9 +17,14 @@
package org.apache.ignite.internal.raft.storage.segstore;
+import static org.apache.ignite.internal.raft.util.VarlenEncoder.readLong;
+import static org.apache.ignite.internal.raft.util.VarlenEncoder.sizeInBytes;
+import static org.apache.ignite.internal.raft.util.VarlenEncoder.writeLong;
+
import java.nio.ByteBuffer;
import org.apache.ignite.internal.util.FastCrc;
import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.entity.LogId;
import org.apache.ignite.raft.jraft.entity.codec.LogEntryDecoder;
import org.apache.ignite.raft.jraft.entity.codec.LogEntryEncoder;
@@ -47,7 +52,7 @@ class SegmentPayload {
static void writeTo(
ByteBuffer buffer,
long groupId,
- int entrySize,
+ int segmentEntrySize,
LogEntry logEntry,
LogEntryEncoder logEntryEncoder
) {
@@ -55,7 +60,12 @@ class SegmentPayload {
buffer
.putLong(groupId)
- .putInt(entrySize);
+ .putInt(segmentEntrySize - fixedOverheadSize());
+
+ LogId logId = logEntry.getId();
+
+ writeLong(logId.getIndex(), buffer);
+ writeLong(logId.getTerm(), buffer);
logEntryEncoder.encode(buffer, logEntry);
@@ -86,14 +96,35 @@ class SegmentPayload {
}
static LogEntry readFrom(ByteBuffer buffer, LogEntryDecoder
logEntryDecoder) {
- int entrySize = buffer.getInt(buffer.position() + GROUP_ID_SIZE_BYTES);
+ int originalPosition = buffer.position();
+
+ buffer.position(originalPosition + GROUP_ID_SIZE_BYTES); // Skip group
ID.
+
+ int payloadLength = buffer.getInt();
+
+ int payloadPosition = buffer.position();
+
+ readLong(buffer); // Skip log entry index.
+ readLong(buffer); // Skip log entry term.
+
+ int logEntryPosition = buffer.position();
+
+ int crcPosition = payloadPosition + payloadLength;
+
+ int crc = buffer.getInt(crcPosition);
- verifyCrc(buffer, entrySize);
+ buffer.position(originalPosition);
- buffer.position(buffer.position() + GROUP_ID_SIZE_BYTES +
LENGTH_SIZE_BYTES);
+ int actualCrc = FastCrc.calcCrc(buffer, crcPosition -
originalPosition);
+
+ if (crc != actualCrc) {
+ throw new IllegalStateException("CRC mismatch, expected: " + crc +
", actual: " + actualCrc);
+ }
+
+ buffer.position(logEntryPosition);
// TODO: https://issues.apache.org/jira/browse/IGNITE-26623.
- byte[] entryBytes = new byte[entrySize];
+ byte[] entryBytes = new byte[crcPosition - logEntryPosition];
buffer.get(entryBytes);
@@ -103,28 +134,15 @@ class SegmentPayload {
return logEntryDecoder.decode(entryBytes);
}
- private static void verifyCrc(ByteBuffer buffer, int entrySize) {
- int position = buffer.position();
-
- int dataSize = size(entrySize) - HASH_SIZE;
-
- int expectedCrc = buffer.getInt(position + dataSize);
+ static int size(LogEntry logEntry, LogEntryEncoder logEntryEncoder) {
+ int entrySize = logEntryEncoder.size(logEntry);
- int actualCrc = FastCrc.calcCrc(buffer, dataSize);
-
- // calcCrc alters the position.
- buffer.position(position);
-
- if (expectedCrc != actualCrc) {
- throw new IllegalStateException("CRC mismatch, expected: " +
expectedCrc + ", actual: " + actualCrc);
- }
- }
+ LogId logId = logEntry.getId();
- static int size(int entrySize) {
- return overheadSize() + entrySize;
+ return fixedOverheadSize() + sizeInBytes(logId.getIndex()) +
sizeInBytes(logId.getTerm()) + entrySize;
}
- static int overheadSize() {
+ static int fixedOverheadSize() {
return GROUP_ID_SIZE_BYTES + LENGTH_SIZE_BYTES + HASH_SIZE;
}
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/VarlenEncoder.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/VarlenEncoder.java
new file mode 100644
index 00000000000..03dce6592cd
--- /dev/null
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/VarlenEncoder.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.util;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.util.GridUnsafe;
+
+/**
+ * Class containing methods for encoding/decoding long values using variable
length encoding.
+ */
+// Based on DirectByteBufferStreamImplV1.
+public class VarlenEncoder {
+ /** Mask for clearing 7 least significant bits. */
+ private static final long LEAST_SIGNIFICANT_BITS_MASK =
0xFFFF_FFFF_FFFF_FF80L;
+
+ /**
+ * Writes the given value to the given buffer.
+ *
+ * @return Number of bytes written.
+ */
+ public static int writeLong(long val, ByteBuffer out) {
+ int startPos = out.position();
+
+ while ((val & LEAST_SIGNIFICANT_BITS_MASK) != 0) {
+ byte b = (byte) (val | 0x80);
+
+ out.put(b);
+
+ val >>>= 7;
+ }
+
+ out.put((byte) val);
+
+ return out.position() - startPos;
+ }
+
+ /**
+ * Writes the given value to the given buffer address.
+ *
+ * @return Number of bytes written.
+ * @see GridUnsafe#bufferAddress(ByteBuffer)
+ */
+ public static int writeLong(long val, long addr) {
+ int offset = 0;
+
+ while ((val & LEAST_SIGNIFICANT_BITS_MASK) != 0) {
+ byte b = (byte) (val | 0x80);
+
+ GridUnsafe.putByte(addr + offset, b);
+
+ val >>>= 7;
+
+ offset++;
+ }
+
+ GridUnsafe.putByte(addr + offset, (byte) val);
+
+ offset++;
+
+ return offset;
+ }
+
+ /**
+ * Returns the number of bytes, required by the {@link #writeLong} to
write the value.
+ */
+ public static int sizeInBytes(long val) {
+ if (val >= 0) {
+ if (val < (1L << 7)) {
+ return 1;
+ } else if (val < (1L << 14)) {
+ return 2;
+ } else if (val < (1L << 21)) {
+ return 3;
+ } else if (val < (1L << 28)) {
+ return 4;
+ } else if (val < (1L << 35)) {
+ return 5;
+ } else if (val < (1L << 42)) {
+ return 6;
+ } else if (val < (1L << 49)) {
+ return 7;
+ } else if (val < (1L << 56)) {
+ return 8;
+ } else {
+ return 9;
+ }
+ } else {
+ return 10;
+ }
+ }
+
+ /**
+ * Reads a long value, written by one of the {@link #writeLong} methods,
from the given buffer.
+ */
+ public static long readLong(ByteBuffer buffer) {
+ long val = 0;
+ int shift = 0;
+
+ while (true) {
+ byte b = buffer.get();
+
+ val |= (long) (b & 0x7F) << shift;
+
+ if ((b & 0x80) == 0) {
+ return val;
+ } else {
+ shift += 7;
+ }
+ }
+ }
+}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/codec/v1/V1Encoder.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/codec/v1/V1Encoder.java
index f28263c6750..6bdb581a243 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/codec/v1/V1Encoder.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/codec/v1/V1Encoder.java
@@ -16,6 +16,8 @@
*/
package org.apache.ignite.raft.jraft.entity.codec.v1;
+import static org.apache.ignite.internal.raft.util.VarlenEncoder.sizeInBytes;
+import static org.apache.ignite.internal.raft.util.VarlenEncoder.writeLong;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.List;
@@ -103,9 +105,9 @@ public final class V1Encoder implements LogEntryEncoder {
GridUnsafe.putByte(addr++, LogEntryV1CodecFactory.MAGIC);
- addr = writeLong(typeNumber, addr);
- addr = writeLong(index, addr);
- addr = writeLong(term, addr);
+ addr += writeLong(typeNumber, addr);
+ addr += writeLong(index, addr);
+ addr += writeLong(term, addr);
Bits.putLongLittleEndian(addr, logEntry.getChecksum());
addr += Long.BYTES;
@@ -192,10 +194,10 @@ public final class V1Encoder implements LogEntryEncoder {
private static long writeNodesList(long addr, List<PeerId> nodes) {
if (nodes == null) {
- return writeLong(0, addr);
+ return addr + writeLong(0, addr);
}
- addr = writeLong(nodes.size(), addr);
+ addr += writeLong(nodes.size(), addr);
for (PeerId node : nodes) {
String nodeStr = node.getConsistentId();
@@ -209,8 +211,8 @@ public final class V1Encoder implements LogEntryEncoder {
}
addr += length;
- addr = writeLong(node.getIdx(), addr);
- addr = writeLong(node.getPriority() + 1, addr);
+ addr += writeLong(node.getIdx(), addr);
+ addr += writeLong(node.getPriority() + 1, addr);
}
return addr;
@@ -237,60 +239,4 @@ public final class V1Encoder implements LogEntryEncoder {
writeLong(peerId.getPriority() + 1, content);
}
}
-
- // Based on DirectByteBufferStreamImplV1.
- private static void writeLong(long val, ByteBuffer out) {
- while ((val & 0xFFFF_FFFF_FFFF_FF80L) != 0) {
- byte b = (byte) (val | 0x80);
-
- out.put(b);
-
- val >>>= 7;
- }
-
- out.put((byte) val);
- }
-
- private static long writeLong(long val, long addr) {
- while ((val & 0xFFFF_FFFF_FFFF_FF80L) != 0) {
- byte b = (byte) (val | 0x80);
-
- GridUnsafe.putByte(addr++, b);
-
- val >>>= 7;
- }
-
- GridUnsafe.putByte(addr++, (byte) val);
-
- return addr;
- }
-
- /**
- * Returns the number of bytes, required by the {@link #writeLong} to
write the value.
- */
- private static int sizeInBytes(long val) {
- if (val >= 0) {
- if (val < (1L << 7)) {
- return 1;
- } else if (val < (1L << 14)) {
- return 2;
- } else if (val < (1L << 21)) {
- return 3;
- } else if (val < (1L << 28)) {
- return 4;
- } else if (val < (1L << 35)) {
- return 5;
- } else if (val < (1L << 42)) {
- return 6;
- } else if (val < (1L << 49)) {
- return 7;
- } else if (val < (1L << 56)) {
- return 8;
- } else {
- return 9;
- }
- } else {
- return 10;
- }
- }
}
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/DeserializedSegmentPayload.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/DeserializedSegmentPayload.java
index 4366e261281..8c647cd0f99 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/DeserializedSegmentPayload.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/DeserializedSegmentPayload.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.raft.storage.segstore;
import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.GROUP_ID_SIZE_BYTES;
import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.HASH_SIZE;
import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.LENGTH_SIZE_BYTES;
+import static org.apache.ignite.internal.raft.util.VarlenEncoder.readLong;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@@ -89,7 +90,14 @@ class DeserializedSegmentPayload {
int payloadLength = entryBuf.getInt();
- byte[] payload = new byte[payloadLength];
+ int pos = entryBuf.position();
+
+ readLong(entryBuf); // Skip log entry index.
+ readLong(entryBuf); // Skip log entry term.
+
+ int indexAndTermSize = entryBuf.position() - pos;
+
+ byte[] payload = new byte[payloadLength - indexAndTermSize];
entryBuf.get(payload);
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
index 7d30b6aa65c..ac39ec43e7e 100644
---
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerTest.java
@@ -123,7 +123,7 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
void checkFileNamingAfterRollovers() throws Exception {
int segmentFilesNum = 10;
- byte[] bytes = new byte[FILE_SIZE - HEADER_RECORD.length -
SegmentPayload.overheadSize()];
+ byte[] bytes = new byte[FILE_SIZE - HEADER_RECORD.length - 2 *
SegmentPayload.fixedOverheadSize()];
for (int i = 0; i < segmentFilesNum; i++) {
appendBytes(bytes, i);
@@ -153,9 +153,9 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
IllegalArgumentException e =
assertThrows(IllegalArgumentException.class, () -> appendBytes(new
byte[FILE_SIZE], 0));
String expectedMessage = String.format(
- "Entry size is too big (%d bytes), maximum allowed entry size:
%d bytes.",
- FILE_SIZE,
- FILE_SIZE - HEADER_RECORD.length -
SegmentPayload.overheadSize()
+ "Segment entry is too big (%d bytes), maximum allowed segment
entry size: %d bytes.",
+ FILE_SIZE + SegmentPayload.fixedOverheadSize() + 2, // 2 bytes
for index and term.
+ FILE_SIZE - HEADER_RECORD.length
);
assertThat(e.getMessage(), is(expectedMessage));
@@ -243,7 +243,7 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
assertThat(segmentFiles, hasSize(greaterThan(1)));
- List<byte[]> actualData = readDataFromSegmentFiles(batchSize,
batches.size()).stream()
+ List<byte[]> actualData = readDataFromSegmentFiles().stream()
.sorted(comparingLong(DeserializedSegmentPayload::groupId))
.map(DeserializedSegmentPayload::payload)
.collect(toList());
@@ -322,7 +322,7 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
assertThat(writtenBatches, is(not(empty())));
assertThat(exceptions, is(not(empty())));
- List<byte[]> actualData = readDataFromSegmentFiles(batchSize,
writtenBatches.size()).stream()
+ List<byte[]> actualData = readDataFromSegmentFiles().stream()
.sorted(comparingLong(DeserializedSegmentPayload::groupId))
.map(DeserializedSegmentPayload::payload)
.collect(toList());
@@ -458,10 +458,8 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
.collect(toList());
}
- private List<DeserializedSegmentPayload> readDataFromSegmentFiles(int
batchLength, int numBatches) throws IOException {
- var result = new ArrayList<DeserializedSegmentPayload>(numBatches);
-
- int entrySize = batchLength + SegmentPayload.overheadSize();
+ private List<DeserializedSegmentPayload> readDataFromSegmentFiles() throws
IOException {
+ var result = new ArrayList<DeserializedSegmentPayload>();
for (Path segmentFile : segmentFiles()) {
try (SeekableByteChannel channel =
Files.newByteChannel(segmentFile)) {
@@ -469,14 +467,19 @@ class SegmentFileManagerTest extends IgniteAbstractTest {
int bytesRead = HEADER_RECORD.length;
- while (bytesRead + entrySize < FILE_SIZE && result.size() <
numBatches) {
+ while (bytesRead < FILE_SIZE) {
long position = channel.position();
-
result.add(DeserializedSegmentPayload.fromByteChannel(channel));
+ DeserializedSegmentPayload payload =
DeserializedSegmentPayload.fromByteChannel(channel);
+
+ bytesRead += (int) (channel.position() - position);
- assertThat(channel.position(), is(position + entrySize));
+ if (payload == null) {
+ // EOF reached.
+ break;
+ }
- bytesRead += entrySize;
+ result.add(payload);
}
if (FILE_SIZE - bytesRead >= SWITCH_SEGMENT_RECORD.length) {
diff --git
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/util/VarlenEncoderTest.java
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/util/VarlenEncoderTest.java
new file mode 100644
index 00000000000..767a47608e6
--- /dev/null
+++
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/util/VarlenEncoderTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.util;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.FieldSource;
+
+class VarlenEncoderTest {
+ @SuppressWarnings("unused")
+ private static final long[] TEST_VALUES = {
+ 0L,
+ 1L,
+ 127L, // 1 byte boundary
+ 128L, // 2 bytes start
+ (1L << 14) - 1, // 2 bytes end
+ (1L << 14), // 3 bytes start
+ (1L << 21) - 1,
+ (1L << 21),
+ (1L << 28) - 1,
+ (1L << 28),
+ (1L << 35),
+ (1L << 42),
+ (1L << 49),
+ (1L << 56) - 1,
+ Long.MAX_VALUE,
+
+ -1L,
+ -2L,
+ -127L,
+ -128L,
+ -129L,
+ -(1L << 7),
+ -(1L << 14),
+ -(1L << 21),
+ -(1L << 28),
+ -(1L << 35),
+ -(1L << 42),
+ -(1L << 49),
+ -(1L << 56),
+ Long.MIN_VALUE
+ };
+
+ @ParameterizedTest
+ @FieldSource("TEST_VALUES")
+ void testWriteToBuffer(long value) {
+ ByteBuffer buf = ByteBuffer.allocate(10);
+
+ int start = buf.position();
+
+ int written = VarlenEncoder.writeLong(value, buf);
+
+ assertThat(written, is(VarlenEncoder.sizeInBytes(value)));
+
+ assertThat(buf.position(), is(start + written));
+
+ buf.rewind();
+
+ assertThat(VarlenEncoder.readLong(buf), is(value));
+ }
+
+ @ParameterizedTest
+ @FieldSource("TEST_VALUES")
+ void testWriteToDirectBufferUsingAddr(long value) {
+ ByteBuffer buf = ByteBuffer.allocateDirect(10);
+
+ int written = VarlenEncoder.writeLong(value,
GridUnsafe.bufferAddress(buf));
+
+ assertThat(written, is(VarlenEncoder.sizeInBytes(value)));
+
+ assertThat(VarlenEncoder.readLong(buf), is(value));
+ }
+}