rythm-sachdeva commented on code in PR #2606:
URL: https://github.com/apache/iggy/pull/2606#discussion_r2732307293
##########
foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java:
##########
@@ -92,108 +114,177 @@ private void configureBootstrap() {
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.option(ChannelOption.SO_KEEPALIVE, true)
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) {
- ChannelPipeline pipeline = ch.pipeline();
+ .remoteAddress(this.host, this.port);
+ }
- if (enableTls) {
- pipeline.addLast("ssl",
sslContext.newHandler(ch.alloc(), host, port));
- }
+ /**
+ * Initialises Connection pool.
+ */
+ public CompletableFuture<Void> connect() {
+ if (isClosed.get()) {
+ return CompletableFuture.failedFuture(new
IllegalStateException("Client is Closed"));
+ }
+ AbstractChannelPoolHandler poolHandler = new
AbstractChannelPoolHandler() {
+ @Override
+ public void channelCreated(Channel ch) {
+ ChannelPipeline pipeline = ch.pipeline();
+ if (enableTls) {
+ // adding ssl if ssl enabled
+ pipeline.addLast("ssl", sslContext.newHandler(ch.alloc(),
host, port));
+ }
+ // Adding the FrameDecoder to end of channel pipeline
+ pipeline.addLast("frameDecoder", new IggyFrameDecoder());
- // Custom frame decoder for Iggy protocol responses
- pipeline.addLast("frameDecoder", new
IggyFrameDecoder());
+ // Adding Response Handler Now Statefull
+ pipeline.addLast("responseHandler", new IggyResponseHandler());
+ }
- // No encoder needed - we build complete frames
following Iggy protocol
- // The protocol already includes the length field, so
adding an encoder
- // would duplicate it. This matches the blocking
client implementation.
+ @Override
+ public void channelAcquired(Channel ch) {
+ IggyResponseHandler handler =
ch.pipeline().get(IggyResponseHandler.class);
+ handler.setPool(channelPool);
+ }
+ };
+
+ this.channelPool = new FixedChannelPool(
+ bootstrap,
+ poolHandler,
+ ChannelHealthChecker.ACTIVE, // Check If the connection is
Active Before Lending
+ FixedChannelPool.AcquireTimeoutAction.FAIL, // Fail If we take
too long
+ poolConfig.getAcquireTimeoutMillis(),
+ poolConfig.getMaxConnections(),
+ poolConfig.getMaxPendingAcquires());
+ log.info("Connection pool initialized with max connections: {}",
poolConfig.getMaxConnections());
+ return CompletableFuture.completedFuture(null);
+ }
- // Response handler
- pipeline.addLast("responseHandler", new
IggyResponseHandler(pendingRequests));
- }
- });
+ /**
+ * Returns Pool metrics.
+ */
+ public PoolMetrics getMetrics() {
+ return this.poolMetrics;
}
/**
- * Connects to the server asynchronously.
+ * BroadCast Command to each connection
+ * (Mainly for login so that each connection in the pool is Authenticated)
+ * Returns the result of the LAST connection's execution, allowing the
caller
+ * to treat this like a single request.
*/
- public CompletableFuture<Void> connect() {
- CompletableFuture<Void> future = new CompletableFuture<>();
+ public CompletableFuture<ByteBuf> broadcastAsync(int commandCode, ByteBuf
payload) {
+ if (isClosed.get()) {
+ return CompletableFuture.failedFuture(new
IllegalStateException("Client is closed"));
+ }
+ if (channelPool == null) {
+ return CompletableFuture.failedFuture(new
IllegalStateException("Client not connected call connect first"));
+ }
+ List<CompletableFuture<ByteBuf>> lastFuture = new ArrayList<>();
- bootstrap.connect(host, port).addListener((ChannelFutureListener)
channelFuture -> {
- if (channelFuture.isSuccess()) {
- channel = channelFuture.channel();
- future.complete(null);
- } else {
- future.completeExceptionally(channelFuture.cause());
- }
- });
+ int poolSize = poolConfig.getMaxConnections();
- return future;
+ // 1. Drain the Pool (Acquire all connections)
+ for (int i = 0; i < poolSize; i++) {
+ lastFuture.add(sendAsync(commandCode,
payload.retainedDuplicate()));
+ }
+ // 2. Return the last Future (caller can treat this like a single
request)
+ return lastFuture.get(lastFuture.size() - 1);
Review Comment:
Does not guarantee, I assumed that if last request is fulfilled the previous
requests have been successful. Though not a good approach. If there is any
better approach you suggest I am happy to take feedback and implement it in an
optimal way.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]