anujmodi2021 commented on code in PR #7817: URL: https://github.com/apache/hadoop/pull/7817#discussion_r2284234190
########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java: ########## @@ -330,7 +330,8 @@ public void close() throws IOException { } catch (ExecutionException e) { LOG.error("Error freeing leases", e); } finally { - IOUtils.cleanupWithLogger(LOG, getClient()); + IOUtils.cleanupWithLogger(LOG, getClientHandler().getDfsClient(), Review Comment: Can we simply have a close of clientHandler called here and inside client handler we call close for both the clients? ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java: ########## @@ -139,6 +139,10 @@ public AbfsAHCHttpOperation(final URL url, throw new PathIOException(getUrl().toString(), "Unsupported HTTP method: " + getMethod()); } + + for (AbfsHttpHeader header : requestHeaders) { Review Comment: Why this change? ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java: ########## @@ -213,13 +213,19 @@ public final class FileSystemConfigurations { public static final long THOUSAND = 1000L; public static final HttpOperationType DEFAULT_NETWORKING_LIBRARY - = HttpOperationType.JDK_HTTP_URL_CONNECTION; + = HttpOperationType.APACHE_HTTP_CLIENT; Review Comment: Let's revert this. ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java: ########## @@ -196,7 +199,6 @@ public void processResponse(final byte[] buffer, final int length) throws IOException { try { if (!isPayloadRequest) { - prepareRequest(); Review Comment: Why removed? ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java: ########## @@ -280,17 +282,18 @@ HttpResponse executeRequest() throws IOException { /**{@inheritDoc}*/ @Override public void setRequestProperty(final String key, final String value) { - List<AbfsHttpHeader> headers = getRequestHeaders(); - if (headers != null) { - headers.add(new AbfsHttpHeader(key, value)); + if (httpRequestBase instanceof HttpEntityEnclosingRequestBase Review Comment: +1 Add javadoc for whythese 2 conditions cannot be together. ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java: ########## @@ -65,12 +69,40 @@ class AbfsConnectionManager implements HttpClientConnectionManager { */ private final HttpClientConnectionOperator connectionOperator; + /** + * Number of connections to be created during cache refresh. + */ + private final int cacheRefreshConnections; Review Comment: variable name should say its a number or count. ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java: ########## @@ -115,6 +180,26 @@ public HttpClientConnection get(final long timeout, public boolean cancel() { return false; } + + /** + * Trigger a background warm-up of the connection cache if needed. + * This method checks if the cache size is small and if caching is not already in progress. + * If so, it starts a new thread to cache extra connections. + */ + private void triggerConnectionWarmupIfNeeded() { + if (kac.size() <= 2 && !isCaching.get()) { + // Use a single-threaded executor or thread pool instead of raw thread Review Comment: Also please give a relevant name to thread. This helps in identifying threads when debugging any issue. ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java: ########## @@ -65,12 +69,40 @@ class AbfsConnectionManager implements HttpClientConnectionManager { */ private final HttpClientConnectionOperator connectionOperator; + /** + * Number of connections to be created during cache refresh. + */ + private final int cacheRefreshConnections; + + /** + * Connection timeout for establishing a new connection. + */ + private final int connectionTimeout; + + private final AtomicBoolean isCaching = new AtomicBoolean(false); + + private final Object connectionLock = new Object(); + + private HttpHost baseHost; + AbfsConnectionManager(Registry<ConnectionSocketFactory> socketFactoryRegistry, - AbfsHttpClientConnectionFactory connectionFactory, KeepAliveCache kac) { + AbfsHttpClientConnectionFactory connectionFactory, KeepAliveCache kac, + final AbfsConfiguration abfsConfiguration, final URL baseUrl) { this.httpConnectionFactory = connectionFactory; + this.connectionTimeout = abfsConfiguration.getHttpConnectionTimeout(); this.kac = kac; this.connectionOperator = new DefaultHttpClientConnectionOperator( socketFactoryRegistry, null, null); + if (abfsConfiguration.getCacheWarmupConnections() > 0) { Review Comment: Same for this and all the variables. If they indicate a number better to name then as `numOfCacheWarmupConnections` or `cacheWarmupConnectionsCount` Please check this throughout the patch in all the classes for all the variables. ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java: ########## @@ -398,24 +400,10 @@ public void sendPayload(final byte[] buffer, } } - /** - * Sets the header on the request. - */ - private void prepareRequest() { - final boolean isEntityBasedRequest - = httpRequestBase instanceof HttpEntityEnclosingRequestBase; - for (AbfsHttpHeader header : getRequestHeaders()) { - if (CONTENT_LENGTH.equals(header.getName()) && isEntityBasedRequest) { - continue; - } - httpRequestBase.setHeader(header.getName(), header.getValue()); - } - } - /**{@inheritDoc}*/ @Override public String getRequestProperty(String name) { - for (AbfsHttpHeader header : getRequestHeaders()) { + for (Header header : httpRequestBase.getAllHeaders()) { Review Comment: Why needed? ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java: ########## @@ -280,17 +282,18 @@ HttpResponse executeRequest() throws IOException { /**{@inheritDoc}*/ @Override public void setRequestProperty(final String key, final String value) { - List<AbfsHttpHeader> headers = getRequestHeaders(); - if (headers != null) { - headers.add(new AbfsHttpHeader(key, value)); + if (httpRequestBase instanceof HttpEntityEnclosingRequestBase + && CONTENT_LENGTH.equals(key)) { + return; } + httpRequestBase.setHeader(key, value); } /**{@inheritDoc}*/ @Override Map<String, List<String>> getRequestProperties() { Map<String, List<String>> map = new HashMap<>(); - for (AbfsHttpHeader header : getRequestHeaders()) { + for (Header header : httpRequestBase.getAllHeaders()) { Review Comment: Why this change? Why are we changing more specific implementation to a parent class? ABFSHttpHeaders is supposed to be more specific and sufficient for all ABFS needs. ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java: ########## @@ -115,6 +180,26 @@ public HttpClientConnection get(final long timeout, public boolean cancel() { return false; } + + /** + * Trigger a background warm-up of the connection cache if needed. + * This method checks if the cache size is small and if caching is not already in progress. + * If so, it starts a new thread to cache extra connections. + */ + private void triggerConnectionWarmupIfNeeded() { + if (kac.size() <= 2 && !isCaching.get()) { Review Comment: +1 ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java: ########## @@ -89,23 +121,56 @@ public ConnectionRequest requestConnection(final HttpRoute route, */ @Override public HttpClientConnection get(final long timeout, - final TimeUnit timeUnit) - throws InterruptedException, ExecutionException, - ConnectionPoolTimeoutException { + final TimeUnit timeUnit) throws ExecutionException { String requestId = UUID.randomUUID().toString(); logDebug("Connection requested for request {}", requestId); + if (!route.getTargetHost().equals(baseHost)) { + // If the route target host does not match the base host, create a new connection + logDebug("Route target host {} does not match base host {}, creating new connection", + route.getTargetHost(), baseHost); + return createNewConnection(); + } try { - HttpClientConnection clientConn = kac.get(); - if (clientConn != null) { - logDebug("Connection retrieved from KAC: {} for requestId: {}", - clientConn, requestId); - return clientConn; + HttpClientConnection conn = kac.get(); + + // If a valid connection is available, return it and trigger background warm-up if needed + if (conn != null) { + triggerConnectionWarmupIfNeeded(); Review Comment: Why this is not needed in synchronized block? ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java: ########## @@ -65,12 +69,40 @@ class AbfsConnectionManager implements HttpClientConnectionManager { */ private final HttpClientConnectionOperator connectionOperator; + /** + * Number of connections to be created during cache refresh. + */ + private final int cacheRefreshConnections; + + /** + * Connection timeout for establishing a new connection. + */ + private final int connectionTimeout; + + private final AtomicBoolean isCaching = new AtomicBoolean(false); + + private final Object connectionLock = new Object(); + + private HttpHost baseHost; + AbfsConnectionManager(Registry<ConnectionSocketFactory> socketFactoryRegistry, - AbfsHttpClientConnectionFactory connectionFactory, KeepAliveCache kac) { + AbfsHttpClientConnectionFactory connectionFactory, KeepAliveCache kac, + final AbfsConfiguration abfsConfiguration, final URL baseUrl) { this.httpConnectionFactory = connectionFactory; + this.connectionTimeout = abfsConfiguration.getHttpConnectionTimeout(); this.kac = kac; this.connectionOperator = new DefaultHttpClientConnectionOperator( socketFactoryRegistry, null, null); + if (abfsConfiguration.getCacheWarmupConnections() > 0) { Review Comment: Should we discuss that? Should not the Cach be shared among both the clients. We expect ony one client at a time to be working. So they should share the cache IMO -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org