ex172000 commented on code in PR #2606:
URL: https://github.com/apache/iggy/pull/2606#discussion_r2730265644


##########
foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java:
##########
@@ -92,108 +114,177 @@ private void configureBootstrap() {
                 .group(eventLoopGroup)
                 .channel(NioSocketChannel.class)
                 .option(ChannelOption.TCP_NODELAY, true)
+                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                 .option(ChannelOption.SO_KEEPALIVE, true)
-                .handler(new ChannelInitializer<SocketChannel>() {
-                    @Override
-                    protected void initChannel(SocketChannel ch) {
-                        ChannelPipeline pipeline = ch.pipeline();
+                .remoteAddress(this.host, this.port);
+    }
 
-                        if (enableTls) {
-                            pipeline.addLast("ssl", 
sslContext.newHandler(ch.alloc(), host, port));
-                        }
+    /**
+     * Initialises Connection pool.
+     */
+    public CompletableFuture<Void> connect() {
+        if (isClosed.get()) {
+            return CompletableFuture.failedFuture(new 
IllegalStateException("Client is Closed"));
+        }
+        AbstractChannelPoolHandler poolHandler = new 
AbstractChannelPoolHandler() {
+            @Override
+            public void channelCreated(Channel ch) {
+                ChannelPipeline pipeline = ch.pipeline();
+                if (enableTls) {
+                    // adding ssl if ssl enabled
+                    pipeline.addLast("ssl", sslContext.newHandler(ch.alloc(), 
host, port));
+                }
+                // Adding the FrameDecoder to end of channel pipeline
+                pipeline.addLast("frameDecoder", new IggyFrameDecoder());
 
-                        // Custom frame decoder for Iggy protocol responses
-                        pipeline.addLast("frameDecoder", new 
IggyFrameDecoder());
+                // Adding Response Handler Now Statefull
+                pipeline.addLast("responseHandler", new IggyResponseHandler());
+            }
 
-                        // No encoder needed - we build complete frames 
following Iggy protocol
-                        // The protocol already includes the length field, so 
adding an encoder
-                        // would duplicate it. This matches the blocking 
client implementation.
+            @Override
+            public void channelAcquired(Channel ch) {
+                IggyResponseHandler handler = 
ch.pipeline().get(IggyResponseHandler.class);
+                handler.setPool(channelPool);
+            }
+        };
+
+        this.channelPool = new FixedChannelPool(
+                bootstrap,
+                poolHandler,
+                ChannelHealthChecker.ACTIVE, // Check If the connection is 
Active Before Lending
+                FixedChannelPool.AcquireTimeoutAction.FAIL, // Fail If we take 
too long
+                poolConfig.getAcquireTimeoutMillis(),
+                poolConfig.getMaxConnections(),
+                poolConfig.getMaxPendingAcquires());
+        log.info("Connection pool initialized with max connections: {}", 
poolConfig.getMaxConnections());
+        return CompletableFuture.completedFuture(null);
+    }
 
-                        // Response handler
-                        pipeline.addLast("responseHandler", new 
IggyResponseHandler(pendingRequests));
-                    }
-                });
+    /**
+     * Returns Pool metrics.
+     */
+    public PoolMetrics getMetrics() {
+        return this.poolMetrics;
     }
 
     /**
-     * Connects to the server asynchronously.
+     * BroadCast Command to each connection

Review Comment:
   very minor nit: `Broadcasts` to keep it consistent :)



##########
foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncIggyTcpClient.java:
##########
@@ -95,7 +95,13 @@ public static Builder builder() {
      * Connects to the Iggy server asynchronously.
      */
     public CompletableFuture<Void> connect() {
-        connection = new AsyncTcpConnection(host, port, enableTls, 
tlsCertificate);
+        // 1. Create the pool configuration from builder/client fields
+        AsyncTcpConnection.TCPConnectionPoolConfig poolConfig = new 
AsyncTcpConnection.TCPConnectionPoolConfig(
+                connectionPoolSize.orElse(5), // Default to 5 if not provided
+                1000, // maxPendingAcquires
+                connectionTimeout.map(Duration::toMillis).orElse(3000L) // map 
Duration to millis

Review Comment:
   Do we want to 1) make the params configurable? 2) or put them as static 
final constants?
   Also seems we have the default provided by the builder already?



