This is an automated email from the ASF dual-hosted git repository.
maciej pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 5b1098db3 test(java): add unit tests for serialization utilities
(#2607)
5b1098db3 is described below
commit 5b1098db3628b4d90b76d3629dd9a47b1c78d2ea
Author: Qichao Chu <[email protected]>
AuthorDate: Tue Jan 27 01:16:03 2026 -0800
test(java): add unit tests for serialization utilities (#2607)
---
.../org/apache/iggy/serde/BytesSerializer.java | 5 +-
.../client/async/tcp/IggyFrameDecoderTest.java | 455 ++++++++++++++
.../client/blocking/tcp/BytesSerializerTest.java | 555 +++++++++++++++++
.../apache/iggy/serde/BytesDeserializerTest.java | 671 +++++++++++++++++++++
4 files changed, 1684 insertions(+), 2 deletions(-)
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesSerializer.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesSerializer.java
index 936dd55d8..4014f3914 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesSerializer.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesSerializer.java
@@ -204,8 +204,9 @@ public final class BytesSerializer {
public static ByteBuf toBytes(String value) {
ByteBuf buffer = Unpooled.buffer(1 + value.length());
- buffer.writeByte(value.length());
- buffer.writeBytes(value.getBytes(StandardCharsets.UTF_8));
+ byte[] stringBytes = value.getBytes(StandardCharsets.UTF_8);
+ buffer.writeByte(stringBytes.length);
+ buffer.writeBytes(stringBytes);
return buffer;
}
diff --git
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/tcp/IggyFrameDecoderTest.java
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/tcp/IggyFrameDecoderTest.java
new file mode 100644
index 000000000..448f07de7
--- /dev/null
+++
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/tcp/IggyFrameDecoderTest.java
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.client.async.tcp;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class IggyFrameDecoderTest {
+
+ private EmbeddedChannel channel;
+
+ @AfterEach
+ void tearDown() {
+ if (channel != null) {
+ // Release any remaining messages
+ Object msg;
+ while ((msg = channel.readInbound()) != null) {
+ if (msg instanceof ByteBuf) {
+ ((ByteBuf) msg).release();
+ }
+ }
+ channel.finishAndReleaseAll();
+ }
+ }
+
+ @Nested
+ class CompleteFrames {
+
+ @Test
+ void shouldDecodeCompleteFrameWithSmallPayload() {
+ // given
+ channel = new EmbeddedChannel(new IggyFrameDecoder());
+ ByteBuf input = Unpooled.buffer();
+ input.writeIntLE(0); // status = success
+ input.writeIntLE(5); // length = 5 bytes
+ input.writeBytes("hello".getBytes()); // payload
+
+ // when
+ channel.writeInbound(input);
+
+ // then
+ ByteBuf decoded = channel.readInbound();
+ assertThat(decoded).isNotNull();
+ assertThat(decoded.readableBytes()).isEqualTo(8 + 5); // header +
payload
+ assertThat(decoded.readIntLE()).isEqualTo(0); // status
+ assertThat(decoded.readIntLE()).isEqualTo(5); // length
+ byte[] payload = new byte[5];
+ decoded.readBytes(payload);
+ assertThat(new String(payload)).isEqualTo("hello");
+ decoded.release();
+ }
+
+ @Test
+ void shouldDecodeCompleteFrameWithZeroLengthPayload() {
+ // given
+ channel = new EmbeddedChannel(new IggyFrameDecoder());
+ ByteBuf input = Unpooled.buffer();
+ input.writeIntLE(0); // status
+ input.writeIntLE(0); // length = 0
+
+ // when
+ channel.writeInbound(input);
+
+ // then
+ ByteBuf decoded = channel.readInbound();
+ assertThat(decoded).isNotNull();
+ assertThat(decoded.readableBytes()).isEqualTo(8); // header only
+ assertThat(decoded.readIntLE()).isEqualTo(0);
+ assertThat(decoded.readIntLE()).isEqualTo(0);
+ decoded.release();
+ }
+
+ @Test
+ void shouldDecodeCompleteFrameWithLargePayload() {
+ // given
+ channel = new EmbeddedChannel(new IggyFrameDecoder());
+ byte[] largePayload = new byte[10000];
+ for (int i = 0; i < largePayload.length; i++) {
+ largePayload[i] = (byte) (i % 256);
+ }
+
+ ByteBuf input = Unpooled.buffer();
+ input.writeIntLE(200); // error status
+ input.writeIntLE(10000); // large length
+ input.writeBytes(largePayload);
+
+ // when
+ channel.writeInbound(input);
+
+ // then
+ ByteBuf decoded = channel.readInbound();
+ assertThat(decoded).isNotNull();
+ assertThat(decoded.readableBytes()).isEqualTo(8 + 10000);
+ assertThat(decoded.readIntLE()).isEqualTo(200);
+ assertThat(decoded.readIntLE()).isEqualTo(10000);
+ decoded.release();
+ }
+
+ @Test
+ void shouldDecodeFrameWithVariousStatusCodes() {
+ // given
+ channel = new EmbeddedChannel(new IggyFrameDecoder());
+
+ for (int status = 0; status <= 5; status++) {
+ ByteBuf input = Unpooled.buffer();
+ input.writeIntLE(status);
+ input.writeIntLE(1);
+ input.writeByte(42);
+
+ // when
+ channel.writeInbound(input);
+
+ // then
+ ByteBuf decoded = channel.readInbound();
+ assertThat(decoded).isNotNull();
+ assertThat(decoded.readIntLE()).isEqualTo(status);
+ decoded.skipBytes(4); // length
+ decoded.skipBytes(1); // payload
+ decoded.release();
+ }
+ }
+ }
+
+ @Nested
+ class IncompleteFrames {
+
+ @Test
+ void shouldWaitForCompleteHeaderWhenOnlyPartialHeaderAvailable() {
+ // given
+ channel = new EmbeddedChannel(new IggyFrameDecoder());
+ ByteBuf input = Unpooled.buffer();
+ input.writeIntLE(0); // only status, missing length (4 bytes
total, need 8)
+
+ // when
+ boolean hasMessage = channel.writeInbound(input);
+
+ // then
+ assertThat(hasMessage).isFalse(); // No complete frame yet
+ ByteBuf decoded = channel.readInbound();
+ assertThat(decoded).isNull(); // Nothing to read
+ }
+
+ @Test
+ void shouldWaitForCompletePayloadWhenOnlyHeaderAvailable() {
+ // given
+ channel = new EmbeddedChannel(new IggyFrameDecoder());
+ ByteBuf input = Unpooled.buffer();
+ input.writeIntLE(0); // status
+ input.writeIntLE(100); // expects 100 bytes payload
+ // But no payload written
+
+ // when
+ boolean hasMessage = channel.writeInbound(input);
+
+ // then
+ assertThat(hasMessage).isFalse();
+ ByteBuf decoded = channel.readInbound();
+ assertThat(decoded).isNull();
+ }
+
+ @Test
+ void shouldWaitForCompletePayloadWhenPartialPayloadAvailable() {
+ // given
+ channel = new EmbeddedChannel(new IggyFrameDecoder());
+ ByteBuf input = Unpooled.buffer();
+ input.writeIntLE(0);
+ input.writeIntLE(100); // expects 100 bytes
+ input.writeBytes(new byte[50]); // only 50 bytes available
+
+ // when
+ boolean hasMessage = channel.writeInbound(input);
+
+ // then
+ assertThat(hasMessage).isFalse();
+ assertThat((ByteBuf) channel.readInbound()).isNull();
+ }
+
+ @Test
+ void shouldEventuallyDecodeWhenMoreDataArrives() {
+ // given
+ channel = new EmbeddedChannel(new IggyFrameDecoder());
+ ByteBuf firstChunk = Unpooled.buffer();
+ firstChunk.writeIntLE(0);
+ firstChunk.writeIntLE(10);
+ firstChunk.writeBytes(new byte[5]); // only 5 of 10 bytes
+
+ // when - first chunk
+ boolean hasMessage1 = channel.writeInbound(firstChunk);
+ assertThat(hasMessage1).isFalse();
+
+ // when - second chunk completes the frame
+ ByteBuf secondChunk = Unpooled.buffer();
+ secondChunk.writeBytes(new byte[5]); // remaining 5 bytes
+ boolean hasMessage2 = channel.writeInbound(secondChunk);
+
+ // then
+ assertThat(hasMessage2).isTrue();
+ ByteBuf decoded = channel.readInbound();
+ assertThat(decoded).isNotNull();
+ assertThat(decoded.readableBytes()).isEqualTo(8 + 10);
+ decoded.release();
+ }
+ }
+
+ @Nested
+ class MultipleFrames {
+
+ @Test
+ void shouldDecodeMultipleFramesInSequence() {
+ // given
+ channel = new EmbeddedChannel(new IggyFrameDecoder());
+ ByteBuf input = Unpooled.buffer();
+
+ // Frame 1
+ input.writeIntLE(0);
+ input.writeIntLE(3);
+ input.writeBytes("abc".getBytes());
+
+ // Frame 2
+ input.writeIntLE(1);
+ input.writeIntLE(2);
+ input.writeBytes("de".getBytes());
+
+ // when
+ channel.writeInbound(input);
+
+ // then - frame 1
+ ByteBuf decoded1 = channel.readInbound();
+ assertThat(decoded1).isNotNull();
+ assertThat(decoded1.readableBytes()).isEqualTo(8 + 3);
+ decoded1.release();
+
+ // then - frame 2
+ ByteBuf decoded2 = channel.readInbound();
+ assertThat(decoded2).isNotNull();
+ assertThat(decoded2.readableBytes()).isEqualTo(8 + 2);
+ decoded2.release();
+
+ // No more frames
+ assertThat((ByteBuf) channel.readInbound()).isNull();
+ }
+
+ @Test
+ void shouldDecodeThreeFramesCorrectly() {
+ // given
+ channel = new EmbeddedChannel(new IggyFrameDecoder());
+ ByteBuf input = Unpooled.buffer();
+
+ for (int i = 0; i < 3; i++) {
+ input.writeIntLE(i);
+ input.writeIntLE(1);
+ input.writeByte(i);
+ }
+
+ // when
+ channel.writeInbound(input);
+
+ // then
+ for (int i = 0; i < 3; i++) {
+ ByteBuf decoded = channel.readInbound();
+ assertThat(decoded).isNotNull();
+ assertThat(decoded.readIntLE()).isEqualTo(i);
+ decoded.skipBytes(4 + 1); // skip length and payload
+ decoded.release();
+ }
+
+ assertThat((ByteBuf) channel.readInbound()).isNull();
+ }
+ }
+
+ @Nested
+ class EdgeCases {
+
+ @Test
+ void shouldNotAdvanceReaderIndexWhenPayloadIncomplete() {
+ // given
+ channel = new EmbeddedChannel(new IggyFrameDecoder());
+ ByteBuf input = Unpooled.buffer();
+ input.writeIntLE(0);
+ input.writeIntLE(100); // expects 100 bytes
+ input.writeBytes(new byte[50]); // only 50 bytes
+ int readerIndexBefore = input.readerIndex();
+
+ // when
+ boolean hasMessage = channel.writeInbound(input);
+
+ // then - decoder should wait for complete payload
+ assertThat(hasMessage).isFalse();
+ ByteBuf decoded = channel.readInbound();
+ assertThat(decoded).isNull();
+ assertThat(input.readerIndex()).isEqualTo(readerIndexBefore);
+ }
+
+ @Test
+ void shouldHandleEmptyInput() {
+ // given
+ channel = new EmbeddedChannel(new IggyFrameDecoder());
+ ByteBuf input = Unpooled.buffer(); // empty buffer
+
+ // when
+ boolean hasMessage = channel.writeInbound(input);
+
+ // then
+ assertThat(hasMessage).isFalse();
+ assertThat((ByteBuf) channel.readInbound()).isNull();
+ }
+
+ @Test
+ void shouldHandleSingleByteInput() {
+ // given
+ channel = new EmbeddedChannel(new IggyFrameDecoder());
+ ByteBuf input = Unpooled.buffer();
+ input.writeByte(0); // only 1 byte
+
+ // when
+ boolean hasMessage = channel.writeInbound(input);
+
+ // then
+ assertThat(hasMessage).isFalse();
+ assertThat((ByteBuf) channel.readInbound()).isNull();
+ }
+
+ @Test
+ void shouldHandleExactlyHeaderSizeWithoutPayload() {
+ // given
+ channel = new EmbeddedChannel(new IggyFrameDecoder());
+ ByteBuf input = Unpooled.buffer();
+ input.writeIntLE(0);
+ input.writeIntLE(10); // expects payload, but none provided
+ // Exactly 8 bytes (header size)
+
+ // when
+ boolean hasMessage = channel.writeInbound(input);
+
+ // then
+ assertThat(hasMessage).isFalse(); // Waiting for payload
+ assertThat((ByteBuf) channel.readInbound()).isNull();
+ }
+
+ @Test
+ void shouldDecodeFrameFollowedByPartialNextFrame() {
+ // given
+ channel = new EmbeddedChannel(new IggyFrameDecoder());
+ ByteBuf input = Unpooled.buffer();
+
+ // Complete frame 1
+ input.writeIntLE(0);
+ input.writeIntLE(5);
+ input.writeBytes("hello".getBytes());
+
+ // Partial frame 2 (only header)
+ input.writeIntLE(1);
+ input.writeIntLE(10);
+ // No payload for frame 2
+
+ // when
+ channel.writeInbound(input);
+
+ // then - should get frame 1
+ ByteBuf decoded1 = channel.readInbound();
+ assertThat(decoded1).isNotNull();
+ assertThat(decoded1.readableBytes()).isEqualTo(8 + 5);
+ decoded1.release();
+
+ // Frame 2 should not be available yet
+ assertThat((ByteBuf) channel.readInbound()).isNull();
+ }
+
+ @Test
+ void shouldHandleMaxIntPayloadLength() {
+ // given - test with a reasonably large payload (not actual
MAX_INT to avoid OOM)
+ channel = new EmbeddedChannel(new IggyFrameDecoder());
+ int largeSize = 1000000; // 1MB
+ ByteBuf input = Unpooled.buffer();
+ input.writeIntLE(0);
+ input.writeIntLE(largeSize);
+ input.writeBytes(new byte[largeSize]);
+
+ // when
+ channel.writeInbound(input);
+
+ // then
+ ByteBuf decoded = channel.readInbound();
+ assertThat(decoded).isNotNull();
+ assertThat(decoded.readableBytes()).isEqualTo(8 + largeSize);
+ decoded.release();
+ }
+ }
+
+ @Nested
+ class BufferManagement {
+
+ @Test
+ void shouldCreateNewBufferForEachDecodedFrame() {
+ // given
+ channel = new EmbeddedChannel(new IggyFrameDecoder());
+ ByteBuf input = Unpooled.buffer();
+ input.writeIntLE(0);
+ input.writeIntLE(5);
+ input.writeBytes("hello".getBytes());
+
+ // when
+ channel.writeInbound(input);
+ ByteBuf decoded = channel.readInbound();
+
+ // then - decoded buffer should be independent
+ assertThat(decoded).isNotNull();
+ assertThat(decoded).isNotSameAs(input);
+ assertThat(decoded.readableBytes()).isEqualTo(13);
+ decoded.release();
+ }
+
+ @Test
+ void shouldAllowManualReleaseOfDecodedBuffers() {
+ // given
+ channel = new EmbeddedChannel(new IggyFrameDecoder());
+ ByteBuf input = Unpooled.buffer();
+ input.writeIntLE(0);
+ input.writeIntLE(3);
+ input.writeBytes("foo".getBytes());
+
+ // when
+ channel.writeInbound(input);
+ ByteBuf decoded = channel.readInbound();
+
+ // then - should be releasable
+ assertThat(decoded.refCnt()).isEqualTo(1);
+ decoded.release();
+ assertThat(decoded.refCnt()).isEqualTo(0);
+ }
+ }
+}
diff --git
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/tcp/BytesSerializerTest.java
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/tcp/BytesSerializerTest.java
index 2e035979e..224d5e7ec 100644
---
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/tcp/BytesSerializerTest.java
+++
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/tcp/BytesSerializerTest.java
@@ -21,12 +21,30 @@ package org.apache.iggy.client.blocking.tcp;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import org.apache.iggy.consumergroup.Consumer;
+import org.apache.iggy.identifier.ConsumerId;
+import org.apache.iggy.identifier.StreamId;
+import org.apache.iggy.message.BytesMessageId;
+import org.apache.iggy.message.HeaderKind;
+import org.apache.iggy.message.HeaderValue;
+import org.apache.iggy.message.Message;
+import org.apache.iggy.message.MessageHeader;
+import org.apache.iggy.message.Partitioning;
+import org.apache.iggy.message.PollingStrategy;
import org.apache.iggy.serde.BytesSerializer;
+import org.apache.iggy.user.GlobalPermissions;
+import org.apache.iggy.user.Permissions;
+import org.apache.iggy.user.StreamPermissions;
+import org.apache.iggy.user.TopicPermissions;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -138,4 +156,541 @@ class BytesSerializerTest {
assertThatThrownBy(() ->
BytesSerializer.toBytesAsU128(value)).isInstanceOf(IllegalArgumentException.class);
}
}
+
+ @Nested
+ class StringSerialization {
+
+ @Test
+ void shouldSerializeSimpleString() {
+ // given
+ String input = "test";
+
+ // when
+ ByteBuf result = BytesSerializer.toBytes(input);
+
+ // then
+ assertThat(result.readByte()).isEqualTo((byte) 4); // length
+ byte[] stringBytes = new byte[4];
+ result.readBytes(stringBytes);
+ assertThat(new String(stringBytes,
StandardCharsets.UTF_8)).isEqualTo("test");
+ }
+
+ @Test
+ void shouldSerializeEmptyString() {
+ // given
+ String input = "";
+
+ // when
+ ByteBuf result = BytesSerializer.toBytes(input);
+
+ // then
+ assertThat(result.readByte()).isEqualTo((byte) 0); // length = 0
+ assertThat(result.readableBytes()).isEqualTo(0);
+ }
+
+ @Test
+ void shouldSerializeUtf8Characters() {
+ // given
+ String input = "Hello世界";
+
+ // when
+ ByteBuf result = BytesSerializer.toBytes(input);
+
+ // then
+ byte[] expectedBytes = input.getBytes(StandardCharsets.UTF_8);
+ assertThat(result.readByte()).isEqualTo((byte)
expectedBytes.length);
+ byte[] stringBytes = new byte[expectedBytes.length];
+ result.readBytes(stringBytes);
+ assertThat(stringBytes).isEqualTo(expectedBytes);
+ }
+ }
+
+ @Nested
+ class IdentifierSerialization {
+
+ @Test
+ void shouldSerializeNumericIdentifier() {
+ // given
+ var identifier = StreamId.of(123L);
+
+ // when
+ ByteBuf result = BytesSerializer.toBytes(identifier);
+
+ // then
+ assertThat(result.readByte()).isEqualTo((byte) 1); // kind = 1
(numeric)
+ assertThat(result.readByte()).isEqualTo((byte) 4); // length = 4
+ assertThat(result.readIntLE()).isEqualTo(123); // id value
+ }
+
+ @Test
+ void shouldSerializeStringIdentifier() {
+ // given
+ var identifier = StreamId.of("test-stream");
+
+ // when
+ ByteBuf result = BytesSerializer.toBytes(identifier);
+
+ // then
+ assertThat(result.readByte()).isEqualTo((byte) 2); // kind = 2
(string)
+ assertThat(result.readByte()).isEqualTo((byte) 11); // length =
"test-stream".length()
+ byte[] nameBytes = new byte[11];
+ result.readBytes(nameBytes);
+ assertThat(new String(nameBytes)).isEqualTo("test-stream");
+ }
+ }
+
+ @Nested
+ class ConsumerSerialization {
+
+ @Test
+ void shouldSerializeConsumerWithNumericId() {
+ // given
+ var consumer = Consumer.of(42L);
+
+ // when
+ ByteBuf result = BytesSerializer.toBytes(consumer);
+
+ // then
+ assertThat(result.readByte()).isEqualTo((byte) 1); // Consumer kind
+ assertThat(result.readByte()).isEqualTo((byte) 1); // Identifier
kind (numeric)
+ assertThat(result.readByte()).isEqualTo((byte) 4); // Identifier
length
+ assertThat(result.readIntLE()).isEqualTo(42); // ID value
+ }
+
+ @Test
+ void shouldSerializeConsumerGroupWithNumericId() {
+ // given
+ var consumer = Consumer.group(99L);
+
+ // when
+ ByteBuf result = BytesSerializer.toBytes(consumer);
+
+ // then
+ assertThat(result.readByte()).isEqualTo((byte) 2); //
ConsumerGroup kind
+ assertThat(result.readByte()).isEqualTo((byte) 1); // Identifier
kind (numeric)
+ assertThat(result.readByte()).isEqualTo((byte) 4); // Identifier
length
+ assertThat(result.readIntLE()).isEqualTo(99); // ID value
+ }
+
+ @Test
+ void shouldSerializeConsumerWithStringId() {
+ // given
+ var consumer = Consumer.of(ConsumerId.of("my-consumer"));
+
+ // when
+ ByteBuf result = BytesSerializer.toBytes(consumer);
+
+ // then
+ assertThat(result.readByte()).isEqualTo((byte) 1); // Consumer kind
+ assertThat(result.readByte()).isEqualTo((byte) 2); // Identifier
kind (string)
+ assertThat(result.readByte()).isEqualTo((byte) 11); //
"my-consumer".length()
+ }
+ }
+
+ @Nested
+ class PartitioningSerialization {
+
+ @Test
+ void shouldSerializeBalancedPartitioning() {
+ // given
+ var partitioning = Partitioning.balanced();
+
+ // when
+ ByteBuf result = BytesSerializer.toBytes(partitioning);
+
+ // then
+ assertThat(result.readByte()).isEqualTo((byte) 1); // Balanced kind
+ assertThat(result.readByte()).isEqualTo((byte) 0); // Empty value
+ }
+
+ @Test
+ void shouldSerializePartitionIdPartitioning() {
+ // given
+ var partitioning = Partitioning.partitionId(5L);
+
+ // when
+ ByteBuf result = BytesSerializer.toBytes(partitioning);
+
+ // then
+ assertThat(result.readByte()).isEqualTo((byte) 2); // PartitionId
kind
+ assertThat(result.readByte()).isEqualTo((byte) 4); // 4 bytes for
int
+ assertThat(result.readIntLE()).isEqualTo(5); // partition ID
+ }
+
+ @Test
+ void shouldSerializeMessagesKeyPartitioning() {
+ // given
+ var partitioning = Partitioning.messagesKey("user-123");
+
+ // when
+ ByteBuf result = BytesSerializer.toBytes(partitioning);
+
+ // then
+ assertThat(result.readByte()).isEqualTo((byte) 3); // MessagesKey
kind
+ assertThat(result.readByte()).isEqualTo((byte) 8); //
"user-123".length()
+ byte[] keyBytes = new byte[8];
+ result.readBytes(keyBytes);
+ assertThat(new String(keyBytes)).isEqualTo("user-123");
+ }
+ }
+
+ @Nested
+ class PollingStrategySerialization {
+
+ @Test
+ void shouldSerializeFirstStrategy() {
+ // given
+ var strategy = PollingStrategy.first();
+
+ // when
+ ByteBuf result = BytesSerializer.toBytes(strategy);
+
+ // then
+ assertThat(result.readByte()).isEqualTo((byte) 3); // First kind =
3
+ // Followed by 8 bytes of U64 (zero value)
+ assertThat(result.readableBytes()).isEqualTo(8);
+ }
+
+ @Test
+ void shouldSerializeLastStrategy() {
+ // given
+ var strategy = PollingStrategy.last();
+
+ // when
+ ByteBuf result = BytesSerializer.toBytes(strategy);
+
+ // then
+ assertThat(result.readByte()).isEqualTo((byte) 4); // Last kind = 4
+ assertThat(result.readableBytes()).isEqualTo(8);
+ }
+
+ @Test
+ void shouldSerializeOffsetStrategy() {
+ // given
+ var strategy = PollingStrategy.offset(BigInteger.valueOf(100));
+
+ // when
+ ByteBuf result = BytesSerializer.toBytes(strategy);
+
+ // then
+ assertThat(result.readByte()).isEqualTo((byte) 1); // Offset kind
= 1
+ assertThat(result.readableBytes()).isEqualTo(8);
+ }
+
+ @Test
+ void shouldSerializeNextStrategy() {
+ // given
+ var strategy = PollingStrategy.next();
+
+ // when
+ ByteBuf result = BytesSerializer.toBytes(strategy);
+
+ // then
+ assertThat(result.readByte()).isEqualTo((byte) 5); // Next kind = 5
+ assertThat(result.readableBytes()).isEqualTo(8);
+ }
+
+ @Test
+ void shouldSerializeTimestampStrategy() {
+ // given
+ var strategy =
PollingStrategy.timestamp(BigInteger.valueOf(1234567890));
+
+ // when
+ ByteBuf result = BytesSerializer.toBytes(strategy);
+
+ // then
+ assertThat(result.readByte()).isEqualTo((byte) 2); // Timestamp
kind = 2
+ assertThat(result.readableBytes()).isEqualTo(8);
+ }
+ }
+
+ @Nested
+ class OptionalValueSerialization {
+
+ @Test
+ void shouldSerializePresentOptional() {
+ // given
+ Optional<Long> optional = Optional.of(42L);
+
+ // when
+ ByteBuf result = BytesSerializer.toBytes(optional);
+
+ // then
+ assertThat(result.readByte()).isEqualTo((byte) 1); // present flag
+ assertThat(result.readIntLE()).isEqualTo(42); // value
+ }
+
+ @Test
+ void shouldSerializeEmptyOptional() {
+ // given
+ Optional<Long> optional = Optional.empty();
+
+ // when
+ ByteBuf result = BytesSerializer.toBytes(optional);
+
+ // then
+ assertThat(result.readByte()).isEqualTo((byte) 0); // not present
flag
+ assertThat(result.readIntLE()).isEqualTo(0); // zero value
+ }
+ }
+
+ @Nested
+ class HeadersSerialization {
+
+ @Test
+ void shouldSerializeEmptyHeaders() {
+ // given
+ Map<String, HeaderValue> headers = new HashMap<>();
+
+ // when
+ ByteBuf result = BytesSerializer.toBytes(headers);
+
+ // then
+ assertThat(result.readableBytes()).isEqualTo(0); // Empty buffer
+ }
+
+ @Test
+ void shouldSerializeSingleHeader() {
+ // given
+ Map<String, HeaderValue> headers = new HashMap<>();
+ headers.put("key1", new HeaderValue(HeaderKind.Raw, "value1"));
+
+ // when
+ ByteBuf result = BytesSerializer.toBytes(headers);
+
+ // then
+ assertThat(result.readIntLE()).isEqualTo(4); // "key1".length()
+ byte[] keyBytes = new byte[4];
+ result.readBytes(keyBytes);
+ assertThat(new String(keyBytes)).isEqualTo("key1");
+ assertThat(result.readByte()).isEqualTo((byte) 1); // Raw kind
+ assertThat(result.readIntLE()).isEqualTo(6); // "value1".length()
+ byte[] valueBytes = new byte[6];
+ result.readBytes(valueBytes);
+ assertThat(new String(valueBytes)).isEqualTo("value1");
+ }
+
+ @Test
+ void shouldSerializeMultipleHeaders() {
+ // given
+ Map<String, HeaderValue> headers = new HashMap<>();
+ headers.put("k1", new HeaderValue(HeaderKind.Raw, "v1")); // 13
bytes
+ headers.put("k2", new HeaderValue(HeaderKind.String, "v2")); // 13
bytes
+
+ // when
+ ByteBuf result = BytesSerializer.toBytes(headers);
+
+ // then - verify buffer contains data for both headers
+ assertThat(result.readableBytes()).isEqualTo(26);
+ }
+ }
+
+ @Nested
+ class MessageSerialization {
+
+ @Test
+ void shouldSerializeMessageWithoutUserHeaders() {
+ // given
+ var messageId = new BytesMessageId(new byte[16]);
+ var header = new MessageHeader(
+ BigInteger.valueOf(123), // checksum
+ messageId,
+ BigInteger.valueOf(0), // offset
+ BigInteger.valueOf(1000), // timestamp
+ BigInteger.valueOf(1000), // originTimestamp
+ 0L, // userHeadersLength
+ 5L // payloadLength
+ );
+ byte[] payload = "hello".getBytes();
+ var message = new Message(header, payload, new HashMap<>());
+
+ // when
+ ByteBuf result = BytesSerializer.toBytes(message);
+
+ // then
+ assertThat(result.readableBytes()).isEqualTo(MessageHeader.SIZE +
5); // header + payload, no user headers
+ }
+
+ @Test
+ void shouldSerializeMessageWithUserHeaders() {
+ // given
+ var messageId = new BytesMessageId(new byte[16]);
+ Map<String, HeaderValue> userHeaders = new HashMap<>();
+ userHeaders.put("key", new HeaderValue(HeaderKind.Raw, "val"));
+
+ // Calculate user headers size
+ ByteBuf headersBuf = BytesSerializer.toBytes(userHeaders);
+ int userHeadersLength = headersBuf.readableBytes();
+
+ var header = new MessageHeader(
+ BigInteger.ZERO,
+ messageId,
+ BigInteger.ZERO,
+ BigInteger.valueOf(1000),
+ BigInteger.valueOf(1000),
+ (long) userHeadersLength,
+ 3L // "abc".length()
+ );
+ byte[] payload = "abc".getBytes();
+ var message = new Message(header, payload, userHeaders);
+
+ // when
+ ByteBuf result = BytesSerializer.toBytes(message);
+
+ // then
+ assertThat(result.readableBytes()).isEqualTo(MessageHeader.SIZE +
3 + userHeadersLength);
+ }
+ }
+
+ @Nested
+ class MessageHeaderSerialization {
+
+ @Test
+ void shouldSerializeMessageHeader() {
+ // given
+ var messageId = new BytesMessageId(new byte[16]);
+ var header = new MessageHeader(
+ BigInteger.valueOf(999), // checksum
+ messageId,
+ BigInteger.valueOf(42), // offset
+ BigInteger.valueOf(2000), // timestamp
+ BigInteger.valueOf(1999), // originTimestamp
+ 10L, // userHeadersLength
+ 100L // payloadLength
+ );
+
+ // when
+ ByteBuf result = BytesSerializer.toBytes(header);
+
+ // then
+ assertThat(result.readableBytes()).isEqualTo(MessageHeader.SIZE);
+ // Read checksum (8 bytes)
+ result.skipBytes(8);
+ // Read message ID (16 bytes)
+ result.skipBytes(16);
+ // Read offset (8 bytes)
+ result.skipBytes(8);
+ // Read timestamp (8 bytes)
+ result.skipBytes(8);
+ // Read origin timestamp (8 bytes)
+ result.skipBytes(8);
+ // Read user headers length (4 bytes)
+ assertThat(result.readIntLE()).isEqualTo(10);
+ // Read payload length (4 bytes)
+ assertThat(result.readIntLE()).isEqualTo(100);
+ }
+ }
+
+ @Nested
+ class PermissionsSerialization {
+
+ @Test
+ void shouldSerializeGlobalPermissions() {
+ // given
+ var permissions = new GlobalPermissions(true, false, true, false,
true, false, true, false, true, false);
+
+ // when
+ ByteBuf result = BytesSerializer.toBytes(permissions);
+
+ // then
+ assertThat(result.readableBytes()).isEqualTo(10); // 10 booleans
+ assertThat(result.readBoolean()).isTrue(); // manageServers
+ assertThat(result.readBoolean()).isFalse(); // readServers
+ assertThat(result.readBoolean()).isTrue(); // manageUsers
+ assertThat(result.readBoolean()).isFalse(); // readUsers
+ assertThat(result.readBoolean()).isTrue(); // manageStreams
+ assertThat(result.readBoolean()).isFalse(); // readStreams
+ assertThat(result.readBoolean()).isTrue(); // manageTopics
+ assertThat(result.readBoolean()).isFalse(); // readTopics
+ assertThat(result.readBoolean()).isTrue(); // pollMessages
+ assertThat(result.readBoolean()).isFalse(); // sendMessages
+ }
+
+ @Test
+ void shouldSerializeTopicPermissions() {
+ // given
+ var permissions = new TopicPermissions(true, false, true, false);
+
+ // when
+ ByteBuf result = BytesSerializer.toBytes(permissions);
+
+ // then
+ assertThat(result.readableBytes()).isEqualTo(4); // 4 booleans
+ assertThat(result.readBoolean()).isTrue(); // manageTopic
+ assertThat(result.readBoolean()).isFalse(); // readTopic
+ assertThat(result.readBoolean()).isTrue(); // pollMessages
+ assertThat(result.readBoolean()).isFalse(); // sendMessages
+ }
+
+ @Test
+ void shouldSerializeStreamPermissionsWithoutTopics() {
+ // given
+ var permissions = new StreamPermissions(true, false, true, false,
true, false, new HashMap<>());
+
+ // when
+ ByteBuf result = BytesSerializer.toBytes(permissions);
+
+ // then
+ assertThat(result.readBoolean()).isTrue(); // manageStream
+ assertThat(result.readBoolean()).isFalse(); // readStream
+ assertThat(result.readBoolean()).isTrue(); // manageTopics
+ assertThat(result.readBoolean()).isFalse(); // readTopics
+ assertThat(result.readBoolean()).isTrue(); // pollMessages
+ assertThat(result.readBoolean()).isFalse(); // sendMessages
+ assertThat(result.readByte()).isEqualTo((byte) 0); // no topics
marker
+ }
+
+ @Test
+ void shouldSerializeStreamPermissionsWithTopics() {
+ // given
+ Map<Long, TopicPermissions> topicPerms = new HashMap<>();
+ topicPerms.put(1L, new TopicPermissions(true, true, true, true));
+ var permissions = new StreamPermissions(true, true, true, true,
true, true, topicPerms);
+
+ // when
+ ByteBuf result = BytesSerializer.toBytes(permissions);
+
+ // then
+ result.skipBytes(6); // Skip 6 stream-level booleans
+ assertThat(result.readByte()).isEqualTo((byte) 1); // has topic
marker
+ assertThat(result.readIntLE()).isEqualTo(1); // topic ID
+ result.skipBytes(4); // Skip topic permissions
+ assertThat(result.readByte()).isEqualTo((byte) 0); // end marker
+ }
+
+ @Test
+ void shouldSerializeFullPermissionsWithoutStreams() {
+ // given
+ var globalPerms = new GlobalPermissions(true, true, true, true,
true, true, true, true, true, true);
+ var permissions = new Permissions(globalPerms, new HashMap<>());
+
+ // when
+ ByteBuf result = BytesSerializer.toBytes(permissions);
+
+ // then
+ result.skipBytes(10); // Skip global permissions
+ assertThat(result.readByte()).isEqualTo((byte) 0); // no streams
marker
+ }
+
+ @Test
+ void shouldSerializeFullPermissionsWithStreams() {
+ // given
+ var globalPerms =
+ new GlobalPermissions(false, false, false, false, false,
false, false, false, false, false);
+ Map<Long, StreamPermissions> streamPerms = new HashMap<>();
+ streamPerms.put(1L, new StreamPermissions(true, true, true, true,
true, true, new HashMap<>()));
+ var permissions = new Permissions(globalPerms, streamPerms);
+
+ // when
+ ByteBuf result = BytesSerializer.toBytes(permissions);
+
+ // then
+ result.skipBytes(10); // Skip global permissions
+ assertThat(result.readByte()).isEqualTo((byte) 1); // has stream
marker
+ assertThat(result.readIntLE()).isEqualTo(1); // stream ID
+ result.skipBytes(6); // Skip stream permissions
+ assertThat(result.readByte()).isEqualTo((byte) 0); // no topics in
stream
+ assertThat(result.readByte()).isEqualTo((byte) 0); // end of
streams marker
+ }
+ }
}
diff --git
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/serde/BytesDeserializerTest.java
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/serde/BytesDeserializerTest.java
index 42e8b0158..4558d30fa 100644
---
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/serde/BytesDeserializerTest.java
+++
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/serde/BytesDeserializerTest.java
@@ -22,17 +22,76 @@ package org.apache.iggy.serde;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.commons.lang3.ArrayUtils;
+import org.apache.iggy.message.HeaderKind;
+import org.apache.iggy.topic.CompressionAlgorithm;
+import org.apache.iggy.user.UserStatus;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
import java.util.HexFormat;
+import static org.apache.iggy.serde.BytesDeserializer.readClientInfo;
+import static org.apache.iggy.serde.BytesDeserializer.readConsumerGroup;
+import static org.apache.iggy.serde.BytesDeserializer.readConsumerGroupDetails;
+import static org.apache.iggy.serde.BytesDeserializer.readConsumerGroupInfo;
+import static org.apache.iggy.serde.BytesDeserializer.readConsumerGroupMember;
+import static org.apache.iggy.serde.BytesDeserializer.readConsumerOffsetInfo;
+import static org.apache.iggy.serde.BytesDeserializer.readGlobalPermissions;
+import static org.apache.iggy.serde.BytesDeserializer.readPartition;
+import static org.apache.iggy.serde.BytesDeserializer.readPermissions;
+import static
org.apache.iggy.serde.BytesDeserializer.readPersonalAccessTokenInfo;
+import static org.apache.iggy.serde.BytesDeserializer.readPolledMessage;
+import static org.apache.iggy.serde.BytesDeserializer.readPolledMessages;
+import static
org.apache.iggy.serde.BytesDeserializer.readRawPersonalAccessToken;
+import static org.apache.iggy.serde.BytesDeserializer.readStats;
+import static org.apache.iggy.serde.BytesDeserializer.readStreamBase;
+import static org.apache.iggy.serde.BytesDeserializer.readStreamDetails;
+import static org.apache.iggy.serde.BytesDeserializer.readStreamPermissions;
+import static org.apache.iggy.serde.BytesDeserializer.readTopic;
+import static org.apache.iggy.serde.BytesDeserializer.readTopicDetails;
+import static org.apache.iggy.serde.BytesDeserializer.readTopicPermissions;
import static org.apache.iggy.serde.BytesDeserializer.readU64AsBigInteger;
+import static org.apache.iggy.serde.BytesDeserializer.readUserInfo;
+import static org.apache.iggy.serde.BytesDeserializer.readUserInfoDetails;
import static org.assertj.core.api.Assertions.assertThat;
class BytesDeserializerTest {
+ // Helper methods for writing test data
+ private static void writeU64(ByteBuf buffer, BigInteger value) {
+ byte[] bytes = value.toByteArray();
+ ArrayUtils.reverse(bytes);
+ buffer.writeBytes(bytes, 0, Math.min(8, bytes.length));
+ if (bytes.length < 8) {
+ buffer.writeZero(8 - bytes.length);
+ }
+ }
+
+ private static void writeTopicData(ByteBuf buffer) {
+ buffer.writeIntLE(10); // topic ID
+ writeU64(buffer, BigInteger.valueOf(1000)); // created at
+ buffer.writeIntLE(4); // partitions count
+ writeU64(buffer, BigInteger.ZERO); // message expiry
+ buffer.writeByte(CompressionAlgorithm.None.asCode()); // compression
+ writeU64(buffer, BigInteger.valueOf(10000)); // max topic size
+ buffer.writeByte(1); // replication factor
+ writeU64(buffer, BigInteger.valueOf(500)); // size
+ writeU64(buffer, BigInteger.valueOf(50)); // messages count
+ buffer.writeByte(4); // name length
+ buffer.writeBytes("test".getBytes());
+ }
+
+ private static void writePartitionData(ByteBuf buffer) {
+ buffer.writeIntLE(1); // partition ID
+ writeU64(buffer, BigInteger.valueOf(1000)); // created at
+ buffer.writeIntLE(5); // segments count
+ writeU64(buffer, BigInteger.valueOf(99)); // current offset
+ writeU64(buffer, BigInteger.valueOf(200)); // size
+ writeU64(buffer, BigInteger.valueOf(20)); // messages count
+ }
+
@Nested
class U64 {
@@ -78,4 +137,616 @@ class BytesDeserializerTest {
assertThat(result).isEqualTo(expected);
}
}
+
+ @Nested
+ class StreamDeserialization {
+
+ @Test
+ void shouldDeserializeStreamBase() {
+ // given
+ ByteBuf buffer = Unpooled.buffer();
+ buffer.writeIntLE(1); // stream ID
+ writeU64(buffer, BigInteger.valueOf(1000)); // createdAt
+ buffer.writeIntLE(2); // topics count
+ writeU64(buffer, BigInteger.valueOf(5000)); // size
+ writeU64(buffer, BigInteger.valueOf(100)); // messages count
+ buffer.writeByte(11); // name length
+ buffer.writeBytes("test-stream".getBytes(StandardCharsets.UTF_8));
+
+ // when
+ var stream = readStreamBase(buffer);
+
+ // then
+ assertThat(stream.id()).isEqualTo(1L);
+ assertThat(stream.createdAt()).isEqualTo(BigInteger.valueOf(1000));
+ assertThat(stream.topicsCount()).isEqualTo(2L);
+ assertThat(stream.name()).isEqualTo("test-stream");
+ }
+
+ @Test
+ void shouldDeserializeStreamDetails() {
+ // given
+ ByteBuf buffer = Unpooled.buffer();
+ // Write stream base
+ buffer.writeIntLE(1); // stream ID
+ writeU64(buffer, BigInteger.valueOf(1000));
+ buffer.writeIntLE(1); // topics count
+ writeU64(buffer, BigInteger.valueOf(5000));
+ writeU64(buffer, BigInteger.valueOf(100));
+ buffer.writeByte(6);
+ buffer.writeBytes("stream".getBytes());
+ // Write one topic
+ writeTopicData(buffer);
+
+ // when
+ var streamDetails = readStreamDetails(buffer);
+
+ // then
+ assertThat(streamDetails.id()).isEqualTo(1L);
+ assertThat(streamDetails.topics()).hasSize(1);
+ }
+ }
+
+ @Nested
+ class TopicDeserialization {
+
+ @Test
+ void shouldDeserializeTopic() {
+ // given
+ ByteBuf buffer = Unpooled.buffer();
+ writeTopicData(buffer);
+
+ // when
+ var topic = readTopic(buffer);
+
+ // then
+ assertThat(topic.id()).isEqualTo(10L);
+ assertThat(topic.name()).isEqualTo("test");
+ assertThat(topic.partitionsCount()).isEqualTo(4L);
+ }
+
+ @Test
+ void shouldDeserializeTopicDetails() {
+ // given
+ ByteBuf buffer = Unpooled.buffer();
+ writeTopicData(buffer);
+ // Write one partition
+ writePartitionData(buffer);
+
+ // when
+ var topicDetails = readTopicDetails(buffer);
+
+ // then
+ assertThat(topicDetails.id()).isEqualTo(10L);
+ assertThat(topicDetails.partitions()).hasSize(1);
+ }
+ }
+
+ @Nested
+ class PartitionDeserialization {
+
+ @Test
+ void shouldDeserializePartition() {
+ // given
+ ByteBuf buffer = Unpooled.buffer();
+ writePartitionData(buffer);
+
+ // when
+ var partition = readPartition(buffer);
+
+ // then
+ assertThat(partition.id()).isEqualTo(1L);
+ assertThat(partition.segmentsCount()).isEqualTo(5L);
+
assertThat(partition.currentOffset()).isEqualTo(BigInteger.valueOf(99));
+ }
+ }
+
+ @Nested
+ class ConsumerGroupDeserialization {
+
+ @Test
+ void shouldDeserializeConsumerGroup() {
+ // given
+ ByteBuf buffer = Unpooled.buffer();
+ buffer.writeIntLE(1); // group ID
+ buffer.writeIntLE(3); // partitions count
+ buffer.writeIntLE(2); // members count
+ buffer.writeByte(5); // name length
+ buffer.writeBytes("group".getBytes());
+
+ // when
+ var group = readConsumerGroup(buffer);
+
+ // then
+ assertThat(group.id()).isEqualTo(1L);
+ assertThat(group.name()).isEqualTo("group");
+ assertThat(group.partitionsCount()).isEqualTo(3L);
+ assertThat(group.membersCount()).isEqualTo(2L);
+ }
+
+ @Test
+ void shouldDeserializeConsumerGroupMember() {
+ // given
+ ByteBuf buffer = Unpooled.buffer();
+ buffer.writeIntLE(42); // member ID
+ buffer.writeIntLE(2); // partitions count
+ buffer.writeIntLE(1); // partition ID 1
+ buffer.writeIntLE(2); // partition ID 2
+
+ // when
+ var member = readConsumerGroupMember(buffer);
+
+ // then
+ assertThat(member.id()).isEqualTo(42L);
+ assertThat(member.partitionsCount()).isEqualTo(2L);
+ assertThat(member.partitions()).containsExactly(1L, 2L);
+ }
+
+ @Test
+ void shouldDeserializeConsumerGroupDetails() {
+ // given
+ ByteBuf buffer = Unpooled.buffer();
+ // Write consumer group
+ buffer.writeIntLE(1);
+ buffer.writeIntLE(1);
+ buffer.writeIntLE(1);
+ buffer.writeByte(2);
+ buffer.writeBytes("cg".getBytes());
+ // Write one member
+ buffer.writeIntLE(10);
+ buffer.writeIntLE(0); // no partitions
+
+ // when
+ var details = readConsumerGroupDetails(buffer);
+
+ // then
+ assertThat(details.id()).isEqualTo(1L);
+ assertThat(details.members()).hasSize(1);
+ }
+ }
+
+ @Nested
+ class ConsumerOffsetDeserialization {
+
+ @Test
+ void shouldDeserializeConsumerOffsetInfo() {
+ // given
+ ByteBuf buffer = Unpooled.buffer();
+ buffer.writeIntLE(5); // partition ID
+ writeU64(buffer, BigInteger.valueOf(100)); // current offset
+ writeU64(buffer, BigInteger.valueOf(95)); // stored offset
+
+ // when
+ var offsetInfo = readConsumerOffsetInfo(buffer);
+
+ // then
+ assertThat(offsetInfo.partitionId()).isEqualTo(5L);
+
assertThat(offsetInfo.currentOffset()).isEqualTo(BigInteger.valueOf(100));
+
assertThat(offsetInfo.storedOffset()).isEqualTo(BigInteger.valueOf(95));
+ }
+ }
+
+ @Nested
+ class MessageDeserialization {
+
+ @Test
+ void shouldDeserializePolledMessageWithoutUserHeaders() {
+ // given
+ ByteBuf buffer = Unpooled.buffer();
+ writeU64(buffer, BigInteger.valueOf(123)); // checksum
+ buffer.writeBytes(new byte[16]); // message ID
+ writeU64(buffer, BigInteger.ZERO); // offset
+ writeU64(buffer, BigInteger.valueOf(1000)); // timestamp
+ writeU64(buffer, BigInteger.valueOf(1000)); // origin timestamp
+ buffer.writeIntLE(0); // user headers length
+ buffer.writeIntLE(5); // payload length
+ buffer.writeBytes("hello".getBytes()); // payload
+
+ // when
+ var message = readPolledMessage(buffer);
+
+ // then
+
assertThat(message.header().checksum()).isEqualTo(BigInteger.valueOf(123));
+ assertThat(message.header().payloadLength()).isEqualTo(5L);
+ assertThat(message.payload()).isEqualTo("hello".getBytes());
+ assertThat(message.userHeaders()).isEmpty();
+ }
+
+ @Test
+ void shouldDeserializePolledMessageWithUserHeaders() {
+ // given
+ ByteBuf buffer = Unpooled.buffer();
+ writeU64(buffer, BigInteger.ZERO);
+ buffer.writeBytes(new byte[16]);
+ writeU64(buffer, BigInteger.ZERO);
+ writeU64(buffer, BigInteger.valueOf(1000));
+ writeU64(buffer, BigInteger.valueOf(1000));
+
+ // Calculate and write user headers
+ ByteBuf headersBuffer = Unpooled.buffer();
+ headersBuffer.writeIntLE(3); // key length
+ headersBuffer.writeBytes("key".getBytes());
+ headersBuffer.writeByte(HeaderKind.Raw.asCode());
+ headersBuffer.writeIntLE(3); // value length
+ headersBuffer.writeBytes("val".getBytes());
+
+ buffer.writeIntLE(headersBuffer.readableBytes()); // user headers
length
+ buffer.writeIntLE(3); // payload length
+ buffer.writeBytes("abc".getBytes()); // payload
+ buffer.writeBytes(headersBuffer); // user headers
+
+ // when
+ var message = readPolledMessage(buffer);
+
+ // then
+ assertThat(message.userHeaders()).hasSize(1);
+
assertThat(message.userHeaders().get("key").value()).isEqualTo("val");
+ }
+
+ @Test
+ void shouldDeserializePolledMessages() {
+ // given
+ ByteBuf buffer = Unpooled.buffer();
+ buffer.writeIntLE(1); // partition ID
+ writeU64(buffer, BigInteger.valueOf(10)); // current offset
+ buffer.writeIntLE(1); // messages count
+ // Write one message
+ writeU64(buffer, BigInteger.ZERO);
+ buffer.writeBytes(new byte[16]);
+ writeU64(buffer, BigInteger.ZERO);
+ writeU64(buffer, BigInteger.valueOf(1000));
+ writeU64(buffer, BigInteger.valueOf(1000));
+ buffer.writeIntLE(0);
+ buffer.writeIntLE(2);
+ buffer.writeBytes("hi".getBytes());
+
+ // when
+ var polledMessages = readPolledMessages(buffer);
+
+ // then
+ assertThat(polledMessages.partitionId()).isEqualTo(1L);
+
assertThat(polledMessages.currentOffset()).isEqualTo(BigInteger.valueOf(10));
+ assertThat(polledMessages.count()).isEqualTo(1L);
+ assertThat(polledMessages.messages()).hasSize(1);
+ }
+ }
+
+ @Nested
+ class StatsDeserialization {
+
+ @Test
+ void shouldDeserializeStats() {
+ // given
+ ByteBuf buffer = Unpooled.buffer();
+ buffer.writeIntLE(1234); // process ID
+ buffer.writeFloatLE(12.5f); // CPU usage
+ buffer.writeFloatLE(50.0f); // total CPU usage
+ writeU64(buffer, BigInteger.valueOf(1000000)); // memory usage
+ writeU64(buffer, BigInteger.valueOf(8000000)); // total memory
+ writeU64(buffer, BigInteger.valueOf(7000000)); // available memory
+ writeU64(buffer, BigInteger.valueOf(3600)); // run time
+ writeU64(buffer, BigInteger.valueOf(1000000)); // start time
+ writeU64(buffer, BigInteger.valueOf(500)); // read bytes
+ writeU64(buffer, BigInteger.valueOf(600)); // written bytes
+ writeU64(buffer, BigInteger.valueOf(1000)); // messages size bytes
+ buffer.writeIntLE(5); // streams count
+ buffer.writeIntLE(10); // topics count
+ buffer.writeIntLE(20); // partitions count
+ buffer.writeIntLE(100); // segments count
+ writeU64(buffer, BigInteger.valueOf(5000)); // messages count
+ buffer.writeIntLE(3); // clients count
+ buffer.writeIntLE(2); // consumer groups count
+ buffer.writeIntLE(9); // hostname length
+ buffer.writeBytes("localhost".getBytes());
+ buffer.writeIntLE(5); // OS name length
+ buffer.writeBytes("Linux".getBytes());
+ buffer.writeIntLE(5); // OS version length
+ buffer.writeBytes("5.4.0".getBytes());
+ buffer.writeIntLE(7); // kernel version length
+ buffer.writeBytes("5.4.0-1".getBytes());
+
+ // when
+ var stats = readStats(buffer);
+
+ // then
+ assertThat(stats.processId()).isEqualTo(1234L);
+ assertThat(stats.cpuUsage()).isEqualTo(12.5f);
+ assertThat(stats.streamsCount()).isEqualTo(5L);
+ assertThat(stats.hostname()).isEqualTo("localhost");
+ assertThat(stats.osName()).isEqualTo("Linux");
+ }
+ }
+
+ @Nested
+ class ClientInfoDeserialization {
+
+ @Test
+ void shouldDeserializeClientInfo() {
+ // given
+ ByteBuf buffer = Unpooled.buffer();
+ buffer.writeIntLE(100); // client ID
+ buffer.writeIntLE(5); // user ID
+ buffer.writeByte(1); // transport (TCP)
+ buffer.writeIntLE(9); // address length
+ buffer.writeBytes("127.0.0.1".getBytes());
+ buffer.writeIntLE(0); // consumer groups count
+
+ // when
+ var clientInfo = readClientInfo(buffer);
+
+ // then
+ assertThat(clientInfo.clientId()).isEqualTo(100L);
+ assertThat(clientInfo.userId()).isPresent().hasValue(5L);
+ assertThat(clientInfo.address()).isEqualTo("127.0.0.1");
+ assertThat(clientInfo.transport()).isEqualTo("Tcp");
+ }
+
+ @Test
+ void shouldDeserializeConsumerGroupInfo() {
+ // given
+ ByteBuf buffer = Unpooled.buffer();
+ buffer.writeIntLE(1); // stream ID
+ buffer.writeIntLE(2); // topic ID
+ buffer.writeIntLE(3); // group ID
+
+ // when
+ var groupInfo = readConsumerGroupInfo(buffer);
+
+ // then
+ assertThat(groupInfo.streamId()).isEqualTo(1L);
+ assertThat(groupInfo.topicId()).isEqualTo(2L);
+ assertThat(groupInfo.consumerGroupId()).isEqualTo(3L);
+ }
+ }
+
+ @Nested
+ class UserInfoDeserialization {
+
+ @Test
+ void shouldDeserializeUserInfo() {
+ // given
+ ByteBuf buffer = Unpooled.buffer();
+ buffer.writeIntLE(42); // user ID
+ writeU64(buffer, BigInteger.valueOf(2000)); // created at
+ buffer.writeByte(UserStatus.Active.asCode()); // status
+ buffer.writeByte(4); // username length
+ buffer.writeBytes("user".getBytes());
+
+ // when
+ var userInfo = readUserInfo(buffer);
+
+ // then
+ assertThat(userInfo.id()).isEqualTo(42L);
+
assertThat(userInfo.createdAt()).isEqualTo(BigInteger.valueOf(2000));
+ assertThat(userInfo.status()).isEqualTo(UserStatus.Active);
+ assertThat(userInfo.username()).isEqualTo("user");
+ }
+
+ @Test
+ void shouldDeserializeUserInfoDetailsWithoutPermissions() {
+ // given
+ ByteBuf buffer = Unpooled.buffer();
+ buffer.writeIntLE(1);
+ writeU64(buffer, BigInteger.valueOf(1000));
+ buffer.writeByte(UserStatus.Active.asCode());
+ buffer.writeByte(5);
+ buffer.writeBytes("admin".getBytes());
+ buffer.writeBoolean(false); // no permissions
+
+ // when
+ var userInfoDetails = readUserInfoDetails(buffer);
+
+ // then
+ assertThat(userInfoDetails.id()).isEqualTo(1L);
+ assertThat(userInfoDetails.permissions()).isEmpty();
+ }
+
+ @Test
+ void shouldDeserializeUserInfoDetailsWithPermissions() {
+ // given
+ ByteBuf buffer = Unpooled.buffer();
+ buffer.writeIntLE(1);
+ writeU64(buffer, BigInteger.valueOf(1000));
+ buffer.writeByte(UserStatus.Active.asCode());
+ buffer.writeByte(5);
+ buffer.writeBytes("admin".getBytes());
+ buffer.writeBoolean(true); // has permissions
+ buffer.writeIntLE(10); // permissions length (ignored but required)
+ // Write global permissions (10 booleans)
+ for (int i = 0; i < 10; i++) {
+ buffer.writeBoolean(true);
+ }
+ buffer.writeBoolean(false); // no stream permissions
+
+ // when
+ var userInfoDetails = readUserInfoDetails(buffer);
+
+ // then
+ assertThat(userInfoDetails.permissions()).isPresent();
+ }
+ }
+
+ @Nested
+ class PermissionsDeserialization {
+
+ @Test
+ void shouldDeserializeGlobalPermissions() {
+ // given
+ ByteBuf buffer = Unpooled.buffer();
+ buffer.writeBoolean(true); // manageServers
+ buffer.writeBoolean(false); // readServers
+ buffer.writeBoolean(true); // manageUsers
+ buffer.writeBoolean(false); // readUsers
+ buffer.writeBoolean(true); // manageStreams
+ buffer.writeBoolean(false); // readStreams
+ buffer.writeBoolean(true); // manageTopics
+ buffer.writeBoolean(false); // readTopics
+ buffer.writeBoolean(true); // pollMessages
+ buffer.writeBoolean(false); // sendMessages
+
+ // when
+ var permissions = readGlobalPermissions(buffer);
+
+ // then
+ assertThat(permissions.manageServers()).isTrue();
+ assertThat(permissions.readServers()).isFalse();
+ assertThat(permissions.pollMessages()).isTrue();
+ }
+
+ @Test
+ void shouldDeserializeTopicPermissions() {
+ // given
+ ByteBuf buffer = Unpooled.buffer();
+ buffer.writeBoolean(true);
+ buffer.writeBoolean(false);
+ buffer.writeBoolean(true);
+ buffer.writeBoolean(false);
+
+ // when
+ var permissions = readTopicPermissions(buffer);
+
+ // then
+ assertThat(permissions.manageTopic()).isTrue();
+ assertThat(permissions.readTopic()).isFalse();
+ assertThat(permissions.pollMessages()).isTrue();
+ assertThat(permissions.sendMessages()).isFalse();
+ }
+
+ @Test
+ void shouldDeserializeStreamPermissionsWithoutTopics() {
+ // given
+ ByteBuf buffer = Unpooled.buffer();
+ buffer.writeBoolean(true);
+ buffer.writeBoolean(true);
+ buffer.writeBoolean(true);
+ buffer.writeBoolean(true);
+ buffer.writeBoolean(true);
+ buffer.writeBoolean(true);
+ buffer.writeBoolean(false); // no topics
+
+ // when
+ var permissions = readStreamPermissions(buffer);
+
+ // then
+ assertThat(permissions.manageStream()).isTrue();
+ assertThat(permissions.topics()).isEmpty();
+ }
+
+ @Test
+ void shouldDeserializeStreamPermissionsWithTopics() {
+ // given
+ ByteBuf buffer = Unpooled.buffer();
+ buffer.writeBoolean(true);
+ buffer.writeBoolean(true);
+ buffer.writeBoolean(true);
+ buffer.writeBoolean(true);
+ buffer.writeBoolean(true);
+ buffer.writeBoolean(true);
+ buffer.writeBoolean(true); // has topic
+ buffer.writeIntLE(1); // topic ID
+ buffer.writeBoolean(true);
+ buffer.writeBoolean(true);
+ buffer.writeBoolean(true);
+ buffer.writeBoolean(true);
+ buffer.writeBoolean(false); // end of topics
+
+ // when
+ var permissions = readStreamPermissions(buffer);
+
+ // then
+ assertThat(permissions.topics()).hasSize(1);
+ assertThat(permissions.topics()).containsKey(1L);
+ }
+
+ @Test
+ void shouldDeserializeFullPermissionsWithoutStreams() {
+ // given
+ ByteBuf buffer = Unpooled.buffer();
+ buffer.writeIntLE(10); // permissions length
+ for (int i = 0; i < 10; i++) {
+ buffer.writeBoolean(false);
+ }
+ buffer.writeBoolean(false); // no streams
+
+ // when
+ var permissions = readPermissions(buffer);
+
+ // then
+ assertThat(permissions.streams()).isEmpty();
+ }
+
+ @Test
+ void shouldDeserializeFullPermissionsWithStreams() {
+ // given
+ ByteBuf buffer = Unpooled.buffer();
+ buffer.writeIntLE(10);
+ for (int i = 0; i < 10; i++) {
+ buffer.writeBoolean(false);
+ }
+ buffer.writeBoolean(true); // has stream
+ buffer.writeIntLE(1); // stream ID
+ for (int i = 0; i < 6; i++) {
+ buffer.writeBoolean(true);
+ }
+ buffer.writeBoolean(false); // no topics in stream
+ buffer.writeBoolean(false); // end of streams
+
+ // when
+ var permissions = readPermissions(buffer);
+
+ // then
+ assertThat(permissions.streams()).hasSize(1);
+ assertThat(permissions.streams()).containsKey(1L);
+ }
+ }
+
+ @Nested
+ class PersonalAccessTokenDeserialization {
+
+ @Test
+ void shouldDeserializeRawPersonalAccessToken() {
+ // given
+ ByteBuf buffer = Unpooled.buffer();
+ buffer.writeByte(10); // token length
+ buffer.writeBytes("token12345".getBytes());
+
+ // when
+ var token = readRawPersonalAccessToken(buffer);
+
+ // then
+ assertThat(token.token()).isEqualTo("token12345");
+ }
+
+ @Test
+ void shouldDeserializePersonalAccessTokenInfoWithExpiry() {
+ // given
+ ByteBuf buffer = Unpooled.buffer();
+ buffer.writeByte(7); // name length
+ buffer.writeBytes("mytoken".getBytes());
+ writeU64(buffer, BigInteger.valueOf(3000)); // expiry
+
+ // when
+ var tokenInfo = readPersonalAccessTokenInfo(buffer);
+
+ // then
+ assertThat(tokenInfo.name()).isEqualTo("mytoken");
+
assertThat(tokenInfo.expiryAt()).isPresent().hasValue(BigInteger.valueOf(3000));
+ }
+
+ @Test
+ void shouldDeserializePersonalAccessTokenInfoWithoutExpiry() {
+ // given
+ ByteBuf buffer = Unpooled.buffer();
+ buffer.writeByte(7);
+ buffer.writeBytes("mytoken".getBytes());
+ writeU64(buffer, BigInteger.ZERO); // no expiry
+
+ // when
+ var tokenInfo = readPersonalAccessTokenInfo(buffer);
+
+ // then
+ assertThat(tokenInfo.name()).isEqualTo("mytoken");
+ assertThat(tokenInfo.expiryAt()).isEmpty();
+ }
+ }
}