This is an automated email from the ASF dual-hosted git repository. maciej pushed a commit to branch java-serde in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 156191450a252994197de454b4216706cc9006a5 Author: Maciej Modzelewski <[email protected]> AuthorDate: Fri Dec 12 10:30:17 2025 +0100 refactor(java): unify deserialization and serialization code for tcp clients --- .../client/async/tcp/AsyncBytesDeserializer.java | 210 ------------------ .../client/async/tcp/AsyncBytesSerializer.java | 237 --------------------- .../client/async/tcp/ConsumerGroupsTcpClient.java | 15 +- .../iggy/client/async/tcp/MessagesTcpClient.java | 8 +- .../iggy/client/async/tcp/StreamsTcpClient.java | 19 +- .../iggy/client/async/tcp/TopicsTcpClient.java | 19 +- .../iggy/client/async/tcp/UsersTcpClient.java | 7 +- .../blocking/tcp/ConsumerGroupsTcpClient.java | 8 +- .../blocking/tcp/ConsumerOffsetTcpClient.java | 6 +- .../client/blocking/tcp/InternalTcpClient.java | 1 + .../client/blocking/tcp/MessagesTcpClient.java | 4 +- .../client/blocking/tcp/PartitionsTcpClient.java | 3 +- .../tcp/PersonalAccessTokensTcpClient.java | 12 +- .../iggy/client/blocking/tcp/StreamsTcpClient.java | 10 +- .../iggy/client/blocking/tcp/SystemTcpClient.java | 2 + .../iggy/client/blocking/tcp/TopicsTcpClient.java | 12 +- .../iggy/client/blocking/tcp/UsersTcpClient.java | 19 +- .../apache/iggy/message/BigIntegerMessageId.java | 2 +- .../blocking/tcp => serde}/BytesDeserializer.java | 44 ++-- .../blocking/tcp => serde}/BytesSerializer.java | 45 ++-- .../blocking/tcp => serde}/CommandCode.java | 6 +- .../iggy/client/async/AsyncPollMessageTest.java | 25 --- .../client/blocking/tcp/BytesSerializerTest.java | 24 +++ 23 files changed, 162 insertions(+), 576 deletions(-) diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncBytesDeserializer.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncBytesDeserializer.java deleted file mode 100644 index ae3037f18..000000000 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncBytesDeserializer.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iggy.client.async.tcp; - -import io.netty.buffer.ByteBuf; -import org.apache.commons.lang3.ArrayUtils; -import org.apache.iggy.message.BytesMessageId; -import org.apache.iggy.message.Message; -import org.apache.iggy.message.MessageHeader; -import org.apache.iggy.message.PolledMessages; -import org.apache.iggy.partition.Partition; -import org.apache.iggy.stream.StreamBase; -import org.apache.iggy.stream.StreamDetails; -import org.apache.iggy.topic.CompressionAlgorithm; -import org.apache.iggy.topic.Topic; -import org.apache.iggy.topic.TopicDetails; - -import java.math.BigInteger; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; - -/** - * Async version of BytesDeserializer for the async package. - * Provides the same wire protocol deserialization as the blocking version. - */ -public final class AsyncBytesDeserializer { - - private AsyncBytesDeserializer() {} - - /** - * Reads PolledMessages from the response buffer. - * @param response The ByteBuf containing the response data - * @return The deserialized PolledMessages - */ - public static PolledMessages readPolledMessages(ByteBuf response) { - var partitionId = response.readUnsignedIntLE(); - var currentOffset = readU64AsBigInteger(response); - var messagesCount = response.readUnsignedIntLE(); - var messages = new ArrayList<Message>(); - while (response.isReadable()) { - messages.add(readPolledMessage(response)); - } - return new PolledMessages(partitionId, currentOffset, messagesCount, messages); - } - - /** - * Reads a single Message from the buffer. - */ - private static Message readPolledMessage(ByteBuf response) { - var checksum = readU64AsBigInteger(response); - var id = readBytesMessageId(response); - var offset = readU64AsBigInteger(response); - var timestamp = readU64AsBigInteger(response); - var originTimestamp = readU64AsBigInteger(response); - var userHeadersLength = response.readUnsignedIntLE(); - var payloadLength = response.readUnsignedIntLE(); - var header = - new MessageHeader(checksum, id, offset, timestamp, originTimestamp, userHeadersLength, payloadLength); - var payload = new byte[toInt(payloadLength)]; - response.readBytes(payload); - // TODO: Add support for user headers. - return new Message(header, payload, Optional.empty()); - } - - /** - * Reads an unsigned 64-bit integer as BigInteger. - */ - private static BigInteger readU64AsBigInteger(ByteBuf buffer) { - var bytesArray = new byte[8]; - buffer.readBytes(bytesArray, 0, 8); - ArrayUtils.reverse(bytesArray); - // Ensure it's treated as unsigned - byte[] unsigned = new byte[9]; - System.arraycopy(bytesArray, 0, unsigned, 1, 8); - return new BigInteger(unsigned); - } - - /** - * Reads a 16-byte message ID. - */ - private static BytesMessageId readBytesMessageId(ByteBuf buffer) { - var bytesArray = new byte[16]; - buffer.readBytes(bytesArray); - ArrayUtils.reverse(bytesArray); - return new BytesMessageId(bytesArray); - } - - /** - * Converts a long to int safely. - */ - private static int toInt(long value) { - if (value > Integer.MAX_VALUE) { - throw new IllegalArgumentException("Value too large for int: " + value); - } - return (int) value; - } - - /** - * Reads StreamBase from the buffer. - * @param response The ByteBuf containing the response data - * @return The deserialized StreamBase - */ - public static StreamBase readStreamBase(ByteBuf response) { - var streamId = response.readUnsignedIntLE(); - var createdAt = readU64AsBigInteger(response); - var topicsCount = response.readUnsignedIntLE(); - var size = readU64AsBigInteger(response); - var messagesCount = readU64AsBigInteger(response); - var nameLength = response.readByte(); - var name = response.readCharSequence(nameLength, StandardCharsets.UTF_8).toString(); - - return new StreamBase(streamId, createdAt, name, size.toString(), messagesCount, topicsCount); - } - - /** - * Reads StreamDetails from the buffer. - * @param response The ByteBuf containing the response data - * @return The deserialized StreamDetails - */ - public static StreamDetails readStreamDetails(ByteBuf response) { - var streamBase = readStreamBase(response); - - List<Topic> topics = new ArrayList<>(); - while (response.isReadable()) { - topics.add(readTopic(response)); - } - - return new StreamDetails(streamBase, topics); - } - - /** - * Reads Topic from the buffer. - * @param response The ByteBuf containing the response data - * @return The deserialized Topic - */ - public static Topic readTopic(ByteBuf response) { - var topicId = response.readUnsignedIntLE(); - var createdAt = readU64AsBigInteger(response); - var partitionsCount = response.readUnsignedIntLE(); - var messageExpiry = readU64AsBigInteger(response); - var compressionAlgorithmCode = response.readByte(); - var maxTopicSize = readU64AsBigInteger(response); - var replicationFactor = response.readByte(); - var size = readU64AsBigInteger(response); - var messagesCount = readU64AsBigInteger(response); - var nameLength = response.readByte(); - var name = response.readCharSequence(nameLength, StandardCharsets.UTF_8).toString(); - - return new Topic( - topicId, - createdAt, - name, - size.toString(), - messageExpiry, - CompressionAlgorithm.fromCode(compressionAlgorithmCode), - maxTopicSize, - (short) replicationFactor, - messagesCount, - partitionsCount); - } - - /** - * Reads TopicDetails from the buffer. - * @param response The ByteBuf containing the response data - * @return The deserialized TopicDetails - */ - public static TopicDetails readTopicDetails(ByteBuf response) { - var topic = readTopic(response); - - List<Partition> partitions = new ArrayList<>(); - while (response.isReadable()) { - partitions.add(readPartition(response)); - } - - return new TopicDetails(topic, partitions); - } - - /** - * Reads Partition from the buffer. - */ - private static Partition readPartition(ByteBuf response) { - var partitionId = response.readUnsignedIntLE(); - var createdAt = readU64AsBigInteger(response); - var segmentsCount = response.readUnsignedIntLE(); - var currentOffset = readU64AsBigInteger(response); - var size = readU64AsBigInteger(response); - var messagesCount = readU64AsBigInteger(response); - - return new Partition(partitionId, createdAt, segmentsCount, currentOffset, size.toString(), messagesCount); - } -} diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncBytesSerializer.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncBytesSerializer.java deleted file mode 100644 index 7b1c87bf2..000000000 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncBytesSerializer.java +++ /dev/null @@ -1,237 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iggy.client.async.tcp; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import org.apache.commons.lang3.ArrayUtils; -import org.apache.iggy.consumergroup.Consumer; -import org.apache.iggy.identifier.Identifier; -import org.apache.iggy.message.Message; -import org.apache.iggy.message.MessageHeader; -import org.apache.iggy.message.Partitioning; -import org.apache.iggy.message.PollingStrategy; - -import java.math.BigInteger; -import java.nio.charset.StandardCharsets; -import java.util.Optional; - -/** - * Async version of BytesSerializer for the async package. - * Provides the same wire protocol serialization as the blocking version. - */ -public final class AsyncBytesSerializer { - - private AsyncBytesSerializer() {} - - /** - * Serializes a Consumer to bytes. - * - * @param consumer The consumer to serialize (can be null) - * @return ByteBuf containing the serialized consumer - */ - public static ByteBuf toBytes(Consumer consumer) { - ByteBuf buffer = Unpooled.buffer(); - if (consumer == null) { - // No consumer - use code 0 (1 byte) and empty identifier (4 bytes) - buffer.writeByte(0); - buffer.writeIntLE(0); - } else { - buffer.writeByte(consumer.kind().asCode()); - buffer.writeBytes(toBytes(consumer.id())); - } - return buffer; - } - - /** - * Serializes an Identifier to bytes. - * - * @param identifier The identifier to serialize - * @return ByteBuf containing the serialized identifier - */ - public static ByteBuf toBytes(Identifier identifier) { - if (identifier.getKind() == 1) { - ByteBuf buffer = Unpooled.buffer(6); - buffer.writeByte(1); - buffer.writeByte(4); - buffer.writeIntLE(identifier.getId().intValue()); - return buffer; - } else if (identifier.getKind() == 2) { - ByteBuf buffer = Unpooled.buffer(2 + identifier.getName().length()); - buffer.writeByte(2); - buffer.writeByte(identifier.getName().length()); - buffer.writeBytes(identifier.getName().getBytes()); - return buffer; - } else { - throw new IllegalArgumentException("Unknown identifier kind: " + identifier.getKind()); - } - } - - /** - * Serializes a String to bytes with length prefix. - * - * @param str The string to serialize - * @return ByteBuf containing the serialized string - */ - public static ByteBuf toBytes(String str) { - ByteBuf buffer = Unpooled.buffer(1 + str.length()); - buffer.writeByte(str.length()); - buffer.writeBytes(str.getBytes()); - return buffer; - } - - /** - * Serializes a Partitioning to bytes. - * - * @param partitioning The partitioning to serialize - * @return ByteBuf containing the serialized partitioning - */ - public static ByteBuf toBytes(Partitioning partitioning) { - ByteBuf buffer = Unpooled.buffer(2 + partitioning.value().length); - buffer.writeByte(partitioning.kind().asCode()); - buffer.writeByte(partitioning.value().length); - buffer.writeBytes(partitioning.value()); - return buffer; - } - - /** - * Serializes a Message to bytes. - * - * @param message The message to serialize - * @return ByteBuf containing the serialized message - */ - public static ByteBuf toBytes(Message message) { - var buffer = Unpooled.buffer(MessageHeader.SIZE + message.payload().length); - buffer.writeBytes(toBytes(message.header())); - buffer.writeBytes(message.payload()); - return buffer; - } - - /** - * Serializes a MessageHeader to bytes. - * - * @param header The message header to serialize - * @return ByteBuf containing the serialized header - */ - public static ByteBuf toBytes(MessageHeader header) { - var buffer = Unpooled.buffer(MessageHeader.SIZE); - buffer.writeBytes(toBytesAsU64(header.checksum())); - // Convert MessageId to BigInteger and serialize as U128 - buffer.writeBytes(toBytesAsU128(header.id().toBigInteger())); - buffer.writeBytes(toBytesAsU64(header.offset())); - buffer.writeBytes(toBytesAsU64(header.timestamp())); - buffer.writeBytes(toBytesAsU64(header.originTimestamp())); - buffer.writeIntLE(header.userHeadersLength().intValue()); - buffer.writeIntLE(header.payloadLength().intValue()); - return buffer; - } - - /** - * Serializes a PollingStrategy to bytes. - * - * @param strategy The polling strategy to serialize - * @return ByteBuf containing the serialized strategy - */ - public static ByteBuf toBytes(PollingStrategy strategy) { - var buffer = Unpooled.buffer(9); - buffer.writeByte(strategy.kind().asCode()); - buffer.writeBytes(toBytesAsU64(strategy.value())); - return buffer; - } - - static ByteBuf toBytes(Optional<Long> optionalLong) { - var buffer = Unpooled.buffer(5); - if (optionalLong.isPresent()) { - buffer.writeByte(1); - buffer.writeIntLE(optionalLong.get().intValue()); - } else { - buffer.writeByte(0); - buffer.writeIntLE(0); - } - return buffer; - } - - /** - * Converts a BigInteger to bytes as unsigned 64-bit integer. - * - * @param value The BigInteger value to convert - * @return ByteBuf containing the value as 8 bytes in little-endian format - */ - public static ByteBuf toBytesAsU64(BigInteger value) { - if (value.signum() == -1) { - throw new IllegalArgumentException("Negative value cannot be serialized to unsigned 64: " + value); - } - ByteBuf buffer = Unpooled.buffer(8); - byte[] valueAsBytes = value.toByteArray(); - if (valueAsBytes.length > 9) { - throw new IllegalArgumentException("Value too large for U64: " + value); - } - // Handle sign byte if present - if (valueAsBytes.length == 9 && valueAsBytes[0] == 0) { - valueAsBytes = ArrayUtils.subarray(valueAsBytes, 1, 9); - } - ArrayUtils.reverse(valueAsBytes); - buffer.writeBytes(valueAsBytes, 0, Math.min(8, valueAsBytes.length)); - if (valueAsBytes.length < 8) { - buffer.writeZero(8 - valueAsBytes.length); - } - return buffer; - } - - /** - * Converts a name string to bytes with length prefix. - * - * @param name The name string to convert - * @return ByteBuf containing the serialized name - */ - public static ByteBuf nameToBytes(String name) { - var buffer = Unpooled.buffer(1 + name.length()); - buffer.writeByte(name.length()); - buffer.writeBytes(name.getBytes(StandardCharsets.UTF_8)); - return buffer; - } - - /** - * Converts a BigInteger to bytes as unsigned 128-bit integer. - * - * @param value The BigInteger value to convert - * @return ByteBuf containing the value as 16 bytes in little-endian format - */ - public static ByteBuf toBytesAsU128(BigInteger value) { - if (value.signum() == -1) { - throw new IllegalArgumentException("Negative value cannot be serialized to unsigned 128: " + value); - } - ByteBuf buffer = Unpooled.buffer(16, 16); - byte[] valueAsBytes = value.toByteArray(); - if (valueAsBytes.length > 17) { - throw new IllegalArgumentException("Value too large for U128: " + value); - } - // Remove leading zero byte if present (from positive sign bit) - if (valueAsBytes.length == 17 && valueAsBytes[0] == 0) { - valueAsBytes = ArrayUtils.subarray(valueAsBytes, 1, 17); - } - ArrayUtils.reverse(valueAsBytes); - buffer.writeBytes(valueAsBytes, 0, Math.min(16, valueAsBytes.length)); - if (valueAsBytes.length < 16) { - buffer.writeZero(16 - valueAsBytes.length); - } - return buffer; - } -} diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/ConsumerGroupsTcpClient.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/ConsumerGroupsTcpClient.java index 06e778871..e671539f8 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/ConsumerGroupsTcpClient.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/ConsumerGroupsTcpClient.java @@ -21,10 +21,11 @@ package org.apache.iggy.client.async.tcp; import io.netty.buffer.Unpooled; import org.apache.iggy.client.async.ConsumerGroupsClient; -import org.apache.iggy.client.blocking.tcp.CommandCode; import org.apache.iggy.identifier.ConsumerId; import org.apache.iggy.identifier.StreamId; import org.apache.iggy.identifier.TopicId; +import org.apache.iggy.serde.BytesSerializer; +import org.apache.iggy.serde.CommandCode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,13 +48,13 @@ public class ConsumerGroupsTcpClient implements ConsumerGroupsClient { var payload = Unpooled.buffer(); // Serialize stream ID - payload.writeBytes(AsyncBytesSerializer.toBytes(streamId)); + payload.writeBytes(BytesSerializer.toBytes(streamId)); // Serialize topic ID - payload.writeBytes(AsyncBytesSerializer.toBytes(topicId)); + payload.writeBytes(BytesSerializer.toBytes(topicId)); // Serialize consumer group ID - payload.writeBytes(AsyncBytesSerializer.toBytes(groupId)); + payload.writeBytes(BytesSerializer.toBytes(groupId)); log.debug("Joining consumer group - Stream: {}, Topic: {}, Group: {}", streamId, topicId, groupId); @@ -70,13 +71,13 @@ public class ConsumerGroupsTcpClient implements ConsumerGroupsClient { var payload = Unpooled.buffer(); // Serialize stream ID - payload.writeBytes(AsyncBytesSerializer.toBytes(streamId)); + payload.writeBytes(BytesSerializer.toBytes(streamId)); // Serialize topic ID - payload.writeBytes(AsyncBytesSerializer.toBytes(topicId)); + payload.writeBytes(BytesSerializer.toBytes(topicId)); // Serialize consumer group ID - payload.writeBytes(AsyncBytesSerializer.toBytes(groupId)); + payload.writeBytes(BytesSerializer.toBytes(groupId)); log.debug("Leaving consumer group - Stream: {}, Topic: {}, Group: {}", streamId, topicId, groupId); diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/MessagesTcpClient.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/MessagesTcpClient.java index 9e2abec50..74fdc49c2 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/MessagesTcpClient.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/MessagesTcpClient.java @@ -21,7 +21,6 @@ package org.apache.iggy.client.async.tcp; import io.netty.buffer.Unpooled; import org.apache.iggy.client.async.MessagesClient; -import org.apache.iggy.client.blocking.tcp.CommandCode; import org.apache.iggy.consumergroup.Consumer; import org.apache.iggy.identifier.StreamId; import org.apache.iggy.identifier.TopicId; @@ -29,12 +28,14 @@ import org.apache.iggy.message.Message; import org.apache.iggy.message.Partitioning; import org.apache.iggy.message.PolledMessages; import org.apache.iggy.message.PollingStrategy; +import org.apache.iggy.serde.BytesDeserializer; +import org.apache.iggy.serde.CommandCode; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import static org.apache.iggy.client.async.tcp.AsyncBytesSerializer.toBytes; +import static org.apache.iggy.serde.BytesSerializer.toBytes; /** * Async TCP implementation of MessagesClient using Netty for non-blocking I/O. @@ -57,7 +58,6 @@ public class MessagesTcpClient implements MessagesClient { Long count, boolean autoCommit) { - // Build the request payload var payload = Unpooled.buffer(); var consumerBytes = toBytes(consumer); @@ -83,7 +83,7 @@ public class MessagesTcpClient implements MessagesClient { .sendAsync(CommandCode.Messages.POLL.getValue(), payload) .thenApply(response -> { try { - return AsyncBytesDeserializer.readPolledMessages(response); + return BytesDeserializer.readPolledMessages(response); } finally { response.release(); } diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/StreamsTcpClient.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/StreamsTcpClient.java index fdd8ba01b..c9e10ee26 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/StreamsTcpClient.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/StreamsTcpClient.java @@ -20,9 +20,11 @@ package org.apache.iggy.client.async.tcp; import io.netty.buffer.Unpooled; +import io.netty.util.ReferenceCounted; import org.apache.iggy.client.async.StreamsClient; -import org.apache.iggy.client.blocking.tcp.CommandCode; import org.apache.iggy.identifier.StreamId; +import org.apache.iggy.serde.BytesSerializer; +import org.apache.iggy.serde.CommandCode; import org.apache.iggy.stream.StreamBase; import org.apache.iggy.stream.StreamDetails; @@ -31,10 +33,9 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import static org.apache.iggy.client.async.tcp.AsyncBytesDeserializer.readStreamBase; -import static org.apache.iggy.client.async.tcp.AsyncBytesDeserializer.readStreamDetails; -import static org.apache.iggy.client.async.tcp.AsyncBytesSerializer.nameToBytes; -import static org.apache.iggy.client.async.tcp.AsyncBytesSerializer.toBytes; +import static org.apache.iggy.serde.BytesDeserializer.readStreamBase; +import static org.apache.iggy.serde.BytesDeserializer.readStreamDetails; +import static org.apache.iggy.serde.BytesSerializer.toBytes; /** * Async TCP implementation of StreamsClient using Netty for non-blocking I/O. @@ -52,7 +53,7 @@ public class StreamsTcpClient implements StreamsClient { var payloadSize = 1 + name.length(); var payload = Unpooled.buffer(payloadSize); - payload.writeBytes(nameToBytes(name)); + payload.writeBytes(BytesSerializer.toBytes(name)); return connection .sendAsync(CommandCode.Stream.CREATE.getValue(), payload) @@ -100,11 +101,11 @@ public class StreamsTcpClient implements StreamsClient { var payload = Unpooled.buffer(payloadSize + idBytes.capacity()); payload.writeBytes(idBytes); - payload.writeBytes(nameToBytes(name)); + payload.writeBytes(BytesSerializer.toBytes(name)); return connection .sendAsync(CommandCode.Stream.UPDATE.getValue(), payload) - .thenAccept(response -> response.release()); + .thenAccept(ReferenceCounted::release); } @Override @@ -113,6 +114,6 @@ public class StreamsTcpClient implements StreamsClient { return connection .sendAsync(CommandCode.Stream.DELETE.getValue(), payload) - .thenAccept(response -> response.release()); + .thenAccept(ReferenceCounted::release); } } diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/TopicsTcpClient.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/TopicsTcpClient.java index 5128039d2..80e73b0e4 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/TopicsTcpClient.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/TopicsTcpClient.java @@ -21,9 +21,11 @@ package org.apache.iggy.client.async.tcp; import io.netty.buffer.Unpooled; import org.apache.iggy.client.async.TopicsClient; -import org.apache.iggy.client.blocking.tcp.CommandCode; import org.apache.iggy.identifier.StreamId; import org.apache.iggy.identifier.TopicId; +import org.apache.iggy.serde.BytesDeserializer; +import org.apache.iggy.serde.BytesSerializer; +import org.apache.iggy.serde.CommandCode; import org.apache.iggy.topic.CompressionAlgorithm; import org.apache.iggy.topic.Topic; import org.apache.iggy.topic.TopicDetails; @@ -34,9 +36,8 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import static org.apache.iggy.client.async.tcp.AsyncBytesSerializer.nameToBytes; -import static org.apache.iggy.client.async.tcp.AsyncBytesSerializer.toBytes; -import static org.apache.iggy.client.async.tcp.AsyncBytesSerializer.toBytesAsU64; +import static org.apache.iggy.serde.BytesSerializer.toBytes; +import static org.apache.iggy.serde.BytesSerializer.toBytesAsU64; /** * Async TCP implementation of TopicsClient using Netty for non-blocking I/O. @@ -58,7 +59,7 @@ public class TopicsTcpClient implements TopicsClient { return connection.sendAsync(CommandCode.Topic.GET.getValue(), payload).thenApply(response -> { try { if (response.isReadable()) { - return Optional.of(AsyncBytesDeserializer.readTopicDetails(response)); + return Optional.of(BytesDeserializer.readTopicDetails(response)); } return Optional.<TopicDetails>empty(); } finally { @@ -77,7 +78,7 @@ public class TopicsTcpClient implements TopicsClient { try { List<Topic> topics = new ArrayList<>(); while (response.isReadable()) { - topics.add(AsyncBytesDeserializer.readTopic(response)); + topics.add(BytesDeserializer.readTopic(response)); } return topics; } finally { @@ -105,13 +106,13 @@ public class TopicsTcpClient implements TopicsClient { payload.writeBytes(toBytesAsU64(messageExpiry)); payload.writeBytes(toBytesAsU64(maxTopicSize)); payload.writeByte(replicationFactor.orElse((short) 0)); - payload.writeBytes(nameToBytes(name)); + payload.writeBytes(BytesSerializer.toBytes(name)); return connection .sendAsync(CommandCode.Topic.CREATE.getValue(), payload) .thenApply(response -> { try { - return AsyncBytesDeserializer.readTopicDetails(response); + return BytesDeserializer.readTopicDetails(response); } finally { response.release(); } @@ -135,7 +136,7 @@ public class TopicsTcpClient implements TopicsClient { payload.writeBytes(toBytesAsU64(messageExpiry)); payload.writeBytes(toBytesAsU64(maxTopicSize)); payload.writeByte(replicationFactor.orElse((short) 0)); - payload.writeBytes(nameToBytes(name)); + payload.writeBytes(BytesSerializer.toBytes(name)); return connection .sendAsync(CommandCode.Topic.UPDATE.getValue(), payload) diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/UsersTcpClient.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/UsersTcpClient.java index 00d8f31db..8f7ded490 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/UsersTcpClient.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/UsersTcpClient.java @@ -21,7 +21,8 @@ package org.apache.iggy.client.async.tcp; import io.netty.buffer.Unpooled; import org.apache.iggy.client.async.UsersClient; -import org.apache.iggy.client.blocking.tcp.CommandCode; +import org.apache.iggy.serde.BytesSerializer; +import org.apache.iggy.serde.CommandCode; import org.apache.iggy.user.IdentityInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,8 +48,8 @@ public class UsersTcpClient implements UsersClient { String context = "java-sdk"; var payload = Unpooled.buffer(); - var usernameBytes = AsyncBytesSerializer.toBytes(username); - var passwordBytes = AsyncBytesSerializer.toBytes(password); + var usernameBytes = BytesSerializer.toBytes(username); + var passwordBytes = BytesSerializer.toBytes(password); payload.writeBytes(usernameBytes); payload.writeBytes(passwordBytes); diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerGroupsTcpClient.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerGroupsTcpClient.java index bf2fb9366..7868d1dfb 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerGroupsTcpClient.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerGroupsTcpClient.java @@ -26,12 +26,14 @@ import org.apache.iggy.consumergroup.ConsumerGroupDetails; import org.apache.iggy.identifier.ConsumerId; import org.apache.iggy.identifier.StreamId; import org.apache.iggy.identifier.TopicId; +import org.apache.iggy.serde.BytesDeserializer; +import org.apache.iggy.serde.BytesSerializer; +import org.apache.iggy.serde.CommandCode; import java.util.List; import java.util.Optional; -import static org.apache.iggy.client.blocking.tcp.BytesSerializer.nameToBytes; -import static org.apache.iggy.client.blocking.tcp.BytesSerializer.toBytes; +import static org.apache.iggy.serde.BytesSerializer.toBytes; class ConsumerGroupsTcpClient implements ConsumerGroupsClient { @@ -66,7 +68,7 @@ class ConsumerGroupsTcpClient implements ConsumerGroupsClient { payload.writeBytes(streamIdBytes); payload.writeBytes(topicIdBytes); - payload.writeBytes(nameToBytes(name)); + payload.writeBytes(BytesSerializer.toBytes(name)); return tcpClient.exchangeForEntity( CommandCode.ConsumerGroup.CREATE, payload, BytesDeserializer::readConsumerGroupDetails); diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerOffsetTcpClient.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerOffsetTcpClient.java index 2fe86ba6b..08ddd5f3b 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerOffsetTcpClient.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/ConsumerOffsetTcpClient.java @@ -24,12 +24,14 @@ import org.apache.iggy.consumergroup.Consumer; import org.apache.iggy.consumeroffset.ConsumerOffsetInfo; import org.apache.iggy.identifier.StreamId; import org.apache.iggy.identifier.TopicId; +import org.apache.iggy.serde.BytesDeserializer; +import org.apache.iggy.serde.CommandCode; import java.math.BigInteger; import java.util.Optional; -import static org.apache.iggy.client.blocking.tcp.BytesSerializer.toBytes; -import static org.apache.iggy.client.blocking.tcp.BytesSerializer.toBytesAsU64; +import static org.apache.iggy.serde.BytesSerializer.toBytes; +import static org.apache.iggy.serde.BytesSerializer.toBytesAsU64; class ConsumerOffsetTcpClient implements ConsumerOffsetsClient { diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/InternalTcpClient.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/InternalTcpClient.java index ab4bf684e..234a29455 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/InternalTcpClient.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/InternalTcpClient.java @@ -25,6 +25,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; +import org.apache.iggy.serde.CommandCode; import reactor.core.publisher.Mono; import reactor.netty.Connection; import reactor.netty.tcp.TcpClient; diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/MessagesTcpClient.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/MessagesTcpClient.java index 172d0c9e6..84e251d26 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/MessagesTcpClient.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/MessagesTcpClient.java @@ -28,11 +28,13 @@ import org.apache.iggy.message.Message; import org.apache.iggy.message.Partitioning; import org.apache.iggy.message.PolledMessages; import org.apache.iggy.message.PollingStrategy; +import org.apache.iggy.serde.BytesDeserializer; +import org.apache.iggy.serde.CommandCode; import java.util.List; import java.util.Optional; -import static org.apache.iggy.client.blocking.tcp.BytesSerializer.toBytes; +import static org.apache.iggy.serde.BytesSerializer.toBytes; class MessagesTcpClient implements MessagesClient { diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PartitionsTcpClient.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PartitionsTcpClient.java index 1fa634ad6..7678d7c6b 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PartitionsTcpClient.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PartitionsTcpClient.java @@ -22,8 +22,9 @@ package org.apache.iggy.client.blocking.tcp; import org.apache.iggy.client.blocking.PartitionsClient; import org.apache.iggy.identifier.StreamId; import org.apache.iggy.identifier.TopicId; +import org.apache.iggy.serde.CommandCode; -import static org.apache.iggy.client.blocking.tcp.BytesSerializer.toBytes; +import static org.apache.iggy.serde.BytesSerializer.toBytes; class PartitionsTcpClient implements PartitionsClient { diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PersonalAccessTokensTcpClient.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PersonalAccessTokensTcpClient.java index f0d0b22f3..b3535b888 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PersonalAccessTokensTcpClient.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/PersonalAccessTokensTcpClient.java @@ -23,14 +23,16 @@ import io.netty.buffer.Unpooled; import org.apache.iggy.client.blocking.PersonalAccessTokensClient; import org.apache.iggy.personalaccesstoken.PersonalAccessTokenInfo; import org.apache.iggy.personalaccesstoken.RawPersonalAccessToken; +import org.apache.iggy.serde.BytesDeserializer; +import org.apache.iggy.serde.CommandCode; import org.apache.iggy.user.IdentityInfo; import java.math.BigInteger; import java.util.List; import java.util.Optional; -import static org.apache.iggy.client.blocking.tcp.BytesSerializer.nameToBytes; -import static org.apache.iggy.client.blocking.tcp.BytesSerializer.toBytesAsU64; +import static org.apache.iggy.serde.BytesSerializer.toBytes; +import static org.apache.iggy.serde.BytesSerializer.toBytesAsU64; class PersonalAccessTokensTcpClient implements PersonalAccessTokensClient { @@ -43,7 +45,7 @@ class PersonalAccessTokensTcpClient implements PersonalAccessTokensClient { @Override public RawPersonalAccessToken createPersonalAccessToken(String name, BigInteger expiry) { var payload = Unpooled.buffer(); - payload.writeBytes(nameToBytes(name)); + payload.writeBytes(toBytes(name)); payload.writeBytes(toBytesAsU64(expiry)); return tcpClient.exchangeForEntity( CommandCode.PersonalAccessToken.CREATE, payload, BytesDeserializer::readRawPersonalAccessToken); @@ -57,13 +59,13 @@ class PersonalAccessTokensTcpClient implements PersonalAccessTokensClient { @Override public void deletePersonalAccessToken(String name) { - var payload = nameToBytes(name); + var payload = toBytes(name); tcpClient.send(CommandCode.PersonalAccessToken.DELETE, payload); } @Override public IdentityInfo loginWithPersonalAccessToken(String token) { - var payload = nameToBytes(token); + var payload = toBytes(token); return tcpClient.exchangeForEntity(CommandCode.PersonalAccessToken.LOGIN, payload, buf -> { var userId = buf.readUnsignedIntLE(); return new IdentityInfo(userId, Optional.empty()); diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/StreamsTcpClient.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/StreamsTcpClient.java index e7152ddcb..7605f164e 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/StreamsTcpClient.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/StreamsTcpClient.java @@ -22,14 +22,16 @@ package org.apache.iggy.client.blocking.tcp; import io.netty.buffer.Unpooled; import org.apache.iggy.client.blocking.StreamsClient; import org.apache.iggy.identifier.StreamId; +import org.apache.iggy.serde.BytesDeserializer; +import org.apache.iggy.serde.BytesSerializer; +import org.apache.iggy.serde.CommandCode; import org.apache.iggy.stream.StreamBase; import org.apache.iggy.stream.StreamDetails; import java.util.List; import java.util.Optional; -import static org.apache.iggy.client.blocking.tcp.BytesSerializer.nameToBytes; -import static org.apache.iggy.client.blocking.tcp.BytesSerializer.toBytes; +import static org.apache.iggy.serde.BytesSerializer.toBytes; class StreamsTcpClient implements StreamsClient { @@ -44,7 +46,7 @@ class StreamsTcpClient implements StreamsClient { var payloadSize = 1 + name.length(); var payload = Unpooled.buffer(payloadSize); - payload.writeBytes(nameToBytes(name)); + payload.writeBytes(BytesSerializer.toBytes(name)); return tcpClient.exchangeForEntity(CommandCode.Stream.CREATE, payload, BytesDeserializer::readStreamDetails); } @@ -66,7 +68,7 @@ class StreamsTcpClient implements StreamsClient { var payload = Unpooled.buffer(payloadSize + idBytes.capacity()); payload.writeBytes(idBytes); - payload.writeBytes(nameToBytes(name)); + payload.writeBytes(BytesSerializer.toBytes(name)); tcpClient.send(CommandCode.Stream.UPDATE, payload); } diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/SystemTcpClient.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/SystemTcpClient.java index 447f48b3f..70277b742 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/SystemTcpClient.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/SystemTcpClient.java @@ -21,6 +21,8 @@ package org.apache.iggy.client.blocking.tcp; import io.netty.buffer.Unpooled; import org.apache.iggy.client.blocking.SystemClient; +import org.apache.iggy.serde.BytesDeserializer; +import org.apache.iggy.serde.CommandCode; import org.apache.iggy.system.ClientInfo; import org.apache.iggy.system.ClientInfoDetails; import org.apache.iggy.system.Stats; diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/TopicsTcpClient.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/TopicsTcpClient.java index 1ccbd512a..01392dc55 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/TopicsTcpClient.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/TopicsTcpClient.java @@ -23,6 +23,9 @@ import io.netty.buffer.Unpooled; import org.apache.iggy.client.blocking.TopicsClient; import org.apache.iggy.identifier.StreamId; import org.apache.iggy.identifier.TopicId; +import org.apache.iggy.serde.BytesDeserializer; +import org.apache.iggy.serde.BytesSerializer; +import org.apache.iggy.serde.CommandCode; import org.apache.iggy.topic.CompressionAlgorithm; import org.apache.iggy.topic.Topic; import org.apache.iggy.topic.TopicDetails; @@ -31,9 +34,8 @@ import java.math.BigInteger; import java.util.List; import java.util.Optional; -import static org.apache.iggy.client.blocking.tcp.BytesSerializer.nameToBytes; -import static org.apache.iggy.client.blocking.tcp.BytesSerializer.toBytes; -import static org.apache.iggy.client.blocking.tcp.BytesSerializer.toBytesAsU64; +import static org.apache.iggy.serde.BytesSerializer.toBytes; +import static org.apache.iggy.serde.BytesSerializer.toBytesAsU64; class TopicsTcpClient implements TopicsClient { @@ -74,7 +76,7 @@ class TopicsTcpClient implements TopicsClient { payload.writeBytes(toBytesAsU64(messageExpiry)); payload.writeBytes(toBytesAsU64(maxTopicSize)); payload.writeByte(replicationFactor.orElse((short) 0)); - payload.writeBytes(nameToBytes(name)); + payload.writeBytes(BytesSerializer.toBytes(name)); return tcpClient.exchangeForEntity(CommandCode.Topic.CREATE, payload, BytesDeserializer::readTopicDetails); } @@ -95,7 +97,7 @@ class TopicsTcpClient implements TopicsClient { payload.writeBytes(toBytesAsU64(messageExpiry)); payload.writeBytes(toBytesAsU64(maxTopicSize)); payload.writeByte(replicationFactor.orElse((short) 0)); - payload.writeBytes(nameToBytes(name)); + payload.writeBytes(BytesSerializer.toBytes(name)); tcpClient.send(CommandCode.Topic.UPDATE, payload); } diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/UsersTcpClient.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/UsersTcpClient.java index 006de6586..6fb201526 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/UsersTcpClient.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/UsersTcpClient.java @@ -23,6 +23,8 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import org.apache.iggy.client.blocking.UsersClient; import org.apache.iggy.identifier.UserId; +import org.apache.iggy.serde.BytesDeserializer; +import org.apache.iggy.serde.CommandCode; import org.apache.iggy.user.IdentityInfo; import org.apache.iggy.user.Permissions; import org.apache.iggy.user.UserInfo; @@ -32,8 +34,7 @@ import org.apache.iggy.user.UserStatus; import java.util.List; import java.util.Optional; -import static org.apache.iggy.client.blocking.tcp.BytesSerializer.nameToBytes; -import static org.apache.iggy.client.blocking.tcp.BytesSerializer.toBytes; +import static org.apache.iggy.serde.BytesSerializer.toBytes; class UsersTcpClient implements UsersClient { @@ -58,8 +59,8 @@ class UsersTcpClient implements UsersClient { public UserInfoDetails createUser( String username, String password, UserStatus status, Optional<Permissions> permissions) { var payload = Unpooled.buffer(); - payload.writeBytes(nameToBytes(username)); - payload.writeBytes(nameToBytes(password)); + payload.writeBytes(toBytes(username)); + payload.writeBytes(toBytes(password)); payload.writeByte(status.asCode()); permissions.ifPresentOrElse( perms -> { @@ -85,7 +86,7 @@ class UsersTcpClient implements UsersClient { usernameOptional.ifPresentOrElse( (username) -> { payload.writeByte(1); - payload.writeBytes(nameToBytes(username)); + payload.writeBytes(toBytes(username)); }, () -> payload.writeByte(0)); statusOptional.ifPresentOrElse( @@ -117,8 +118,8 @@ class UsersTcpClient implements UsersClient { @Override public void changePassword(UserId userId, String currentPassword, String newPassword) { var payload = toBytes(userId); - payload.writeBytes(nameToBytes(currentPassword)); - payload.writeBytes(nameToBytes(newPassword)); + payload.writeBytes(toBytes(currentPassword)); + payload.writeBytes(toBytes(newPassword)); tcpClient.send(CommandCode.User.CHANGE_PASSWORD, payload); } @@ -130,8 +131,8 @@ class UsersTcpClient implements UsersClient { var payloadSize = 2 + username.length() + password.length() + 4 + version.length() + 4 + context.length(); var payload = Unpooled.buffer(payloadSize); - payload.writeBytes(nameToBytes(username)); - payload.writeBytes(nameToBytes(password)); + payload.writeBytes(toBytes(username)); + payload.writeBytes(toBytes(password)); payload.writeIntLE(version.length()); payload.writeBytes(version.getBytes()); payload.writeIntLE(context.length()); diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/BigIntegerMessageId.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/BigIntegerMessageId.java index 8e1b16691..d8b04e27d 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/BigIntegerMessageId.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/BigIntegerMessageId.java @@ -20,7 +20,7 @@ package org.apache.iggy.message; import io.netty.buffer.ByteBuf; -import org.apache.iggy.client.blocking.tcp.BytesSerializer; +import org.apache.iggy.serde.BytesSerializer; import java.math.BigInteger; diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/BytesDeserializer.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesDeserializer.java similarity index 91% rename from foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/BytesDeserializer.java rename to foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesDeserializer.java index b89f6fce9..67b36f170 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/BytesDeserializer.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesDeserializer.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iggy.client.blocking.tcp; +package org.apache.iggy.serde; import io.netty.buffer.ByteBuf; import org.apache.commons.lang3.ArrayUtils; @@ -57,11 +57,15 @@ import java.util.List; import java.util.Map; import java.util.Optional; -final class BytesDeserializer { +/** + * Unified deserializer for both blocking and async clients. + * Provides deserialization of ByteBuf to domain objects according to Iggy wire protocol. + */ +public final class BytesDeserializer { private BytesDeserializer() {} - static StreamBase readStreamBase(ByteBuf response) { + public static StreamBase readStreamBase(ByteBuf response) { var streamId = response.readUnsignedIntLE(); var createdAt = readU64AsBigInteger(response); var topicsCount = response.readUnsignedIntLE(); @@ -73,11 +77,11 @@ final class BytesDeserializer { return new StreamBase(streamId, createdAt, name, size.toString(), messagesCount, topicsCount); } - static StreamDetails readStreamDetails(ByteBuf response) { + public static StreamDetails readStreamDetails(ByteBuf response) { var streamBase = readStreamBase(response); List<Topic> topics = new ArrayList<>(); - if (response.isReadable()) { + while (response.isReadable()) { topics.add(readTopic(response)); } @@ -95,7 +99,7 @@ final class BytesDeserializer { return new TopicDetails(topic, partitions); } - static Partition readPartition(ByteBuf response) { + public static Partition readPartition(ByteBuf response) { var partitionId = response.readUnsignedIntLE(); var createdAt = readU64AsBigInteger(response); var segmentsCount = response.readUnsignedIntLE(); @@ -141,7 +145,7 @@ final class BytesDeserializer { return new ConsumerGroupDetails(consumerGroup, members); } - static ConsumerGroupMember readConsumerGroupMember(ByteBuf response) { + public static ConsumerGroupMember readConsumerGroupMember(ByteBuf response) { var memberId = response.readUnsignedIntLE(); var partitionsCount = response.readUnsignedIntLE(); List<Long> partitionIds = new ArrayList<>(); @@ -178,7 +182,7 @@ final class BytesDeserializer { return new PolledMessages(partitionId, currentOffset, messagesCount, messages); } - static Message readPolledMessage(ByteBuf response) { + public static Message readPolledMessage(ByteBuf response) { var checksum = readU64AsBigInteger(response); var id = readBytesMessageId(response); var offset = readU64AsBigInteger(response); @@ -194,7 +198,7 @@ final class BytesDeserializer { return new Message(header, payload, Optional.empty()); } - static Stats readStats(ByteBuf response) { + public static Stats readStats(ByteBuf response) { var processId = response.readUnsignedIntLE(); var cpuUsage = response.readFloatLE(); var totalCpuUsage = response.readFloatLE(); @@ -251,7 +255,7 @@ final class BytesDeserializer { kernelVersion); } - static ClientInfoDetails readClientInfoDetails(ByteBuf response) { + public static ClientInfoDetails readClientInfoDetails(ByteBuf response) { var clientInfo = readClientInfo(response); var consumerGroups = new ArrayList<ConsumerGroupInfo>(); for (int i = 0; i < clientInfo.consumerGroupsCount(); i++) { @@ -261,7 +265,7 @@ final class BytesDeserializer { return new ClientInfoDetails(clientInfo, consumerGroups); } - static ClientInfo readClientInfo(ByteBuf response) { + public static ClientInfo readClientInfo(ByteBuf response) { var clientId = response.readUnsignedIntLE(); var userId = response.readUnsignedIntLE(); var userIdOptional = Optional.<Long>empty(); @@ -280,7 +284,7 @@ final class BytesDeserializer { return new ClientInfo(clientId, userIdOptional, address, transportString, consumerGroupsCount); } - static ConsumerGroupInfo readConsumerGroupInfo(ByteBuf response) { + public static ConsumerGroupInfo readConsumerGroupInfo(ByteBuf response) { var streamId = response.readUnsignedIntLE(); var topicId = response.readUnsignedIntLE(); var groupId = response.readUnsignedIntLE(); @@ -288,7 +292,7 @@ final class BytesDeserializer { return new ConsumerGroupInfo(streamId, topicId, groupId); } - static UserInfoDetails readUserInfoDetails(ByteBuf response) { + public static UserInfoDetails readUserInfoDetails(ByteBuf response) { var userInfo = readUserInfo(response); Optional<Permissions> permissionsOptional = Optional.empty(); @@ -300,7 +304,7 @@ final class BytesDeserializer { return new UserInfoDetails(userInfo, permissionsOptional); } - static Permissions readPermissions(ByteBuf response) { + public static Permissions readPermissions(ByteBuf response) { var _permissionsLength = response.readUnsignedIntLE(); var globalPermissions = readGlobalPermissions(response); Map<Long, StreamPermissions> streamPermissionsMap = new HashMap<>(); @@ -312,7 +316,7 @@ final class BytesDeserializer { return new Permissions(globalPermissions, streamPermissionsMap); } - static StreamPermissions readStreamPermissions(ByteBuf response) { + public static StreamPermissions readStreamPermissions(ByteBuf response) { var manageStream = response.readBoolean(); var readStream = response.readBoolean(); var manageTopics = response.readBoolean(); @@ -329,7 +333,7 @@ final class BytesDeserializer { manageStream, readStream, manageTopics, readTopics, pollMessages, sendMessages, topicPermissionsMap); } - static TopicPermissions readTopicPermissions(ByteBuf response) { + public static TopicPermissions readTopicPermissions(ByteBuf response) { var manageTopic = response.readBoolean(); var readTopic = response.readBoolean(); var pollMessages = response.readBoolean(); @@ -337,7 +341,7 @@ final class BytesDeserializer { return new TopicPermissions(manageTopic, readTopic, pollMessages, sendMessages); } - static GlobalPermissions readGlobalPermissions(ByteBuf response) { + public static GlobalPermissions readGlobalPermissions(ByteBuf response) { var manageServers = response.readBoolean(); var readServers = response.readBoolean(); var manageUsers = response.readBoolean(); @@ -361,7 +365,7 @@ final class BytesDeserializer { sendMessages); } - static UserInfo readUserInfo(ByteBuf response) { + public static UserInfo readUserInfo(ByteBuf response) { var userId = response.readUnsignedIntLE(); var createdAt = readU64AsBigInteger(response); var statusCode = response.readByte(); @@ -372,14 +376,14 @@ final class BytesDeserializer { return new UserInfo(userId, createdAt, status, username); } - static RawPersonalAccessToken readRawPersonalAccessToken(ByteBuf response) { + public static RawPersonalAccessToken readRawPersonalAccessToken(ByteBuf response) { var tokenLength = response.readByte(); var token = response.readCharSequence(tokenLength, StandardCharsets.UTF_8).toString(); return new RawPersonalAccessToken(token); } - static PersonalAccessTokenInfo readPersonalAccessTokenInfo(ByteBuf response) { + public static PersonalAccessTokenInfo readPersonalAccessTokenInfo(ByteBuf response) { var nameLength = response.readByte(); var name = response.readCharSequence(nameLength, StandardCharsets.UTF_8).toString(); var expiry = readU64AsBigInteger(response); diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/BytesSerializer.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesSerializer.java similarity index 84% rename from foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/BytesSerializer.java rename to foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesSerializer.java index f95c0a622..511ac7ad8 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/BytesSerializer.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/BytesSerializer.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iggy.client.blocking.tcp; +package org.apache.iggy.serde; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -35,21 +35,26 @@ import org.apache.iggy.user.StreamPermissions; import org.apache.iggy.user.TopicPermissions; import java.math.BigInteger; +import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.Optional; +/** + * Unified serializer for both blocking and async clients. + * Provides serialization of domain objects to ByteBuf according to Iggy wire protocol. + */ public final class BytesSerializer { private BytesSerializer() {} - static ByteBuf toBytes(Consumer consumer) { + public static ByteBuf toBytes(Consumer consumer) { ByteBuf buffer = Unpooled.buffer(); buffer.writeByte(consumer.kind().asCode()); buffer.writeBytes(toBytes(consumer.id())); return buffer; } - static ByteBuf toBytes(Identifier identifier) { + public static ByteBuf toBytes(Identifier identifier) { if (identifier.getKind() == 1) { ByteBuf buffer = Unpooled.buffer(6); buffer.writeByte(1); @@ -67,7 +72,7 @@ public final class BytesSerializer { } } - static ByteBuf toBytes(Partitioning partitioning) { + public static ByteBuf toBytes(Partitioning partitioning) { ByteBuf buffer = Unpooled.buffer(2 + partitioning.value().length); buffer.writeByte(partitioning.kind().asCode()); buffer.writeByte(partitioning.value().length); @@ -75,14 +80,14 @@ public final class BytesSerializer { return buffer; } - static ByteBuf toBytes(Message message) { + public static ByteBuf toBytes(Message message) { var buffer = Unpooled.buffer(MessageHeader.SIZE + message.payload().length); buffer.writeBytes(toBytes(message.header())); buffer.writeBytes(message.payload()); return buffer; } - static ByteBuf toBytes(MessageHeader header) { + public static ByteBuf toBytes(MessageHeader header) { var buffer = Unpooled.buffer(MessageHeader.SIZE); buffer.writeBytes(toBytesAsU64(header.checksum())); buffer.writeBytes(header.id().toBytes()); @@ -94,14 +99,14 @@ public final class BytesSerializer { return buffer; } - static ByteBuf toBytes(PollingStrategy strategy) { + public static ByteBuf toBytes(PollingStrategy strategy) { var buffer = Unpooled.buffer(9); buffer.writeByte(strategy.kind().asCode()); buffer.writeBytes(toBytesAsU64(strategy.value())); return buffer; } - static ByteBuf toBytes(Optional<Long> optionalLong) { + public static ByteBuf toBytes(Optional<Long> optionalLong) { var buffer = Unpooled.buffer(5); if (optionalLong.isPresent()) { buffer.writeByte(1); @@ -113,7 +118,7 @@ public final class BytesSerializer { return buffer; } - static ByteBuf toBytes(Map<String, HeaderValue> headers) { + public static ByteBuf toBytes(Map<String, HeaderValue> headers) { if (headers.isEmpty()) { return Unpooled.EMPTY_BUFFER; } @@ -131,7 +136,7 @@ public final class BytesSerializer { return buffer; } - static ByteBuf toBytes(Permissions permissions) { + public static ByteBuf toBytes(Permissions permissions) { var buffer = Unpooled.buffer(); buffer.writeBytes(toBytes(permissions.global())); if (permissions.streams().isEmpty()) { @@ -149,7 +154,7 @@ public final class BytesSerializer { return buffer; } - static ByteBuf toBytes(GlobalPermissions permissions) { + public static ByteBuf toBytes(GlobalPermissions permissions) { var buffer = Unpooled.buffer(); buffer.writeBoolean(permissions.manageServers()); buffer.writeBoolean(permissions.readServers()); @@ -164,7 +169,7 @@ public final class BytesSerializer { return buffer; } - static ByteBuf toBytes(StreamPermissions permissions) { + public static ByteBuf toBytes(StreamPermissions permissions) { var buffer = Unpooled.buffer(); buffer.writeBoolean(permissions.manageStream()); buffer.writeBoolean(permissions.readStream()); @@ -187,7 +192,7 @@ public final class BytesSerializer { return buffer; } - static ByteBuf toBytes(TopicPermissions permissions) { + public static ByteBuf toBytes(TopicPermissions permissions) { var buffer = Unpooled.buffer(); buffer.writeBoolean(permissions.manageTopic()); buffer.writeBoolean(permissions.readTopic()); @@ -196,21 +201,21 @@ public final class BytesSerializer { return buffer; } - static ByteBuf nameToBytes(String name) { - ByteBuf buffer = Unpooled.buffer(1 + name.length()); - buffer.writeByte(name.length()); - buffer.writeBytes(name.getBytes()); + public static ByteBuf toBytes(String value) { + ByteBuf buffer = Unpooled.buffer(1 + value.length()); + buffer.writeByte(value.length()); + buffer.writeBytes(value.getBytes(StandardCharsets.UTF_8)); return buffer; } - static ByteBuf toBytesAsU64(BigInteger value) { + public static ByteBuf toBytesAsU64(BigInteger value) { if (value.signum() == -1) { throw new IllegalArgumentException("Negative value cannot be serialized to unsigned 64: " + value); } ByteBuf buffer = Unpooled.buffer(8, 8); byte[] valueAsBytes = value.toByteArray(); if (valueAsBytes.length > 9) { - throw new IllegalArgumentException(); + throw new IllegalArgumentException("Value too large for U64: " + value); } ArrayUtils.reverse(valueAsBytes); buffer.writeBytes(valueAsBytes, 0, Math.min(8, valueAsBytes.length)); @@ -227,7 +232,7 @@ public final class BytesSerializer { ByteBuf buffer = Unpooled.buffer(16, 16); byte[] valueAsBytes = value.toByteArray(); if (valueAsBytes.length > 17) { - throw new IllegalArgumentException(); + throw new IllegalArgumentException("Value too large for U128: " + value); } ArrayUtils.reverse(valueAsBytes); buffer.writeBytes(valueAsBytes, 0, Math.min(16, valueAsBytes.length)); diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/CommandCode.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/CommandCode.java similarity index 96% rename from foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/CommandCode.java rename to foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/CommandCode.java index 5f7d6b525..e9f1f7905 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/CommandCode.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/serde/CommandCode.java @@ -17,8 +17,12 @@ * under the License. */ -package org.apache.iggy.client.blocking.tcp; +package org.apache.iggy.serde; +/** + * TCP command codes for Iggy wire protocol. + * Used by both blocking and async TCP clients. + */ public interface CommandCode { int getValue(); diff --git a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncPollMessageTest.java b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncPollMessageTest.java index d2d905ea8..e2e6b979f 100644 --- a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncPollMessageTest.java +++ b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncPollMessageTest.java @@ -47,7 +47,6 @@ import java.util.concurrent.TimeoutException; import java.util.stream.IntStream; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; /** * Test class specifically for testing poll message functionality with different consumer scenarios. @@ -163,30 +162,6 @@ public class AsyncPollMessageTest { } } - @Test - @Order(1) - @DisplayName("Poll with NULL consumer - Expected to timeout (server doesn't respond)") - void testPollWithNullConsumer() { - log.info("TEST 1: Polling with NULL consumer"); - - // This test demonstrates the issue: server doesn't respond to null consumer - assertThatThrownBy(() -> { - client.messages() - .pollMessagesAsync( - StreamId.of(testStream), - TopicId.of(TEST_TOPIC), - Optional.of(PARTITION_ID), - null, // NULL consumer causes server to not respond - PollingStrategy.offset(BigInteger.ZERO), - 10L, - false) - .get(3, TimeUnit.SECONDS); - }) - .isInstanceOf(TimeoutException.class); - - log.info("CONFIRMED: Null consumer causes timeout (server doesn't respond)"); - } - @Test @Order(2) @DisplayName("Poll with various consumer IDs") diff --git a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/tcp/BytesSerializerTest.java b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/tcp/BytesSerializerTest.java index b864e1cc6..2e035979e 100644 --- a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/tcp/BytesSerializerTest.java +++ b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/tcp/BytesSerializerTest.java @@ -21,6 +21,7 @@ package org.apache.iggy.client.blocking.tcp; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import org.apache.iggy.serde.BytesSerializer; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; @@ -71,6 +72,17 @@ class BytesSerializerTest { // when & then assertThatThrownBy(() -> BytesSerializer.toBytesAsU64(value)).isInstanceOf(IllegalArgumentException.class); } + + @Test + void shouldFailForValueLargerThanU64() { + // given + long maxLong = 0xFFFF_FFFF_FFFF_FFFFL; + var maxU64 = new BigInteger(Long.toUnsignedString(maxLong)); + var value = maxU64.add(BigInteger.ONE); + + // when & then + assertThatThrownBy(() -> BytesSerializer.toBytesAsU64(value)).isInstanceOf(IllegalArgumentException.class); + } } @Nested @@ -113,5 +125,17 @@ class BytesSerializerTest { // when & then assertThatThrownBy(() -> BytesSerializer.toBytesAsU128(value)).isInstanceOf(IllegalArgumentException.class); } + + @Test + void shouldFailForValueLargerThanU128() { + // given + byte[] maxU128 = new byte[17]; + Arrays.fill(maxU128, 1, 17, (byte) 0xFF); + var maxU128Value = new BigInteger(maxU128); + var value = maxU128Value.add(BigInteger.ONE); + + // when & then + assertThatThrownBy(() -> BytesSerializer.toBytesAsU128(value)).isInstanceOf(IllegalArgumentException.class); + } } }
