This is an automated email from the ASF dual-hosted git repository.

maciej pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new ccb620f6e fix(java): use Netty EventLoop for concurrent async 
operations (#2558)
ccb620f6e is described below

commit ccb620f6edc0d6d1146def6bef79f05f06b7dfa0
Author: Qichao Chu <[email protected]>
AuthorDate: Wed Jan 14 05:37:38 2026 -0800

    fix(java): use Netty EventLoop for concurrent async operations (#2558)
---
 .../iggy/client/async/tcp/AsyncTcpConnection.java  | 90 ++++++++++++----------
 .../client/async/AsyncClientIntegrationTest.java   |  8 +-
 2 files changed, 52 insertions(+), 46 deletions(-)

diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
index 85550e6e3..76539d044 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
@@ -135,57 +135,65 @@ public class AsyncTcpConnection {
 
     /**
      * Sends a command asynchronously and returns the response.
+     * Uses Netty's EventLoop to ensure thread-safe sequential request 
processing with FIFO response matching.
      */
     public CompletableFuture<ByteBuf> sendAsync(int commandCode, ByteBuf 
payload) {
         if (channel == null || !channel.isActive()) {
+            payload.release();
             return CompletableFuture.failedFuture(new 
IllegalStateException("Connection not established or closed"));
         }
 
-        // Since Iggy doesn't use request IDs, we'll just use a simple queue
-        // Each request will get the next response in order
         CompletableFuture<ByteBuf> responseFuture = new CompletableFuture<>();
-        long requestId = requestIdGenerator.incrementAndGet();
-        pendingRequests.put(requestId, responseFuture);
-
-        // Build the request frame exactly like the blocking client
-        // Frame format: [payload_size:4][command:4][payload:N]
-        // where payload_size = 4 (command size) + N (payload size)
-        int payloadSize = payload.readableBytes();
-        int framePayloadSize = 4 + payloadSize; // command (4 bytes) + payload
-
-        ByteBuf frame = channel.alloc().buffer(4 + framePayloadSize);
-        frame.writeIntLE(framePayloadSize); // Length field (includes command)
-        frame.writeIntLE(commandCode); // Command
-        frame.writeBytes(payload, payload.readerIndex(), payloadSize); // 
Payload
-
-        // Debug: print frame bytes
-        byte[] frameBytes = new byte[Math.min(frame.readableBytes(), 30)];
-        if (log.isTraceEnabled()) {
-            frame.getBytes(0, frameBytes);
-            StringBuilder hex = new StringBuilder();
-            for (byte b : frameBytes) {
-                hex.append(String.format("%02x ", b));
+
+        // Execute on channel's EventLoop to ensure sequential processing
+        // This is necessary because Iggy protocol doesn't include request IDs,
+        // and responses are matched using FIFO order
+        channel.eventLoop().execute(() -> {
+            // Since Iggy doesn't use request IDs, we'll just use a simple 
queue
+            // Each request will get the next response in order
+            long requestId = requestIdGenerator.incrementAndGet();
+            pendingRequests.put(requestId, responseFuture);
+
+            // Build the request frame exactly like the blocking client
+            // Frame format: [payload_size:4][command:4][payload:N]
+            // where payload_size = 4 (command size) + N (payload size)
+            int payloadSize = payload.readableBytes();
+            int framePayloadSize = 4 + payloadSize; // command (4 bytes) + 
payload
+
+            ByteBuf frame = channel.alloc().buffer(4 + framePayloadSize);
+            frame.writeIntLE(framePayloadSize); // Length field (includes 
command)
+            frame.writeIntLE(commandCode); // Command
+            frame.writeBytes(payload, payload.readerIndex(), payloadSize); // 
Payload
+
+            // Debug: print frame bytes
+            if (log.isTraceEnabled()) {
+                byte[] frameBytes = new byte[Math.min(frame.readableBytes(), 
30)];
+                frame.getBytes(0, frameBytes);
+                StringBuilder hex = new StringBuilder();
+                for (byte b : frameBytes) {
+                    hex.append(String.format("%02x ", b));
+                }
+                log.trace(
+                        "Sending frame with command: {}, payload size: {}, 
frame payload size (with command): {}, total frame size: {}",
+                        commandCode,
+                        payloadSize,
+                        framePayloadSize,
+                        frame.readableBytes());
+                log.trace("Frame bytes (hex): {}", hex.toString());
             }
-            log.trace(
-                    "Sending frame with command: {}, payload size: {}, frame 
payload size (with command): {}, total frame size: {}",
-                    commandCode,
-                    payloadSize,
-                    framePayloadSize,
-                    frame.readableBytes());
-            log.trace("Frame bytes (hex): {}", hex.toString());
-        }
 
-        payload.release();
+            payload.release();
 
-        // Send the frame
-        channel.writeAndFlush(frame).addListener((ChannelFutureListener) 
future -> {
-            if (!future.isSuccess()) {
-                log.error("Failed to send frame: {}", 
future.cause().getMessage());
-                pendingRequests.remove(requestId);
-                responseFuture.completeExceptionally(future.cause());
-            } else {
-                log.trace("Frame sent successfully to {}", 
channel.remoteAddress());
-            }
+            // Send the frame
+            channel.writeAndFlush(frame).addListener((ChannelFutureListener) 
future -> {
+                if (!future.isSuccess()) {
+                    log.error("Failed to send frame: {}", 
future.cause().getMessage());
+                    pendingRequests.remove(requestId);
+                    responseFuture.completeExceptionally(future.cause());
+                } else {
+                    log.trace("Frame sent successfully to {}", 
channel.remoteAddress());
+                }
+            });
         });
 
         return responseFuture;
diff --git 
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncClientIntegrationTest.java
 
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncClientIntegrationTest.java
index 5ec3748be..0e8038d4a 100644
--- 
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncClientIntegrationTest.java
+++ 
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncClientIntegrationTest.java
@@ -341,10 +341,7 @@ class AsyncClientIntegrationTest {
         log.info("Successfully deleted topic: {}", tempTopic);
     }
 
-    // TODO: Re-enable when server supports null consumer polling
-    // This test uses null consumer in concurrent poll operations which causes 
timeout
     @Test
-    @Disabled
     @Order(10)
     void testConcurrentOperations() throws Exception {
         log.info("Testing concurrent async operations");
@@ -372,14 +369,15 @@ class AsyncClientIntegrationTest {
             operations.add(future);
         }
 
-        // Poll messages concurrently
+        // Poll messages concurrently using valid consumer
         for (int i = 0; i < 3; i++) {
+            final long consumerId = 10000L + i;
             var future = client.messages()
                     .pollMessagesAsync(
                             StreamId.of(TEST_STREAM),
                             TopicId.of(TEST_TOPIC),
                             Optional.of(PARTITION_ID),
-                            null,
+                            Consumer.of(consumerId),
                             PollingStrategy.last(),
                             10L,
                             false);

Reply via email to