This is an automated email from the ASF dual-hosted git repository.

maciej pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b1098db3 test(java): add unit tests for serialization utilities 
(#2607)
5b1098db3 is described below

commit 5b1098db3628b4d90b76d3629dd9a47b1c78d2ea
Author: Qichao Chu <[email protected]>
AuthorDate: Tue Jan 27 01:16:03 2026 -0800

    test(java): add unit tests for serialization utilities (#2607)
---
 .../org/apache/iggy/serde/BytesSerializer.java     |   5 +-
 .../client/async/tcp/IggyFrameDecoderTest.java     | 455 ++++++++++++++
 .../client/blocking/tcp/BytesSerializerTest.java   | 555 +++++++++++++++++
 .../apache/iggy/serde/BytesDeserializerTest.java   | 671 +++++++++++++++++++++
 4 files changed, 1684 insertions(+), 2 deletions(-)

diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesSerializer.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesSerializer.java
index 936dd55d8..4014f3914 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesSerializer.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesSerializer.java
@@ -204,8 +204,9 @@ public final class BytesSerializer {
 
     public static ByteBuf toBytes(String value) {
         ByteBuf buffer = Unpooled.buffer(1 + value.length());
-        buffer.writeByte(value.length());
-        buffer.writeBytes(value.getBytes(StandardCharsets.UTF_8));
+        byte[] stringBytes = value.getBytes(StandardCharsets.UTF_8);
+        buffer.writeByte(stringBytes.length);
+        buffer.writeBytes(stringBytes);
         return buffer;
     }
 
diff --git 
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/tcp/IggyFrameDecoderTest.java
 
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/tcp/IggyFrameDecoderTest.java
new file mode 100644
index 000000000..448f07de7
--- /dev/null
+++ 
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/tcp/IggyFrameDecoderTest.java
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.client.async.tcp;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class IggyFrameDecoderTest {
+
+    private EmbeddedChannel channel;
+
+    @AfterEach
+    void tearDown() {
+        if (channel != null) {
+            // Release any remaining messages
+            Object msg;
+            while ((msg = channel.readInbound()) != null) {
+                if (msg instanceof ByteBuf) {
+                    ((ByteBuf) msg).release();
+                }
+            }
+            channel.finishAndReleaseAll();
+        }
+    }
+
+    @Nested
+    class CompleteFrames {
+
+        @Test
+        void shouldDecodeCompleteFrameWithSmallPayload() {
+            // given
+            channel = new EmbeddedChannel(new IggyFrameDecoder());
+            ByteBuf input = Unpooled.buffer();
+            input.writeIntLE(0); // status = success
+            input.writeIntLE(5); // length = 5 bytes
+            input.writeBytes("hello".getBytes()); // payload
+
+            // when
+            channel.writeInbound(input);
+
+            // then
+            ByteBuf decoded = channel.readInbound();
+            assertThat(decoded).isNotNull();
+            assertThat(decoded.readableBytes()).isEqualTo(8 + 5); // header + 
payload
+            assertThat(decoded.readIntLE()).isEqualTo(0); // status
+            assertThat(decoded.readIntLE()).isEqualTo(5); // length
+            byte[] payload = new byte[5];
+            decoded.readBytes(payload);
+            assertThat(new String(payload)).isEqualTo("hello");
+            decoded.release();
+        }
+
+        @Test
+        void shouldDecodeCompleteFrameWithZeroLengthPayload() {
+            // given
+            channel = new EmbeddedChannel(new IggyFrameDecoder());
+            ByteBuf input = Unpooled.buffer();
+            input.writeIntLE(0); // status
+            input.writeIntLE(0); // length = 0
+
+            // when
+            channel.writeInbound(input);
+
+            // then
+            ByteBuf decoded = channel.readInbound();
+            assertThat(decoded).isNotNull();
+            assertThat(decoded.readableBytes()).isEqualTo(8); // header only
+            assertThat(decoded.readIntLE()).isEqualTo(0);
+            assertThat(decoded.readIntLE()).isEqualTo(0);
+            decoded.release();
+        }
+
+        @Test
+        void shouldDecodeCompleteFrameWithLargePayload() {
+            // given
+            channel = new EmbeddedChannel(new IggyFrameDecoder());
+            byte[] largePayload = new byte[10000];
+            for (int i = 0; i < largePayload.length; i++) {
+                largePayload[i] = (byte) (i % 256);
+            }
+
+            ByteBuf input = Unpooled.buffer();
+            input.writeIntLE(200); // error status
+            input.writeIntLE(10000); // large length
+            input.writeBytes(largePayload);
+
+            // when
+            channel.writeInbound(input);
+
+            // then
+            ByteBuf decoded = channel.readInbound();
+            assertThat(decoded).isNotNull();
+            assertThat(decoded.readableBytes()).isEqualTo(8 + 10000);
+            assertThat(decoded.readIntLE()).isEqualTo(200);
+            assertThat(decoded.readIntLE()).isEqualTo(10000);
+            decoded.release();
+        }
+
+        @Test
+        void shouldDecodeFrameWithVariousStatusCodes() {
+            // given
+            channel = new EmbeddedChannel(new IggyFrameDecoder());
+
+            for (int status = 0; status <= 5; status++) {
+                ByteBuf input = Unpooled.buffer();
+                input.writeIntLE(status);
+                input.writeIntLE(1);
+                input.writeByte(42);
+
+                // when
+                channel.writeInbound(input);
+
+                // then
+                ByteBuf decoded = channel.readInbound();
+                assertThat(decoded).isNotNull();
+                assertThat(decoded.readIntLE()).isEqualTo(status);
+                decoded.skipBytes(4); // length
+                decoded.skipBytes(1); // payload
+                decoded.release();
+            }
+        }
+    }
+
+    @Nested
+    class IncompleteFrames {
+
+        @Test
+        void shouldWaitForCompleteHeaderWhenOnlyPartialHeaderAvailable() {
+            // given
+            channel = new EmbeddedChannel(new IggyFrameDecoder());
+            ByteBuf input = Unpooled.buffer();
+            input.writeIntLE(0); // only status, missing length (4 bytes 
total, need 8)
+
+            // when
+            boolean hasMessage = channel.writeInbound(input);
+
+            // then
+            assertThat(hasMessage).isFalse(); // No complete frame yet
+            ByteBuf decoded = channel.readInbound();
+            assertThat(decoded).isNull(); // Nothing to read
+        }
+
+        @Test
+        void shouldWaitForCompletePayloadWhenOnlyHeaderAvailable() {
+            // given
+            channel = new EmbeddedChannel(new IggyFrameDecoder());
+            ByteBuf input = Unpooled.buffer();
+            input.writeIntLE(0); // status
+            input.writeIntLE(100); // expects 100 bytes payload
+            // But no payload written
+
+            // when
+            boolean hasMessage = channel.writeInbound(input);
+
+            // then
+            assertThat(hasMessage).isFalse();
+            ByteBuf decoded = channel.readInbound();
+            assertThat(decoded).isNull();
+        }
+
+        @Test
+        void shouldWaitForCompletePayloadWhenPartialPayloadAvailable() {
+            // given
+            channel = new EmbeddedChannel(new IggyFrameDecoder());
+            ByteBuf input = Unpooled.buffer();
+            input.writeIntLE(0);
+            input.writeIntLE(100); // expects 100 bytes
+            input.writeBytes(new byte[50]); // only 50 bytes available
+
+            // when
+            boolean hasMessage = channel.writeInbound(input);
+
+            // then
+            assertThat(hasMessage).isFalse();
+            assertThat((ByteBuf) channel.readInbound()).isNull();
+        }
+
+        @Test
+        void shouldEventuallyDecodeWhenMoreDataArrives() {
+            // given
+            channel = new EmbeddedChannel(new IggyFrameDecoder());
+            ByteBuf firstChunk = Unpooled.buffer();
+            firstChunk.writeIntLE(0);
+            firstChunk.writeIntLE(10);
+            firstChunk.writeBytes(new byte[5]); // only 5 of 10 bytes
+
+            // when - first chunk
+            boolean hasMessage1 = channel.writeInbound(firstChunk);
+            assertThat(hasMessage1).isFalse();
+
+            // when - second chunk completes the frame
+            ByteBuf secondChunk = Unpooled.buffer();
+            secondChunk.writeBytes(new byte[5]); // remaining 5 bytes
+            boolean hasMessage2 = channel.writeInbound(secondChunk);
+
+            // then
+            assertThat(hasMessage2).isTrue();
+            ByteBuf decoded = channel.readInbound();
+            assertThat(decoded).isNotNull();
+            assertThat(decoded.readableBytes()).isEqualTo(8 + 10);
+            decoded.release();
+        }
+    }
+
+    @Nested
+    class MultipleFrames {
+
+        @Test
+        void shouldDecodeMultipleFramesInSequence() {
+            // given
+            channel = new EmbeddedChannel(new IggyFrameDecoder());
+            ByteBuf input = Unpooled.buffer();
+
+            // Frame 1
+            input.writeIntLE(0);
+            input.writeIntLE(3);
+            input.writeBytes("abc".getBytes());
+
+            // Frame 2
+            input.writeIntLE(1);
+            input.writeIntLE(2);
+            input.writeBytes("de".getBytes());
+
+            // when
+            channel.writeInbound(input);
+
+            // then - frame 1
+            ByteBuf decoded1 = channel.readInbound();
+            assertThat(decoded1).isNotNull();
+            assertThat(decoded1.readableBytes()).isEqualTo(8 + 3);
+            decoded1.release();
+
+            // then - frame 2
+            ByteBuf decoded2 = channel.readInbound();
+            assertThat(decoded2).isNotNull();
+            assertThat(decoded2.readableBytes()).isEqualTo(8 + 2);
+            decoded2.release();
+
+            // No more frames
+            assertThat((ByteBuf) channel.readInbound()).isNull();
+        }
+
+        @Test
+        void shouldDecodeThreeFramesCorrectly() {
+            // given
+            channel = new EmbeddedChannel(new IggyFrameDecoder());
+            ByteBuf input = Unpooled.buffer();
+
+            for (int i = 0; i < 3; i++) {
+                input.writeIntLE(i);
+                input.writeIntLE(1);
+                input.writeByte(i);
+            }
+
+            // when
+            channel.writeInbound(input);
+
+            // then
+            for (int i = 0; i < 3; i++) {
+                ByteBuf decoded = channel.readInbound();
+                assertThat(decoded).isNotNull();
+                assertThat(decoded.readIntLE()).isEqualTo(i);
+                decoded.skipBytes(4 + 1); // skip length and payload
+                decoded.release();
+            }
+
+            assertThat((ByteBuf) channel.readInbound()).isNull();
+        }
+    }
+
+    @Nested
+    class EdgeCases {
+
+        @Test
+        void shouldNotAdvanceReaderIndexWhenPayloadIncomplete() {
+            // given
+            channel = new EmbeddedChannel(new IggyFrameDecoder());
+            ByteBuf input = Unpooled.buffer();
+            input.writeIntLE(0);
+            input.writeIntLE(100); // expects 100 bytes
+            input.writeBytes(new byte[50]); // only 50 bytes
+            int readerIndexBefore = input.readerIndex();
+
+            // when
+            boolean hasMessage = channel.writeInbound(input);
+
+            // then - decoder should wait for complete payload
+            assertThat(hasMessage).isFalse();
+            ByteBuf decoded = channel.readInbound();
+            assertThat(decoded).isNull();
+            assertThat(input.readerIndex()).isEqualTo(readerIndexBefore);
+        }
+
+        @Test
+        void shouldHandleEmptyInput() {
+            // given
+            channel = new EmbeddedChannel(new IggyFrameDecoder());
+            ByteBuf input = Unpooled.buffer(); // empty buffer
+
+            // when
+            boolean hasMessage = channel.writeInbound(input);
+
+            // then
+            assertThat(hasMessage).isFalse();
+            assertThat((ByteBuf) channel.readInbound()).isNull();
+        }
+
+        @Test
+        void shouldHandleSingleByteInput() {
+            // given
+            channel = new EmbeddedChannel(new IggyFrameDecoder());
+            ByteBuf input = Unpooled.buffer();
+            input.writeByte(0); // only 1 byte
+
+            // when
+            boolean hasMessage = channel.writeInbound(input);
+
+            // then
+            assertThat(hasMessage).isFalse();
+            assertThat((ByteBuf) channel.readInbound()).isNull();
+        }
+
+        @Test
+        void shouldHandleExactlyHeaderSizeWithoutPayload() {
+            // given
+            channel = new EmbeddedChannel(new IggyFrameDecoder());
+            ByteBuf input = Unpooled.buffer();
+            input.writeIntLE(0);
+            input.writeIntLE(10); // expects payload, but none provided
+            // Exactly 8 bytes (header size)
+
+            // when
+            boolean hasMessage = channel.writeInbound(input);
+
+            // then
+            assertThat(hasMessage).isFalse(); // Waiting for payload
+            assertThat((ByteBuf) channel.readInbound()).isNull();
+        }
+
+        @Test
+        void shouldDecodeFrameFollowedByPartialNextFrame() {
+            // given
+            channel = new EmbeddedChannel(new IggyFrameDecoder());
+            ByteBuf input = Unpooled.buffer();
+
+            // Complete frame 1
+            input.writeIntLE(0);
+            input.writeIntLE(5);
+            input.writeBytes("hello".getBytes());
+
+            // Partial frame 2 (only header)
+            input.writeIntLE(1);
+            input.writeIntLE(10);
+            // No payload for frame 2
+
+            // when
+            channel.writeInbound(input);
+
+            // then - should get frame 1
+            ByteBuf decoded1 = channel.readInbound();
+            assertThat(decoded1).isNotNull();
+            assertThat(decoded1.readableBytes()).isEqualTo(8 + 5);
+            decoded1.release();
+
+            // Frame 2 should not be available yet
+            assertThat((ByteBuf) channel.readInbound()).isNull();
+        }
+
+        @Test
+        void shouldHandleMaxIntPayloadLength() {
+            // given - test with a reasonably large payload (not actual 
MAX_INT to avoid OOM)
+            channel = new EmbeddedChannel(new IggyFrameDecoder());
+            int largeSize = 1000000; // 1MB
+            ByteBuf input = Unpooled.buffer();
+            input.writeIntLE(0);
+            input.writeIntLE(largeSize);
+            input.writeBytes(new byte[largeSize]);
+
+            // when
+            channel.writeInbound(input);
+
+            // then
+            ByteBuf decoded = channel.readInbound();
+            assertThat(decoded).isNotNull();
+            assertThat(decoded.readableBytes()).isEqualTo(8 + largeSize);
+            decoded.release();
+        }
+    }
+
+    @Nested
+    class BufferManagement {
+
+        @Test
+        void shouldCreateNewBufferForEachDecodedFrame() {
+            // given
+            channel = new EmbeddedChannel(new IggyFrameDecoder());
+            ByteBuf input = Unpooled.buffer();
+            input.writeIntLE(0);
+            input.writeIntLE(5);
+            input.writeBytes("hello".getBytes());
+
+            // when
+            channel.writeInbound(input);
+            ByteBuf decoded = channel.readInbound();
+
+            // then - decoded buffer should be independent
+            assertThat(decoded).isNotNull();
+            assertThat(decoded).isNotSameAs(input);
+            assertThat(decoded.readableBytes()).isEqualTo(13);
+            decoded.release();
+        }
+
+        @Test
+        void shouldAllowManualReleaseOfDecodedBuffers() {
+            // given
+            channel = new EmbeddedChannel(new IggyFrameDecoder());
+            ByteBuf input = Unpooled.buffer();
+            input.writeIntLE(0);
+            input.writeIntLE(3);
+            input.writeBytes("foo".getBytes());
+
+            // when
+            channel.writeInbound(input);
+            ByteBuf decoded = channel.readInbound();
+
+            // then - should be releasable
+            assertThat(decoded.refCnt()).isEqualTo(1);
+            decoded.release();
+            assertThat(decoded.refCnt()).isEqualTo(0);
+        }
+    }
+}
diff --git 
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/tcp/BytesSerializerTest.java
 
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/tcp/BytesSerializerTest.java
index 2e035979e..224d5e7ec 100644
--- 
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/tcp/BytesSerializerTest.java
+++ 
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/tcp/BytesSerializerTest.java
@@ -21,12 +21,30 @@ package org.apache.iggy.client.blocking.tcp;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import org.apache.iggy.consumergroup.Consumer;
+import org.apache.iggy.identifier.ConsumerId;
+import org.apache.iggy.identifier.StreamId;
+import org.apache.iggy.message.BytesMessageId;
+import org.apache.iggy.message.HeaderKind;
+import org.apache.iggy.message.HeaderValue;
+import org.apache.iggy.message.Message;
+import org.apache.iggy.message.MessageHeader;
+import org.apache.iggy.message.Partitioning;
+import org.apache.iggy.message.PollingStrategy;
 import org.apache.iggy.serde.BytesSerializer;
+import org.apache.iggy.user.GlobalPermissions;
+import org.apache.iggy.user.Permissions;
+import org.apache.iggy.user.StreamPermissions;
+import org.apache.iggy.user.TopicPermissions;
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 
 import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -138,4 +156,541 @@ class BytesSerializerTest {
             assertThatThrownBy(() -> 
BytesSerializer.toBytesAsU128(value)).isInstanceOf(IllegalArgumentException.class);
         }
     }
