This is an automated email from the ASF dual-hosted git repository.
maciej pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new ccb620f6e fix(java): use Netty EventLoop for concurrent async
operations (#2558)
ccb620f6e is described below
commit ccb620f6edc0d6d1146def6bef79f05f06b7dfa0
Author: Qichao Chu <[email protected]>
AuthorDate: Wed Jan 14 05:37:38 2026 -0800
fix(java): use Netty EventLoop for concurrent async operations (#2558)
---
.../iggy/client/async/tcp/AsyncTcpConnection.java | 90 ++++++++++++----------
.../client/async/AsyncClientIntegrationTest.java | 8 +-
2 files changed, 52 insertions(+), 46 deletions(-)
diff --git
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
index 85550e6e3..76539d044 100644
---
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
+++
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
@@ -135,57 +135,65 @@ public class AsyncTcpConnection {
/**
* Sends a command asynchronously and returns the response.
+ * Uses Netty's EventLoop to ensure thread-safe sequential request
processing with FIFO response matching.
*/
public CompletableFuture<ByteBuf> sendAsync(int commandCode, ByteBuf
payload) {
if (channel == null || !channel.isActive()) {
+ payload.release();
return CompletableFuture.failedFuture(new
IllegalStateException("Connection not established or closed"));
}
- // Since Iggy doesn't use request IDs, we'll just use a simple queue
- // Each request will get the next response in order
CompletableFuture<ByteBuf> responseFuture = new CompletableFuture<>();
- long requestId = requestIdGenerator.incrementAndGet();
- pendingRequests.put(requestId, responseFuture);
-
- // Build the request frame exactly like the blocking client
- // Frame format: [payload_size:4][command:4][payload:N]
- // where payload_size = 4 (command size) + N (payload size)
- int payloadSize = payload.readableBytes();
- int framePayloadSize = 4 + payloadSize; // command (4 bytes) + payload
-
- ByteBuf frame = channel.alloc().buffer(4 + framePayloadSize);
- frame.writeIntLE(framePayloadSize); // Length field (includes command)
- frame.writeIntLE(commandCode); // Command
- frame.writeBytes(payload, payload.readerIndex(), payloadSize); //
Payload
-
- // Debug: print frame bytes
- byte[] frameBytes = new byte[Math.min(frame.readableBytes(), 30)];
- if (log.isTraceEnabled()) {
- frame.getBytes(0, frameBytes);
- StringBuilder hex = new StringBuilder();
- for (byte b : frameBytes) {
- hex.append(String.format("%02x ", b));
+
+ // Execute on channel's EventLoop to ensure sequential processing
+ // This is necessary because Iggy protocol doesn't include request IDs,
+ // and responses are matched using FIFO order
+ channel.eventLoop().execute(() -> {
+ // Since Iggy doesn't use request IDs, we'll just use a simple
queue
+ // Each request will get the next response in order
+ long requestId = requestIdGenerator.incrementAndGet();
+ pendingRequests.put(requestId, responseFuture);
+
+ // Build the request frame exactly like the blocking client
+ // Frame format: [payload_size:4][command:4][payload:N]
+ // where payload_size = 4 (command size) + N (payload size)
+ int payloadSize = payload.readableBytes();
+ int framePayloadSize = 4 + payloadSize; // command (4 bytes) +
payload
+
+ ByteBuf frame = channel.alloc().buffer(4 + framePayloadSize);
+ frame.writeIntLE(framePayloadSize); // Length field (includes
command)
+ frame.writeIntLE(commandCode); // Command
+ frame.writeBytes(payload, payload.readerIndex(), payloadSize); //
Payload
+
+ // Debug: print frame bytes
+ if (log.isTraceEnabled()) {
+ byte[] frameBytes = new byte[Math.min(frame.readableBytes(),
30)];
+ frame.getBytes(0, frameBytes);
+ StringBuilder hex = new StringBuilder();
+ for (byte b : frameBytes) {
+ hex.append(String.format("%02x ", b));
+ }
+ log.trace(
+ "Sending frame with command: {}, payload size: {},
frame payload size (with command): {}, total frame size: {}",
+ commandCode,
+ payloadSize,
+ framePayloadSize,
+ frame.readableBytes());
+ log.trace("Frame bytes (hex): {}", hex.toString());
}
- log.trace(
- "Sending frame with command: {}, payload size: {}, frame
payload size (with command): {}, total frame size: {}",
- commandCode,
- payloadSize,
- framePayloadSize,
- frame.readableBytes());
- log.trace("Frame bytes (hex): {}", hex.toString());
- }
- payload.release();
+ payload.release();
- // Send the frame
- channel.writeAndFlush(frame).addListener((ChannelFutureListener)
future -> {
- if (!future.isSuccess()) {
- log.error("Failed to send frame: {}",
future.cause().getMessage());
- pendingRequests.remove(requestId);
- responseFuture.completeExceptionally(future.cause());
- } else {
- log.trace("Frame sent successfully to {}",
channel.remoteAddress());
- }
+ // Send the frame
+ channel.writeAndFlush(frame).addListener((ChannelFutureListener)
future -> {
+ if (!future.isSuccess()) {
+ log.error("Failed to send frame: {}",
future.cause().getMessage());
+ pendingRequests.remove(requestId);
+ responseFuture.completeExceptionally(future.cause());
+ } else {
+ log.trace("Frame sent successfully to {}",
channel.remoteAddress());
+ }
+ });
});
return responseFuture;
diff --git
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncClientIntegrationTest.java
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncClientIntegrationTest.java
index 5ec3748be..0e8038d4a 100644
---
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncClientIntegrationTest.java
+++
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncClientIntegrationTest.java
@@ -341,10 +341,7 @@ class AsyncClientIntegrationTest {
log.info("Successfully deleted topic: {}", tempTopic);
}
- // TODO: Re-enable when server supports null consumer polling
- // This test uses null consumer in concurrent poll operations which causes
timeout
@Test
- @Disabled
@Order(10)
void testConcurrentOperations() throws Exception {
log.info("Testing concurrent async operations");
@@ -372,14 +369,15 @@ class AsyncClientIntegrationTest {
operations.add(future);
}
- // Poll messages concurrently
+ // Poll messages concurrently using valid consumer
for (int i = 0; i < 3; i++) {
+ final long consumerId = 10000L + i;
var future = client.messages()
.pollMessagesAsync(
StreamId.of(TEST_STREAM),
TopicId.of(TEST_TOPIC),
Optional.of(PARTITION_ID),
- null,
+ Consumer.of(consumerId),
PollingStrategy.last(),
10L,
false);