This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new bdc84883 chore(sdk): support zero copy schema in Java SDK (#1813)
bdc84883 is described below
commit bdc84883bce1d76957ed61883974bf785e3792e7
Author: Nandakumar Vadivelu <[email protected]>
AuthorDate: Mon May 26 19:33:09 2025 +0530
chore(sdk): support zero copy schema in Java SDK (#1813)
Fixes #1812
---
foreign/java/.gitignore | 6 +++
.../java/examples/simple-consumer/build.gradle.kts | 1 +
.../main/java/org/apache/iggy/SimpleConsumer.java | 4 +-
.../java/examples/simple-producer/build.gradle.kts | 1 +
foreign/java/java-sdk/build.gradle.kts | 1 +
.../client/blocking/tcp/BytesDeserializer.java | 45 ++++++++--------------
.../iggy/client/blocking/tcp/BytesSerializer.java | 23 +++++++----
.../client/blocking/tcp/MessagesTcpClient.java | 31 ++++++++++++++-
.../org/apache/iggy/identifier/Identifier.java | 10 +++++
.../main/java/org/apache/iggy/message/Message.java | 27 ++++++++++---
.../{PolledMessage.java => MessageHeader.java} | 20 +++++-----
.../java/org/apache/iggy/message/Partitioning.java | 5 +++
.../org/apache/iggy/message/PolledMessages.java | 3 +-
.../iggy/client/blocking/IntegrationTest.java | 2 +-
.../client/blocking/MessagesClientBaseTest.java | 12 ++----
15 files changed, 124 insertions(+), 67 deletions(-)
diff --git a/foreign/java/.gitignore b/foreign/java/.gitignore
new file mode 100644
index 00000000..2bdccb0b
--- /dev/null
+++ b/foreign/java/.gitignore
@@ -0,0 +1,6 @@
+.classpath
+.project
+.settings/
+.gradle/
+build/
+out/
diff --git a/foreign/java/examples/simple-consumer/build.gradle.kts
b/foreign/java/examples/simple-consumer/build.gradle.kts
index dbbecc6d..3445a078 100644
--- a/foreign/java/examples/simple-consumer/build.gradle.kts
+++ b/foreign/java/examples/simple-consumer/build.gradle.kts
@@ -32,4 +32,5 @@ dependencies {
implementation(project(":iggy-java-sdk"))
implementation("org.slf4j:slf4j-api:2.0.9")
runtimeOnly("ch.qos.logback:logback-classic:1.4.12")
+
runtimeOnly("io.netty:netty-resolver-dns-native-macos:4.2.1.Final:osx-aarch_64")
}
diff --git
a/foreign/java/examples/simple-consumer/src/main/java/org/apache/iggy/SimpleConsumer.java
b/foreign/java/examples/simple-consumer/src/main/java/org/apache/iggy/SimpleConsumer.java
index 20c5fec3..5fd439f7 100644
---
a/foreign/java/examples/simple-consumer/src/main/java/org/apache/iggy/SimpleConsumer.java
+++
b/foreign/java/examples/simple-consumer/src/main/java/org/apache/iggy/SimpleConsumer.java
@@ -19,6 +19,7 @@
package org.apache.iggy;
+import org.apache.iggy.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.iggy.client.blocking.IggyBaseClient;
@@ -28,7 +29,6 @@ import org.apache.iggy.consumergroup.ConsumerGroupDetails;
import org.apache.iggy.identifier.ConsumerId;
import org.apache.iggy.identifier.StreamId;
import org.apache.iggy.identifier.TopicId;
-import org.apache.iggy.message.PolledMessage;
import org.apache.iggy.message.PollingStrategy;
import org.apache.iggy.stream.StreamDetails;
import org.apache.iggy.topic.CompressionAlgorithm;
@@ -57,7 +57,7 @@ public class SimpleConsumer {
createConsumerGroup(client);
client.consumerGroups().joinConsumerGroup(STREAM_ID, TOPIC_ID,
GROUP_ID);
- var messages = new ArrayList<PolledMessage>();
+ var messages = new ArrayList<Message>();
while (messages.size() < 1000) {
var polledMessages = client.messages()
.pollMessages(STREAM_ID,
diff --git a/foreign/java/examples/simple-producer/build.gradle.kts
b/foreign/java/examples/simple-producer/build.gradle.kts
index dbbecc6d..3445a078 100644
--- a/foreign/java/examples/simple-producer/build.gradle.kts
+++ b/foreign/java/examples/simple-producer/build.gradle.kts
@@ -32,4 +32,5 @@ dependencies {
implementation(project(":iggy-java-sdk"))
implementation("org.slf4j:slf4j-api:2.0.9")
runtimeOnly("ch.qos.logback:logback-classic:1.4.12")
+
runtimeOnly("io.netty:netty-resolver-dns-native-macos:4.2.1.Final:osx-aarch_64")
}
diff --git a/foreign/java/java-sdk/build.gradle.kts
b/foreign/java/java-sdk/build.gradle.kts
index d217d355..4b9ea164 100644
--- a/foreign/java/java-sdk/build.gradle.kts
+++ b/foreign/java/java-sdk/build.gradle.kts
@@ -56,6 +56,7 @@ dependencies {
testImplementation("org.junit.jupiter:junit-jupiter")
testImplementation("org.assertj:assertj-core:3.26.3")
testRuntimeOnly("ch.qos.logback:logback-classic:1.5.11")
+
testRuntimeOnly("io.netty:netty-resolver-dns-native-macos:4.2.1.Final:osx-aarch_64")
}
tasks.withType<Test> {
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/BytesDeserializer.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/BytesDeserializer.java
index ce541c85..4940dfeb 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/BytesDeserializer.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/BytesDeserializer.java
@@ -26,10 +26,8 @@ import org.apache.iggy.consumergroup.ConsumerGroupDetails;
import org.apache.iggy.consumergroup.ConsumerGroupMember;
import org.apache.iggy.consumeroffset.ConsumerOffsetInfo;
import org.apache.iggy.message.BytesMessageId;
-import org.apache.iggy.message.HeaderKind;
-import org.apache.iggy.message.HeaderValue;
-import org.apache.iggy.message.MessageState;
-import org.apache.iggy.message.PolledMessage;
+import org.apache.iggy.message.Message;
+import org.apache.iggy.message.MessageHeader;
import org.apache.iggy.message.PolledMessages;
import org.apache.iggy.user.GlobalPermissions;
import org.apache.iggy.user.Permissions;
@@ -167,41 +165,28 @@ final class BytesDeserializer {
public static PolledMessages readPolledMessages(ByteBuf response) {
var partitionId = response.readUnsignedIntLE();
var currentOffset = readU64AsBigInteger(response);
- var _messagesCount = response.readUnsignedIntLE();
- var messages = new ArrayList<PolledMessage>();
+ var messagesCount = response.readUnsignedIntLE();
+ var messages = new ArrayList<Message>();
while (response.isReadable()) {
messages.add(readPolledMessage(response));
}
- return new PolledMessages(partitionId, currentOffset, messages);
+ return new PolledMessages(partitionId, currentOffset, messagesCount,
messages);
}
- static PolledMessage readPolledMessage(ByteBuf response) {
+ static Message readPolledMessage(ByteBuf response) {
+ var checksum = readU64AsBigInteger(response);
+ var id = readBytesMessageId(response);
var offset = readU64AsBigInteger(response);
- var stateCode = response.readByte();
- var state = MessageState.fromCode(stateCode);
var timestamp = readU64AsBigInteger(response);
- var id = readBytesMessageId(response);
- var checksum = response.readUnsignedIntLE();
- var headersLength = response.readUnsignedIntLE();
- var headers = Optional.<Map<String, HeaderValue>>empty();
- if (headersLength > 0) {
- var headersMap = new HashMap<String, HeaderValue>();
- ByteBuf headersBytes = response.readBytes(toInt(headersLength));
- while (headersBytes.isReadable()) {
- var keyLength = headersBytes.readUnsignedIntLE();
- var key = headersBytes.readCharSequence(toInt(keyLength),
StandardCharsets.UTF_8).toString();
- var kindCode = headersBytes.readByte();
- var kind = HeaderKind.fromCode(kindCode);
- var valueLength = headersBytes.readUnsignedIntLE();
- var value = headersBytes.readCharSequence(toInt(valueLength),
StandardCharsets.UTF_8);
- headersMap.put(key, new HeaderValue(kind,
String.valueOf(value)));
- }
- headers = Optional.of(headersMap);
- }
+ var originTimestamp = readU64AsBigInteger(response);
+ var userHeadersLength = response.readUnsignedIntLE();
var payloadLength = response.readUnsignedIntLE();
+ var header = new MessageHeader(checksum, id, offset, timestamp,
originTimestamp,
+ userHeadersLength, payloadLength);
var payload = newByteArray(payloadLength);
response.readBytes(payload);
- return new PolledMessage(offset, state, timestamp, id, checksum,
headers, payload);
+ // TODO: Add support for user headers.
+ return new Message(header, payload, Optional.empty());
}
static Stats readStats(ByteBuf response) {
@@ -394,7 +379,7 @@ final class BytesDeserializer {
}
private static BigInteger readU64AsBigInteger(ByteBuf buffer) {
- var bytesArray = new byte[9];
+ var bytesArray = new byte[8];
buffer.readBytes(bytesArray, 0, 8);
ArrayUtils.reverse(bytesArray);
return new BigInteger(bytesArray);
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/BytesSerializer.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/BytesSerializer.java
index 5343b6a3..bdfccc11 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/BytesSerializer.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/BytesSerializer.java
@@ -26,6 +26,7 @@ import org.apache.iggy.consumergroup.Consumer;
import org.apache.iggy.identifier.Identifier;
import org.apache.iggy.message.HeaderValue;
import org.apache.iggy.message.Message;
+import org.apache.iggy.message.MessageHeader;
import org.apache.iggy.message.Partitioning;
import org.apache.iggy.message.PollingStrategy;
import org.apache.iggy.user.GlobalPermissions;
@@ -81,18 +82,24 @@ public final class BytesSerializer {
}
static ByteBuf toBytes(Message message) {
- var buffer = Unpooled.buffer();
- buffer.writeBytes(message.id().toBytes());
- message.headers().ifPresentOrElse((headers) -> {
- var headersBytes = toBytes(headers);
- buffer.writeIntLE(headersBytes.readableBytes());
- buffer.writeBytes(headersBytes);
- }, () -> buffer.writeIntLE(0));
- buffer.writeIntLE(message.payload().length);
+ var buffer = Unpooled.buffer(MessageHeader.SIZE +
message.payload().length);
+ buffer.writeBytes(toBytes(message.header()));
buffer.writeBytes(message.payload());
return buffer;
}
+ static ByteBuf toBytes(MessageHeader header) {
+ var buffer = Unpooled.buffer(MessageHeader.SIZE);
+ buffer.writeBytes(toBytesAsU64(header.checksum()));
+ buffer.writeBytes(header.id().toBytes());
+ buffer.writeBytes(toBytesAsU64(header.offset()));
+ buffer.writeBytes(toBytesAsU64(header.timestamp()));
+ buffer.writeBytes(toBytesAsU64(header.originTimestamp()));
+ buffer.writeIntLE(header.userHeadersLength().intValue());
+ buffer.writeIntLE(header.payloadLength().intValue());
+ return buffer;
+ }
+
static ByteBuf toBytes(PollingStrategy strategy) {
var buffer = Unpooled.buffer(9);
buffer.writeByte(strategy.kind().asCode());
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/MessagesTcpClient.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/MessagesTcpClient.java
index 1bc88b4f..82e86633 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/MessagesTcpClient.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/blocking/tcp/MessagesTcpClient.java
@@ -19,6 +19,7 @@
package org.apache.iggy.client.blocking.tcp;
+import io.netty.buffer.Unpooled;
import org.apache.iggy.client.blocking.MessagesClient;
import org.apache.iggy.consumergroup.Consumer;
import org.apache.iggy.identifier.StreamId;
@@ -56,9 +57,37 @@ class MessagesTcpClient implements MessagesClient {
@Override
public void sendMessages(StreamId streamId, TopicId topicId, Partitioning
partitioning, List<Message> messages) {
- var payload = toBytes(streamId);
+ // Length of streamId, topicId, partitioning and messages count (4
bytes)
+ var metadataLength = streamId.getSize() + topicId.getSize() +
partitioning.getSize() + 4;
+ var payload = Unpooled.buffer(4 + metadataLength);
+ payload.writeIntLE(metadataLength);
+ payload.writeBytes(toBytes(streamId));
payload.writeBytes(toBytes(topicId));
payload.writeBytes(toBytes(partitioning));
+ payload.writeIntLE(messages.size());
+
+ // Writing index
+ var position = 0;
+ for (var message : messages) {
+
+ // The logic in messages_batch_mut.rs#message_start_position
checks the
+ // previous index to get the starting position of the message.
+ // For the first message it's always 0.
+ // This is the reason why we are setting the position to start of
the next
+ // message.
+
+ // This used as both start index of next message and
+ // the end position for the current message.
+ position += message.getSize();
+
+ // offset
+ payload.writeIntLE(0);
+ // position
+ payload.writeIntLE(position);
+ // timestamp.
+ payload.writeZero(8);
+ }
+
for (var message : messages) {
payload.writeBytes(toBytes(message));
}
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/identifier/Identifier.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/identifier/Identifier.java
index 27f6b929..a1b6367e 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/identifier/Identifier.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/identifier/Identifier.java
@@ -66,5 +66,15 @@ public abstract class Identifier {
return name;
}
+ public int getSize() {
+ if (id != null) {
+ // kind, 1 byte + length, 1 byte + id, 4 bytes
+ return 6;
+ } else {
+ // kind, 1 byte + length, 1 byte + name.length()
+ return 2 + name.length();
+ }
+ }
+
}
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/Message.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/Message.java
index 89252210..29cfd38c 100644
--- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/Message.java
+++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/Message.java
@@ -19,17 +19,34 @@
package org.apache.iggy.message;
+import java.math.BigInteger;
import java.util.Map;
import java.util.Optional;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
public record Message(
- MessageId id,
+ MessageHeader header,
byte[] payload,
- Optional<Map<String, HeaderValue>> headers
+ Optional<Map<String, HeaderValue>> userHeaders
) {
- public static Message of(String payload) {
- return new Message(MessageId.serverGenerated(), payload.getBytes(),
Optional.empty());
- }
+ public static Message of(String payload) {
+ final byte[] payloadBytes = payload.getBytes();
+ final MessageHeader msgHeader = new
MessageHeader(getChecksum(payloadBytes), MessageId.serverGenerated(),
+ BigInteger.ZERO, BigInteger.ZERO, BigInteger.ZERO, 0L,
+ (long)payloadBytes.length);
+ return new Message(msgHeader, payloadBytes, Optional.empty());
+ }
+ private static BigInteger getChecksum(byte[] payload) {
+ final Checksum crc32 = new CRC32();
+ crc32.update(payload, 0, payload.length);
+ return BigInteger.valueOf(crc32.getValue());
+ }
+
+ public int getSize() {
+ // userHeaders is empty for now.
+ return MessageHeader.SIZE + payload.length;
+ }
}
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/PolledMessage.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/MessageHeader.java
similarity index 75%
rename from
foreign/java/java-sdk/src/main/java/org/apache/iggy/message/PolledMessage.java
rename to
foreign/java/java-sdk/src/main/java/org/apache/iggy/message/MessageHeader.java
index 72b16e72..9f4583d7 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/PolledMessage.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/MessageHeader.java
@@ -20,16 +20,16 @@
package org.apache.iggy.message;
import java.math.BigInteger;
-import java.util.Map;
-import java.util.Optional;
-public record PolledMessage(
- BigInteger offset,
- MessageState state,
- BigInteger timestamp,
- MessageId id,
- Long checksum,
- Optional<Map<String, HeaderValue>> headers,
- byte[] payload
+public record MessageHeader(
+ BigInteger checksum,
+ MessageId id,
+ BigInteger offset,
+ BigInteger timestamp,
+ BigInteger originTimestamp,
+ Long userHeadersLength,
+ Long payloadLength
) {
+
+ public static final int SIZE = 8 + 16 + 8 + 8 + 8 + 4 + 4;
}
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/Partitioning.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/Partitioning.java
index abd1ee52..452c0a08 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/Partitioning.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/Partitioning.java
@@ -45,4 +45,9 @@ public record Partitioning(
}
return new Partitioning(PartitioningKind.MessagesKey, key.getBytes());
}
+
+ public int getSize() {
+ // kind, 1 byte + length, 1 byte + value.length()
+ return 2 + value.length;
+ }
}
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/PolledMessages.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/PolledMessages.java
index 96ecdb0e..bb037fe9 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/PolledMessages.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/message/PolledMessages.java
@@ -25,6 +25,7 @@ import java.util.List;
public record PolledMessages(
Long partitionId,
BigInteger currentOffset,
- List<PolledMessage> messages
+ Long count,
+ List<Message> messages
) {
}
diff --git
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/IntegrationTest.java
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/IntegrationTest.java
index 77e759bb..d1eb50eb 100644
---
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/IntegrationTest.java
+++
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/IntegrationTest.java
@@ -36,7 +36,7 @@ public abstract class IntegrationTest {
public static final int TCP_PORT = 8090;
@Container
- protected final GenericContainer<?> iggyServer = new
GenericContainer<>(DockerImageName.parse("iggyrs/iggy:latest"))
+ protected final GenericContainer<?> iggyServer = new
GenericContainer<>(DockerImageName.parse("apache/iggy:edge"))
.withExposedPorts(HTTP_PORT, TCP_PORT);
protected IggyBaseClient client;
diff --git
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/MessagesClientBaseTest.java
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/MessagesClientBaseTest.java
index 417a9695..096c8968 100644
---
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/MessagesClientBaseTest.java
+++
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/MessagesClientBaseTest.java
@@ -23,12 +23,10 @@ import org.apache.iggy.message.Message;
import org.apache.iggy.message.Partitioning;
import org.apache.iggy.message.PollingKind;
import org.apache.iggy.message.PollingStrategy;
-import org.apache.iggy.message.UuidMessageId;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.math.BigInteger;
import java.util.List;
-import java.util.UUID;
import static java.util.Optional.empty;
import static org.assertj.core.api.Assertions.assertThat;
@@ -50,8 +48,7 @@ public abstract class MessagesClientBaseTest extends
IntegrationTest {
// when
String text = "message from java sdk";
- messagesClient.sendMessages(42L, 42L, Partitioning.partitionId(1L),
- List.of(new Message(new UuidMessageId(UUID.randomUUID()),
text.getBytes(), empty())));
+ messagesClient.sendMessages(42L, 42L, Partitioning.partitionId(1L),
List.of(Message.of(text)));
var polledMessages = messagesClient.pollMessages(42L, 42L, empty(), 0L,
new PollingStrategy(PollingKind.Last, BigInteger.TEN), 10L,
false);
@@ -67,8 +64,7 @@ public abstract class MessagesClientBaseTest extends
IntegrationTest {
// when
String text = "message from java sdk";
- messagesClient.sendMessages(42L, 42L, Partitioning.balanced(),
- List.of(new Message(new UuidMessageId(UUID.randomUUID()),
text.getBytes(), empty())));
+ messagesClient.sendMessages(42L, 42L, Partitioning.balanced(),
List.of(Message.of(text)));
var polledMessages = messagesClient.pollMessages(42L, 42L, empty(), 0L,
new PollingStrategy(PollingKind.Last, BigInteger.TEN), 10L,
false);
@@ -84,9 +80,7 @@ public abstract class MessagesClientBaseTest extends
IntegrationTest {
// when
String text = "message from java sdk";
- messagesClient.sendMessages(42L, 42L,
Partitioning.messagesKey("test-key"),
- List.of(new Message(new UuidMessageId(UUID.randomUUID()),
text.getBytes(), empty())));
-
+ messagesClient.sendMessages(42L, 42L,
Partitioning.messagesKey("test-key"), List.of(Message.of(text)));
var polledMessages = messagesClient.pollMessages(42L, 42L, empty(), 0L,
new PollingStrategy(PollingKind.Last, BigInteger.TEN), 10L,
false);