##########
foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java:
##########
@@ -92,108 +114,177 @@ private void configureBootstrap() {
                 .group(eventLoopGroup)
                 .channel(NioSocketChannel.class)
                 .option(ChannelOption.TCP_NODELAY, true)
+                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                 .option(ChannelOption.SO_KEEPALIVE, true)
-                .handler(new ChannelInitializer<SocketChannel>() {
-                    @Override
-                    protected void initChannel(SocketChannel ch) {
-                        ChannelPipeline pipeline = ch.pipeline();
+                .remoteAddress(this.host, this.port);
+    }
 
-                        if (enableTls) {
-                            pipeline.addLast("ssl", 
sslContext.newHandler(ch.alloc(), host, port));
-                        }
+    /**
+     * Initialises Connection pool.
+     */
+    public CompletableFuture<Void> connect() {
+        if (isClosed.get()) {
+            return CompletableFuture.failedFuture(new 
IllegalStateException("Client is Closed"));
+        }
+        AbstractChannelPoolHandler poolHandler = new 
AbstractChannelPoolHandler() {
+            @Override
+            public void channelCreated(Channel ch) {
+                ChannelPipeline pipeline = ch.pipeline();
+                if (enableTls) {
+                    // adding ssl if ssl enabled
+                    pipeline.addLast("ssl", sslContext.newHandler(ch.alloc(), 
host, port));
+                }
+                // Adding the FrameDecoder to end of channel pipeline
+                pipeline.addLast("frameDecoder", new IggyFrameDecoder());
 
-                        // Custom frame decoder for Iggy protocol responses
-                        pipeline.addLast("frameDecoder", new 
IggyFrameDecoder());
+                // Adding Response Handler Now Statefull
+                pipeline.addLast("responseHandler", new IggyResponseHandler());
+            }
 
-                        // No encoder needed - we build complete frames 
following Iggy protocol
-                        // The protocol already includes the length field, so 
adding an encoder
-                        // would duplicate it. This matches the blocking 
client implementation.
+            @Override
+            public void channelAcquired(Channel ch) {
+                IggyResponseHandler handler = 
ch.pipeline().get(IggyResponseHandler.class);
+                handler.setPool(channelPool);
+            }
+        };
+
+        this.channelPool = new FixedChannelPool(
+                bootstrap,
+                poolHandler,
+                ChannelHealthChecker.ACTIVE, // Check If the connection is 
Active Before Lending
+                FixedChannelPool.AcquireTimeoutAction.FAIL, // Fail If we take 
too long
+                poolConfig.getAcquireTimeoutMillis(),
+                poolConfig.getMaxConnections(),
+                poolConfig.getMaxPendingAcquires());
+        log.info("Connection pool initialized with max connections: {}", 
poolConfig.getMaxConnections());
+        return CompletableFuture.completedFuture(null);
+    }
 
-                        // Response handler
-                        pipeline.addLast("responseHandler", new 
IggyResponseHandler(pendingRequests));
-                    }
-                });
+    /**
+     * Returns Pool metrics.
+     */
+    public PoolMetrics getMetrics() {
+        return this.poolMetrics;
     }
 
     /**
-     * Connects to the server asynchronously.
+     * BroadCast Command to each connection
+     * (Mainly for login so that each connection in the pool is Authenticated)
+     * Returns the result of the LAST connection's execution, allowing the 
caller
+     * to treat this like a single request.
      */
-    public CompletableFuture<Void> connect() {
-        CompletableFuture<Void> future = new CompletableFuture<>();
+    public CompletableFuture<ByteBuf> broadcastAsync(int commandCode, ByteBuf 
payload) {
+        if (isClosed.get()) {
+            return CompletableFuture.failedFuture(new 
IllegalStateException("Client is closed"));
+        }
+        if (channelPool == null) {
+            return CompletableFuture.failedFuture(new 
IllegalStateException("Client not connected call connect first"));
+        }
+        List<CompletableFuture<ByteBuf>> lastFuture = new ArrayList<>();
 
-        bootstrap.connect(host, port).addListener((ChannelFutureListener) 
channelFuture -> {
-            if (channelFuture.isSuccess()) {
-                channel = channelFuture.channel();
-                future.complete(null);
-            } else {
-                future.completeExceptionally(channelFuture.cause());
-            }
-        });
+        int poolSize = poolConfig.getMaxConnections();
 
-        return future;
+        // 1. Drain the Pool (Acquire all connections)
+        for (int i = 0; i < poolSize; i++) {
+            lastFuture.add(sendAsync(commandCode, 
payload.retainedDuplicate()));
+        }
+        // 2. Return the last Future (caller can treat this like a single 
request)
+        return lastFuture.get(lastFuture.size() - 1);

Review Comment:
   Not sure if I understand correctly, but how does the last future guarantee 
on the behavior of all futures?



##########
foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java:
##########
@@ -58,21 +64,37 @@ public class AsyncTcpConnection {
     private final SslContext sslContext;
     private final EventLoopGroup eventLoopGroup;
     private final Bootstrap bootstrap;
-    private Channel channel;
-    private final AtomicLong requestIdGenerator = new AtomicLong(0);
-    private final ConcurrentHashMap<Long, CompletableFuture<ByteBuf>> 
pendingRequests = new ConcurrentHashMap<>();
+    // private Channel channel;
+    // private ChannelHealthChecker channelHealthChecker;
+
+    // Pooling System;
+    private SimpleChannelPool channelPool;
+    private final TCPConnectionPoolConfig poolConfig;
+    private final PoolMetrics poolMetrics;
+
+    private final AtomicBoolean isClosed = new AtomicBoolean(false);
+    // private final AtomicLong requestIdGenerator = new AtomicLong(0);
+    // private final ConcurrentHashMap<Long, CompletableFuture<ByteBuf>> 
pendingRequests = new ConcurrentHashMap<>();

Review Comment:
   Do we want to remove them?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to