This is an automated email from the ASF dual-hosted git repository. maciej pushed a commit to branch java-connection-pool in repository https://gitbox.apache.org/repos/asf/iggy.git
commit fa0f5db3189f7a561fbe5859f0ccf6a11398b318 Author: Maciej Modzelewski <[email protected]> AuthorDate: Tue Mar 10 08:44:28 2026 +0100 refactor(java): use generation-based auth for pooled channels Pooled connections authenticated with a simple boolean flag failed to re-authenticate after logout/re-login cycles. Channels from the prior session appeared authenticated but held stale server-side state, causing silent failures. Replace the boolean auth attribute with an AtomicLong generation counter. Each login increments the generation; logout invalidates all channels by advancing it. Channels compare their stored generation on acquire and transparently re-authenticate when stale. Additional cleanup: move pool initialization into the constructor with eager connectivity validation, extract PoolChannelHandler as a static inner class, fix payload leak on acquire failure by releasing in finally, fix login capture to include PAT login (was incorrectly capturing UPDATE), reduce IggyAuthenticator/FrameEncoder visibility to package-private, and migrate to Netty 5 IoEventLoopGroup API. --- .../iggy/client/async/tcp/AsyncIggyTcpClient.java | 4 +- .../iggy/client/async/tcp/AsyncTcpConnection.java | 278 +++++++++------------ .../iggy/client/async/tcp/IggyAuthenticator.java | 46 ++-- .../iggy/client/async/tcp/IggyFrameEncoder.java | 24 +- .../client/async/AsyncConnectionPoolAuthTest.java | 263 +++++++++++++++++++ 5 files changed, 430 insertions(+), 185 deletions(-) diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java index 04e5e1d28..cdd2c6716 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java @@ -171,7 +171,6 @@ public class AsyncIggyTcpClient { connectionPoolSize.ifPresent(poolConfigBuilder::setMaxConnections); acquireTimeout.ifPresent(timeout -> poolConfigBuilder.setAcquireTimeoutMillis(timeout.toMillis())); TCPConnectionPoolConfig poolConfig = poolConfigBuilder.build(); - // TCPConnectionPoolConfig poolConfig = new TCPConnectionPoolConfig(); connection = new AsyncTcpConnection(host, port, enableTls, tlsCertificate, poolConfig); return connection.connect().thenRun(() -> { messagesClient = new MessagesTcpClient(connection); @@ -195,7 +194,7 @@ public class AsyncIggyTcpClient { * {@link UsersClient#login(String, String)} instead. * * @return a {@link CompletableFuture} that completes with the user's - * {@link IdentityInfo} on success + * {@link IdentityInfo} on success * @throws IggyMissingCredentialsException if no credentials were provided at build time * @throws IggyNotConnectedException if {@link #connect()} has not been called */ @@ -221,6 +220,7 @@ public class AsyncIggyTcpClient { } return usersClient; } + /** * Returns the async messages client for producing and consuming messages. * diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java index ca0a1f2f4..20b49f0f5 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java @@ -26,13 +26,13 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; -import io.netty.channel.EventLoopGroup; +import io.netty.channel.IoEventLoopGroup; +import io.netty.channel.MultiThreadIoEventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.nio.NioIoHandler; import io.netty.channel.pool.AbstractChannelPoolHandler; import io.netty.channel.pool.ChannelHealthChecker; import io.netty.channel.pool.FixedChannelPool; -import io.netty.channel.pool.SimpleChannelPool; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; @@ -56,6 +56,7 @@ import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; /** @@ -64,23 +65,15 @@ import java.util.function.Function; */ public class AsyncTcpConnection { private static final Logger log = LoggerFactory.getLogger(AsyncTcpConnection.class); - private final String host; - private final int port; - private final boolean enableTls; - private final Optional<File> tlsCertificate; - private final SslContext sslContext; - private final EventLoopGroup eventLoopGroup; - private final Bootstrap bootstrap; - private SimpleChannelPool channelPool; - private final TCPConnectionPoolConfig poolConfig; - private ByteBuf loginPayload; - private AtomicBoolean isAuthenticated = new AtomicBoolean(false); + private final IoEventLoopGroup eventLoopGroup; + private final FixedChannelPool channelPool; private final AtomicBoolean isClosed = new AtomicBoolean(false); + private final AtomicLong authGeneration = new AtomicLong(0); + private ByteBuf loginPayload; - public AsyncTcpConnection(String host, int port) { - this(host, port, false, Optional.empty(), new TCPConnectionPoolConfig()); - } + private volatile int loginCommandCode; + private volatile boolean authenticated = false; public AsyncTcpConnection( String host, @@ -88,77 +81,53 @@ public class AsyncTcpConnection { boolean enableTls, Optional<File> tlsCertificate, TCPConnectionPoolConfig poolConfig) { - this.host = host; - this.port = port; - this.enableTls = enableTls; - this.tlsCertificate = tlsCertificate; - this.poolConfig = poolConfig; - this.eventLoopGroup = new NioEventLoopGroup(); - this.bootstrap = new Bootstrap(); - - if (this.enableTls) { + this.eventLoopGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory()); + + SslContext sslContext = null; + if (enableTls) { try { - SslContextBuilder builder = SslContextBuilder.forClient(); - this.tlsCertificate.ifPresent(builder::trustManager); - this.sslContext = builder.build(); + SslContextBuilder sslBuilder = SslContextBuilder.forClient(); + tlsCertificate.ifPresent(sslBuilder::trustManager); + sslContext = sslBuilder.build(); } catch (SSLException e) { throw new IggyTlsException("Failed to build SSL context for AsyncTcpConnection", e); } - } else { - this.sslContext = null; } - configureBootstrap(); - } - private void configureBootstrap() { - bootstrap + var bootstrap = new Bootstrap() .group(eventLoopGroup) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000) .option(ChannelOption.SO_KEEPALIVE, true) - .remoteAddress(this.host, this.port); - } - - /** - * Initialises Connection pool. - */ - public CompletableFuture<Void> connect() { - if (isClosed.get()) { - return CompletableFuture.failedFuture(new IggyClientException("Client is Closed")); - } - AbstractChannelPoolHandler poolHandler = new AbstractChannelPoolHandler() { - @Override - public void channelCreated(Channel ch) { - ChannelPipeline pipeline = ch.pipeline(); - if (enableTls) { - // adding ssl if ssl enabled - pipeline.addLast("ssl", sslContext.newHandler(ch.alloc(), host, port)); - } - // Adding the FrameDecoder to end of channel pipeline - pipeline.addLast("frameDecoder", new IggyFrameDecoder()); - - // Adding Response Handler Now Stateful - pipeline.addLast("responseHandler", new IggyResponseHandler()); - } - - @Override - public void channelAcquired(Channel ch) { - IggyResponseHandler handler = ch.pipeline().get(IggyResponseHandler.class); - handler.setPool(channelPool); - } - }; + .remoteAddress(host, port); this.channelPool = new FixedChannelPool( bootstrap, - poolHandler, - ChannelHealthChecker.ACTIVE, // Check If the connection is Active Before Lending - FixedChannelPool.AcquireTimeoutAction.FAIL, // Fail If we take too long + new PoolChannelHandler(host, port, enableTls, sslContext), + ChannelHealthChecker.ACTIVE, + FixedChannelPool.AcquireTimeoutAction.FAIL, poolConfig.getAcquireTimeoutMillis(), poolConfig.getMaxConnections(), poolConfig.getMaxPendingAcquires()); + log.info("Connection pool initialized with max connections: {}", poolConfig.getMaxConnections()); - return CompletableFuture.completedFuture(null); + } + + /** + * Validates server reachability by eagerly acquiring and releasing one connection. + */ + public CompletableFuture<Void> connect() { + CompletableFuture<Void> future = new CompletableFuture<>(); + channelPool.acquire().addListener((FutureListener<Channel>) f -> { + if (f.isSuccess()) { + channelPool.release(f.getNow()); + future.complete(null); + } else { + future.completeExceptionally(f.cause()); + } + }); + return future; } public <T> CompletableFuture<T> exchangeForEntity( @@ -205,131 +174,134 @@ public class AsyncTcpConnection { } public CompletableFuture<Void> sendAndRelease(CommandCode commandCode, ByteBuf payload) { - return send(commandCode, payload).thenAccept(response -> response.release()); + return send(commandCode, payload).thenAccept(ByteBuf::release); } public CompletableFuture<ByteBuf> send(CommandCode commandCode, ByteBuf payload) { return send(commandCode.getValue(), payload); } - /** - * Sends a command asynchronously and returns the response. - * Uses Netty's EventLoop to ensure thread-safe sequential request processing with FIFO response matching. - */ public CompletableFuture<ByteBuf> send(int commandCode, ByteBuf payload) { - if (isClosed.get()) { - return CompletableFuture.failedFuture( - new IggyNotConnectedException("Connection not established or closed")); - } - if (channelPool == null) { - return CompletableFuture.failedFuture( - new IggyNotConnectedException("Connection not established or closed")); - } - captureLoginPayloadIfNeeded(commandCode, payload); CompletableFuture<ByteBuf> responseFuture = new CompletableFuture<>(); channelPool.acquire().addListener((FutureListener<Channel>) f -> { if (!f.isSuccess()) { - responseFuture.completeExceptionally(f.cause()); + payload.release(); + responseFuture.completeExceptionally(mapAcquireException(f.cause())); return; } Channel channel = f.getNow(); - if (Boolean.FALSE.equals(isAuthenticated.get())) { - IggyAuthenticator.setAuthAttribute(channel, isAuthenticated); - } - CompletableFuture<Void> authStep; boolean isLoginOp = (commandCode == CommandCode.User.LOGIN.getValue() || commandCode == CommandCode.PersonalAccessToken.LOGIN.getValue()); + responseFuture.handle((res, ex) -> { + handlePostResponse(channel, commandCode, isLoginOp, ex); + return null; + }); + + CompletableFuture<Void> authStep; if (isLoginOp) { authStep = CompletableFuture.completedFuture(null); + } else if (!authenticated) { + payload.release(); + responseFuture.completeExceptionally( + new IggyNotConnectedException("Not authenticated, call login first")); + return; } else { - if (loginPayload == null) { - responseFuture.completeExceptionally(new IggyNotConnectedException("Login First")); + ByteBuf loginPayloadCopy = getLoginPayloadCopy(); + if (loginPayloadCopy == null) { + payload.release(); + responseFuture.completeExceptionally( + new IggyNotConnectedException("Not authenticated, call login first")); + return; } authStep = IggyAuthenticator.ensureAuthenticated( - channel, loginPayload.retainedDuplicate(), CommandCode.User.LOGIN.getValue()); + channel, loginPayloadCopy, loginCommandCode, authGeneration); } authStep.thenRun(() -> sendFrame(channel, payload, commandCode, responseFuture)) .exceptionally(ex -> { - payload.release(); responseFuture.completeExceptionally(ex); return null; }); - - responseFuture.handle((res, ex) -> { - handlePostResponse(channel, commandCode, isLoginOp, ex); - return null; - }); }); return responseFuture; } + private static Throwable mapAcquireException(Throwable cause) { + if (cause instanceof IllegalStateException) { + return new IggyNotConnectedException("Connection pool is closed"); + } + return cause; + } + private void sendFrame( Channel channel, ByteBuf payload, int commandCode, CompletableFuture<ByteBuf> responseFuture) { try { - IggyResponseHandler handler = channel.pipeline().get(IggyResponseHandler.class); if (handler == null) { throw new IggyClientException("Channel missing IggyResponseHandler"); } - // Enqueuing request so handler knows who to call back; handler.enqueueRequest(responseFuture); - ByteBuf frame = IggyFrameEncoder.encode(channel.alloc(), commandCode, payload); - payload.release(); - - // Send the frame channel.writeAndFlush(frame).addListener((ChannelFutureListener) future -> { if (!future.isSuccess()) { log.error("Failed to send frame: {}", future.cause().getMessage()); - frame.release(); - channel.close(); responseFuture.completeExceptionally(future.cause()); } else { log.trace("Frame sent successfully to {}", channel.remoteAddress()); } }); - } catch (RuntimeException e) { responseFuture.completeExceptionally(e); + } finally { + payload.release(); } } private void handlePostResponse(Channel channel, int commandCode, boolean isLoginOp, Throwable ex) { if (isLoginOp) { if (ex == null) { - isAuthenticated.set(true); + authenticated = true; + long generation = authGeneration.incrementAndGet(); + IggyAuthenticator.setAuthGeneration(channel, generation); } else { releaseLoginPayload(); } } if (commandCode == CommandCode.User.LOGOUT.getValue()) { - isAuthenticated.set(false); - IggyAuthenticator.setAuthAttribute(channel, isAuthenticated); - } - if (channelPool != null) { - channelPool.release(channel); + authenticated = false; + authGeneration.incrementAndGet(); + IggyAuthenticator.clearAuthGeneration(channel); } + channelPool.release(channel); } private void captureLoginPayloadIfNeeded(int commandCode, ByteBuf payload) { - if (commandCode == CommandCode.User.LOGIN.getValue() || commandCode == CommandCode.User.UPDATE.getValue()) { - updateLoginPayload(payload); + if (commandCode == CommandCode.User.LOGIN.getValue() + || commandCode == CommandCode.PersonalAccessToken.LOGIN.getValue()) { + updateLoginPayload(commandCode, payload); } } - private synchronized void updateLoginPayload(ByteBuf payload) { + private synchronized void updateLoginPayload(int commandCode, ByteBuf payload) { if (this.loginPayload != null) { loginPayload.release(); } this.loginPayload = payload.retainedSlice(); + this.loginCommandCode = commandCode; + } + + private synchronized ByteBuf getLoginPayloadCopy() { + if (this.loginPayload != null) { + return loginPayload.retainedDuplicate(); + } + return null; } private synchronized void releaseLoginPayload() { @@ -339,41 +311,49 @@ public class AsyncTcpConnection { } } - /** - * Closes the connection and releases resources. - */ public CompletableFuture<Void> close() { - if (isClosed.compareAndSet(false, true)) { - if (channelPool != null) { - channelPool.close(); - } - CompletableFuture<Void> shutdownFuture = new CompletableFuture<>(); - eventLoopGroup.shutdownGracefully().addListener(f -> { - if (f.isSuccess()) { - shutdownFuture.complete(null); - } else { - shutdownFuture.completeExceptionally(null); - } - }); - return shutdownFuture; + if (!isClosed.compareAndSet(false, true)) { + return CompletableFuture.completedFuture(null); } - return CompletableFuture.completedFuture(null); + releaseLoginPayload(); + channelPool.close(); + CompletableFuture<Void> shutdownFuture = new CompletableFuture<>(); + eventLoopGroup.shutdownGracefully().addListener(f -> { + if (f.isSuccess()) { + shutdownFuture.complete(null); + } else { + shutdownFuture.completeExceptionally(f.cause()); + } + }); + return shutdownFuture; } - /** - * Response handler that correlates responses with requests. - */ - public static class IggyResponseHandler extends SimpleChannelInboundHandler<ByteBuf> { - private final Queue<CompletableFuture<ByteBuf>> responseQueue = new ConcurrentLinkedQueue<>(); - private SimpleChannelPool pool; - - public IggyResponseHandler() { - this.pool = null; + private static final class PoolChannelHandler extends AbstractChannelPoolHandler { + private final String host; + private final int port; + private final boolean enableTls; + private final SslContext sslContext; + + PoolChannelHandler(String host, int port, boolean enableTls, SslContext sslContext) { + this.host = host; + this.port = port; + this.enableTls = enableTls; + this.sslContext = sslContext; } - public void setPool(SimpleChannelPool pool) { - this.pool = pool; + @Override + public void channelCreated(Channel ch) { + ChannelPipeline pipeline = ch.pipeline(); + if (enableTls) { + pipeline.addLast("ssl", sslContext.newHandler(ch.alloc(), host, port)); + } + pipeline.addLast("frameDecoder", new IggyFrameDecoder()); + pipeline.addLast("responseHandler", new IggyResponseHandler()); } + } + + public static class IggyResponseHandler extends SimpleChannelInboundHandler<ByteBuf> { + private final Queue<CompletableFuture<ByteBuf>> responseQueue = new ConcurrentLinkedQueue<>(); public void enqueueRequest(CompletableFuture<ByteBuf> future) { responseQueue.add(future); @@ -381,20 +361,15 @@ public class AsyncTcpConnection { @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) { - // Read response header (status and length only - no request ID) int status = msg.readIntLE(); int length = msg.readIntLE(); CompletableFuture<ByteBuf> future = responseQueue.poll(); if (future != null) { - if (status == 0) { - // Success - pass the remaining buffer as response future.complete(msg.retainedSlice()); } else { - // Error - the payload contains the error message - byte[] errorBytes = length > 0 ? new byte[length] : new byte[0]; msg.readBytes(errorBytes); future.completeExceptionally(IggyServerException.fromTcpResponse(status, errorBytes)); @@ -408,14 +383,10 @@ public class AsyncTcpConnection { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - // If the connection dies, fail ALL waiting requests for this connection CompletableFuture<ByteBuf> f; while ((f = responseQueue.poll()) != null) { f.completeExceptionally(cause); } - if (pool != null) { - pool.release(ctx.channel()); - } ctx.close(); } } @@ -450,7 +421,6 @@ public class AsyncTcpConnection { return this.acquireTimeoutMillis; } - // Builder Class for TCPConnectionPoolConfig public static final class Builder { public static final int DEFAULT_MAX_CONNECTION = 5; public static final int DEFAULT_MAX_PENDING_ACQUIRES = 1000; diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/IggyAuthenticator.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/IggyAuthenticator.java index b11f17ba5..71bfcdb60 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/IggyAuthenticator.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/IggyAuthenticator.java @@ -24,27 +24,42 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; import io.netty.util.AttributeKey; import org.apache.iggy.client.async.tcp.AsyncTcpConnection.IggyResponseHandler; -import org.apache.iggy.exception.IggyAuthenticationException; +import org.apache.iggy.exception.IggyNotConnectedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; -public final class IggyAuthenticator { - private static final AttributeKey<Boolean> AUTH_KEY = AttributeKey.valueOf("AUTH_KEY"); +final class IggyAuthenticator { private static final Logger log = LoggerFactory.getLogger(IggyAuthenticator.class); + private static final AttributeKey<Long> AUTH_GENERATION_KEY = AttributeKey.valueOf("AUTH_GENERATION"); private IggyAuthenticator() {} - public static CompletableFuture<Void> ensureAuthenticated(Channel channel, ByteBuf loginPayload, int commandCode) { - Boolean isAuth = channel.attr(AUTH_KEY).get(); - if (Boolean.TRUE.equals(isAuth)) { + /** + * Ensures the channel is authenticated for the current authentication generation. + * If the channel's stored generation matches the current one, it is already authenticated. + * Otherwise, sends a login command on the channel and updates the generation on success. + * + * @param channel the channel to authenticate + * @param loginPayload the login payload to send (will be released by this method) + * @param commandCode the login command code + * @param currentGeneration the current authentication generation counter + * @return a future that completes when authentication is done + */ + static CompletableFuture<Void> ensureAuthenticated( + Channel channel, ByteBuf loginPayload, int commandCode, AtomicLong currentGeneration) { + Long channelGeneration = channel.attr(AUTH_GENERATION_KEY).get(); + long requiredGeneration = currentGeneration.get(); + + if (channelGeneration != null && channelGeneration == requiredGeneration) { + loginPayload.release(); return CompletableFuture.completedFuture(null); } - if (loginPayload.equals(null)) { - return CompletableFuture.failedFuture( - new IggyAuthenticationException(null, commandCode, "login first", null, null)); + + if (loginPayload == null) { + return CompletableFuture.failedFuture(new IggyNotConnectedException("Not authenticated, call login first")); } CompletableFuture<ByteBuf> loginFuture = new CompletableFuture<>(); @@ -54,14 +69,13 @@ public final class IggyAuthenticator { loginPayload.release(); channel.writeAndFlush(frame).addListener((ChannelFutureListener) f -> { if (!f.isSuccess()) { - frame.release(); loginFuture.completeExceptionally(f.cause()); } }); return loginFuture.thenAccept(result -> { try { - channel.attr(AUTH_KEY).set(true); + channel.attr(AUTH_GENERATION_KEY).set(currentGeneration.get()); log.debug("Channel {} authenticated successfully", channel.id()); } finally { result.release(); @@ -69,11 +83,11 @@ public final class IggyAuthenticator { }); } - public static void setAuthAttribute(Channel channel, AtomicBoolean value) { - channel.attr(AUTH_KEY).set(value.get()); + static void setAuthGeneration(Channel channel, long generation) { + channel.attr(AUTH_GENERATION_KEY).set(generation); } - public static Boolean getAuthAttribute(Channel channel) { - return channel.attr(AUTH_KEY).get(); + static void clearAuthGeneration(Channel channel) { + channel.attr(AUTH_GENERATION_KEY).set(null); } } diff --git a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/IggyFrameEncoder.java b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/IggyFrameEncoder.java index b26b31743..87f0eb479 100644 --- a/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/IggyFrameEncoder.java +++ b/foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/IggyFrameEncoder.java @@ -24,26 +24,24 @@ import io.netty.buffer.ByteBufAllocator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public final class IggyFrameEncoder { +final class IggyFrameEncoder { private static final Logger log = LoggerFactory.getLogger(IggyFrameEncoder.class); private IggyFrameEncoder() {} - public static ByteBuf encode(ByteBufAllocator alloc, int commandCode, ByteBuf payload) { - - // Build the request frame exactly like the blocking client - // Frame format: [payload_size:4][command:4][payload:N] - // where payload_size = 4 (command size) + N (payload size) + /** + * Encodes a command into the Iggy TCP frame format: [payload_size:4][command:4][payload:N] + */ + static ByteBuf encode(ByteBufAllocator alloc, int commandCode, ByteBuf payload) { int payloadSize = payload.readableBytes(); - int framePayloadSize = 4 + payloadSize; // command (4 bytes) + payload + int framePayloadSize = 4 + payloadSize; ByteBuf frame = alloc.buffer(4 + framePayloadSize); - frame.writeIntLE(framePayloadSize); // Length field (includes command) - frame.writeIntLE(commandCode); // Command - frame.writeBytes(payload, payload.readerIndex(), payloadSize); // Payload + frame.writeIntLE(framePayloadSize); + frame.writeIntLE(commandCode); + frame.writeBytes(payload); - // Debug: print frame bytes - byte[] frameBytes = new byte[Math.min(frame.readableBytes(), 30)]; if (log.isTraceEnabled()) { + byte[] frameBytes = new byte[Math.min(frame.readableBytes(), 30)]; frame.getBytes(0, frameBytes); StringBuilder hex = new StringBuilder(); for (byte b : frameBytes) { @@ -55,7 +53,7 @@ public final class IggyFrameEncoder { payloadSize, framePayloadSize, frame.readableBytes()); - log.trace("Frame bytes (hex): {}", hex.toString()); + log.trace("Frame bytes (hex): {}", hex); } return frame; diff --git a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncConnectionPoolAuthTest.java b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncConnectionPoolAuthTest.java new file mode 100644 index 000000000..ce7fd59f6 --- /dev/null +++ b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/async/AsyncConnectionPoolAuthTest.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iggy.client.async; + +import org.apache.iggy.client.BaseIntegrationTest; +import org.apache.iggy.client.async.tcp.AsyncIggyTcpClient; +import org.apache.iggy.exception.IggyNotConnectedException; +import org.apache.iggy.identifier.StreamId; +import org.apache.iggy.message.Message; +import org.apache.iggy.message.Partitioning; +import org.apache.iggy.topic.CompressionAlgorithm; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Integration tests for connection pool authentication lifecycle. + * Verifies that lazy per-channel authentication works correctly across + * login, logout, and re-login cycles with a pooled connection. + */ +@DisplayName("Connection Pool Authentication") +class AsyncConnectionPoolAuthTest extends BaseIntegrationTest { + private static final Logger log = LoggerFactory.getLogger(AsyncConnectionPoolAuthTest.class); + + private static final String USERNAME = "iggy"; + private static final String PASSWORD = "iggy"; + private static final int POOL_SIZE = 3; + + private AsyncIggyTcpClient client; + + @BeforeEach + void setUp() throws Exception { + client = AsyncIggyTcpClient.builder() + .host(serverHost()) + .port(serverTcpPort()) + .connectionPoolSize(POOL_SIZE) + .build(); + client.connect().get(5, TimeUnit.SECONDS); + } + + @AfterEach + void tearDown() throws Exception { + if (client != null) { + client.close().get(5, TimeUnit.SECONDS); + } + } + + @Test + @DisplayName("should reject commands before login") + void shouldRejectCommandsBeforeLogin() { + // when/then + assertThatThrownBy(() -> client.streams().getStreams().get(5, TimeUnit.SECONDS)) + .isInstanceOf(ExecutionException.class) + .hasCauseInstanceOf(IggyNotConnectedException.class); + } + + @Test + @DisplayName("should execute commands after login") + void shouldExecuteCommandsAfterLogin() throws Exception { + // given + client.users().login(USERNAME, PASSWORD).get(5, TimeUnit.SECONDS); + + // when + var streams = client.streams().getStreams().get(5, TimeUnit.SECONDS); + + // then + assertThat(streams).isNotNull(); + } + + @Test + @DisplayName("should login, logout, and re-login successfully") + void shouldLoginLogoutAndReLogin() throws Exception { + // given + String streamName = "auth-test-" + UUID.randomUUID(); + + // when - first login + client.users().login(USERNAME, PASSWORD).get(5, TimeUnit.SECONDS); + var streamsAfterLogin = client.streams().getStreams().get(5, TimeUnit.SECONDS); + + // then + assertThat(streamsAfterLogin).isNotNull(); + log.info("First login successful, got {} streams", streamsAfterLogin.size()); + + // when - create a stream to verify full functionality + var stream = client.streams().createStream(streamName).get(5, TimeUnit.SECONDS); + assertThat(stream).isNotNull(); + assertThat(stream.name()).isEqualTo(streamName); + + // when - logout + client.users().logout().get(5, TimeUnit.SECONDS); + log.info("Logout successful"); + + // then - commands should fail after logout + assertThatThrownBy(() -> client.streams().getStreams().get(5, TimeUnit.SECONDS)) + .isInstanceOf(ExecutionException.class) + .hasCauseInstanceOf(IggyNotConnectedException.class); + log.info("Commands correctly rejected after logout"); + + // when - re-login + client.users().login(USERNAME, PASSWORD).get(5, TimeUnit.SECONDS); + log.info("Re-login successful"); + + // then - commands should work again + var streamsAfterReLogin = client.streams().getStreams().get(5, TimeUnit.SECONDS); + assertThat(streamsAfterReLogin).isNotNull(); + + // cleanup + client.streams().deleteStream(StreamId.of(streamName)).get(5, TimeUnit.SECONDS); + } + + @Test + @DisplayName("should authenticate all pool channels lazily via concurrent requests") + void shouldAuthenticatePoolChannelsLazily() throws Exception { + // given + String streamName = "pool-auth-test-" + UUID.randomUUID(); + client.users().login(USERNAME, PASSWORD).get(5, TimeUnit.SECONDS); + + client.streams().createStream(streamName).get(5, TimeUnit.SECONDS); + client.topics() + .createTopic( + StreamId.of(streamName), + 1L, + CompressionAlgorithm.None, + BigInteger.ZERO, + BigInteger.ZERO, + Optional.empty(), + "test-topic") + .get(5, TimeUnit.SECONDS); + + // when - fire more concurrent requests than the pool size to force + // multiple channels to be created and lazily authenticated + int concurrentRequests = POOL_SIZE * 3; + List<CompletableFuture<Void>> futures = new ArrayList<>(); + for (int i = 0; i < concurrentRequests; i++) { + var future = client.messages() + .sendMessages( + StreamId.of(streamName), + org.apache.iggy.identifier.TopicId.of("test-topic"), + Partitioning.partitionId(0L), + List.of(Message.of("msg-" + i))); + futures.add(future); + } + + // then - all requests should complete successfully + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(15, TimeUnit.SECONDS); + log.info("All {} concurrent requests completed successfully", concurrentRequests); + + // cleanup + client.streams().deleteStream(StreamId.of(streamName)).get(5, TimeUnit.SECONDS); + } + + @Test + @DisplayName("should re-authenticate stale channels after logout and re-login") + void shouldReAuthenticateStaleChannelsAfterReLogin() throws Exception { + // given - login and warm up multiple pool channels + String streamName = "reauth-test-" + UUID.randomUUID(); + client.users().login(USERNAME, PASSWORD).get(5, TimeUnit.SECONDS); + + client.streams().createStream(streamName).get(5, TimeUnit.SECONDS); + client.topics() + .createTopic( + StreamId.of(streamName), + 1L, + CompressionAlgorithm.None, + BigInteger.ZERO, + BigInteger.ZERO, + Optional.empty(), + "test-topic") + .get(5, TimeUnit.SECONDS); + + List<CompletableFuture<Void>> warmupFutures = new ArrayList<>(); + for (int i = 0; i < POOL_SIZE * 2; i++) { + warmupFutures.add(client.messages() + .sendMessages( + StreamId.of(streamName), + org.apache.iggy.identifier.TopicId.of("test-topic"), + Partitioning.partitionId(0L), + List.of(Message.of("warmup-" + i)))); + } + CompletableFuture.allOf(warmupFutures.toArray(new CompletableFuture[0])).get(10, TimeUnit.SECONDS); + log.info("Pool warmed up with {} requests", warmupFutures.size()); + + // when - logout and re-login (invalidates all channel auth generations) + client.users().logout().get(5, TimeUnit.SECONDS); + client.users().login(USERNAME, PASSWORD).get(5, TimeUnit.SECONDS); + log.info("Logout + re-login complete"); + + // then - all channels should re-authenticate transparently + List<CompletableFuture<Void>> postReLoginFutures = new ArrayList<>(); + for (int i = 0; i < POOL_SIZE * 2; i++) { + postReLoginFutures.add(client.messages() + .sendMessages( + StreamId.of(streamName), + org.apache.iggy.identifier.TopicId.of("test-topic"), + Partitioning.partitionId(0L), + List.of(Message.of("after-relogin-" + i)))); + } + CompletableFuture.allOf(postReLoginFutures.toArray(new CompletableFuture[0])) + .get(15, TimeUnit.SECONDS); + log.info("All {} post-re-login requests succeeded", postReLoginFutures.size()); + + // cleanup + client.streams().deleteStream(StreamId.of(streamName)).get(5, TimeUnit.SECONDS); + } + + @Test + @DisplayName("should handle multiple sequential login-logout cycles") + void shouldHandleMultipleLoginLogoutCycles() throws Exception { + // given + int cycles = 3; + + for (int i = 0; i < cycles; i++) { + // when - login + client.users().login(USERNAME, PASSWORD).get(5, TimeUnit.SECONDS); + + // then - verify commands work + var streams = client.streams().getStreams().get(5, TimeUnit.SECONDS); + assertThat(streams).isNotNull(); + log.info("Cycle {}: login and command succeeded", i + 1); + + // when - logout + client.users().logout().get(5, TimeUnit.SECONDS); + + // then - verify commands are rejected + assertThatThrownBy(() -> client.streams().getStreams().get(5, TimeUnit.SECONDS)) + .isInstanceOf(ExecutionException.class) + .hasCauseInstanceOf(IggyNotConnectedException.class); + log.info("Cycle {}: logout and rejection verified", i + 1); + } + } +}
