[ https://issues.apache.org/jira/browse/HADOOP-19120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17832124#comment-17832124 ]
ASF GitHub Bot commented on HADOOP-19120: ----------------------------------------- saxenapranav commented on code in PR #6633: URL: https://github.com/apache/hadoop/pull/6633#discussion_r1544300906 ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java: ########## @@ -0,0 +1,422 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; +import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; +import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsApacheHttpExpect100Exception; +import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory; +import org.apache.http.Header; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpDelete; +import org.apache.http.client.methods.HttpEntityEnclosingRequestBase; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpHead; +import org.apache.http.client.methods.HttpPatch; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.client.methods.HttpRequestBase; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.util.EntityUtils; + +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APACHE_IMPL; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_POST; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID; +import static org.apache.http.entity.ContentType.TEXT_PLAIN; + +/** + * Implementation of {@link HttpOperation} for orchestrating server calls using + * Apache Http Client. + */ +public class AbfsAHCHttpOperation extends HttpOperation { + + private static final Logger LOG = LoggerFactory.getLogger( + AbfsAHCHttpOperation.class); + + /** + * Map to store the AbfsApacheHttpClient. Each instance of AbfsClient to have + * a unique AbfsApacheHttpClient instance. The key of the map is the UUID of the client. + */ + private static final Map<String, AbfsApacheHttpClient> + ABFS_APACHE_HTTP_CLIENT_MAP = new HashMap<>(); + + private AbfsApacheHttpClient abfsApacheHttpClient; + + private HttpRequestBase httpRequestBase; + + private HttpResponse httpResponse; + + private AbfsManagedHttpContext abfsHttpClientContext; + + private final AbfsRestOperationType abfsRestOperationType; + + private boolean connectionDisconnectedOnError = false; + + private AbfsApacheHttpExpect100Exception abfsApacheHttpExpect100Exception; + + private final boolean isPayloadRequest; + + private List<AbfsHttpHeader> requestHeaders; + + private AbfsAHCHttpOperation(final URL url, + final String method, + final List<AbfsHttpHeader> requestHeaders, + final AbfsRestOperationType abfsRestOperationType) { + super(LOG, url, method); + this.abfsRestOperationType = abfsRestOperationType; + this.requestHeaders = requestHeaders; + this.isPayloadRequest = isPayloadRequest(method); + } + + public AbfsAHCHttpOperation(final URL url, + final String method, + final List<AbfsHttpHeader> requestHeaders, + final AbfsConfiguration abfsConfiguration, + final String clientId, + final AbfsRestOperationType abfsRestOperationType) { + super(LOG, url, method); + this.abfsRestOperationType = abfsRestOperationType; + this.requestHeaders = requestHeaders; + setAbfsApacheHttpClient(abfsConfiguration, clientId); + this.isPayloadRequest = isPayloadRequest(method); + } + + public AbfsAHCHttpOperation(final URL url, + final String method, + final ArrayList<AbfsHttpHeader> requestHeaders, + final int httpStatus) { + this(url, method, requestHeaders, null); + setStatusCode(httpStatus); + } + + private void setAbfsApacheHttpClient(final AbfsConfiguration abfsConfiguration, + final String clientId) { + AbfsApacheHttpClient client = ABFS_APACHE_HTTP_CLIENT_MAP.get(clientId); + if (client == null) { + synchronized (ABFS_APACHE_HTTP_CLIENT_MAP) { + client = ABFS_APACHE_HTTP_CLIENT_MAP.get(clientId); + if (client == null) { + client = new AbfsApacheHttpClient( + DelegatingSSLSocketFactory.getDefaultFactory(), + abfsConfiguration); + ABFS_APACHE_HTTP_CLIENT_MAP.put(clientId, client); + } + } + } + abfsApacheHttpClient = client; + } + + static void removeClient(final String clientId) throws IOException { + AbfsApacheHttpClient client = ABFS_APACHE_HTTP_CLIENT_MAP.remove(clientId); + if (client != null) { + client.close(); + } + } + + @VisibleForTesting + AbfsManagedHttpContext setFinalAbfsClientContext() { + return new AbfsManagedHttpContext(); + } + + private boolean isPayloadRequest(final String method) { + return HTTP_METHOD_PUT.equals(method) || HTTP_METHOD_PATCH.equals(method) + || HTTP_METHOD_POST.equals(method); + } + + + public static AbfsAHCHttpOperation getAbfsApacheHttpClientHttpOperationWithFixedResult( + final URL url, + final String method, + final int httpStatus) { + return new AbfsAHCHttpOperation(url, method, new ArrayList<>(), httpStatus); + } + + @Override + protected InputStream getErrorStream() throws IOException { + HttpEntity entity = httpResponse.getEntity(); + if (entity == null) { + return null; + } + return entity.getContent(); + } + + @Override + String getConnProperty(final String key) { + for (AbfsHttpHeader header : requestHeaders) { + if (header.getName().equals(key)) { + return header.getValue(); + } + } + return null; + } + + @Override + URL getConnUrl() { + return getUrl(); + } + + @Override + String getConnRequestMethod() { + return getMethod(); + } + + @Override + Integer getConnResponseCode() throws IOException { + return getStatusCode(); + } + + @Override + String getConnResponseMessage() throws IOException { + return getStatusDescription(); + } + + public void processResponse(final byte[] buffer, + final int offset, + final int length) throws IOException { + try { + if (!isPayloadRequest) { + prepareRequest(); + httpResponse = executeRequest(); + } + parseResponseHeaderAndBody(buffer, offset, length); + } finally { + if (httpResponse != null) { + EntityUtils.consume(httpResponse.getEntity()); + } + if (httpResponse != null + && httpResponse instanceof CloseableHttpResponse) { + ((CloseableHttpResponse) httpResponse).close(); + } + } + } + + @VisibleForTesting + void parseResponseHeaderAndBody(final byte[] buffer, + final int offset, + final int length) throws IOException { + setStatusCode(httpResponse.getStatusLine().getStatusCode()); + + setStatusDescription(httpResponse.getStatusLine().getReasonPhrase()); + + String requestId = getResponseHeader( + HttpHeaderConfigurations.X_MS_REQUEST_ID); + if (requestId == null) { + requestId = AbfsHttpConstants.EMPTY_STRING; + } + setRequestId(requestId); + + // dump the headers + AbfsIoUtils.dumpHeadersToDebugLog("Response Headers", + getResponseHeaders(httpResponse)); + parseResponse(buffer, offset, length); + } + + @VisibleForTesting + HttpResponse executeRequest() throws IOException { + abfsHttpClientContext = setFinalAbfsClientContext(); + HttpResponse response = abfsApacheHttpClient.execute(httpRequestBase, + abfsHttpClientContext); + setConnectionTimeMs(abfsHttpClientContext.getConnectTime()); + setSendRequestTimeMs(abfsHttpClientContext.getSendTime()); + setRecvResponseTimeMs(abfsHttpClientContext.getReadTime()); + return response; + } + + private Map<String, List<String>> getResponseHeaders(final HttpResponse httpResponse) { + if (httpResponse == null || httpResponse.getAllHeaders() == null) { + return new HashMap<>(); + } + Map<String, List<String>> map = new HashMap<>(); + for (Header header : httpResponse.getAllHeaders()) { + map.put(header.getName(), new ArrayList<String>( + Collections.singleton(header.getValue()))); + } + return map; + } + + @Override + public void setRequestProperty(final String key, final String value) { + setHeader(key, value); + } + + @Override + Map<String, List<String>> getRequestProperties() { + Map<String, List<String>> map = new HashMap<>(); + for (AbfsHttpHeader header : requestHeaders) { + map.put(header.getName(), + new ArrayList<String>() {{ + add(header.getValue()); + }}); + } + return map; + } + + @Override + public String getResponseHeader(final String headerName) { + if (httpResponse == null) { + return null; + } + Header header = httpResponse.getFirstHeader(headerName); + if (header != null) { + return header.getValue(); + } + return null; + } + + @Override + InputStream getContentInputStream() + throws IOException { + if (httpResponse == null) { + return null; + } + HttpEntity entity = httpResponse.getEntity(); + if (entity != null) { + return httpResponse.getEntity().getContent(); + } + return null; + } + + public void sendPayload(final byte[] buffer, + final int offset, + final int length) + throws IOException { + if (!isPayloadRequest) { + return; + } + + if (HTTP_METHOD_PUT.equals(getMethod())) { + httpRequestBase = new HttpPut(getUri()); Review Comment: taken. ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java: ########## @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.azurebfs.services.kac.KeepAliveCache; +import org.apache.http.HttpClientConnection; +import org.apache.http.config.Registry; +import org.apache.http.config.SocketConfig; +import org.apache.http.conn.ConnectionPoolTimeoutException; +import org.apache.http.conn.ConnectionRequest; +import org.apache.http.conn.HttpClientConnectionManager; +import org.apache.http.conn.HttpClientConnectionOperator; +import org.apache.http.conn.routing.HttpRoute; +import org.apache.http.conn.socket.ConnectionSocketFactory; +import org.apache.http.impl.conn.DefaultHttpClientConnectionOperator; +import org.apache.http.impl.conn.ManagedHttpClientConnectionFactory; +import org.apache.http.protocol.HttpContext; +import org.apache.http.util.Asserts; + +/** + * AbfsConnectionManager is a custom implementation of {@link HttpClientConnectionManager}. + * This implementation manages connection-pooling heuristics and custom implementation + * of {@link ManagedHttpClientConnectionFactory}. + */ +public class AbfsConnectionManager implements HttpClientConnectionManager { + + private final KeepAliveCache kac = KeepAliveCache.getInstance(); + + private final AbfsConnFactory httpConnectionFactory; + + private final HttpClientConnectionOperator connectionOperator; + + public AbfsConnectionManager(Registry<ConnectionSocketFactory> socketFactoryRegistry, + AbfsConnFactory connectionFactory) { + this.httpConnectionFactory = connectionFactory; + connectionOperator = new DefaultHttpClientConnectionOperator( + socketFactoryRegistry, null, null); + } + + @Override + public ConnectionRequest requestConnection(final HttpRoute route, + final Object state) { + return new ConnectionRequest() { + @Override + public HttpClientConnection get(final long timeout, + final TimeUnit timeUnit) + throws InterruptedException, ExecutionException, + ConnectionPoolTimeoutException { + try { + HttpClientConnection client = kac.get(route); + if (client != null && client.isOpen()) { Review Comment: taken. > [ABFS]: ApacheHttpClient adaptation as network library > ------------------------------------------------------ > > Key: HADOOP-19120 > URL: https://issues.apache.org/jira/browse/HADOOP-19120 > Project: Hadoop Common > Issue Type: Sub-task > Components: fs/azure > Affects Versions: 3.5.0 > Reporter: Pranav Saxena > Assignee: Pranav Saxena > Priority: Major > Labels: pull-request-available > > Apache HttpClient is more feature-rich and flexible and gives application > more granular control over networking parameter. > ABFS currently relies on the JDK-net library. This library is managed by > OpenJDK and has no performance problem. However, it limits the application's > control over networking, and there are very few APIs and hooks exposed that > the application can use to get metrics, choose which and when a connection > should be reused. ApacheHttpClient will give important hooks to fetch > important metrics and control networking parameters. > A custom implementation of connection-pool is used. The implementation is > adapted from the JDK8 connection pooling. Reasons for doing it: > 1. PoolingHttpClientConnectionManager heuristic caches all the reusable > connections it has created. JDK's implementation only caches limited number > of connections. The limit is given by JVM system property > "http.maxConnections". If there is no system-property, it defaults to 5. > Connection-establishment latency increased with all the connections were > cached. Hence, adapting the pooling heuristic of JDK netlib, > 2. In PoolingHttpClientConnectionManager, it expects the application to > provide `setMaxPerRoute` and `setMaxTotal`, which the implementation uses as > the total number of connections it can create. For application using ABFS, it > is not feasible to provide a value in the initialisation of the > connectionManager. JDK's implementation has no cap on the number of > connections it can have opened on a moment. Hence, adapting the pooling > heuristic of JDK netlib, -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org