+
+    @Nested
+    class StringSerialization {
+
+        @Test
+        void shouldSerializeSimpleString() {
+            // given
+            String input = "test";
+
+            // when
+            ByteBuf result = BytesSerializer.toBytes(input);
+
+            // then
+            assertThat(result.readByte()).isEqualTo((byte) 4); // length
+            byte[] stringBytes = new byte[4];
+            result.readBytes(stringBytes);
+            assertThat(new String(stringBytes, 
StandardCharsets.UTF_8)).isEqualTo("test");
+        }
+
+        @Test
+        void shouldSerializeEmptyString() {
+            // given
+            String input = "";
+
+            // when
+            ByteBuf result = BytesSerializer.toBytes(input);
+
+            // then
+            assertThat(result.readByte()).isEqualTo((byte) 0); // length = 0
+            assertThat(result.readableBytes()).isEqualTo(0);
+        }
+
+        @Test
+        void shouldSerializeUtf8Characters() {
+            // given
+            String input = "Hello世界";
+
+            // when
+            ByteBuf result = BytesSerializer.toBytes(input);
+
+            // then
+            byte[] expectedBytes = input.getBytes(StandardCharsets.UTF_8);
+            assertThat(result.readByte()).isEqualTo((byte) 
expectedBytes.length);
+            byte[] stringBytes = new byte[expectedBytes.length];
+            result.readBytes(stringBytes);
+            assertThat(stringBytes).isEqualTo(expectedBytes);
+        }
+    }
+
+    @Nested
+    class IdentifierSerialization {
+
+        @Test
+        void shouldSerializeNumericIdentifier() {
+            // given
+            var identifier = StreamId.of(123L);
+
+            // when
+            ByteBuf result = BytesSerializer.toBytes(identifier);
+
+            // then
+            assertThat(result.readByte()).isEqualTo((byte) 1); // kind = 1 
(numeric)
+            assertThat(result.readByte()).isEqualTo((byte) 4); // length = 4
+            assertThat(result.readIntLE()).isEqualTo(123); // id value
+        }
+
+        @Test
+        void shouldSerializeStringIdentifier() {
+            // given
+            var identifier = StreamId.of("test-stream");
+
+            // when
+            ByteBuf result = BytesSerializer.toBytes(identifier);
+
+            // then
+            assertThat(result.readByte()).isEqualTo((byte) 2); // kind = 2 
(string)
+            assertThat(result.readByte()).isEqualTo((byte) 11); // length = 
"test-stream".length()
+            byte[] nameBytes = new byte[11];
+            result.readBytes(nameBytes);
+            assertThat(new String(nameBytes)).isEqualTo("test-stream");
+        }
+    }
+
+    @Nested
+    class ConsumerSerialization {
+
+        @Test
+        void shouldSerializeConsumerWithNumericId() {
+            // given
+            var consumer = Consumer.of(42L);
+
+            // when
+            ByteBuf result = BytesSerializer.toBytes(consumer);
+
+            // then
+            assertThat(result.readByte()).isEqualTo((byte) 1); // Consumer kind
+            assertThat(result.readByte()).isEqualTo((byte) 1); // Identifier 
kind (numeric)
+            assertThat(result.readByte()).isEqualTo((byte) 4); // Identifier 
length
+            assertThat(result.readIntLE()).isEqualTo(42); // ID value
+        }
+
+        @Test
+        void shouldSerializeConsumerGroupWithNumericId() {
+            // given
+            var consumer = Consumer.group(99L);
+
+            // when
+            ByteBuf result = BytesSerializer.toBytes(consumer);
+
+            // then
+            assertThat(result.readByte()).isEqualTo((byte) 2); // 
ConsumerGroup kind
+            assertThat(result.readByte()).isEqualTo((byte) 1); // Identifier 
kind (numeric)
+            assertThat(result.readByte()).isEqualTo((byte) 4); // Identifier 
length
+            assertThat(result.readIntLE()).isEqualTo(99); // ID value
+        }
+
+        @Test
+        void shouldSerializeConsumerWithStringId() {
+            // given
+            var consumer = Consumer.of(ConsumerId.of("my-consumer"));
+
+            // when
+            ByteBuf result = BytesSerializer.toBytes(consumer);
+
+            // then
+            assertThat(result.readByte()).isEqualTo((byte) 1); // Consumer kind
+            assertThat(result.readByte()).isEqualTo((byte) 2); // Identifier 
kind (string)
+            assertThat(result.readByte()).isEqualTo((byte) 11); // 
"my-consumer".length()
+        }
+    }
+
+    @Nested
+    class PartitioningSerialization {
+
+        @Test
+        void shouldSerializeBalancedPartitioning() {
+            // given
+            var partitioning = Partitioning.balanced();
+
+            // when
+            ByteBuf result = BytesSerializer.toBytes(partitioning);
+
+            // then
+            assertThat(result.readByte()).isEqualTo((byte) 1); // Balanced kind
+            assertThat(result.readByte()).isEqualTo((byte) 0); // Empty value
+        }
+
+        @Test
+        void shouldSerializePartitionIdPartitioning() {
+            // given
+            var partitioning = Partitioning.partitionId(5L);
+
+            // when
+            ByteBuf result = BytesSerializer.toBytes(partitioning);
+
+            // then
+            assertThat(result.readByte()).isEqualTo((byte) 2); // PartitionId 
kind
+            assertThat(result.readByte()).isEqualTo((byte) 4); // 4 bytes for 
int
+            assertThat(result.readIntLE()).isEqualTo(5); // partition ID
+        }
+
+        @Test
+        void shouldSerializeMessagesKeyPartitioning() {
+            // given
+            var partitioning = Partitioning.messagesKey("user-123");
+
+            // when
+            ByteBuf result = BytesSerializer.toBytes(partitioning);
+
+            // then
+            assertThat(result.readByte()).isEqualTo((byte) 3); // MessagesKey 
kind
+            assertThat(result.readByte()).isEqualTo((byte) 8); // 
"user-123".length()
+            byte[] keyBytes = new byte[8];
+            result.readBytes(keyBytes);
+            assertThat(new String(keyBytes)).isEqualTo("user-123");
+        }
+    }
+
+    @Nested
+    class PollingStrategySerialization {
+
+        @Test
+        void shouldSerializeFirstStrategy() {
+            // given
+            var strategy = PollingStrategy.first();
+
+            // when
+            ByteBuf result = BytesSerializer.toBytes(strategy);
+
+            // then
+            assertThat(result.readByte()).isEqualTo((byte) 3); // First kind = 
3
+            // Followed by 8 bytes of U64 (zero value)
+            assertThat(result.readableBytes()).isEqualTo(8);
+        }
+
+        @Test
+        void shouldSerializeLastStrategy() {
+            // given
+            var strategy = PollingStrategy.last();
+
+            // when
+            ByteBuf result = BytesSerializer.toBytes(strategy);
+
+            // then
+            assertThat(result.readByte()).isEqualTo((byte) 4); // Last kind = 4
+            assertThat(result.readableBytes()).isEqualTo(8);
+        }
+
+        @Test
+        void shouldSerializeOffsetStrategy() {
+            // given
+            var strategy = PollingStrategy.offset(BigInteger.valueOf(100));
+
+            // when
+            ByteBuf result = BytesSerializer.toBytes(strategy);
+
+            // then
+            assertThat(result.readByte()).isEqualTo((byte) 1); // Offset kind 
= 1
+            assertThat(result.readableBytes()).isEqualTo(8);
+        }
+
+        @Test
+        void shouldSerializeNextStrategy() {
+            // given
+            var strategy = PollingStrategy.next();
+
+            // when
+            ByteBuf result = BytesSerializer.toBytes(strategy);
+
+            // then
+            assertThat(result.readByte()).isEqualTo((byte) 5); // Next kind = 5
+            assertThat(result.readableBytes()).isEqualTo(8);
+        }
+
+        @Test
+        void shouldSerializeTimestampStrategy() {
+            // given
+            var strategy = 
PollingStrategy.timestamp(BigInteger.valueOf(1234567890));
+
+            // when
+            ByteBuf result = BytesSerializer.toBytes(strategy);
+
+            // then
+            assertThat(result.readByte()).isEqualTo((byte) 2); // Timestamp 
kind = 2
+            assertThat(result.readableBytes()).isEqualTo(8);
+        }
+    }
+
+    @Nested
+    class OptionalValueSerialization {
+
+        @Test
+        void shouldSerializePresentOptional() {
+            // given
+            Optional<Long> optional = Optional.of(42L);
+
+            // when
+            ByteBuf result = BytesSerializer.toBytes(optional);
+
+            // then
+            assertThat(result.readByte()).isEqualTo((byte) 1); // present flag
+            assertThat(result.readIntLE()).isEqualTo(42); // value
+        }
+
+        @Test
+        void shouldSerializeEmptyOptional() {
+            // given
+            Optional<Long> optional = Optional.empty();
+
+            // when
+            ByteBuf result = BytesSerializer.toBytes(optional);
+
+            // then
+            assertThat(result.readByte()).isEqualTo((byte) 0); // not present 
flag
+            assertThat(result.readIntLE()).isEqualTo(0); // zero value
+        }
+    }
+
+    @Nested
+    class HeadersSerialization {
+
+        @Test
+        void shouldSerializeEmptyHeaders() {
+            // given
+            Map<String, HeaderValue> headers = new HashMap<>();
+
+            // when
+            ByteBuf result = BytesSerializer.toBytes(headers);
+
+            // then
+            assertThat(result.readableBytes()).isEqualTo(0); // Empty buffer
+        }
+
+        @Test
+        void shouldSerializeSingleHeader() {
+            // given
+            Map<String, HeaderValue> headers = new HashMap<>();
+            headers.put("key1", new HeaderValue(HeaderKind.Raw, "value1"));
+
+            // when
+            ByteBuf result = BytesSerializer.toBytes(headers);
+
+            // then
+            assertThat(result.readIntLE()).isEqualTo(4); // "key1".length()
+            byte[] keyBytes = new byte[4];
+            result.readBytes(keyBytes);
+            assertThat(new String(keyBytes)).isEqualTo("key1");
+            assertThat(result.readByte()).isEqualTo((byte) 1); // Raw kind
+            assertThat(result.readIntLE()).isEqualTo(6); // "value1".length()
+            byte[] valueBytes = new byte[6];
+            result.readBytes(valueBytes);
+            assertThat(new String(valueBytes)).isEqualTo("value1");
+        }
+
+        @Test
+        void shouldSerializeMultipleHeaders() {
+            // given
+            Map<String, HeaderValue> headers = new HashMap<>();
+            headers.put("k1", new HeaderValue(HeaderKind.Raw, "v1")); // 13 
bytes
+            headers.put("k2", new HeaderValue(HeaderKind.String, "v2")); // 13 
bytes
+
+            // when
+            ByteBuf result = BytesSerializer.toBytes(headers);
+
+            // then - verify buffer contains data for both headers
+            assertThat(result.readableBytes()).isEqualTo(26);
+        }
+    }
+
+    @Nested
+    class MessageSerialization {
+
+        @Test
+        void shouldSerializeMessageWithoutUserHeaders() {
+            // given
+            var messageId = new BytesMessageId(new byte[16]);
+            var header = new MessageHeader(
+                    BigInteger.valueOf(123), // checksum
+                    messageId,
+                    BigInteger.valueOf(0), // offset
+                    BigInteger.valueOf(1000), // timestamp
+                    BigInteger.valueOf(1000), // originTimestamp
+                    0L, // userHeadersLength
+                    5L // payloadLength
+                    );
+            byte[] payload = "hello".getBytes();
+            var message = new Message(header, payload, new HashMap<>());
+
+            // when
+            ByteBuf result = BytesSerializer.toBytes(message);
+
+            // then
+            assertThat(result.readableBytes()).isEqualTo(MessageHeader.SIZE + 
5); // header + payload, no user headers
+        }
+
+        @Test
+        void shouldSerializeMessageWithUserHeaders() {
+            // given
+            var messageId = new BytesMessageId(new byte[16]);
+            Map<String, HeaderValue> userHeaders = new HashMap<>();
+            userHeaders.put("key", new HeaderValue(HeaderKind.Raw, "val"));
+
+            // Calculate user headers size
+            ByteBuf headersBuf = BytesSerializer.toBytes(userHeaders);
+            int userHeadersLength = headersBuf.readableBytes();
+
+            var header = new MessageHeader(
+                    BigInteger.ZERO,
+                    messageId,
+                    BigInteger.ZERO,
+                    BigInteger.valueOf(1000),
+                    BigInteger.valueOf(1000),
+                    (long) userHeadersLength,
+                    3L // "abc".length()
+                    );
+            byte[] payload = "abc".getBytes();
+            var message = new Message(header, payload, userHeaders);
+
+            // when
+            ByteBuf result = BytesSerializer.toBytes(message);
+
+            // then
+            assertThat(result.readableBytes()).isEqualTo(MessageHeader.SIZE + 
3 + userHeadersLength);
+        }
+    }
+
+    @Nested
+    class MessageHeaderSerialization {
+
+        @Test
+        void shouldSerializeMessageHeader() {
+            // given
+            var messageId = new BytesMessageId(new byte[16]);
+            var header = new MessageHeader(
+                    BigInteger.valueOf(999), // checksum
+                    messageId,
+                    BigInteger.valueOf(42), // offset
+                    BigInteger.valueOf(2000), // timestamp
+                    BigInteger.valueOf(1999), // originTimestamp
+                    10L, // userHeadersLength
+                    100L // payloadLength
+                    );
+
+            // when
+            ByteBuf result = BytesSerializer.toBytes(header);
+
+            // then
+            assertThat(result.readableBytes()).isEqualTo(MessageHeader.SIZE);
+            // Read checksum (8 bytes)
+            result.skipBytes(8);
+            // Read message ID (16 bytes)
+            result.skipBytes(16);
+            // Read offset (8 bytes)
+            result.skipBytes(8);
+            // Read timestamp (8 bytes)
+            result.skipBytes(8);
+            // Read origin timestamp (8 bytes)
+            result.skipBytes(8);
+            // Read user headers length (4 bytes)
+            assertThat(result.readIntLE()).isEqualTo(10);
+            // Read payload length (4 bytes)
+            assertThat(result.readIntLE()).isEqualTo(100);
+        }
+    }
+
+    @Nested
+    class PermissionsSerialization {
+
+        @Test
+        void shouldSerializeGlobalPermissions() {
+            // given
+            var permissions = new GlobalPermissions(true, false, true, false, 
true, false, true, false, true, false);
+
+            // when
+            ByteBuf result = BytesSerializer.toBytes(permissions);
+
+            // then
+            assertThat(result.readableBytes()).isEqualTo(10); // 10 booleans
+            assertThat(result.readBoolean()).isTrue(); // manageServers
+            assertThat(result.readBoolean()).isFalse(); // readServers
+            assertThat(result.readBoolean()).isTrue(); // manageUsers
+            assertThat(result.readBoolean()).isFalse(); // readUsers
+            assertThat(result.readBoolean()).isTrue(); // manageStreams
+            assertThat(result.readBoolean()).isFalse(); // readStreams
+            assertThat(result.readBoolean()).isTrue(); // manageTopics
+            assertThat(result.readBoolean()).isFalse(); // readTopics
+            assertThat(result.readBoolean()).isTrue(); // pollMessages
+            assertThat(result.readBoolean()).isFalse(); // sendMessages
+        }
+
+        @Test
+        void shouldSerializeTopicPermissions() {
+            // given
+            var permissions = new TopicPermissions(true, false, true, false);
+
+            // when
+            ByteBuf result = BytesSerializer.toBytes(permissions);
+
+            // then
+            assertThat(result.readableBytes()).isEqualTo(4); // 4 booleans
+            assertThat(result.readBoolean()).isTrue(); // manageTopic
+            assertThat(result.readBoolean()).isFalse(); // readTopic
+            assertThat(result.readBoolean()).isTrue(); // pollMessages
+            assertThat(result.readBoolean()).isFalse(); // sendMessages
+        }
+
+        @Test
+        void shouldSerializeStreamPermissionsWithoutTopics() {
+            // given
+            var permissions = new StreamPermissions(true, false, true, false, 
true, false, new HashMap<>());
+
+            // when
+            ByteBuf result = BytesSerializer.toBytes(permissions);
+
+            // then
+            assertThat(result.readBoolean()).isTrue(); // manageStream
+            assertThat(result.readBoolean()).isFalse(); // readStream
+            assertThat(result.readBoolean()).isTrue(); // manageTopics
+            assertThat(result.readBoolean()).isFalse(); // readTopics
+            assertThat(result.readBoolean()).isTrue(); // pollMessages
+            assertThat(result.readBoolean()).isFalse(); // sendMessages
+            assertThat(result.readByte()).isEqualTo((byte) 0); // no topics 
marker
+        }
+
+        @Test
+        void shouldSerializeStreamPermissionsWithTopics() {
+            // given
+            Map<Long, TopicPermissions> topicPerms = new HashMap<>();
+            topicPerms.put(1L, new TopicPermissions(true, true, true, true));
+            var permissions = new StreamPermissions(true, true, true, true, 
true, true, topicPerms);
+
+            // when
+            ByteBuf result = BytesSerializer.toBytes(permissions);
+
+            // then
+            result.skipBytes(6); // Skip 6 stream-level booleans
+            assertThat(result.readByte()).isEqualTo((byte) 1); // has topic 
marker
+            assertThat(result.readIntLE()).isEqualTo(1); // topic ID
+            result.skipBytes(4); // Skip topic permissions
+            assertThat(result.readByte()).isEqualTo((byte) 0); // end marker
+        }
+
+        @Test
+        void shouldSerializeFullPermissionsWithoutStreams() {
+            // given
+            var globalPerms = new GlobalPermissions(true, true, true, true, 
true, true, true, true, true, true);
+            var permissions = new Permissions(globalPerms, new HashMap<>());
+
+            // when
+            ByteBuf result = BytesSerializer.toBytes(permissions);
+
+            // then
+            result.skipBytes(10); // Skip global permissions
+            assertThat(result.readByte()).isEqualTo((byte) 0); // no streams 
marker
+        }
+
+        @Test
+        void shouldSerializeFullPermissionsWithStreams() {
+            // given
+            var globalPerms =
+                    new GlobalPermissions(false, false, false, false, false, 
false, false, false, false, false);
+            Map<Long, StreamPermissions> streamPerms = new HashMap<>();
+            streamPerms.put(1L, new StreamPermissions(true, true, true, true, 
true, true, new HashMap<>()));
+            var permissions = new Permissions(globalPerms, streamPerms);
+
+            // when
+            ByteBuf result = BytesSerializer.toBytes(permissions);
+
+            // then
+            result.skipBytes(10); // Skip global permissions
+            assertThat(result.readByte()).isEqualTo((byte) 1); // has stream 
marker
+            assertThat(result.readIntLE()).isEqualTo(1); // stream ID
+            result.skipBytes(6); // Skip stream permissions
+            assertThat(result.readByte()).isEqualTo((byte) 0); // no topics in 
stream
+            assertThat(result.readByte()).isEqualTo((byte) 0); // end of 
streams marker
+        }
+    }
 }
