This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new f1588b33d refactor(java): unify deserialization and serialization code
for tcp clients (#2477)
f1588b33d is described below
commit f1588b33d3ca08adff47edb5278f8d4e4f953b29
Author: Maciej Modzelewski <[email protected]>
AuthorDate: Fri Dec 12 13:35:24 2025 +0100
refactor(java): unify deserialization and serialization code for tcp
clients (#2477)
resolves #2224
---
.../client/async/tcp/AsyncBytesDeserializer.java | 210 ------------------
.../client/async/tcp/AsyncBytesSerializer.java | 237 ---------------------
.../client/async/tcp/ConsumerGroupsTcpClient.java | 15 +-
.../iggy/client/async/tcp/MessagesTcpClient.java | 8 +-
.../iggy/client/async/tcp/StreamsTcpClient.java | 19 +-
.../iggy/client/async/tcp/TopicsTcpClient.java | 19 +-
.../iggy/client/async/tcp/UsersTcpClient.java | 7 +-
.../blocking/tcp/ConsumerGroupsTcpClient.java | 8 +-
.../blocking/tcp/ConsumerOffsetTcpClient.java | 6 +-
.../client/blocking/tcp/InternalTcpClient.java | 1 +
.../client/blocking/tcp/MessagesTcpClient.java | 4 +-
.../client/blocking/tcp/PartitionsTcpClient.java | 3 +-
.../tcp/PersonalAccessTokensTcpClient.java | 12 +-
.../iggy/client/blocking/tcp/StreamsTcpClient.java | 10 +-
.../iggy/client/blocking/tcp/SystemTcpClient.java | 2 +
.../iggy/client/blocking/tcp/TopicsTcpClient.java | 12 +-
.../iggy/client/blocking/tcp/UsersTcpClient.java | 19 +-
.../apache/iggy/message/BigIntegerMessageId.java | 2 +-
.../blocking/tcp => serde}/BytesDeserializer.java | 48 +++--
.../blocking/tcp => serde}/BytesSerializer.java | 49 +++--
.../blocking/tcp => serde}/CommandCode.java | 6 +-
.../iggy/client/async/AsyncPollMessageTest.java | 25 ---
.../client/blocking/tcp/BytesSerializerTest.java | 24 +++
.../apache/iggy/serde/BytesDeserializerTest.java | 81 +++++++
24 files changed, 247 insertions(+), 580 deletions(-)
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncBytesDeserializer.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncBytesDeserializer.java
deleted file mode 100644
index ae3037f18..000000000
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncBytesDeserializer.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iggy.client.async.tcp;
-
-import io.netty.buffer.ByteBuf;
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.iggy.message.BytesMessageId;
-import org.apache.iggy.message.Message;
-import org.apache.iggy.message.MessageHeader;
-import org.apache.iggy.message.PolledMessages;
-import org.apache.iggy.partition.Partition;
-import org.apache.iggy.stream.StreamBase;
-import org.apache.iggy.stream.StreamDetails;
-import org.apache.iggy.topic.CompressionAlgorithm;
-import org.apache.iggy.topic.Topic;
-import org.apache.iggy.topic.TopicDetails;
-
-import java.math.BigInteger;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-
-/**
- * Async version of BytesDeserializer for the async package.
- * Provides the same wire protocol deserialization as the blocking version.
- */
-public final class AsyncBytesDeserializer {
-
- private AsyncBytesDeserializer() {}
-
- /**
- * Reads PolledMessages from the response buffer.
- * @param response The ByteBuf containing the response data
- * @return The deserialized PolledMessages
- */
- public static PolledMessages readPolledMessages(ByteBuf response) {
- var partitionId = response.readUnsignedIntLE();
- var currentOffset = readU64AsBigInteger(response);
- var messagesCount = response.readUnsignedIntLE();
- var messages = new ArrayList<Message>();
- while (response.isReadable()) {
- messages.add(readPolledMessage(response));
- }
- return new PolledMessages(partitionId, currentOffset, messagesCount,
messages);
- }
-
- /**
- * Reads a single Message from the buffer.
- */
- private static Message readPolledMessage(ByteBuf response) {
- var checksum = readU64AsBigInteger(response);
- var id = readBytesMessageId(response);
- var offset = readU64AsBigInteger(response);
- var timestamp = readU64AsBigInteger(response);
- var originTimestamp = readU64AsBigInteger(response);
- var userHeadersLength = response.readUnsignedIntLE();
- var payloadLength = response.readUnsignedIntLE();
- var header =
- new MessageHeader(checksum, id, offset, timestamp,
originTimestamp, userHeadersLength, payloadLength);
- var payload = new byte[toInt(payloadLength)];
- response.readBytes(payload);
- // TODO: Add support for user headers.
- return new Message(header, payload, Optional.empty());
- }
-
- /**
- * Reads an unsigned 64-bit integer as BigInteger.
- */
- private static BigInteger readU64AsBigInteger(ByteBuf buffer) {
- var bytesArray = new byte[8];
- buffer.readBytes(bytesArray, 0, 8);
- ArrayUtils.reverse(bytesArray);
- // Ensure it's treated as unsigned
- byte[] unsigned = new byte[9];
- System.arraycopy(bytesArray, 0, unsigned, 1, 8);
- return new BigInteger(unsigned);
- }
-
- /**
- * Reads a 16-byte message ID.
- */
- private static BytesMessageId readBytesMessageId(ByteBuf buffer) {
- var bytesArray = new byte[16];
- buffer.readBytes(bytesArray);
- ArrayUtils.reverse(bytesArray);
- return new BytesMessageId(bytesArray);
- }
-
- /**
- * Converts a long to int safely.
- */
- private static int toInt(long value) {
- if (value > Integer.MAX_VALUE) {
- throw new IllegalArgumentException("Value too large for int: " +
value);
- }
- return (int) value;
- }
-
- /**
- * Reads StreamBase from the buffer.
- * @param response The ByteBuf containing the response data
- * @return The deserialized StreamBase
- */
- public static StreamBase readStreamBase(ByteBuf response) {
- var streamId = response.readUnsignedIntLE();
- var createdAt = readU64AsBigInteger(response);
- var topicsCount = response.readUnsignedIntLE();
- var size = readU64AsBigInteger(response);
- var messagesCount = readU64AsBigInteger(response);
- var nameLength = response.readByte();
- var name = response.readCharSequence(nameLength,
StandardCharsets.UTF_8).toString();
-
- return new StreamBase(streamId, createdAt, name, size.toString(),
messagesCount, topicsCount);
- }
-
- /**
- * Reads StreamDetails from the buffer.
- * @param response The ByteBuf containing the response data
- * @return The deserialized StreamDetails
- */
- public static StreamDetails readStreamDetails(ByteBuf response) {
- var streamBase = readStreamBase(response);
-
- List<Topic> topics = new ArrayList<>();
- while (response.isReadable()) {
- topics.add(readTopic(response));
- }
-
- return new StreamDetails(streamBase, topics);
- }
-
- /**
- * Reads Topic from the buffer.
- * @param response The ByteBuf containing the response data
- * @return The deserialized Topic
- */
- public static Topic readTopic(ByteBuf response) {
- var topicId = response.readUnsignedIntLE();
- var createdAt = readU64AsBigInteger(response);
- var partitionsCount = response.readUnsignedIntLE();
- var messageExpiry = readU64AsBigInteger(response);
- var compressionAlgorithmCode = response.readByte();
- var maxTopicSize = readU64AsBigInteger(response);
- var replicationFactor = response.readByte();
- var size = readU64AsBigInteger(response);
- var messagesCount = readU64AsBigInteger(response);
- var nameLength = response.readByte();
- var name = response.readCharSequence(nameLength,
StandardCharsets.UTF_8).toString();
-
- return new Topic(
- topicId,
- createdAt,
- name,
- size.toString(),
- messageExpiry,
- CompressionAlgorithm.fromCode(compressionAlgorithmCode),
- maxTopicSize,
- (short) replicationFactor,
- messagesCount,
- partitionsCount);
- }
-
- /**
- * Reads TopicDetails from the buffer.
- * @param response The ByteBuf containing the response data
- * @return The deserialized TopicDetails
- */
- public static TopicDetails readTopicDetails(ByteBuf response) {
- var topic = readTopic(response);
-
- List<Partition> partitions = new ArrayList<>();
- while (response.isReadable()) {
- partitions.add(readPartition(response));
- }
-
- return new TopicDetails(topic, partitions);
- }
-
- /**
- * Reads Partition from the buffer.
- */
- private static Partition readPartition(ByteBuf response) {
- var partitionId = response.readUnsignedIntLE();
- var createdAt = readU64AsBigInteger(response);
- var segmentsCount = response.readUnsignedIntLE();
- var currentOffset = readU64AsBigInteger(response);
- var size = readU64AsBigInteger(response);
- var messagesCount = readU64AsBigInteger(response);
-
- return new Partition(partitionId, createdAt, segmentsCount,
currentOffset, size.toString(), messagesCount);
- }
-}
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncBytesSerializer.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncBytesSerializer.java
deleted file mode 100644
index 7b1c87bf2..000000000
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncBytesSerializer.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iggy.client.async.tcp;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.iggy.consumergroup.Consumer;
-import org.apache.iggy.identifier.Identifier;
-import org.apache.iggy.message.Message;
-import org.apache.iggy.message.MessageHeader;
-import org.apache.iggy.message.Partitioning;
-import org.apache.iggy.message.PollingStrategy;
-
-import java.math.BigInteger;
-import java.nio.charset.StandardCharsets;
-import java.util.Optional;
-
-/**
- * Async version of BytesSerializer for the async package.
- * Provides the same wire protocol serialization as the blocking version.
- */
-public final class AsyncBytesSerializer {
-
- private AsyncBytesSerializer() {}
-
- /**
- * Serializes a Consumer to bytes.
- *
- * @param consumer The consumer to serialize (can be null)
- * @return ByteBuf containing the serialized consumer
- */
- public static ByteBuf toBytes(Consumer consumer) {
- ByteBuf buffer = Unpooled.buffer();
- if (consumer == null) {
- // No consumer - use code 0 (1 byte) and empty identifier (4 bytes)
- buffer.writeByte(0);
- buffer.writeIntLE(0);
- } else {
- buffer.writeByte(consumer.kind().asCode());
- buffer.writeBytes(toBytes(consumer.id()));
- }
- return buffer;
- }
-
- /**
- * Serializes an Identifier to bytes.
- *
- * @param identifier The identifier to serialize
- * @return ByteBuf containing the serialized identifier
- */
- public static ByteBuf toBytes(Identifier identifier) {
- if (identifier.getKind() == 1) {
- ByteBuf buffer = Unpooled.buffer(6);
- buffer.writeByte(1);
- buffer.writeByte(4);
- buffer.writeIntLE(identifier.getId().intValue());
- return buffer;
- } else if (identifier.getKind() == 2) {
- ByteBuf buffer = Unpooled.buffer(2 +
identifier.getName().length());
- buffer.writeByte(2);
- buffer.writeByte(identifier.getName().length());
- buffer.writeBytes(identifier.getName().getBytes());
- return buffer;
- } else {
- throw new IllegalArgumentException("Unknown identifier kind: " +
identifier.getKind());
- }
- }
-
- /**
- * Serializes a String to bytes with length prefix.
- *
- * @param str The string to serialize
- * @return ByteBuf containing the serialized string
- */
- public static ByteBuf toBytes(String str) {
- ByteBuf buffer = Unpooled.buffer(1 + str.length());
- buffer.writeByte(str.length());
- buffer.writeBytes(str.getBytes());
- return buffer;
- }
-
- /**
- * Serializes a Partitioning to bytes.
- *
- * @param partitioning The partitioning to serialize
- * @return ByteBuf containing the serialized partitioning
- */
- public static ByteBuf toBytes(Partitioning partitioning) {
- ByteBuf buffer = Unpooled.buffer(2 + partitioning.value().length);
- buffer.writeByte(partitioning.kind().asCode());
- buffer.writeByte(partitioning.value().length);
- buffer.writeBytes(partitioning.value());
- return buffer;
- }
-
- /**
- * Serializes a Message to bytes.
- *
- * @param message The message to serialize
- * @return ByteBuf containing the serialized message
- */
- public static ByteBuf toBytes(Message message) {
- var buffer = Unpooled.buffer(MessageHeader.SIZE +
message.payload().length);
- buffer.writeBytes(toBytes(message.header()));
- buffer.writeBytes(message.payload());
- return buffer;
- }
-
- /**
- * Serializes a MessageHeader to bytes.
- *
- * @param header The message header to serialize
- * @return ByteBuf containing the serialized header
- */
- public static ByteBuf toBytes(MessageHeader header) {
- var buffer = Unpooled.buffer(MessageHeader.SIZE);
- buffer.writeBytes(toBytesAsU64(header.checksum()));
- // Convert MessageId to BigInteger and serialize as U128
- buffer.writeBytes(toBytesAsU128(header.id().toBigInteger()));
- buffer.writeBytes(toBytesAsU64(header.offset()));
- buffer.writeBytes(toBytesAsU64(header.timestamp()));
- buffer.writeBytes(toBytesAsU64(header.originTimestamp()));
- buffer.writeIntLE(header.userHeadersLength().intValue());
- buffer.writeIntLE(header.payloadLength().intValue());
- return buffer;
- }
-
- /**
- * Serializes a PollingStrategy to bytes.
- *
- * @param strategy The polling strategy to serialize
- * @return ByteBuf containing the serialized strategy
- */
- public static ByteBuf toBytes(PollingStrategy strategy) {
- var buffer = Unpooled.buffer(9);
- buffer.writeByte(strategy.kind().asCode());
- buffer.writeBytes(toBytesAsU64(strategy.value()));
- return buffer;
- }
-
- static ByteBuf toBytes(Optional<Long> optionalLong) {
- var buffer = Unpooled.buffer(5);
- if (optionalLong.isPresent()) {
- buffer.writeByte(1);
- buffer.writeIntLE(optionalLong.get().intValue());
- } else {
- buffer.writeByte(0);
- buffer.writeIntLE(0);
- }
- return buffer;
- }
-
- /**
- * Converts a BigInteger to bytes as unsigned 64-bit integer.
- *
- * @param value The BigInteger value to convert
- * @return ByteBuf containing the value as 8 bytes in little-endian format
- */
- public static ByteBuf toBytesAsU64(BigInteger value) {
- if (value.signum() == -1) {
- throw new IllegalArgumentException("Negative value cannot be
serialized to unsigned 64: " + value);
- }
- ByteBuf buffer = Unpooled.buffer(8);
- byte[] valueAsBytes = value.toByteArray();
- if (valueAsBytes.length > 9) {
- throw new IllegalArgumentException("Value too large for U64: " +
value);
- }
- // Handle sign byte if present
- if (valueAsBytes.length == 9 && valueAsBytes[0] == 0) {
- valueAsBytes = ArrayUtils.subarray(valueAsBytes, 1, 9);
- }
- ArrayUtils.reverse(valueAsBytes);
- buffer.writeBytes(valueAsBytes, 0, Math.min(8, valueAsBytes.length));
- if (valueAsBytes.length < 8) {
- buffer.writeZero(8 - valueAsBytes.length);
- }
- return buffer;
- }
-
- /**
- * Converts a name string to bytes with length prefix.
- *
- * @param name The name string to convert
- * @return ByteBuf containing the serialized name
- */
- public static ByteBuf nameToBytes(String name) {
- var buffer = Unpooled.buffer(1 + name.length());
- buffer.writeByte(name.length());
- buffer.writeBytes(name.getBytes(StandardCharsets.UTF_8));
- return buffer;
- }
-
- /**
- * Converts a BigInteger to bytes as unsigned 128-bit integer.
- *
- * @param value The BigInteger value to convert
- * @return ByteBuf containing the value as 16 bytes in little-endian format
- */
- public static ByteBuf toBytesAsU128(BigInteger value) {
- if (value.signum() == -1) {
- throw new IllegalArgumentException("Negative value cannot be
serialized to unsigned 128: " + value);
- }
- ByteBuf buffer = Unpooled.buffer(16, 16);
- byte[] valueAsBytes = value.toByteArray();
- if (valueAsBytes.length > 17) {
- throw new IllegalArgumentException("Value too large for U128: " +
value);
- }
- // Remove leading zero byte if present (from positive sign bit)
- if (valueAsBytes.length == 17 && valueAsBytes[0] == 0) {
- valueAsBytes = ArrayUtils.subarray(valueAsBytes, 1, 17);
- }
- ArrayUtils.reverse(valueAsBytes);
- buffer.writeBytes(valueAsBytes, 0, Math.min(16, valueAsBytes.length));
- if (valueAsBytes.length < 16) {
- buffer.writeZero(16 - valueAsBytes.length);
- }
- return buffer;
- }
-}
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/ConsumerGroupsTcpClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/ConsumerGroupsTcpClient.java
index 06e778871..e671539f8 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/ConsumerGroupsTcpClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/ConsumerGroupsTcpClient.java
@@ -21,10 +21,11 @@ package org.apache.iggy.client.async.tcp;
import io.netty.buffer.Unpooled;
import org.apache.iggy.client.async.ConsumerGroupsClient;
-import org.apache.iggy.client.blocking.tcp.CommandCode;
import org.apache.iggy.identifier.ConsumerId;
import org.apache.iggy.identifier.StreamId;
import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.serde.BytesSerializer;
+import org.apache.iggy.serde.CommandCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,13 +48,13 @@ public class ConsumerGroupsTcpClient implements
ConsumerGroupsClient {
var payload = Unpooled.buffer();
// Serialize stream ID
- payload.writeBytes(AsyncBytesSerializer.toBytes(streamId));
+ payload.writeBytes(BytesSerializer.toBytes(streamId));
// Serialize topic ID
- payload.writeBytes(AsyncBytesSerializer.toBytes(topicId));
+ payload.writeBytes(BytesSerializer.toBytes(topicId));
// Serialize consumer group ID
- payload.writeBytes(AsyncBytesSerializer.toBytes(groupId));
+ payload.writeBytes(BytesSerializer.toBytes(groupId));
log.debug("Joining consumer group - Stream: {}, Topic: {}, Group: {}",
streamId, topicId, groupId);
@@ -70,13 +71,13 @@ public class ConsumerGroupsTcpClient implements
ConsumerGroupsClient {
var payload = Unpooled.buffer();
// Serialize stream ID
- payload.writeBytes(AsyncBytesSerializer.toBytes(streamId));
+ payload.writeBytes(BytesSerializer.toBytes(streamId));
// Serialize topic ID
- payload.writeBytes(AsyncBytesSerializer.toBytes(topicId));
+ payload.writeBytes(BytesSerializer.toBytes(topicId));
// Serialize consumer group ID
- payload.writeBytes(AsyncBytesSerializer.toBytes(groupId));
+ payload.writeBytes(BytesSerializer.toBytes(groupId));
log.debug("Leaving consumer group - Stream: {}, Topic: {}, Group: {}",
streamId, topicId, groupId);
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/MessagesTcpClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/MessagesTcpClient.java
index 9e2abec50..74fdc49c2 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/MessagesTcpClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/MessagesTcpClient.java
@@ -21,7 +21,6 @@ package org.apache.iggy.client.async.tcp;
import io.netty.buffer.Unpooled;
import org.apache.iggy.client.async.MessagesClient;
-import org.apache.iggy.client.blocking.tcp.CommandCode;
import org.apache.iggy.consumergroup.Consumer;
import org.apache.iggy.identifier.StreamId;
import org.apache.iggy.identifier.TopicId;
@@ -29,12 +28,14 @@ import org.apache.iggy.message.Message;
import org.apache.iggy.message.Partitioning;
import org.apache.iggy.message.PolledMessages;
import org.apache.iggy.message.PollingStrategy;
+import org.apache.iggy.serde.BytesDeserializer;
+import org.apache.iggy.serde.CommandCode;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import static org.apache.iggy.client.async.tcp.AsyncBytesSerializer.toBytes;
+import static org.apache.iggy.serde.BytesSerializer.toBytes;
/**
* Async TCP implementation of MessagesClient using Netty for non-blocking I/O.
@@ -57,7 +58,6 @@ public class MessagesTcpClient implements MessagesClient {
Long count,
boolean autoCommit) {
- // Build the request payload
var payload = Unpooled.buffer();
var consumerBytes = toBytes(consumer);
@@ -83,7 +83,7 @@ public class MessagesTcpClient implements MessagesClient {
.sendAsync(CommandCode.Messages.POLL.getValue(), payload)
.thenApply(response -> {
try {
- return
AsyncBytesDeserializer.readPolledMessages(response);
+ return BytesDeserializer.readPolledMessages(response);
} finally {
response.release();
}
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/StreamsTcpClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/StreamsTcpClient.java
index fdd8ba01b..c9e10ee26 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/StreamsTcpClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/StreamsTcpClient.java
@@ -20,9 +20,11 @@
package org.apache.iggy.client.async.tcp;
import io.netty.buffer.Unpooled;
+import io.netty.util.ReferenceCounted;
import org.apache.iggy.client.async.StreamsClient;
-import org.apache.iggy.client.blocking.tcp.CommandCode;
import org.apache.iggy.identifier.StreamId;
+import org.apache.iggy.serde.BytesSerializer;
+import org.apache.iggy.serde.CommandCode;
import org.apache.iggy.stream.StreamBase;
import org.apache.iggy.stream.StreamDetails;
@@ -31,10 +33,9 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import static
org.apache.iggy.client.async.tcp.AsyncBytesDeserializer.readStreamBase;
-import static
org.apache.iggy.client.async.tcp.AsyncBytesDeserializer.readStreamDetails;
-import static
org.apache.iggy.client.async.tcp.AsyncBytesSerializer.nameToBytes;
-import static org.apache.iggy.client.async.tcp.AsyncBytesSerializer.toBytes;
+import static org.apache.iggy.serde.BytesDeserializer.readStreamBase;
+import static org.apache.iggy.serde.BytesDeserializer.readStreamDetails;
+import static org.apache.iggy.serde.BytesSerializer.toBytes;
/**
* Async TCP implementation of StreamsClient using Netty for non-blocking I/O.
@@ -52,7 +53,7 @@ public class StreamsTcpClient implements StreamsClient {
var payloadSize = 1 + name.length();
var payload = Unpooled.buffer(payloadSize);
- payload.writeBytes(nameToBytes(name));
+ payload.writeBytes(BytesSerializer.toBytes(name));
return connection
.sendAsync(CommandCode.Stream.CREATE.getValue(), payload)
@@ -100,11 +101,11 @@ public class StreamsTcpClient implements StreamsClient {
var payload = Unpooled.buffer(payloadSize + idBytes.capacity());
payload.writeBytes(idBytes);
- payload.writeBytes(nameToBytes(name));
+ payload.writeBytes(BytesSerializer.toBytes(name));
return connection
.sendAsync(CommandCode.Stream.UPDATE.getValue(), payload)
- .thenAccept(response -> response.release());
+ .thenAccept(ReferenceCounted::release);
}
@Override
@@ -113,6 +114,6 @@ public class StreamsTcpClient implements StreamsClient {
return connection
.sendAsync(CommandCode.Stream.DELETE.getValue(), payload)
- .thenAccept(response -> response.release());
+ .thenAccept(ReferenceCounted::release);
}
}
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/TopicsTcpClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/TopicsTcpClient.java
index 5128039d2..80e73b0e4 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/TopicsTcpClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/TopicsTcpClient.java
@@ -21,9 +21,11 @@ package org.apache.iggy.client.async.tcp;
import io.netty.buffer.Unpooled;
import org.apache.iggy.client.async.TopicsClient;
-import org.apache.iggy.client.blocking.tcp.CommandCode;
import org.apache.iggy.identifier.StreamId;
import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.serde.BytesDeserializer;
+import org.apache.iggy.serde.BytesSerializer;
+import org.apache.iggy.serde.CommandCode;
import org.apache.iggy.topic.CompressionAlgorithm;
import org.apache.iggy.topic.Topic;
import org.apache.iggy.topic.TopicDetails;
@@ -34,9 +36,8 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import static
org.apache.iggy.client.async.tcp.AsyncBytesSerializer.nameToBytes;
-import static org.apache.iggy.client.async.tcp.AsyncBytesSerializer.toBytes;
-import static
org.apache.iggy.client.async.tcp.AsyncBytesSerializer.toBytesAsU64;
+import static org.apache.iggy.serde.BytesSerializer.toBytes;
+import static org.apache.iggy.serde.BytesSerializer.toBytesAsU64;
/**
* Async TCP implementation of TopicsClient using Netty for non-blocking I/O.
@@ -58,7 +59,7 @@ public class TopicsTcpClient implements TopicsClient {
return connection.sendAsync(CommandCode.Topic.GET.getValue(),
payload).thenApply(response -> {
try {
if (response.isReadable()) {
- return
Optional.of(AsyncBytesDeserializer.readTopicDetails(response));
+ return
Optional.of(BytesDeserializer.readTopicDetails(response));
}
return Optional.<TopicDetails>empty();
} finally {
@@ -77,7 +78,7 @@ public class TopicsTcpClient implements TopicsClient {
try {
List<Topic> topics = new ArrayList<>();
while (response.isReadable()) {
-
topics.add(AsyncBytesDeserializer.readTopic(response));
+ topics.add(BytesDeserializer.readTopic(response));
}
return topics;
} finally {
@@ -105,13 +106,13 @@ public class TopicsTcpClient implements TopicsClient {
payload.writeBytes(toBytesAsU64(messageExpiry));
payload.writeBytes(toBytesAsU64(maxTopicSize));
payload.writeByte(replicationFactor.orElse((short) 0));
- payload.writeBytes(nameToBytes(name));
+ payload.writeBytes(BytesSerializer.toBytes(name));
return connection
.sendAsync(CommandCode.Topic.CREATE.getValue(), payload)
.thenApply(response -> {
try {
- return
AsyncBytesDeserializer.readTopicDetails(response);
+ return BytesDeserializer.readTopicDetails(response);
} finally {
response.release();
}
@@ -135,7 +136,7 @@ public class TopicsTcpClient implements TopicsClient {
payload.writeBytes(toBytesAsU64(messageExpiry));
payload.writeBytes(toBytesAsU64(maxTopicSize));
payload.writeByte(replicationFactor.orElse((short) 0));
- payload.writeBytes(nameToBytes(name));
+ payload.writeBytes(BytesSerializer.toBytes(name));
return connection
.sendAsync(CommandCode.Topic.UPDATE.getValue(), payload)
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/UsersTcpClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/UsersTcpClient.java
index 00d8f31db..8f7ded490 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/UsersTcpClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/UsersTcpClient.java
@@ -21,7 +21,8 @@ package org.apache.iggy.client.async.tcp;
import io.netty.buffer.Unpooled;
import org.apache.iggy.client.async.UsersClient;
-import org.apache.iggy.client.blocking.tcp.CommandCode;
+import org.apache.iggy.serde.BytesSerializer;
+import org.apache.iggy.serde.CommandCode;
import org.apache.iggy.user.IdentityInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,8 +48,8 @@ public class UsersTcpClient implements UsersClient {
String context = "java-sdk";
var payload = Unpooled.buffer();
- var usernameBytes = AsyncBytesSerializer.toBytes(username);
- var passwordBytes = AsyncBytesSerializer.toBytes(password);
+ var usernameBytes = BytesSerializer.toBytes(username);
+ var passwordBytes = BytesSerializer.toBytes(password);
payload.writeBytes(usernameBytes);
payload.writeBytes(passwordBytes);
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerGroupsTcpClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerGroupsTcpClient.java
index bf2fb9366..7868d1dfb 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerGroupsTcpClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerGroupsTcpClient.java
@@ -26,12 +26,14 @@ import org.apache.iggy.consumergroup.ConsumerGroupDetails;
import org.apache.iggy.identifier.ConsumerId;
import org.apache.iggy.identifier.StreamId;
import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.serde.BytesDeserializer;
+import org.apache.iggy.serde.BytesSerializer;
+import org.apache.iggy.serde.CommandCode;
import java.util.List;
import java.util.Optional;
-import static org.apache.iggy.client.blocking.tcp.BytesSerializer.nameToBytes;
-import static org.apache.iggy.client.blocking.tcp.BytesSerializer.toBytes;
+import static org.apache.iggy.serde.BytesSerializer.toBytes;
class ConsumerGroupsTcpClient implements ConsumerGroupsClient {
@@ -66,7 +68,7 @@ class ConsumerGroupsTcpClient implements ConsumerGroupsClient
{
payload.writeBytes(streamIdBytes);
payload.writeBytes(topicIdBytes);
- payload.writeBytes(nameToBytes(name));
+ payload.writeBytes(BytesSerializer.toBytes(name));
return tcpClient.exchangeForEntity(
CommandCode.ConsumerGroup.CREATE, payload,
BytesDeserializer::readConsumerGroupDetails);
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerOffsetTcpClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerOffsetTcpClient.java
index 2fe86ba6b..08ddd5f3b 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerOffsetTcpClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerOffsetTcpClient.java
@@ -24,12 +24,14 @@ import org.apache.iggy.consumergroup.Consumer;
import org.apache.iggy.consumeroffset.ConsumerOffsetInfo;
import org.apache.iggy.identifier.StreamId;
import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.serde.BytesDeserializer;
+import org.apache.iggy.serde.CommandCode;
import java.math.BigInteger;
import java.util.Optional;
-import static org.apache.iggy.client.blocking.tcp.BytesSerializer.toBytes;
-import static org.apache.iggy.client.blocking.tcp.BytesSerializer.toBytesAsU64;
+import static org.apache.iggy.serde.BytesSerializer.toBytes;
+import static org.apache.iggy.serde.BytesSerializer.toBytesAsU64;
class ConsumerOffsetTcpClient implements ConsumerOffsetsClient {
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/InternalTcpClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/InternalTcpClient.java
index ab4bf684e..234a29455 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/InternalTcpClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/InternalTcpClient.java
@@ -25,6 +25,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
+import org.apache.iggy.serde.CommandCode;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/MessagesTcpClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/MessagesTcpClient.java
index 172d0c9e6..84e251d26 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/MessagesTcpClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/MessagesTcpClient.java
@@ -28,11 +28,13 @@ import org.apache.iggy.message.Message;
import org.apache.iggy.message.Partitioning;
import org.apache.iggy.message.PolledMessages;
import org.apache.iggy.message.PollingStrategy;
+import org.apache.iggy.serde.BytesDeserializer;
+import org.apache.iggy.serde.CommandCode;
import java.util.List;
import java.util.Optional;
-import static org.apache.iggy.client.blocking.tcp.BytesSerializer.toBytes;
+import static org.apache.iggy.serde.BytesSerializer.toBytes;
class MessagesTcpClient implements MessagesClient {
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PartitionsTcpClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PartitionsTcpClient.java
index 1fa634ad6..7678d7c6b 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PartitionsTcpClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PartitionsTcpClient.java
@@ -22,8 +22,9 @@ package org.apache.iggy.client.blocking.tcp;
import org.apache.iggy.client.blocking.PartitionsClient;
import org.apache.iggy.identifier.StreamId;
import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.serde.CommandCode;
-import static org.apache.iggy.client.blocking.tcp.BytesSerializer.toBytes;
+import static org.apache.iggy.serde.BytesSerializer.toBytes;
class PartitionsTcpClient implements PartitionsClient {
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PersonalAccessTokensTcpClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PersonalAccessTokensTcpClient.java
index f0d0b22f3..b3535b888 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PersonalAccessTokensTcpClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PersonalAccessTokensTcpClient.java
@@ -23,14 +23,16 @@ import io.netty.buffer.Unpooled;
import org.apache.iggy.client.blocking.PersonalAccessTokensClient;
import org.apache.iggy.personalaccesstoken.PersonalAccessTokenInfo;
import org.apache.iggy.personalaccesstoken.RawPersonalAccessToken;
+import org.apache.iggy.serde.BytesDeserializer;
+import org.apache.iggy.serde.CommandCode;
import org.apache.iggy.user.IdentityInfo;
import java.math.BigInteger;
import java.util.List;
import java.util.Optional;
-import static org.apache.iggy.client.blocking.tcp.BytesSerializer.nameToBytes;
-import static org.apache.iggy.client.blocking.tcp.BytesSerializer.toBytesAsU64;
+import static org.apache.iggy.serde.BytesSerializer.toBytes;
+import static org.apache.iggy.serde.BytesSerializer.toBytesAsU64;
class PersonalAccessTokensTcpClient implements PersonalAccessTokensClient {
@@ -43,7 +45,7 @@ class PersonalAccessTokensTcpClient implements
PersonalAccessTokensClient {
@Override
public RawPersonalAccessToken createPersonalAccessToken(String name,
BigInteger expiry) {
var payload = Unpooled.buffer();
- payload.writeBytes(nameToBytes(name));
+ payload.writeBytes(toBytes(name));
payload.writeBytes(toBytesAsU64(expiry));
return tcpClient.exchangeForEntity(
CommandCode.PersonalAccessToken.CREATE, payload,
BytesDeserializer::readRawPersonalAccessToken);
@@ -57,13 +59,13 @@ class PersonalAccessTokensTcpClient implements
PersonalAccessTokensClient {
@Override
public void deletePersonalAccessToken(String name) {
- var payload = nameToBytes(name);
+ var payload = toBytes(name);
tcpClient.send(CommandCode.PersonalAccessToken.DELETE, payload);
}
@Override
public IdentityInfo loginWithPersonalAccessToken(String token) {
- var payload = nameToBytes(token);
+ var payload = toBytes(token);
return
tcpClient.exchangeForEntity(CommandCode.PersonalAccessToken.LOGIN, payload, buf
-> {
var userId = buf.readUnsignedIntLE();
return new IdentityInfo(userId, Optional.empty());
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/StreamsTcpClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/StreamsTcpClient.java
index e7152ddcb..7605f164e 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/StreamsTcpClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/StreamsTcpClient.java
@@ -22,14 +22,16 @@ package org.apache.iggy.client.blocking.tcp;
import io.netty.buffer.Unpooled;
import org.apache.iggy.client.blocking.StreamsClient;
import org.apache.iggy.identifier.StreamId;
+import org.apache.iggy.serde.BytesDeserializer;
+import org.apache.iggy.serde.BytesSerializer;
+import org.apache.iggy.serde.CommandCode;
import org.apache.iggy.stream.StreamBase;
import org.apache.iggy.stream.StreamDetails;
import java.util.List;
import java.util.Optional;
-import static org.apache.iggy.client.blocking.tcp.BytesSerializer.nameToBytes;
-import static org.apache.iggy.client.blocking.tcp.BytesSerializer.toBytes;
+import static org.apache.iggy.serde.BytesSerializer.toBytes;
class StreamsTcpClient implements StreamsClient {
@@ -44,7 +46,7 @@ class StreamsTcpClient implements StreamsClient {
var payloadSize = 1 + name.length();
var payload = Unpooled.buffer(payloadSize);
- payload.writeBytes(nameToBytes(name));
+ payload.writeBytes(BytesSerializer.toBytes(name));
return tcpClient.exchangeForEntity(CommandCode.Stream.CREATE, payload,
BytesDeserializer::readStreamDetails);
}
@@ -66,7 +68,7 @@ class StreamsTcpClient implements StreamsClient {
var payload = Unpooled.buffer(payloadSize + idBytes.capacity());
payload.writeBytes(idBytes);
- payload.writeBytes(nameToBytes(name));
+ payload.writeBytes(BytesSerializer.toBytes(name));
tcpClient.send(CommandCode.Stream.UPDATE, payload);
}
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/SystemTcpClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/SystemTcpClient.java
index 447f48b3f..70277b742 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/SystemTcpClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/SystemTcpClient.java
@@ -21,6 +21,8 @@ package org.apache.iggy.client.blocking.tcp;
import io.netty.buffer.Unpooled;
import org.apache.iggy.client.blocking.SystemClient;
+import org.apache.iggy.serde.BytesDeserializer;
+import org.apache.iggy.serde.CommandCode;
import org.apache.iggy.system.ClientInfo;
import org.apache.iggy.system.ClientInfoDetails;
import org.apache.iggy.system.Stats;
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/TopicsTcpClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/TopicsTcpClient.java
index 1ccbd512a..01392dc55 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/TopicsTcpClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/TopicsTcpClient.java
@@ -23,6 +23,9 @@ import io.netty.buffer.Unpooled;
import org.apache.iggy.client.blocking.TopicsClient;
import org.apache.iggy.identifier.StreamId;
import org.apache.iggy.identifier.TopicId;
+import org.apache.iggy.serde.BytesDeserializer;
+import org.apache.iggy.serde.BytesSerializer;
+import org.apache.iggy.serde.CommandCode;
import org.apache.iggy.topic.CompressionAlgorithm;
import org.apache.iggy.topic.Topic;
import org.apache.iggy.topic.TopicDetails;
@@ -31,9 +34,8 @@ import java.math.BigInteger;
import java.util.List;
import java.util.Optional;
-import static org.apache.iggy.client.blocking.tcp.BytesSerializer.nameToBytes;
-import static org.apache.iggy.client.blocking.tcp.BytesSerializer.toBytes;
-import static org.apache.iggy.client.blocking.tcp.BytesSerializer.toBytesAsU64;
+import static org.apache.iggy.serde.BytesSerializer.toBytes;
+import static org.apache.iggy.serde.BytesSerializer.toBytesAsU64;
class TopicsTcpClient implements TopicsClient {
@@ -74,7 +76,7 @@ class TopicsTcpClient implements TopicsClient {
payload.writeBytes(toBytesAsU64(messageExpiry));
payload.writeBytes(toBytesAsU64(maxTopicSize));
payload.writeByte(replicationFactor.orElse((short) 0));
- payload.writeBytes(nameToBytes(name));
+ payload.writeBytes(BytesSerializer.toBytes(name));
return tcpClient.exchangeForEntity(CommandCode.Topic.CREATE, payload,
BytesDeserializer::readTopicDetails);
}
@@ -95,7 +97,7 @@ class TopicsTcpClient implements TopicsClient {
payload.writeBytes(toBytesAsU64(messageExpiry));
payload.writeBytes(toBytesAsU64(maxTopicSize));
payload.writeByte(replicationFactor.orElse((short) 0));
- payload.writeBytes(nameToBytes(name));
+ payload.writeBytes(BytesSerializer.toBytes(name));
tcpClient.send(CommandCode.Topic.UPDATE, payload);
}
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/UsersTcpClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/UsersTcpClient.java
index 006de6586..6fb201526 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/UsersTcpClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/UsersTcpClient.java
@@ -23,6 +23,8 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.iggy.client.blocking.UsersClient;
import org.apache.iggy.identifier.UserId;
+import org.apache.iggy.serde.BytesDeserializer;
+import org.apache.iggy.serde.CommandCode;
import org.apache.iggy.user.IdentityInfo;
import org.apache.iggy.user.Permissions;
import org.apache.iggy.user.UserInfo;
@@ -32,8 +34,7 @@ import org.apache.iggy.user.UserStatus;
import java.util.List;
import java.util.Optional;
-import static org.apache.iggy.client.blocking.tcp.BytesSerializer.nameToBytes;
-import static org.apache.iggy.client.blocking.tcp.BytesSerializer.toBytes;
+import static org.apache.iggy.serde.BytesSerializer.toBytes;
class UsersTcpClient implements UsersClient {
@@ -58,8 +59,8 @@ class UsersTcpClient implements UsersClient {
public UserInfoDetails createUser(
String username, String password, UserStatus status,
Optional<Permissions> permissions) {
var payload = Unpooled.buffer();
- payload.writeBytes(nameToBytes(username));
- payload.writeBytes(nameToBytes(password));
+ payload.writeBytes(toBytes(username));
+ payload.writeBytes(toBytes(password));
payload.writeByte(status.asCode());
permissions.ifPresentOrElse(
perms -> {
@@ -85,7 +86,7 @@ class UsersTcpClient implements UsersClient {
usernameOptional.ifPresentOrElse(
(username) -> {
payload.writeByte(1);
- payload.writeBytes(nameToBytes(username));
+ payload.writeBytes(toBytes(username));
},
() -> payload.writeByte(0));
statusOptional.ifPresentOrElse(
@@ -117,8 +118,8 @@ class UsersTcpClient implements UsersClient {
@Override
public void changePassword(UserId userId, String currentPassword, String
newPassword) {
var payload = toBytes(userId);
- payload.writeBytes(nameToBytes(currentPassword));
- payload.writeBytes(nameToBytes(newPassword));
+ payload.writeBytes(toBytes(currentPassword));
+ payload.writeBytes(toBytes(newPassword));
tcpClient.send(CommandCode.User.CHANGE_PASSWORD, payload);
}
@@ -130,8 +131,8 @@ class UsersTcpClient implements UsersClient {
var payloadSize = 2 + username.length() + password.length() + 4 +
version.length() + 4 + context.length();
var payload = Unpooled.buffer(payloadSize);
- payload.writeBytes(nameToBytes(username));
- payload.writeBytes(nameToBytes(password));
+ payload.writeBytes(toBytes(username));
+ payload.writeBytes(toBytes(password));
payload.writeIntLE(version.length());
payload.writeBytes(version.getBytes());
payload.writeIntLE(context.length());
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/BigIntegerMessageId.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/BigIntegerMessageId.java
index 8e1b16691..d8b04e27d 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/BigIntegerMessageId.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/BigIntegerMessageId.java
@@ -20,7 +20,7 @@
package org.apache.iggy.message;
import io.netty.buffer.ByteBuf;
-import org.apache.iggy.client.blocking.tcp.BytesSerializer;
+import org.apache.iggy.serde.BytesSerializer;
import java.math.BigInteger;
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/BytesDeserializer.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesDeserializer.java
similarity index 90%
rename from
foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/BytesDeserializer.java
rename to
foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesDeserializer.java
index b89f6fce9..58e384b1c 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/BytesDeserializer.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesDeserializer.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iggy.client.blocking.tcp;
+package org.apache.iggy.serde;
import io.netty.buffer.ByteBuf;
import org.apache.commons.lang3.ArrayUtils;
@@ -57,11 +57,15 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
-final class BytesDeserializer {
+/**
+ * Unified deserializer for both blocking and async clients.
+ * Provides deserialization of ByteBuf to domain objects according to Iggy
wire protocol.
+ */
+public final class BytesDeserializer {
private BytesDeserializer() {}
- static StreamBase readStreamBase(ByteBuf response) {
+ public static StreamBase readStreamBase(ByteBuf response) {
var streamId = response.readUnsignedIntLE();
var createdAt = readU64AsBigInteger(response);
var topicsCount = response.readUnsignedIntLE();
@@ -73,11 +77,11 @@ final class BytesDeserializer {
return new StreamBase(streamId, createdAt, name, size.toString(),
messagesCount, topicsCount);
}
- static StreamDetails readStreamDetails(ByteBuf response) {
+ public static StreamDetails readStreamDetails(ByteBuf response) {
var streamBase = readStreamBase(response);
List<Topic> topics = new ArrayList<>();
- if (response.isReadable()) {
+ while (response.isReadable()) {
topics.add(readTopic(response));
}
@@ -95,7 +99,7 @@ final class BytesDeserializer {
return new TopicDetails(topic, partitions);
}
- static Partition readPartition(ByteBuf response) {
+ public static Partition readPartition(ByteBuf response) {
var partitionId = response.readUnsignedIntLE();
var createdAt = readU64AsBigInteger(response);
var segmentsCount = response.readUnsignedIntLE();
@@ -141,7 +145,7 @@ final class BytesDeserializer {
return new ConsumerGroupDetails(consumerGroup, members);
}
- static ConsumerGroupMember readConsumerGroupMember(ByteBuf response) {
+ public static ConsumerGroupMember readConsumerGroupMember(ByteBuf
response) {
var memberId = response.readUnsignedIntLE();
var partitionsCount = response.readUnsignedIntLE();
List<Long> partitionIds = new ArrayList<>();
@@ -178,7 +182,7 @@ final class BytesDeserializer {
return new PolledMessages(partitionId, currentOffset, messagesCount,
messages);
}
- static Message readPolledMessage(ByteBuf response) {
+ public static Message readPolledMessage(ByteBuf response) {
var checksum = readU64AsBigInteger(response);
var id = readBytesMessageId(response);
var offset = readU64AsBigInteger(response);
@@ -194,7 +198,7 @@ final class BytesDeserializer {
return new Message(header, payload, Optional.empty());
}
- static Stats readStats(ByteBuf response) {
+ public static Stats readStats(ByteBuf response) {
var processId = response.readUnsignedIntLE();
var cpuUsage = response.readFloatLE();
var totalCpuUsage = response.readFloatLE();
@@ -251,7 +255,7 @@ final class BytesDeserializer {
kernelVersion);
}
- static ClientInfoDetails readClientInfoDetails(ByteBuf response) {
+ public static ClientInfoDetails readClientInfoDetails(ByteBuf response) {
var clientInfo = readClientInfo(response);
var consumerGroups = new ArrayList<ConsumerGroupInfo>();
for (int i = 0; i < clientInfo.consumerGroupsCount(); i++) {
@@ -261,7 +265,7 @@ final class BytesDeserializer {
return new ClientInfoDetails(clientInfo, consumerGroups);
}
- static ClientInfo readClientInfo(ByteBuf response) {
+ public static ClientInfo readClientInfo(ByteBuf response) {
var clientId = response.readUnsignedIntLE();
var userId = response.readUnsignedIntLE();
var userIdOptional = Optional.<Long>empty();
@@ -280,7 +284,7 @@ final class BytesDeserializer {
return new ClientInfo(clientId, userIdOptional, address,
transportString, consumerGroupsCount);
}
- static ConsumerGroupInfo readConsumerGroupInfo(ByteBuf response) {
+ public static ConsumerGroupInfo readConsumerGroupInfo(ByteBuf response) {
var streamId = response.readUnsignedIntLE();
var topicId = response.readUnsignedIntLE();
var groupId = response.readUnsignedIntLE();
@@ -288,7 +292,7 @@ final class BytesDeserializer {
return new ConsumerGroupInfo(streamId, topicId, groupId);
}
- static UserInfoDetails readUserInfoDetails(ByteBuf response) {
+ public static UserInfoDetails readUserInfoDetails(ByteBuf response) {
var userInfo = readUserInfo(response);
Optional<Permissions> permissionsOptional = Optional.empty();
@@ -300,7 +304,7 @@ final class BytesDeserializer {
return new UserInfoDetails(userInfo, permissionsOptional);
}
- static Permissions readPermissions(ByteBuf response) {
+ public static Permissions readPermissions(ByteBuf response) {
var _permissionsLength = response.readUnsignedIntLE();
var globalPermissions = readGlobalPermissions(response);
Map<Long, StreamPermissions> streamPermissionsMap = new HashMap<>();
@@ -312,7 +316,7 @@ final class BytesDeserializer {
return new Permissions(globalPermissions, streamPermissionsMap);
}
- static StreamPermissions readStreamPermissions(ByteBuf response) {
+ public static StreamPermissions readStreamPermissions(ByteBuf response) {
var manageStream = response.readBoolean();
var readStream = response.readBoolean();
var manageTopics = response.readBoolean();
@@ -329,7 +333,7 @@ final class BytesDeserializer {
manageStream, readStream, manageTopics, readTopics,
pollMessages, sendMessages, topicPermissionsMap);
}
- static TopicPermissions readTopicPermissions(ByteBuf response) {
+ public static TopicPermissions readTopicPermissions(ByteBuf response) {
var manageTopic = response.readBoolean();
var readTopic = response.readBoolean();
var pollMessages = response.readBoolean();
@@ -337,7 +341,7 @@ final class BytesDeserializer {
return new TopicPermissions(manageTopic, readTopic, pollMessages,
sendMessages);
}
- static GlobalPermissions readGlobalPermissions(ByteBuf response) {
+ public static GlobalPermissions readGlobalPermissions(ByteBuf response) {
var manageServers = response.readBoolean();
var readServers = response.readBoolean();
var manageUsers = response.readBoolean();
@@ -361,7 +365,7 @@ final class BytesDeserializer {
sendMessages);
}
- static UserInfo readUserInfo(ByteBuf response) {
+ public static UserInfo readUserInfo(ByteBuf response) {
var userId = response.readUnsignedIntLE();
var createdAt = readU64AsBigInteger(response);
var statusCode = response.readByte();
@@ -372,14 +376,14 @@ final class BytesDeserializer {
return new UserInfo(userId, createdAt, status, username);
}
- static RawPersonalAccessToken readRawPersonalAccessToken(ByteBuf response)
{
+ public static RawPersonalAccessToken readRawPersonalAccessToken(ByteBuf
response) {
var tokenLength = response.readByte();
var token =
response.readCharSequence(tokenLength,
StandardCharsets.UTF_8).toString();
return new RawPersonalAccessToken(token);
}
- static PersonalAccessTokenInfo readPersonalAccessTokenInfo(ByteBuf
response) {
+ public static PersonalAccessTokenInfo readPersonalAccessTokenInfo(ByteBuf
response) {
var nameLength = response.readByte();
var name = response.readCharSequence(nameLength,
StandardCharsets.UTF_8).toString();
var expiry = readU64AsBigInteger(response);
@@ -387,11 +391,11 @@ final class BytesDeserializer {
return new PersonalAccessTokenInfo(name, expiryOptional);
}
- private static BigInteger readU64AsBigInteger(ByteBuf buffer) {
+ static BigInteger readU64AsBigInteger(ByteBuf buffer) {
var bytesArray = new byte[8];
buffer.readBytes(bytesArray, 0, 8);
ArrayUtils.reverse(bytesArray);
- return new BigInteger(bytesArray);
+ return new BigInteger(1, bytesArray);
}
private static BytesMessageId readBytesMessageId(ByteBuf buffer) {
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/BytesSerializer.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesSerializer.java
similarity index 83%
rename from
foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/BytesSerializer.java
rename to
foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesSerializer.java
index f95c0a622..267de8491 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/BytesSerializer.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesSerializer.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iggy.client.blocking.tcp;
+package org.apache.iggy.serde;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@@ -35,21 +35,26 @@ import org.apache.iggy.user.StreamPermissions;
import org.apache.iggy.user.TopicPermissions;
import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Optional;
+/**
+ * Unified serializer for both blocking and async clients.
+ * Provides serialization of domain objects to ByteBuf according to Iggy wire
protocol.
+ */
public final class BytesSerializer {
private BytesSerializer() {}
- static ByteBuf toBytes(Consumer consumer) {
+ public static ByteBuf toBytes(Consumer consumer) {
ByteBuf buffer = Unpooled.buffer();
buffer.writeByte(consumer.kind().asCode());
buffer.writeBytes(toBytes(consumer.id()));
return buffer;
}
- static ByteBuf toBytes(Identifier identifier) {
+ public static ByteBuf toBytes(Identifier identifier) {
if (identifier.getKind() == 1) {
ByteBuf buffer = Unpooled.buffer(6);
buffer.writeByte(1);
@@ -67,7 +72,7 @@ public final class BytesSerializer {
}
}
- static ByteBuf toBytes(Partitioning partitioning) {
+ public static ByteBuf toBytes(Partitioning partitioning) {
ByteBuf buffer = Unpooled.buffer(2 + partitioning.value().length);
buffer.writeByte(partitioning.kind().asCode());
buffer.writeByte(partitioning.value().length);
@@ -75,14 +80,14 @@ public final class BytesSerializer {
return buffer;
}
- static ByteBuf toBytes(Message message) {
+ public static ByteBuf toBytes(Message message) {
var buffer = Unpooled.buffer(MessageHeader.SIZE +
message.payload().length);
buffer.writeBytes(toBytes(message.header()));
buffer.writeBytes(message.payload());
return buffer;
}
- static ByteBuf toBytes(MessageHeader header) {
+ public static ByteBuf toBytes(MessageHeader header) {
var buffer = Unpooled.buffer(MessageHeader.SIZE);
buffer.writeBytes(toBytesAsU64(header.checksum()));
buffer.writeBytes(header.id().toBytes());
@@ -94,14 +99,14 @@ public final class BytesSerializer {
return buffer;
}
- static ByteBuf toBytes(PollingStrategy strategy) {
+ public static ByteBuf toBytes(PollingStrategy strategy) {
var buffer = Unpooled.buffer(9);
buffer.writeByte(strategy.kind().asCode());
buffer.writeBytes(toBytesAsU64(strategy.value()));
return buffer;
}
- static ByteBuf toBytes(Optional<Long> optionalLong) {
+ public static ByteBuf toBytes(Optional<Long> optionalLong) {
var buffer = Unpooled.buffer(5);
if (optionalLong.isPresent()) {
buffer.writeByte(1);
@@ -113,7 +118,7 @@ public final class BytesSerializer {
return buffer;
}
- static ByteBuf toBytes(Map<String, HeaderValue> headers) {
+ public static ByteBuf toBytes(Map<String, HeaderValue> headers) {
if (headers.isEmpty()) {
return Unpooled.EMPTY_BUFFER;
}
@@ -131,7 +136,7 @@ public final class BytesSerializer {
return buffer;
}
- static ByteBuf toBytes(Permissions permissions) {
+ public static ByteBuf toBytes(Permissions permissions) {
var buffer = Unpooled.buffer();
buffer.writeBytes(toBytes(permissions.global()));
if (permissions.streams().isEmpty()) {
@@ -149,7 +154,7 @@ public final class BytesSerializer {
return buffer;
}
- static ByteBuf toBytes(GlobalPermissions permissions) {
+ public static ByteBuf toBytes(GlobalPermissions permissions) {
var buffer = Unpooled.buffer();
buffer.writeBoolean(permissions.manageServers());
buffer.writeBoolean(permissions.readServers());
@@ -164,7 +169,7 @@ public final class BytesSerializer {
return buffer;
}
- static ByteBuf toBytes(StreamPermissions permissions) {
+ public static ByteBuf toBytes(StreamPermissions permissions) {
var buffer = Unpooled.buffer();
buffer.writeBoolean(permissions.manageStream());
buffer.writeBoolean(permissions.readStream());
@@ -187,7 +192,7 @@ public final class BytesSerializer {
return buffer;
}
- static ByteBuf toBytes(TopicPermissions permissions) {
+ public static ByteBuf toBytes(TopicPermissions permissions) {
var buffer = Unpooled.buffer();
buffer.writeBoolean(permissions.manageTopic());
buffer.writeBoolean(permissions.readTopic());
@@ -196,21 +201,21 @@ public final class BytesSerializer {
return buffer;
}
- static ByteBuf nameToBytes(String name) {
- ByteBuf buffer = Unpooled.buffer(1 + name.length());
- buffer.writeByte(name.length());
- buffer.writeBytes(name.getBytes());
+ public static ByteBuf toBytes(String value) {
+ ByteBuf buffer = Unpooled.buffer(1 + value.length());
+ buffer.writeByte(value.length());
+ buffer.writeBytes(value.getBytes(StandardCharsets.UTF_8));
return buffer;
}
- static ByteBuf toBytesAsU64(BigInteger value) {
+ public static ByteBuf toBytesAsU64(BigInteger value) {
if (value.signum() == -1) {
throw new IllegalArgumentException("Negative value cannot be
serialized to unsigned 64: " + value);
}
ByteBuf buffer = Unpooled.buffer(8, 8);
byte[] valueAsBytes = value.toByteArray();
- if (valueAsBytes.length > 9) {
- throw new IllegalArgumentException();
+ if (valueAsBytes.length > 9 || valueAsBytes.length == 9 &&
valueAsBytes[0] != 0) {
+ throw new IllegalArgumentException("Value too large for U64: " +
value);
}
ArrayUtils.reverse(valueAsBytes);
buffer.writeBytes(valueAsBytes, 0, Math.min(8, valueAsBytes.length));
@@ -226,8 +231,8 @@ public final class BytesSerializer {
}
ByteBuf buffer = Unpooled.buffer(16, 16);
byte[] valueAsBytes = value.toByteArray();
- if (valueAsBytes.length > 17) {
- throw new IllegalArgumentException();
+ if (valueAsBytes.length > 17 || valueAsBytes.length == 17 &&
valueAsBytes[0] != 0) {
+ throw new IllegalArgumentException("Value too large for U128: " +
value);
}
ArrayUtils.reverse(valueAsBytes);
buffer.writeBytes(valueAsBytes, 0, Math.min(16, valueAsBytes.length));
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/CommandCode.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/CommandCode.java
similarity index 96%
rename from
foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/CommandCode.java
rename to
foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/CommandCode.java
index 5f7d6b525..e9f1f7905 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/CommandCode.java
+++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/CommandCode.java
@@ -17,8 +17,12 @@
* under the License.
*/
-package org.apache.iggy.client.blocking.tcp;
+package org.apache.iggy.serde;
+/**
+ * TCP command codes for Iggy wire protocol.
+ * Used by both blocking and async TCP clients.
+ */
public interface CommandCode {
int getValue();
diff --git
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncPollMessageTest.java
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncPollMessageTest.java
index d2d905ea8..e2e6b979f 100644
---
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncPollMessageTest.java
+++
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncPollMessageTest.java
@@ -47,7 +47,6 @@ import java.util.concurrent.TimeoutException;
import java.util.stream.IntStream;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
/**
* Test class specifically for testing poll message functionality with
different consumer scenarios.
@@ -163,30 +162,6 @@ public class AsyncPollMessageTest {
}
}
- @Test
- @Order(1)
- @DisplayName("Poll with NULL consumer - Expected to timeout (server
doesn't respond)")
- void testPollWithNullConsumer() {
- log.info("TEST 1: Polling with NULL consumer");
-
- // This test demonstrates the issue: server doesn't respond to null
consumer
- assertThatThrownBy(() -> {
- client.messages()
- .pollMessagesAsync(
- StreamId.of(testStream),
- TopicId.of(TEST_TOPIC),
- Optional.of(PARTITION_ID),
- null, // NULL consumer causes server to
not respond
- PollingStrategy.offset(BigInteger.ZERO),
- 10L,
- false)
- .get(3, TimeUnit.SECONDS);
- })
- .isInstanceOf(TimeoutException.class);
-
- log.info("CONFIRMED: Null consumer causes timeout (server doesn't
respond)");
- }
-
@Test
@Order(2)
@DisplayName("Poll with various consumer IDs")
diff --git
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/tcp/BytesSerializerTest.java
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/tcp/BytesSerializerTest.java
index b864e1cc6..2e035979e 100644
---
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/tcp/BytesSerializerTest.java
+++
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/tcp/BytesSerializerTest.java
@@ -21,6 +21,7 @@ package org.apache.iggy.client.blocking.tcp;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import org.apache.iggy.serde.BytesSerializer;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
@@ -71,6 +72,17 @@ class BytesSerializerTest {
// when & then
assertThatThrownBy(() ->
BytesSerializer.toBytesAsU64(value)).isInstanceOf(IllegalArgumentException.class);
}
+
+ @Test
+ void shouldFailForValueLargerThanU64() {
+ // given
+ long maxLong = 0xFFFF_FFFF_FFFF_FFFFL;
+ var maxU64 = new BigInteger(Long.toUnsignedString(maxLong));
+ var value = maxU64.add(BigInteger.ONE);
+
+ // when & then
+ assertThatThrownBy(() ->
BytesSerializer.toBytesAsU64(value)).isInstanceOf(IllegalArgumentException.class);
+ }
}
@Nested
@@ -113,5 +125,17 @@ class BytesSerializerTest {
// when & then
assertThatThrownBy(() ->
BytesSerializer.toBytesAsU128(value)).isInstanceOf(IllegalArgumentException.class);
}
+
+ @Test
+ void shouldFailForValueLargerThanU128() {
+ // given
+ byte[] maxU128 = new byte[17];
+ Arrays.fill(maxU128, 1, 17, (byte) 0xFF);
+ var maxU128Value = new BigInteger(maxU128);
+ var value = maxU128Value.add(BigInteger.ONE);
+
+ // when & then
+ assertThatThrownBy(() ->
BytesSerializer.toBytesAsU128(value)).isInstanceOf(IllegalArgumentException.class);
+ }
}
}
diff --git
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/serde/BytesDeserializerTest.java
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/serde/BytesDeserializerTest.java
new file mode 100644
index 000000000..42e8b0158
--- /dev/null
+++
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/serde/BytesDeserializerTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.serde;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.commons.lang3.ArrayUtils;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigInteger;
+import java.util.HexFormat;
+
+import static org.apache.iggy.serde.BytesDeserializer.readU64AsBigInteger;
+import static org.assertj.core.api.Assertions.assertThat;
+
+class BytesDeserializerTest {
+
+ @Nested
+ class U64 {
+
+ @Test
+ void shouldDeserializeMaxValue() {
+ // given
+ long maxLong = 0xFFFF_FFFF_FFFF_FFFFL;
+ ByteBuf buffer = Unpooled.copyLong(maxLong);
+ var expectedMaxU64 = new
BigInteger(Long.toUnsignedString(maxLong));
+
+ // when
+ BigInteger result = readU64AsBigInteger(buffer);
+
+ // then
+ assertThat(result).isEqualTo(expectedMaxU64);
+ }
+
+ @Test
+ void shouldDeserializeZero() {
+ // given
+ ByteBuf buffer = Unpooled.buffer(8);
+ buffer.writeZero(8);
+
+ // when
+ BigInteger result = readU64AsBigInteger(buffer);
+
+ // then
+ assertThat(result).isEqualTo(BigInteger.ZERO);
+ }
+
+ @Test
+ void shouldDeserializeArbitraryValue() {
+ // given
+ byte[] bytes = HexFormat.of().parseHex("8000000000000000");
+ var expected = new BigInteger(1, bytes);
+ ArrayUtils.reverse(bytes);
+ ByteBuf buffer = Unpooled.wrappedBuffer(bytes);
+
+ // when
+ BigInteger result = readU64AsBigInteger(buffer);
+
+ // then
+ assertThat(result).isEqualTo(expected);
+ }
+ }
+}