This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b29d336d refactor(java): use generation-based auth for pooled 
channels (#2910)
3b29d336d is described below

commit 3b29d336d2c181f084c85cddf578f9fa3f99dbd9
Author: Maciej Modzelewski <[email protected]>
AuthorDate: Tue Mar 10 11:58:34 2026 +0100

    refactor(java): use generation-based auth for pooled channels (#2910)
    
    Pooled connections authenticated with a simple boolean flag
    failed to re-authenticate after logout/re-login cycles.
    Channels from the prior session appeared authenticated but
    held stale server-side state, causing silent failures.
    
    Replace the boolean auth attribute with an AtomicLong
    generation counter. Each login increments the generation;
    logout invalidates all channels by advancing it. Channels
    compare their stored generation on acquire and transparently
    re-authenticate when stale.
    
    Additional cleanup: move pool initialization into the
    constructor with eager connectivity validation, extract
    PoolChannelHandler as a static inner class, fix payload
    leak on acquire failure by releasing in finally, fix
    login capture to include PAT login (was incorrectly
    capturing UPDATE), reduce IggyAuthenticator/FrameEncoder
    visibility to package-private, and migrate to Netty 5
    IoEventLoopGroup API.
---
 .../iggy/client/async/tcp/AsyncIggyTcpClient.java  |   4 +-
 .../iggy/client/async/tcp/AsyncTcpConnection.java  | 278 +++++++++------------
 .../iggy/client/async/tcp/IggyAuthenticator.java   |  46 ++--
 .../iggy/client/async/tcp/IggyFrameEncoder.java    |  24 +-
 .../client/async/AsyncConnectionPoolAuthTest.java  | 263 +++++++++++++++++++
 5 files changed, 430 insertions(+), 185 deletions(-)

diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
index 04e5e1d28..cdd2c6716 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java
@@ -171,7 +171,6 @@ public class AsyncIggyTcpClient {
         connectionPoolSize.ifPresent(poolConfigBuilder::setMaxConnections);
         acquireTimeout.ifPresent(timeout -> 
poolConfigBuilder.setAcquireTimeoutMillis(timeout.toMillis()));
         TCPConnectionPoolConfig poolConfig = poolConfigBuilder.build();
-        // TCPConnectionPoolConfig poolConfig = new TCPConnectionPoolConfig();
         connection = new AsyncTcpConnection(host, port, enableTls, 
tlsCertificate, poolConfig);
         return connection.connect().thenRun(() -> {
             messagesClient = new MessagesTcpClient(connection);
@@ -195,7 +194,7 @@ public class AsyncIggyTcpClient {
      * {@link UsersClient#login(String, String)} instead.
      *
      * @return a {@link CompletableFuture} that completes with the user's
-     *         {@link IdentityInfo} on success
+     * {@link IdentityInfo} on success
      * @throws IggyMissingCredentialsException if no credentials were provided 
at build time
      * @throws IggyNotConnectedException       if {@link #connect()} has not 
been called
      */
@@ -221,6 +220,7 @@ public class AsyncIggyTcpClient {
         }
         return usersClient;
     }
+
     /**
      * Returns the async messages client for producing and consuming messages.
      *
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
index ca0a1f2f4..20b49f0f5 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java
@@ -26,13 +26,13 @@ import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.ChannelPipeline;
-import io.netty.channel.EventLoopGroup;
+import io.netty.channel.IoEventLoopGroup;
+import io.netty.channel.MultiThreadIoEventLoopGroup;
 import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.nio.NioIoHandler;
 import io.netty.channel.pool.AbstractChannelPoolHandler;
 import io.netty.channel.pool.ChannelHealthChecker;
 import io.netty.channel.pool.FixedChannelPool;
-import io.netty.channel.pool.SimpleChannelPool;
 import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslContextBuilder;
@@ -56,6 +56,7 @@ import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 
 /**
@@ -64,23 +65,15 @@ import java.util.function.Function;
  */
 public class AsyncTcpConnection {
     private static final Logger log = 
LoggerFactory.getLogger(AsyncTcpConnection.class);
-    private final String host;
-    private final int port;
-    private final boolean enableTls;
-    private final Optional<File> tlsCertificate;
-    private final SslContext sslContext;
-    private final EventLoopGroup eventLoopGroup;
-    private final Bootstrap bootstrap;
-    private SimpleChannelPool channelPool;
-    private final TCPConnectionPoolConfig poolConfig;
-    private ByteBuf loginPayload;
-    private AtomicBoolean isAuthenticated = new AtomicBoolean(false);
 
+    private final IoEventLoopGroup eventLoopGroup;
+    private final FixedChannelPool channelPool;
     private final AtomicBoolean isClosed = new AtomicBoolean(false);
+    private final AtomicLong authGeneration = new AtomicLong(0);
+    private ByteBuf loginPayload;
 
-    public AsyncTcpConnection(String host, int port) {
-        this(host, port, false, Optional.empty(), new 
TCPConnectionPoolConfig());
-    }
+    private volatile int loginCommandCode;
+    private volatile boolean authenticated = false;
 
     public AsyncTcpConnection(
             String host,
@@ -88,77 +81,53 @@ public class AsyncTcpConnection {
             boolean enableTls,
             Optional<File> tlsCertificate,
             TCPConnectionPoolConfig poolConfig) {
-        this.host = host;
-        this.port = port;
-        this.enableTls = enableTls;
-        this.tlsCertificate = tlsCertificate;
-        this.poolConfig = poolConfig;
-        this.eventLoopGroup = new NioEventLoopGroup();
-        this.bootstrap = new Bootstrap();
-
-        if (this.enableTls) {
+        this.eventLoopGroup = new 
MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());
+
+        SslContext sslContext = null;
+        if (enableTls) {
             try {
-                SslContextBuilder builder = SslContextBuilder.forClient();
-                this.tlsCertificate.ifPresent(builder::trustManager);
-                this.sslContext = builder.build();
+                SslContextBuilder sslBuilder = SslContextBuilder.forClient();
+                tlsCertificate.ifPresent(sslBuilder::trustManager);
+                sslContext = sslBuilder.build();
             } catch (SSLException e) {
                 throw new IggyTlsException("Failed to build SSL context for 
AsyncTcpConnection", e);
             }
-        } else {
-            this.sslContext = null;
         }
-        configureBootstrap();
-    }
 
-    private void configureBootstrap() {
-        bootstrap
+        var bootstrap = new Bootstrap()
                 .group(eventLoopGroup)
                 .channel(NioSocketChannel.class)
                 .option(ChannelOption.TCP_NODELAY, true)
                 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                 .option(ChannelOption.SO_KEEPALIVE, true)
-                .remoteAddress(this.host, this.port);
-    }
-
-    /**
-     * Initialises Connection pool.
-     */
-    public CompletableFuture<Void> connect() {
-        if (isClosed.get()) {
-            return CompletableFuture.failedFuture(new 
IggyClientException("Client is Closed"));
-        }
-        AbstractChannelPoolHandler poolHandler = new 
AbstractChannelPoolHandler() {
-            @Override
-            public void channelCreated(Channel ch) {
-                ChannelPipeline pipeline = ch.pipeline();
-                if (enableTls) {
-                    // adding ssl if ssl enabled
-                    pipeline.addLast("ssl", sslContext.newHandler(ch.alloc(), 
host, port));
-                }
-                // Adding the FrameDecoder to end of channel pipeline
-                pipeline.addLast("frameDecoder", new IggyFrameDecoder());
-
-                // Adding Response Handler Now Stateful
-                pipeline.addLast("responseHandler", new IggyResponseHandler());
-            }
-
-            @Override
-            public void channelAcquired(Channel ch) {
-                IggyResponseHandler handler = 
ch.pipeline().get(IggyResponseHandler.class);
-                handler.setPool(channelPool);
-            }
-        };
+                .remoteAddress(host, port);
 
         this.channelPool = new FixedChannelPool(
                 bootstrap,
-                poolHandler,
-                ChannelHealthChecker.ACTIVE, // Check If the connection is 
Active Before Lending
-                FixedChannelPool.AcquireTimeoutAction.FAIL, // Fail If we take 
too long
+                new PoolChannelHandler(host, port, enableTls, sslContext),
+                ChannelHealthChecker.ACTIVE,
+                FixedChannelPool.AcquireTimeoutAction.FAIL,
                 poolConfig.getAcquireTimeoutMillis(),
                 poolConfig.getMaxConnections(),
                 poolConfig.getMaxPendingAcquires());
+
         log.info("Connection pool initialized with max connections: {}", 
poolConfig.getMaxConnections());
-        return CompletableFuture.completedFuture(null);
+    }
+
+    /**
+     * Validates server reachability by eagerly acquiring and releasing one 
connection.
+     */
+    public CompletableFuture<Void> connect() {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        channelPool.acquire().addListener((FutureListener<Channel>) f -> {
+            if (f.isSuccess()) {
+                channelPool.release(f.getNow());
+                future.complete(null);
+            } else {
+                future.completeExceptionally(f.cause());
+            }
+        });
+        return future;
     }
 
     public <T> CompletableFuture<T> exchangeForEntity(
@@ -205,131 +174,134 @@ public class AsyncTcpConnection {
     }
 
     public CompletableFuture<Void> sendAndRelease(CommandCode commandCode, 
ByteBuf payload) {
-        return send(commandCode, payload).thenAccept(response -> 
response.release());
+        return send(commandCode, payload).thenAccept(ByteBuf::release);
     }
 
     public CompletableFuture<ByteBuf> send(CommandCode commandCode, ByteBuf 
payload) {
         return send(commandCode.getValue(), payload);
     }
 
-    /**
-     * Sends a command asynchronously and returns the response.
-     * Uses Netty's EventLoop to ensure thread-safe sequential request 
processing with FIFO response matching.
-     */
     public CompletableFuture<ByteBuf> send(int commandCode, ByteBuf payload) {
-        if (isClosed.get()) {
-            return CompletableFuture.failedFuture(
-                    new IggyNotConnectedException("Connection not established 
or closed"));
-        }
-        if (channelPool == null) {
-            return CompletableFuture.failedFuture(
-                    new IggyNotConnectedException("Connection not established 
or closed"));
-        }
-
         captureLoginPayloadIfNeeded(commandCode, payload);
         CompletableFuture<ByteBuf> responseFuture = new CompletableFuture<>();
 
         channelPool.acquire().addListener((FutureListener<Channel>) f -> {
             if (!f.isSuccess()) {
-                responseFuture.completeExceptionally(f.cause());
+                payload.release();
+                
responseFuture.completeExceptionally(mapAcquireException(f.cause()));
                 return;
             }
 
             Channel channel = f.getNow();
-            if (Boolean.FALSE.equals(isAuthenticated.get())) {
-                IggyAuthenticator.setAuthAttribute(channel, isAuthenticated);
-            }
-            CompletableFuture<Void> authStep;
             boolean isLoginOp = (commandCode == 
CommandCode.User.LOGIN.getValue()
                     || commandCode == 
CommandCode.PersonalAccessToken.LOGIN.getValue());
 
+            responseFuture.handle((res, ex) -> {
+                handlePostResponse(channel, commandCode, isLoginOp, ex);
+                return null;
+            });
+
+            CompletableFuture<Void> authStep;
             if (isLoginOp) {
                 authStep = CompletableFuture.completedFuture(null);
+            } else if (!authenticated) {
+                payload.release();
+                responseFuture.completeExceptionally(
+                        new IggyNotConnectedException("Not authenticated, call 
login first"));
+                return;
             } else {
-                if (loginPayload == null) {
-                    responseFuture.completeExceptionally(new 
IggyNotConnectedException("Login First"));
+                ByteBuf loginPayloadCopy = getLoginPayloadCopy();
+                if (loginPayloadCopy == null) {
+                    payload.release();
+                    responseFuture.completeExceptionally(
+                            new IggyNotConnectedException("Not authenticated, 
call login first"));
+                    return;
                 }
                 authStep = IggyAuthenticator.ensureAuthenticated(
-                        channel, loginPayload.retainedDuplicate(), 
CommandCode.User.LOGIN.getValue());
+                        channel, loginPayloadCopy, loginCommandCode, 
authGeneration);
             }
 
             authStep.thenRun(() -> sendFrame(channel, payload, commandCode, 
responseFuture))
                     .exceptionally(ex -> {
-                        payload.release();
                         responseFuture.completeExceptionally(ex);
                         return null;
                     });
-
-            responseFuture.handle((res, ex) -> {
-                handlePostResponse(channel, commandCode, isLoginOp, ex);
-                return null;
-            });
         });
 
         return responseFuture;
     }
 
+    private static Throwable mapAcquireException(Throwable cause) {
+        if (cause instanceof IllegalStateException) {
+            return new IggyNotConnectedException("Connection pool is closed");
+        }
+        return cause;
+    }
+
     private void sendFrame(
             Channel channel, ByteBuf payload, int commandCode, 
CompletableFuture<ByteBuf> responseFuture) {
         try {
-
             IggyResponseHandler handler = 
channel.pipeline().get(IggyResponseHandler.class);
             if (handler == null) {
                 throw new IggyClientException("Channel missing 
IggyResponseHandler");
             }
 
-            // Enqueuing request so handler knows who to call back;
             handler.enqueueRequest(responseFuture);
-
             ByteBuf frame = IggyFrameEncoder.encode(channel.alloc(), 
commandCode, payload);
 
-            payload.release();
-
-            // Send the frame
             channel.writeAndFlush(frame).addListener((ChannelFutureListener) 
future -> {
                 if (!future.isSuccess()) {
                     log.error("Failed to send frame: {}", 
future.cause().getMessage());
-                    frame.release();
-                    channel.close();
                     responseFuture.completeExceptionally(future.cause());
                 } else {
                     log.trace("Frame sent successfully to {}", 
channel.remoteAddress());
                 }
             });
-
         } catch (RuntimeException e) {
             responseFuture.completeExceptionally(e);
+        } finally {
+            payload.release();
         }
     }
 
     private void handlePostResponse(Channel channel, int commandCode, boolean 
isLoginOp, Throwable ex) {
         if (isLoginOp) {
             if (ex == null) {
-                isAuthenticated.set(true);
+                authenticated = true;
+                long generation = authGeneration.incrementAndGet();
+                IggyAuthenticator.setAuthGeneration(channel, generation);
             } else {
                 releaseLoginPayload();
             }
         }
         if (commandCode == CommandCode.User.LOGOUT.getValue()) {
-            isAuthenticated.set(false);
-            IggyAuthenticator.setAuthAttribute(channel, isAuthenticated);
-        }
-        if (channelPool != null) {
-            channelPool.release(channel);
+            authenticated = false;
+            authGeneration.incrementAndGet();
+            IggyAuthenticator.clearAuthGeneration(channel);
         }
+        channelPool.release(channel);
     }
 
     private void captureLoginPayloadIfNeeded(int commandCode, ByteBuf payload) 
{
-        if (commandCode == CommandCode.User.LOGIN.getValue() || commandCode == 
CommandCode.User.UPDATE.getValue()) {
-            updateLoginPayload(payload);
+        if (commandCode == CommandCode.User.LOGIN.getValue()
+                || commandCode == 
CommandCode.PersonalAccessToken.LOGIN.getValue()) {
+            updateLoginPayload(commandCode, payload);
         }
     }
 
-    private synchronized void updateLoginPayload(ByteBuf payload) {
+    private synchronized void updateLoginPayload(int commandCode, ByteBuf 
payload) {
         if (this.loginPayload != null) {
             loginPayload.release();
         }
         this.loginPayload = payload.retainedSlice();
+        this.loginCommandCode = commandCode;
+    }
+
+    private synchronized ByteBuf getLoginPayloadCopy() {
+        if (this.loginPayload != null) {
+            return loginPayload.retainedDuplicate();
+        }
+        return null;
     }
 
     private synchronized void releaseLoginPayload() {
@@ -339,41 +311,49 @@ public class AsyncTcpConnection {
         }
     }
 
-    /**
-     * Closes the connection and releases resources.
-     */
     public CompletableFuture<Void> close() {
-        if (isClosed.compareAndSet(false, true)) {
-            if (channelPool != null) {
-                channelPool.close();
-            }
-            CompletableFuture<Void> shutdownFuture = new CompletableFuture<>();
-            eventLoopGroup.shutdownGracefully().addListener(f -> {
-                if (f.isSuccess()) {
-                    shutdownFuture.complete(null);
-                } else {
-                    shutdownFuture.completeExceptionally(null);
-                }
-            });
-            return shutdownFuture;
+        if (!isClosed.compareAndSet(false, true)) {
+            return CompletableFuture.completedFuture(null);
         }
-        return CompletableFuture.completedFuture(null);
+        releaseLoginPayload();
+        channelPool.close();
+        CompletableFuture<Void> shutdownFuture = new CompletableFuture<>();
+        eventLoopGroup.shutdownGracefully().addListener(f -> {
+            if (f.isSuccess()) {
+                shutdownFuture.complete(null);
+            } else {
+                shutdownFuture.completeExceptionally(f.cause());
+            }
+        });
+        return shutdownFuture;
     }
 
-    /**
-     * Response handler that correlates responses with requests.
-     */
-    public static class IggyResponseHandler extends 
SimpleChannelInboundHandler<ByteBuf> {
-        private final Queue<CompletableFuture<ByteBuf>> responseQueue = new 
ConcurrentLinkedQueue<>();
-        private SimpleChannelPool pool;
-
-        public IggyResponseHandler() {
-            this.pool = null;
+    private static final class PoolChannelHandler extends 
AbstractChannelPoolHandler {
+        private final String host;
+        private final int port;
+        private final boolean enableTls;
+        private final SslContext sslContext;
+
+        PoolChannelHandler(String host, int port, boolean enableTls, 
SslContext sslContext) {
+            this.host = host;
+            this.port = port;
+            this.enableTls = enableTls;
+            this.sslContext = sslContext;
         }
 
-        public void setPool(SimpleChannelPool pool) {
-            this.pool = pool;
+        @Override
+        public void channelCreated(Channel ch) {
+            ChannelPipeline pipeline = ch.pipeline();
+            if (enableTls) {
+                pipeline.addLast("ssl", sslContext.newHandler(ch.alloc(), 
host, port));
+            }
+            pipeline.addLast("frameDecoder", new IggyFrameDecoder());
+            pipeline.addLast("responseHandler", new IggyResponseHandler());
         }
+    }
+
+    public static class IggyResponseHandler extends 
SimpleChannelInboundHandler<ByteBuf> {
+        private final Queue<CompletableFuture<ByteBuf>> responseQueue = new 
ConcurrentLinkedQueue<>();
 
         public void enqueueRequest(CompletableFuture<ByteBuf> future) {
             responseQueue.add(future);
@@ -381,20 +361,15 @@ public class AsyncTcpConnection {
 
         @Override
         protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
-            // Read response header (status and length only - no request ID)
             int status = msg.readIntLE();
             int length = msg.readIntLE();
 
             CompletableFuture<ByteBuf> future = responseQueue.poll();
 
             if (future != null) {
-
                 if (status == 0) {
-                    // Success - pass the remaining buffer as response
                     future.complete(msg.retainedSlice());
                 } else {
-                    // Error - the payload contains the error message
-
                     byte[] errorBytes = length > 0 ? new byte[length] : new 
byte[0];
                     msg.readBytes(errorBytes);
                     
future.completeExceptionally(IggyServerException.fromTcpResponse(status, 
errorBytes));
@@ -408,14 +383,10 @@ public class AsyncTcpConnection {
 
         @Override
         public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
cause) {
-            // If the connection dies, fail ALL waiting requests for this 
connection
             CompletableFuture<ByteBuf> f;
             while ((f = responseQueue.poll()) != null) {
                 f.completeExceptionally(cause);
             }
-            if (pool != null) {
-                pool.release(ctx.channel());
-            }
             ctx.close();
         }
     }
@@ -450,7 +421,6 @@ public class AsyncTcpConnection {
             return this.acquireTimeoutMillis;
         }
 
-        // Builder Class for TCPConnectionPoolConfig
         public static final class Builder {
             public static final int DEFAULT_MAX_CONNECTION = 5;
             public static final int DEFAULT_MAX_PENDING_ACQUIRES = 1000;
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/IggyAuthenticator.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/IggyAuthenticator.java
index b11f17ba5..71bfcdb60 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/IggyAuthenticator.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/IggyAuthenticator.java
@@ -24,27 +24,42 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.util.AttributeKey;
 import org.apache.iggy.client.async.tcp.AsyncTcpConnection.IggyResponseHandler;
-import org.apache.iggy.exception.IggyAuthenticationException;
+import org.apache.iggy.exception.IggyNotConnectedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
-public final class IggyAuthenticator {
-    private static final AttributeKey<Boolean> AUTH_KEY = 
AttributeKey.valueOf("AUTH_KEY");
+final class IggyAuthenticator {
     private static final Logger log = 
LoggerFactory.getLogger(IggyAuthenticator.class);
+    private static final AttributeKey<Long> AUTH_GENERATION_KEY = 
AttributeKey.valueOf("AUTH_GENERATION");
 
     private IggyAuthenticator() {}
 
-    public static CompletableFuture<Void> ensureAuthenticated(Channel channel, 
ByteBuf loginPayload, int commandCode) {
-        Boolean isAuth = channel.attr(AUTH_KEY).get();
-        if (Boolean.TRUE.equals(isAuth)) {
+    /**
+     * Ensures the channel is authenticated for the current authentication 
generation.
+     * If the channel's stored generation matches the current one, it is 
already authenticated.
+     * Otherwise, sends a login command on the channel and updates the 
generation on success.
+     *
+     * @param channel           the channel to authenticate
+     * @param loginPayload      the login payload to send (will be released by 
this method)
+     * @param commandCode       the login command code
+     * @param currentGeneration the current authentication generation counter
+     * @return a future that completes when authentication is done
+     */
+    static CompletableFuture<Void> ensureAuthenticated(
+            Channel channel, ByteBuf loginPayload, int commandCode, AtomicLong 
currentGeneration) {
+        Long channelGeneration = channel.attr(AUTH_GENERATION_KEY).get();
+        long requiredGeneration = currentGeneration.get();
+
+        if (channelGeneration != null && channelGeneration == 
requiredGeneration) {
+            loginPayload.release();
             return CompletableFuture.completedFuture(null);
         }
-        if (loginPayload.equals(null)) {
-            return CompletableFuture.failedFuture(
-                    new IggyAuthenticationException(null, commandCode, "login 
first", null, null));
+
+        if (loginPayload == null) {
+            return CompletableFuture.failedFuture(new 
IggyNotConnectedException("Not authenticated, call login first"));
         }
 
         CompletableFuture<ByteBuf> loginFuture = new CompletableFuture<>();
@@ -54,14 +69,13 @@ public final class IggyAuthenticator {
         loginPayload.release();
         channel.writeAndFlush(frame).addListener((ChannelFutureListener) f -> {
             if (!f.isSuccess()) {
-                frame.release();
                 loginFuture.completeExceptionally(f.cause());
             }
         });
 
         return loginFuture.thenAccept(result -> {
             try {
-                channel.attr(AUTH_KEY).set(true);
+                channel.attr(AUTH_GENERATION_KEY).set(currentGeneration.get());
                 log.debug("Channel {} authenticated successfully", 
channel.id());
             } finally {
                 result.release();
@@ -69,11 +83,11 @@ public final class IggyAuthenticator {
         });
     }
 
-    public static void setAuthAttribute(Channel channel, AtomicBoolean value) {
-        channel.attr(AUTH_KEY).set(value.get());
+    static void setAuthGeneration(Channel channel, long generation) {
+        channel.attr(AUTH_GENERATION_KEY).set(generation);
     }
 
-    public static Boolean getAuthAttribute(Channel channel) {
-        return channel.attr(AUTH_KEY).get();
+    static void clearAuthGeneration(Channel channel) {
+        channel.attr(AUTH_GENERATION_KEY).set(null);
     }
 }
diff --git 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/IggyFrameEncoder.java
 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/IggyFrameEncoder.java
index b26b31743..87f0eb479 100644
--- 
a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/IggyFrameEncoder.java
+++ 
b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/IggyFrameEncoder.java
@@ -24,26 +24,24 @@ import io.netty.buffer.ByteBufAllocator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public final class IggyFrameEncoder {
+final class IggyFrameEncoder {
     private static final Logger log = 
LoggerFactory.getLogger(IggyFrameEncoder.class);
 
     private IggyFrameEncoder() {}
 
-    public static ByteBuf encode(ByteBufAllocator alloc, int commandCode, 
ByteBuf payload) {
-
-        // Build the request frame exactly like the blocking client
-        // Frame format: [payload_size:4][command:4][payload:N]
-        // where payload_size = 4 (command size) + N (payload size)
+    /**
+     * Encodes a command into the Iggy TCP frame format: 
[payload_size:4][command:4][payload:N]
+     */
+    static ByteBuf encode(ByteBufAllocator alloc, int commandCode, ByteBuf 
payload) {
         int payloadSize = payload.readableBytes();
-        int framePayloadSize = 4 + payloadSize; // command (4 bytes) + payload
+        int framePayloadSize = 4 + payloadSize;
         ByteBuf frame = alloc.buffer(4 + framePayloadSize);
-        frame.writeIntLE(framePayloadSize); // Length field (includes command)
-        frame.writeIntLE(commandCode); // Command
-        frame.writeBytes(payload, payload.readerIndex(), payloadSize); // 
Payload
+        frame.writeIntLE(framePayloadSize);
+        frame.writeIntLE(commandCode);
+        frame.writeBytes(payload);
 
-        // Debug: print frame bytes
-        byte[] frameBytes = new byte[Math.min(frame.readableBytes(), 30)];
         if (log.isTraceEnabled()) {
+            byte[] frameBytes = new byte[Math.min(frame.readableBytes(), 30)];
             frame.getBytes(0, frameBytes);
             StringBuilder hex = new StringBuilder();
             for (byte b : frameBytes) {
@@ -55,7 +53,7 @@ public final class IggyFrameEncoder {
                     payloadSize,
                     framePayloadSize,
                     frame.readableBytes());
-            log.trace("Frame bytes (hex): {}", hex.toString());
+            log.trace("Frame bytes (hex): {}", hex);
         }
 
         return frame;
diff --git 
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncConnectionPoolAuthTest.java
 
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncConnectionPoolAuthTest.java
new file mode 100644
index 000000000..ce7fd59f6
--- /dev/null
+++ 
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncConnectionPoolAuthTest.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iggy.client.async;
+
+import org.apache.iggy.client.BaseIntegrationTest;
+import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient;
+import org.apache.iggy.exception.IggyNotConnectedException;
+import org.apache.iggy.identifier.StreamId;
+import org.apache.iggy.message.Message;
+import org.apache.iggy.message.Partitioning;
+import org.apache.iggy.topic.CompressionAlgorithm;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Integration tests for connection pool authentication lifecycle.
+ * Verifies that lazy per-channel authentication works correctly across
+ * login, logout, and re-login cycles with a pooled connection.
+ */
+@DisplayName("Connection Pool Authentication")
+class AsyncConnectionPoolAuthTest extends BaseIntegrationTest {
+    private static final Logger log = 
LoggerFactory.getLogger(AsyncConnectionPoolAuthTest.class);
+
+    private static final String USERNAME = "iggy";
+    private static final String PASSWORD = "iggy";
+    private static final int POOL_SIZE = 3;
+
+    private AsyncIggyTcpClient client;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        client = AsyncIggyTcpClient.builder()
+                .host(serverHost())
+                .port(serverTcpPort())
+                .connectionPoolSize(POOL_SIZE)
+                .build();
+        client.connect().get(5, TimeUnit.SECONDS);
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        if (client != null) {
+            client.close().get(5, TimeUnit.SECONDS);
+        }
+    }
+
+    @Test
+    @DisplayName("should reject commands before login")
+    void shouldRejectCommandsBeforeLogin() {
+        // when/then
+        assertThatThrownBy(() -> client.streams().getStreams().get(5, 
TimeUnit.SECONDS))
+                .isInstanceOf(ExecutionException.class)
+                .hasCauseInstanceOf(IggyNotConnectedException.class);
+    }
+
+    @Test
+    @DisplayName("should execute commands after login")
+    void shouldExecuteCommandsAfterLogin() throws Exception {
+        // given
+        client.users().login(USERNAME, PASSWORD).get(5, TimeUnit.SECONDS);
+
+        // when
+        var streams = client.streams().getStreams().get(5, TimeUnit.SECONDS);
+
+        // then
+        assertThat(streams).isNotNull();
+    }
+
+    @Test
+    @DisplayName("should login, logout, and re-login successfully")
+    void shouldLoginLogoutAndReLogin() throws Exception {
+        // given
+        String streamName = "auth-test-" + UUID.randomUUID();
+
+        // when - first login
+        client.users().login(USERNAME, PASSWORD).get(5, TimeUnit.SECONDS);
+        var streamsAfterLogin = client.streams().getStreams().get(5, 
TimeUnit.SECONDS);
+
+        // then
+        assertThat(streamsAfterLogin).isNotNull();
+        log.info("First login successful, got {} streams", 
streamsAfterLogin.size());
+
+        // when - create a stream to verify full functionality
+        var stream = client.streams().createStream(streamName).get(5, 
TimeUnit.SECONDS);
+        assertThat(stream).isNotNull();
+        assertThat(stream.name()).isEqualTo(streamName);
+
+        // when - logout
+        client.users().logout().get(5, TimeUnit.SECONDS);
+        log.info("Logout successful");
+
+        // then - commands should fail after logout
+        assertThatThrownBy(() -> client.streams().getStreams().get(5, 
TimeUnit.SECONDS))
+                .isInstanceOf(ExecutionException.class)
+                .hasCauseInstanceOf(IggyNotConnectedException.class);
+        log.info("Commands correctly rejected after logout");
+
+        // when - re-login
+        client.users().login(USERNAME, PASSWORD).get(5, TimeUnit.SECONDS);
+        log.info("Re-login successful");
+
+        // then - commands should work again
+        var streamsAfterReLogin = client.streams().getStreams().get(5, 
TimeUnit.SECONDS);
+        assertThat(streamsAfterReLogin).isNotNull();
+
+        // cleanup
+        client.streams().deleteStream(StreamId.of(streamName)).get(5, 
TimeUnit.SECONDS);
+    }
+
+    @Test
+    @DisplayName("should authenticate all pool channels lazily via concurrent 
requests")
+    void shouldAuthenticatePoolChannelsLazily() throws Exception {
+        // given
+        String streamName = "pool-auth-test-" + UUID.randomUUID();
+        client.users().login(USERNAME, PASSWORD).get(5, TimeUnit.SECONDS);
+
+        client.streams().createStream(streamName).get(5, TimeUnit.SECONDS);
+        client.topics()
+                .createTopic(
+                        StreamId.of(streamName),
+                        1L,
+                        CompressionAlgorithm.None,
+                        BigInteger.ZERO,
+                        BigInteger.ZERO,
+                        Optional.empty(),
+                        "test-topic")
+                .get(5, TimeUnit.SECONDS);
+
+        // when - fire more concurrent requests than the pool size to force
+        // multiple channels to be created and lazily authenticated
+        int concurrentRequests = POOL_SIZE * 3;
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        for (int i = 0; i < concurrentRequests; i++) {
+            var future = client.messages()
+                    .sendMessages(
+                            StreamId.of(streamName),
+                            
org.apache.iggy.identifier.TopicId.of("test-topic"),
+                            Partitioning.partitionId(0L),
+                            List.of(Message.of("msg-" + i)));
+            futures.add(future);
+        }
+
+        // then - all requests should complete successfully
+        CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[0])).get(15, TimeUnit.SECONDS);
+        log.info("All {} concurrent requests completed successfully", 
concurrentRequests);
+
+        // cleanup
+        client.streams().deleteStream(StreamId.of(streamName)).get(5, 
TimeUnit.SECONDS);
+    }
+
+    @Test
+    @DisplayName("should re-authenticate stale channels after logout and 
re-login")
+    void shouldReAuthenticateStaleChannelsAfterReLogin() throws Exception {
+        // given - login and warm up multiple pool channels
+        String streamName = "reauth-test-" + UUID.randomUUID();
+        client.users().login(USERNAME, PASSWORD).get(5, TimeUnit.SECONDS);
+
+        client.streams().createStream(streamName).get(5, TimeUnit.SECONDS);
+        client.topics()
+                .createTopic(
+                        StreamId.of(streamName),
+                        1L,
+                        CompressionAlgorithm.None,
+                        BigInteger.ZERO,
+                        BigInteger.ZERO,
+                        Optional.empty(),
+                        "test-topic")
+                .get(5, TimeUnit.SECONDS);
+
+        List<CompletableFuture<Void>> warmupFutures = new ArrayList<>();
+        for (int i = 0; i < POOL_SIZE * 2; i++) {
+            warmupFutures.add(client.messages()
+                    .sendMessages(
+                            StreamId.of(streamName),
+                            
org.apache.iggy.identifier.TopicId.of("test-topic"),
+                            Partitioning.partitionId(0L),
+                            List.of(Message.of("warmup-" + i))));
+        }
+        CompletableFuture.allOf(warmupFutures.toArray(new 
CompletableFuture[0])).get(10, TimeUnit.SECONDS);
+        log.info("Pool warmed up with {} requests", warmupFutures.size());
+
+        // when - logout and re-login (invalidates all channel auth 
generations)
+        client.users().logout().get(5, TimeUnit.SECONDS);
+        client.users().login(USERNAME, PASSWORD).get(5, TimeUnit.SECONDS);
+        log.info("Logout + re-login complete");
+
+        // then - all channels should re-authenticate transparently
+        List<CompletableFuture<Void>> postReLoginFutures = new ArrayList<>();
+        for (int i = 0; i < POOL_SIZE * 2; i++) {
+            postReLoginFutures.add(client.messages()
+                    .sendMessages(
+                            StreamId.of(streamName),
+                            
org.apache.iggy.identifier.TopicId.of("test-topic"),
+                            Partitioning.partitionId(0L),
+                            List.of(Message.of("after-relogin-" + i))));
+        }
+        CompletableFuture.allOf(postReLoginFutures.toArray(new 
CompletableFuture[0]))
+                .get(15, TimeUnit.SECONDS);
+        log.info("All {} post-re-login requests succeeded", 
postReLoginFutures.size());
+
+        // cleanup
+        client.streams().deleteStream(StreamId.of(streamName)).get(5, 
TimeUnit.SECONDS);
+    }
+
+    @Test
+    @DisplayName("should handle multiple sequential login-logout cycles")
+    void shouldHandleMultipleLoginLogoutCycles() throws Exception {
+        // given
+        int cycles = 3;
+
+        for (int i = 0; i < cycles; i++) {
+            // when - login
+            client.users().login(USERNAME, PASSWORD).get(5, TimeUnit.SECONDS);
+
+            // then - verify commands work
+            var streams = client.streams().getStreams().get(5, 
TimeUnit.SECONDS);
+            assertThat(streams).isNotNull();
+            log.info("Cycle {}: login and command succeeded", i + 1);
+
+            // when - logout
+            client.users().logout().get(5, TimeUnit.SECONDS);
+
+            // then - verify commands are rejected
+            assertThatThrownBy(() -> client.streams().getStreams().get(5, 
TimeUnit.SECONDS))
+                    .isInstanceOf(ExecutionException.class)
+                    .hasCauseInstanceOf(IggyNotConnectedException.class);
+            log.info("Cycle {}: logout and rejection verified", i + 1);
+        }
+    }
+}


Reply via email to