diff --git 
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/serde/BytesDeserializerTest.java
 
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/serde/BytesDeserializerTest.java
index 42e8b0158..4558d30fa 100644
--- 
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/serde/BytesDeserializerTest.java
+++ 
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/serde/BytesDeserializerTest.java
@@ -22,17 +22,76 @@ package org.apache.iggy.serde;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import org.apache.commons.lang3.ArrayUtils;
+import org.apache.iggy.message.HeaderKind;
+import org.apache.iggy.topic.CompressionAlgorithm;
+import org.apache.iggy.user.UserStatus;
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 
 import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
 import java.util.HexFormat;
 
+import static org.apache.iggy.serde.BytesDeserializer.readClientInfo;
+import static org.apache.iggy.serde.BytesDeserializer.readConsumerGroup;
+import static org.apache.iggy.serde.BytesDeserializer.readConsumerGroupDetails;
+import static org.apache.iggy.serde.BytesDeserializer.readConsumerGroupInfo;
+import static org.apache.iggy.serde.BytesDeserializer.readConsumerGroupMember;
+import static org.apache.iggy.serde.BytesDeserializer.readConsumerOffsetInfo;
+import static org.apache.iggy.serde.BytesDeserializer.readGlobalPermissions;
+import static org.apache.iggy.serde.BytesDeserializer.readPartition;
+import static org.apache.iggy.serde.BytesDeserializer.readPermissions;
+import static 
org.apache.iggy.serde.BytesDeserializer.readPersonalAccessTokenInfo;
+import static org.apache.iggy.serde.BytesDeserializer.readPolledMessage;
+import static org.apache.iggy.serde.BytesDeserializer.readPolledMessages;
+import static 
org.apache.iggy.serde.BytesDeserializer.readRawPersonalAccessToken;
+import static org.apache.iggy.serde.BytesDeserializer.readStats;
+import static org.apache.iggy.serde.BytesDeserializer.readStreamBase;
+import static org.apache.iggy.serde.BytesDeserializer.readStreamDetails;
+import static org.apache.iggy.serde.BytesDeserializer.readStreamPermissions;
+import static org.apache.iggy.serde.BytesDeserializer.readTopic;
+import static org.apache.iggy.serde.BytesDeserializer.readTopicDetails;
+import static org.apache.iggy.serde.BytesDeserializer.readTopicPermissions;
 import static org.apache.iggy.serde.BytesDeserializer.readU64AsBigInteger;
