mmodzelewski commented on code in PR #2606:
URL: https://github.com/apache/iggy/pull/2606#discussion_r2851693075
##########
foreign/java/java-sdk/src/main/java/org/apache/iggy/client/async/tcp/AsyncTcpConnection.java:
##########
@@ -95,109 +111,186 @@ private void configureBootstrap() {
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.option(ChannelOption.SO_KEEPALIVE, true)
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) {
- ChannelPipeline pipeline = ch.pipeline();
-
- if (enableTls) {
- pipeline.addLast("ssl",
sslContext.newHandler(ch.alloc(), host, port));
- }
+ .remoteAddress(this.host, this.port);
+ }
- // Custom frame decoder for Iggy protocol responses
- pipeline.addLast("frameDecoder", new
IggyFrameDecoder());
+ /**
+ * Initialises Connection pool.
+ */
+ public CompletableFuture<Void> connect() {
+ if (isClosed.get()) {
+ return CompletableFuture.failedFuture(new
IllegalStateException("Client is Closed"));
+ }
+ AbstractChannelPoolHandler poolHandler = new
AbstractChannelPoolHandler() {
+ @Override
+ public void channelCreated(Channel ch) {
+ ChannelPipeline pipeline = ch.pipeline();
+ if (enableTls) {
+ // adding ssl if ssl enabled
+ pipeline.addLast("ssl", sslContext.newHandler(ch.alloc(),
host, port));
+ }
+ // Adding the FrameDecoder to end of channel pipeline
+ pipeline.addLast("frameDecoder", new IggyFrameDecoder());
- // No encoder needed - we build complete frames
following Iggy protocol
- // The protocol already includes the length field, so
adding an encoder
- // would duplicate it. This matches the blocking
client implementation.
+ // Adding Response Handler Now Statefull
+ pipeline.addLast("responseHandler", new IggyResponseHandler());
+ }
- // Response handler
- pipeline.addLast("responseHandler", new
IggyResponseHandler(pendingRequests));
- }
- });
+ @Override
+ public void channelAcquired(Channel ch) {
+ IggyResponseHandler handler =
ch.pipeline().get(IggyResponseHandler.class);
+ handler.setPool(channelPool);
+ }
+ };
+
+ this.channelPool = new FixedChannelPool(
+ bootstrap,
+ poolHandler,
+ ChannelHealthChecker.ACTIVE, // Check If the connection is
Active Before Lending
+ FixedChannelPool.AcquireTimeoutAction.FAIL, // Fail If we take
too long
+ poolConfig.getAcquireTimeoutMillis(),
+ poolConfig.getMaxConnections(),
+ poolConfig.getMaxPendingAcquires());
+ log.info("Connection pool initialized with max connections: {}",
poolConfig.getMaxConnections());
+ return CompletableFuture.completedFuture(null);
}
/**
- * Connects to the server asynchronously.
+ * Returns Pool metrics.
*/
- public CompletableFuture<Void> connect() {
- CompletableFuture<Void> future = new CompletableFuture<>();
+ public PoolMetrics getMetrics() {
+ return this.poolMetrics;
+ }
- bootstrap.connect(host, port).addListener((ChannelFutureListener)
channelFuture -> {
- if (channelFuture.isSuccess()) {
- channel = channelFuture.channel();
- future.complete(null);
- } else {
- future.completeExceptionally(channelFuture.cause());
+ /**
+ * BroadCasts Command to each connection
+ * (Mainly for login so that each connection in the pool is Authenticated)
+ * Returns the result of the LAST connection's execution, allowing the
caller
+ * to treat this like a single request.
+ */
+ public CompletableFuture<ByteBuf> broadcastAsync(int commandCode, ByteBuf
payload) {
Review Comment:
The user-facing API stays the same - `login()` is still called once.
Internally, `login()` would store the credentials and authenticate the one
channel it acquires. Then in `send()`, after acquiring a channel, you check a
channel attribute (e.g. `AUTH_KEY`). If the channel isn't authenticated yet,
send login with the stored credentials first, set the attribute, then send the
actual command. This way, every channel gets authenticated transparently on
first use, including channels created later by the pool.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]