bhattmanish98 commented on code in PR #7817: URL: https://github.com/apache/hadoop/pull/7817#discussion_r2280766445
########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java: ########## @@ -201,7 +279,69 @@ public void shutdown() { private void logDebug(String message, Object... args) { if (LOG.isDebugEnabled()) { - LOG.debug(message, args); + logDebug(message, args); + } + } + + /** + * Caches extra connections in the {@link KeepAliveCache} to warm it up. + * This method is called during initialization and when the cache is empty. + * + * @param route the HTTP route for which connections are created + * @param numberOfConnections the number of connections to create + */ + private void cacheExtraConnection(final HttpRoute route, final int numberOfConnections) { + if (!isCaching.getAndSet(true)) { + long start = System.currentTimeMillis(); + // Only one thread allowed at a time + ExecutorService executorService = Executors.newFixedThreadPool(Math.min(numberOfConnections, 5)); + + for (int i = 0; i < numberOfConnections; i++) { + executorService.submit(() -> { + try { + HttpClientConnection conn = httpConnectionFactory.create(route, null); + connect(conn, route, connectionTimeout, new AbfsManagedHttpClientContext()); + addConnectionToCache(conn); + } catch (Exception e) { + logDebug("Error creating connection: {}", e.getMessage()); + } + }); + } + + executorService.shutdown(); + try { + if (!executorService.awaitTermination(1, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } finally { + isCaching.set(false); + logDebug("Connection warmup completed in {} ms", System.currentTimeMillis() - start); + } + } + } + + /** + * Adds a connection to the cache if it is open and not stale. + * If the connection is added to the cache, it notifies one waiting thread. + * + * @param conn the connection to add to the cache + */ + private void addConnectionToCache(HttpClientConnection conn) { + if (conn instanceof AbfsManagedApacheHttpConnection) { + if (((AbfsManagedApacheHttpConnection) conn).getTargetHost().equals(baseHost)) { + boolean connAddedInKac = kac.add(conn); + synchronized (connectionLock) { + connectionLock.notify(); // wake up one thread only + } + if (connAddedInKac) { + logDebug("Connection cached: {}", conn); + } else { + logDebug("Connection not cached, and is released: {}", conn); Review Comment: releaseConnection() method -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org