+import static org.apache.iggy.serde.BytesDeserializer.readUserInfo;
+import static org.apache.iggy.serde.BytesDeserializer.readUserInfoDetails;
 import static org.assertj.core.api.Assertions.assertThat;
 
 class BytesDeserializerTest {
 
+    // Helper methods for writing test data
+    private static void writeU64(ByteBuf buffer, BigInteger value) {
+        byte[] bytes = value.toByteArray();
+        ArrayUtils.reverse(bytes);
+        buffer.writeBytes(bytes, 0, Math.min(8, bytes.length));
+        if (bytes.length < 8) {
+            buffer.writeZero(8 - bytes.length);
+        }
+    }
+
+    private static void writeTopicData(ByteBuf buffer) {
+        buffer.writeIntLE(10); // topic ID
+        writeU64(buffer, BigInteger.valueOf(1000)); // created at
+        buffer.writeIntLE(4); // partitions count
+        writeU64(buffer, BigInteger.ZERO); // message expiry
+        buffer.writeByte(CompressionAlgorithm.None.asCode()); // compression
+        writeU64(buffer, BigInteger.valueOf(10000)); // max topic size
+        buffer.writeByte(1); // replication factor
+        writeU64(buffer, BigInteger.valueOf(500)); // size
+        writeU64(buffer, BigInteger.valueOf(50)); // messages count
+        buffer.writeByte(4); // name length
+        buffer.writeBytes("test".getBytes());
+    }
+
+    private static void writePartitionData(ByteBuf buffer) {
+        buffer.writeIntLE(1); // partition ID
+        writeU64(buffer, BigInteger.valueOf(1000)); // created at
+        buffer.writeIntLE(5); // segments count
+        writeU64(buffer, BigInteger.valueOf(99)); // current offset
+        writeU64(buffer, BigInteger.valueOf(200)); // size
+        writeU64(buffer, BigInteger.valueOf(20)); // messages count
+    }
+
     @Nested
     class U64 {
 
@@ -78,4 +137,616 @@ class BytesDeserializerTest {
             assertThat(result).isEqualTo(expected);
         }
     }
+
+    @Nested
+    class StreamDeserialization {
+
+        @Test
+        void shouldDeserializeStreamBase() {
+            // given
+            ByteBuf buffer = Unpooled.buffer();
+            buffer.writeIntLE(1); // stream ID
+            writeU64(buffer, BigInteger.valueOf(1000)); // createdAt
+            buffer.writeIntLE(2); // topics count
+            writeU64(buffer, BigInteger.valueOf(5000)); // size
+            writeU64(buffer, BigInteger.valueOf(100)); // messages count
+            buffer.writeByte(11); // name length
+            buffer.writeBytes("test-stream".getBytes(StandardCharsets.UTF_8));
+
+            // when
+            var stream = readStreamBase(buffer);
+
+            // then
+            assertThat(stream.id()).isEqualTo(1L);
+            assertThat(stream.createdAt()).isEqualTo(BigInteger.valueOf(1000));
+            assertThat(stream.topicsCount()).isEqualTo(2L);
+            assertThat(stream.name()).isEqualTo("test-stream");
+        }
+
+        @Test
+        void shouldDeserializeStreamDetails() {
+            // given
+            ByteBuf buffer = Unpooled.buffer();
+            // Write stream base
+            buffer.writeIntLE(1); // stream ID
+            writeU64(buffer, BigInteger.valueOf(1000));
+            buffer.writeIntLE(1); // topics count
+            writeU64(buffer, BigInteger.valueOf(5000));
+            writeU64(buffer, BigInteger.valueOf(100));
+            buffer.writeByte(6);
+            buffer.writeBytes("stream".getBytes());
+            // Write one topic
+            writeTopicData(buffer);
+
+            // when
+            var streamDetails = readStreamDetails(buffer);
+
+            // then
+            assertThat(streamDetails.id()).isEqualTo(1L);
+            assertThat(streamDetails.topics()).hasSize(1);
+        }
+    }
+
+    @Nested
+    class TopicDeserialization {
+
+        @Test
+        void shouldDeserializeTopic() {
+            // given
+            ByteBuf buffer = Unpooled.buffer();
+            writeTopicData(buffer);
+
+            // when
+            var topic = readTopic(buffer);
+
+            // then
+            assertThat(topic.id()).isEqualTo(10L);
+            assertThat(topic.name()).isEqualTo("test");
+            assertThat(topic.partitionsCount()).isEqualTo(4L);
+        }
+
+        @Test
+        void shouldDeserializeTopicDetails() {
+            // given
+            ByteBuf buffer = Unpooled.buffer();
+            writeTopicData(buffer);
+            // Write one partition
+            writePartitionData(buffer);
+
+            // when
+            var topicDetails = readTopicDetails(buffer);
+
+            // then
+            assertThat(topicDetails.id()).isEqualTo(10L);
+            assertThat(topicDetails.partitions()).hasSize(1);
+        }
+    }
+
+    @Nested
+    class PartitionDeserialization {
+
+        @Test
+        void shouldDeserializePartition() {
+            // given
+            ByteBuf buffer = Unpooled.buffer();
+            writePartitionData(buffer);
+
+            // when
+            var partition = readPartition(buffer);
+
+            // then
+            assertThat(partition.id()).isEqualTo(1L);
+            assertThat(partition.segmentsCount()).isEqualTo(5L);
+            
assertThat(partition.currentOffset()).isEqualTo(BigInteger.valueOf(99));
+        }
+    }
+
+    @Nested
+    class ConsumerGroupDeserialization {
+
+        @Test
+        void shouldDeserializeConsumerGroup() {
+            // given
+            ByteBuf buffer = Unpooled.buffer();
+            buffer.writeIntLE(1); // group ID
+            buffer.writeIntLE(3); // partitions count
+            buffer.writeIntLE(2); // members count
+            buffer.writeByte(5); // name length
+            buffer.writeBytes("group".getBytes());
+
+            // when
+            var group = readConsumerGroup(buffer);
+
+            // then
+            assertThat(group.id()).isEqualTo(1L);
+            assertThat(group.name()).isEqualTo("group");
+            assertThat(group.partitionsCount()).isEqualTo(3L);
+            assertThat(group.membersCount()).isEqualTo(2L);
+        }
+
+        @Test
+        void shouldDeserializeConsumerGroupMember() {
+            // given
+            ByteBuf buffer = Unpooled.buffer();
+            buffer.writeIntLE(42); // member ID
+            buffer.writeIntLE(2); // partitions count
+            buffer.writeIntLE(1); // partition ID 1
+            buffer.writeIntLE(2); // partition ID 2
+
+            // when
+            var member = readConsumerGroupMember(buffer);
+
+            // then
+            assertThat(member.id()).isEqualTo(42L);
+            assertThat(member.partitionsCount()).isEqualTo(2L);
+            assertThat(member.partitions()).containsExactly(1L, 2L);
+        }
+
+        @Test
+        void shouldDeserializeConsumerGroupDetails() {
+            // given
+            ByteBuf buffer = Unpooled.buffer();
+            // Write consumer group
+            buffer.writeIntLE(1);
+            buffer.writeIntLE(1);
+            buffer.writeIntLE(1);
+            buffer.writeByte(2);
+            buffer.writeBytes("cg".getBytes());
+            // Write one member
+            buffer.writeIntLE(10);
+            buffer.writeIntLE(0); // no partitions
+
+            // when
+            var details = readConsumerGroupDetails(buffer);
+
+            // then
+            assertThat(details.id()).isEqualTo(1L);
+            assertThat(details.members()).hasSize(1);
+        }
+    }
+
+    @Nested
+    class ConsumerOffsetDeserialization {
+
+        @Test
+        void shouldDeserializeConsumerOffsetInfo() {
+            // given
+            ByteBuf buffer = Unpooled.buffer();
+            buffer.writeIntLE(5); // partition ID
+            writeU64(buffer, BigInteger.valueOf(100)); // current offset
+            writeU64(buffer, BigInteger.valueOf(95)); // stored offset
+
+            // when
+            var offsetInfo = readConsumerOffsetInfo(buffer);
+
+            // then
+            assertThat(offsetInfo.partitionId()).isEqualTo(5L);
+            
assertThat(offsetInfo.currentOffset()).isEqualTo(BigInteger.valueOf(100));
+            
assertThat(offsetInfo.storedOffset()).isEqualTo(BigInteger.valueOf(95));
+        }
+    }
+
+    @Nested
+    class MessageDeserialization {
+
+        @Test
+        void shouldDeserializePolledMessageWithoutUserHeaders() {
+            // given
+            ByteBuf buffer = Unpooled.buffer();
+            writeU64(buffer, BigInteger.valueOf(123)); // checksum
+            buffer.writeBytes(new byte[16]); // message ID
+            writeU64(buffer, BigInteger.ZERO); // offset
+            writeU64(buffer, BigInteger.valueOf(1000)); // timestamp
+            writeU64(buffer, BigInteger.valueOf(1000)); // origin timestamp
+            buffer.writeIntLE(0); // user headers length
+            buffer.writeIntLE(5); // payload length
+            buffer.writeBytes("hello".getBytes()); // payload
+
+            // when
+            var message = readPolledMessage(buffer);
+
+            // then
+            
assertThat(message.header().checksum()).isEqualTo(BigInteger.valueOf(123));
+            assertThat(message.header().payloadLength()).isEqualTo(5L);
+            assertThat(message.payload()).isEqualTo("hello".getBytes());
+            assertThat(message.userHeaders()).isEmpty();
+        }
+
+        @Test
+        void shouldDeserializePolledMessageWithUserHeaders() {
+            // given
+            ByteBuf buffer = Unpooled.buffer();
+            writeU64(buffer, BigInteger.ZERO);
+            buffer.writeBytes(new byte[16]);
+            writeU64(buffer, BigInteger.ZERO);
+            writeU64(buffer, BigInteger.valueOf(1000));
+            writeU64(buffer, BigInteger.valueOf(1000));
+
+            // Calculate and write user headers
+            ByteBuf headersBuffer = Unpooled.buffer();
+            headersBuffer.writeIntLE(3); // key length
+            headersBuffer.writeBytes("key".getBytes());
+            headersBuffer.writeByte(HeaderKind.Raw.asCode());
+            headersBuffer.writeIntLE(3); // value length
+            headersBuffer.writeBytes("val".getBytes());
+
+            buffer.writeIntLE(headersBuffer.readableBytes()); // user headers 
length
+            buffer.writeIntLE(3); // payload length
+            buffer.writeBytes("abc".getBytes()); // payload
+            buffer.writeBytes(headersBuffer); // user headers
+
+            // when
+            var message = readPolledMessage(buffer);
+
+            // then
+            assertThat(message.userHeaders()).hasSize(1);
+            
assertThat(message.userHeaders().get("key").value()).isEqualTo("val");
+        }
+
+        @Test
+        void shouldDeserializePolledMessages() {
+            // given
+            ByteBuf buffer = Unpooled.buffer();
+            buffer.writeIntLE(1); // partition ID
+            writeU64(buffer, BigInteger.valueOf(10)); // current offset
+            buffer.writeIntLE(1); // messages count
+            // Write one message
+            writeU64(buffer, BigInteger.ZERO);
+            buffer.writeBytes(new byte[16]);
+            writeU64(buffer, BigInteger.ZERO);
+            writeU64(buffer, BigInteger.valueOf(1000));
+            writeU64(buffer, BigInteger.valueOf(1000));
+            buffer.writeIntLE(0);
+            buffer.writeIntLE(2);
+            buffer.writeBytes("hi".getBytes());
+
+            // when
+            var polledMessages = readPolledMessages(buffer);
+
+            // then
+            assertThat(polledMessages.partitionId()).isEqualTo(1L);
+            
assertThat(polledMessages.currentOffset()).isEqualTo(BigInteger.valueOf(10));
+            assertThat(polledMessages.count()).isEqualTo(1L);
+            assertThat(polledMessages.messages()).hasSize(1);
+        }
+    }
+
+    @Nested
+    class StatsDeserialization {
+
+        @Test
+        void shouldDeserializeStats() {
+            // given
+            ByteBuf buffer = Unpooled.buffer();
+            buffer.writeIntLE(1234); // process ID
+            buffer.writeFloatLE(12.5f); // CPU usage
+            buffer.writeFloatLE(50.0f); // total CPU usage
+            writeU64(buffer, BigInteger.valueOf(1000000)); // memory usage
+            writeU64(buffer, BigInteger.valueOf(8000000)); // total memory
+            writeU64(buffer, BigInteger.valueOf(7000000)); // available memory
+            writeU64(buffer, BigInteger.valueOf(3600)); // run time
+            writeU64(buffer, BigInteger.valueOf(1000000)); // start time
+            writeU64(buffer, BigInteger.valueOf(500)); // read bytes
+            writeU64(buffer, BigInteger.valueOf(600)); // written bytes
+            writeU64(buffer, BigInteger.valueOf(1000)); // messages size bytes
+            buffer.writeIntLE(5); // streams count
+            buffer.writeIntLE(10); // topics count
+            buffer.writeIntLE(20); // partitions count
+            buffer.writeIntLE(100); // segments count
+            writeU64(buffer, BigInteger.valueOf(5000)); // messages count
+            buffer.writeIntLE(3); // clients count
+            buffer.writeIntLE(2); // consumer groups count
+            buffer.writeIntLE(9); // hostname length
+            buffer.writeBytes("localhost".getBytes());
+            buffer.writeIntLE(5); // OS name length
+            buffer.writeBytes("Linux".getBytes());
+            buffer.writeIntLE(5); // OS version length
+            buffer.writeBytes("5.4.0".getBytes());
+            buffer.writeIntLE(7); // kernel version length
+            buffer.writeBytes("5.4.0-1".getBytes());
+
+            // when
+            var stats = readStats(buffer);
+
+            // then
+            assertThat(stats.processId()).isEqualTo(1234L);
+            assertThat(stats.cpuUsage()).isEqualTo(12.5f);
+            assertThat(stats.streamsCount()).isEqualTo(5L);
+            assertThat(stats.hostname()).isEqualTo("localhost");
+            assertThat(stats.osName()).isEqualTo("Linux");
+        }
+    }
+
+    @Nested
+    class ClientInfoDeserialization {
+
+        @Test
+        void shouldDeserializeClientInfo() {
+            // given
+            ByteBuf buffer = Unpooled.buffer();
+            buffer.writeIntLE(100); // client ID
+            buffer.writeIntLE(5); // user ID
+            buffer.writeByte(1); // transport (TCP)
+            buffer.writeIntLE(9); // address length
+            buffer.writeBytes("127.0.0.1".getBytes());
+            buffer.writeIntLE(0); // consumer groups count
+
+            // when
+            var clientInfo = readClientInfo(buffer);
+
+            // then
+            assertThat(clientInfo.clientId()).isEqualTo(100L);
+            assertThat(clientInfo.userId()).isPresent().hasValue(5L);
+            assertThat(clientInfo.address()).isEqualTo("127.0.0.1");
+            assertThat(clientInfo.transport()).isEqualTo("Tcp");
+        }
+
+        @Test
+        void shouldDeserializeConsumerGroupInfo() {
+            // given
+            ByteBuf buffer = Unpooled.buffer();
+            buffer.writeIntLE(1); // stream ID
+            buffer.writeIntLE(2); // topic ID
+            buffer.writeIntLE(3); // group ID
+
+            // when
+            var groupInfo = readConsumerGroupInfo(buffer);
+
+            // then
+            assertThat(groupInfo.streamId()).isEqualTo(1L);
+            assertThat(groupInfo.topicId()).isEqualTo(2L);
+            assertThat(groupInfo.consumerGroupId()).isEqualTo(3L);
+        }
+    }
+
+    @Nested
+    class UserInfoDeserialization {
+
+        @Test
+        void shouldDeserializeUserInfo() {
+            // given
+            ByteBuf buffer = Unpooled.buffer();
+            buffer.writeIntLE(42); // user ID
+            writeU64(buffer, BigInteger.valueOf(2000)); // created at
+            buffer.writeByte(UserStatus.Active.asCode()); // status
+            buffer.writeByte(4); // username length
+            buffer.writeBytes("user".getBytes());
+
+            // when
+            var userInfo = readUserInfo(buffer);
+
+            // then
+            assertThat(userInfo.id()).isEqualTo(42L);
+            
assertThat(userInfo.createdAt()).isEqualTo(BigInteger.valueOf(2000));
+            assertThat(userInfo.status()).isEqualTo(UserStatus.Active);
+            assertThat(userInfo.username()).isEqualTo("user");
+        }
+
+        @Test
+        void shouldDeserializeUserInfoDetailsWithoutPermissions() {
+            // given
+            ByteBuf buffer = Unpooled.buffer();
+            buffer.writeIntLE(1);
+            writeU64(buffer, BigInteger.valueOf(1000));
+            buffer.writeByte(UserStatus.Active.asCode());
+            buffer.writeByte(5);
+            buffer.writeBytes("admin".getBytes());
+            buffer.writeBoolean(false); // no permissions
+
+            // when
+            var userInfoDetails = readUserInfoDetails(buffer);
+
+            // then
+            assertThat(userInfoDetails.id()).isEqualTo(1L);
+            assertThat(userInfoDetails.permissions()).isEmpty();
+        }
+
+        @Test
+        void shouldDeserializeUserInfoDetailsWithPermissions() {
+            // given
+            ByteBuf buffer = Unpooled.buffer();
+            buffer.writeIntLE(1);
+            writeU64(buffer, BigInteger.valueOf(1000));
+            buffer.writeByte(UserStatus.Active.asCode());
+            buffer.writeByte(5);
+            buffer.writeBytes("admin".getBytes());
+            buffer.writeBoolean(true); // has permissions
+            buffer.writeIntLE(10); // permissions length (ignored but required)
+            // Write global permissions (10 booleans)
+            for (int i = 0; i < 10; i++) {
+                buffer.writeBoolean(true);
+            }
+            buffer.writeBoolean(false); // no stream permissions
+
+            // when
+            var userInfoDetails = readUserInfoDetails(buffer);
+
+            // then
+            assertThat(userInfoDetails.permissions()).isPresent();
+        }
+    }
+
+    @Nested
+    class PermissionsDeserialization {
+
+        @Test
+        void shouldDeserializeGlobalPermissions() {
+            // given
+            ByteBuf buffer = Unpooled.buffer();
+            buffer.writeBoolean(true); // manageServers
+            buffer.writeBoolean(false); // readServers
+            buffer.writeBoolean(true); // manageUsers
+            buffer.writeBoolean(false); // readUsers
+            buffer.writeBoolean(true); // manageStreams
+            buffer.writeBoolean(false); // readStreams
+            buffer.writeBoolean(true); // manageTopics
+            buffer.writeBoolean(false); // readTopics
+            buffer.writeBoolean(true); // pollMessages
+            buffer.writeBoolean(false); // sendMessages
+
+            // when
+            var permissions = readGlobalPermissions(buffer);
+
+            // then
+            assertThat(permissions.manageServers()).isTrue();
+            assertThat(permissions.readServers()).isFalse();
+            assertThat(permissions.pollMessages()).isTrue();
+        }
+
+        @Test
+        void shouldDeserializeTopicPermissions() {
+            // given
+            ByteBuf buffer = Unpooled.buffer();
+            buffer.writeBoolean(true);
+            buffer.writeBoolean(false);
+            buffer.writeBoolean(true);
+            buffer.writeBoolean(false);
+
+            // when
+            var permissions = readTopicPermissions(buffer);
+
+            // then
+            assertThat(permissions.manageTopic()).isTrue();
+            assertThat(permissions.readTopic()).isFalse();
+            assertThat(permissions.pollMessages()).isTrue();
+            assertThat(permissions.sendMessages()).isFalse();
+        }
+
+        @Test
+        void shouldDeserializeStreamPermissionsWithoutTopics() {
+            // given
+            ByteBuf buffer = Unpooled.buffer();
+            buffer.writeBoolean(true);
+            buffer.writeBoolean(true);
+            buffer.writeBoolean(true);
+            buffer.writeBoolean(true);
+            buffer.writeBoolean(true);
+            buffer.writeBoolean(true);
+            buffer.writeBoolean(false); // no topics
+
+            // when
+            var permissions = readStreamPermissions(buffer);
+
+            // then
+            assertThat(permissions.manageStream()).isTrue();
+            assertThat(permissions.topics()).isEmpty();
+        }
+
+        @Test
+        void shouldDeserializeStreamPermissionsWithTopics() {
+            // given
+            ByteBuf buffer = Unpooled.buffer();
+            buffer.writeBoolean(true);
+            buffer.writeBoolean(true);
+            buffer.writeBoolean(true);
+            buffer.writeBoolean(true);
+            buffer.writeBoolean(true);
+            buffer.writeBoolean(true);
+            buffer.writeBoolean(true); // has topic
+            buffer.writeIntLE(1); // topic ID
+            buffer.writeBoolean(true);
+            buffer.writeBoolean(true);
+            buffer.writeBoolean(true);
+            buffer.writeBoolean(true);
+            buffer.writeBoolean(false); // end of topics
+
+            // when
+            var permissions = readStreamPermissions(buffer);
+
+            // then
+            assertThat(permissions.topics()).hasSize(1);
+            assertThat(permissions.topics()).containsKey(1L);
+        }
+
+        @Test
+        void shouldDeserializeFullPermissionsWithoutStreams() {
+            // given
+            ByteBuf buffer = Unpooled.buffer();
+            buffer.writeIntLE(10); // permissions length
+            for (int i = 0; i < 10; i++) {
+                buffer.writeBoolean(false);
+            }
+            buffer.writeBoolean(false); // no streams
+
+            // when
+            var permissions = readPermissions(buffer);
+
+            // then
+            assertThat(permissions.streams()).isEmpty();
+        }
+
+        @Test
+        void shouldDeserializeFullPermissionsWithStreams() {
+            // given
+            ByteBuf buffer = Unpooled.buffer();
+            buffer.writeIntLE(10);
+            for (int i = 0; i < 10; i++) {
+                buffer.writeBoolean(false);
+            }
+            buffer.writeBoolean(true); // has stream
+            buffer.writeIntLE(1); // stream ID
+            for (int i = 0; i < 6; i++) {
+                buffer.writeBoolean(true);
+            }
+            buffer.writeBoolean(false); // no topics in stream
+            buffer.writeBoolean(false); // end of streams
+
+            // when
+            var permissions = readPermissions(buffer);
+
+            // then
+            assertThat(permissions.streams()).hasSize(1);
+            assertThat(permissions.streams()).containsKey(1L);
+        }
+    }
+
+    @Nested
+    class PersonalAccessTokenDeserialization {
+
+        @Test
+        void shouldDeserializeRawPersonalAccessToken() {
+            // given
+            ByteBuf buffer = Unpooled.buffer();
+            buffer.writeByte(10); // token length
+            buffer.writeBytes("token12345".getBytes());
+
+            // when
+            var token = readRawPersonalAccessToken(buffer);
+
+            // then
+            assertThat(token.token()).isEqualTo("token12345");
+        }
+
+        @Test
+        void shouldDeserializePersonalAccessTokenInfoWithExpiry() {
+            // given
+            ByteBuf buffer = Unpooled.buffer();
+            buffer.writeByte(7); // name length
+            buffer.writeBytes("mytoken".getBytes());
+            writeU64(buffer, BigInteger.valueOf(3000)); // expiry
+
+            // when
+            var tokenInfo = readPersonalAccessTokenInfo(buffer);
+
+            // then
+            assertThat(tokenInfo.name()).isEqualTo("mytoken");
+            
assertThat(tokenInfo.expiryAt()).isPresent().hasValue(BigInteger.valueOf(3000));
+        }
+
+        @Test
+        void shouldDeserializePersonalAccessTokenInfoWithoutExpiry() {
+            // given
+            ByteBuf buffer = Unpooled.buffer();
+            buffer.writeByte(7);
+            buffer.writeBytes("mytoken".getBytes());
+            writeU64(buffer, BigInteger.ZERO); // no expiry
+
+            // when
+            var tokenInfo = readPersonalAccessTokenInfo(buffer);
+
+            // then
+            assertThat(tokenInfo.name()).isEqualTo("mytoken");
+            assertThat(tokenInfo.expiryAt()).isEmpty();
+        }
+    }
 }

Reply via email to