[ 
https://issues.apache.org/jira/browse/HADOOP-19120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17832124#comment-17832124
 ] 

ASF GitHub Bot commented on HADOOP-19120:
-----------------------------------------

saxenapranav commented on code in PR #6633:
URL: https://github.com/apache/hadoop/pull/6633#discussion_r1544300906


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java:
##########
@@ -0,0 +1,422 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsApacheHttpExpect100Exception;
+import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpHead;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.util.EntityUtils;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APACHE_IMPL;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_POST;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID;
+import static org.apache.http.entity.ContentType.TEXT_PLAIN;
+
+/**
+ * Implementation of {@link HttpOperation} for orchestrating server calls using
+ * Apache Http Client.
+ */
+public class AbfsAHCHttpOperation extends HttpOperation {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbfsAHCHttpOperation.class);
+
+  /**
+   * Map to store the AbfsApacheHttpClient. Each instance of AbfsClient to have
+   * a unique AbfsApacheHttpClient instance. The key of the map is the UUID of 
the client.
+   */
+  private static final Map<String, AbfsApacheHttpClient>
+      ABFS_APACHE_HTTP_CLIENT_MAP = new HashMap<>();
+
+  private AbfsApacheHttpClient abfsApacheHttpClient;
+
+  private HttpRequestBase httpRequestBase;
+
+  private HttpResponse httpResponse;
+
+  private AbfsManagedHttpContext abfsHttpClientContext;
+
+  private final AbfsRestOperationType abfsRestOperationType;
+
+  private boolean connectionDisconnectedOnError = false;
+
+  private AbfsApacheHttpExpect100Exception abfsApacheHttpExpect100Exception;
+
+  private final boolean isPayloadRequest;
+
+  private List<AbfsHttpHeader> requestHeaders;
+
+  private AbfsAHCHttpOperation(final URL url,
+      final String method,
+      final List<AbfsHttpHeader> requestHeaders,
+      final AbfsRestOperationType abfsRestOperationType) {
+    super(LOG, url, method);
+    this.abfsRestOperationType = abfsRestOperationType;
+    this.requestHeaders = requestHeaders;
+    this.isPayloadRequest = isPayloadRequest(method);
+  }
+
+  public AbfsAHCHttpOperation(final URL url,
+      final String method,
+      final List<AbfsHttpHeader> requestHeaders,
+      final AbfsConfiguration abfsConfiguration,
+      final String clientId,
+      final AbfsRestOperationType abfsRestOperationType) {
+    super(LOG, url, method);
+    this.abfsRestOperationType = abfsRestOperationType;
+    this.requestHeaders = requestHeaders;
+    setAbfsApacheHttpClient(abfsConfiguration, clientId);
+    this.isPayloadRequest = isPayloadRequest(method);
+  }
+
+  public AbfsAHCHttpOperation(final URL url,
+      final String method,
+      final ArrayList<AbfsHttpHeader> requestHeaders,
+      final int httpStatus) {
+    this(url, method, requestHeaders, null);
+    setStatusCode(httpStatus);
+  }
+
+  private void setAbfsApacheHttpClient(final AbfsConfiguration 
abfsConfiguration,
+      final String clientId) {
+    AbfsApacheHttpClient client = ABFS_APACHE_HTTP_CLIENT_MAP.get(clientId);
+    if (client == null) {
+      synchronized (ABFS_APACHE_HTTP_CLIENT_MAP) {
+        client = ABFS_APACHE_HTTP_CLIENT_MAP.get(clientId);
+        if (client == null) {
+          client = new AbfsApacheHttpClient(
+              DelegatingSSLSocketFactory.getDefaultFactory(),
+              abfsConfiguration);
+          ABFS_APACHE_HTTP_CLIENT_MAP.put(clientId, client);
+        }
+      }
+    }
+    abfsApacheHttpClient = client;
+  }
+
+  static void removeClient(final String clientId) throws IOException {
+    AbfsApacheHttpClient client = ABFS_APACHE_HTTP_CLIENT_MAP.remove(clientId);
+    if (client != null) {
+      client.close();
+    }
+  }
+
+  @VisibleForTesting
+  AbfsManagedHttpContext setFinalAbfsClientContext() {
+    return new AbfsManagedHttpContext();
+  }
+
+  private boolean isPayloadRequest(final String method) {
+    return HTTP_METHOD_PUT.equals(method) || HTTP_METHOD_PATCH.equals(method)
+        || HTTP_METHOD_POST.equals(method);
+  }
+
+
+  public static AbfsAHCHttpOperation 
getAbfsApacheHttpClientHttpOperationWithFixedResult(
+      final URL url,
+      final String method,
+      final int httpStatus) {
+    return new AbfsAHCHttpOperation(url, method, new ArrayList<>(), 
httpStatus);
+  }
+
+  @Override
+  protected InputStream getErrorStream() throws IOException {
+    HttpEntity entity = httpResponse.getEntity();
+    if (entity == null) {
+      return null;
+    }
+    return entity.getContent();
+  }
+
+  @Override
+  String getConnProperty(final String key) {
+    for (AbfsHttpHeader header : requestHeaders) {
+      if (header.getName().equals(key)) {
+        return header.getValue();
+      }
+    }
+    return null;
+  }
+
+  @Override
+  URL getConnUrl() {
+    return getUrl();
+  }
+
+  @Override
+  String getConnRequestMethod() {
+    return getMethod();
+  }
+
+  @Override
+  Integer getConnResponseCode() throws IOException {
+    return getStatusCode();
+  }
+
+  @Override
+  String getConnResponseMessage() throws IOException {
+    return getStatusDescription();
+  }
+
+  public void processResponse(final byte[] buffer,
+      final int offset,
+      final int length) throws IOException {
+    try {
+      if (!isPayloadRequest) {
+        prepareRequest();
+        httpResponse = executeRequest();
+      }
+      parseResponseHeaderAndBody(buffer, offset, length);
+    } finally {
+      if (httpResponse != null) {
+        EntityUtils.consume(httpResponse.getEntity());
+      }
+      if (httpResponse != null
+          && httpResponse instanceof CloseableHttpResponse) {
+        ((CloseableHttpResponse) httpResponse).close();
+      }
+    }
+  }
+
+  @VisibleForTesting
+  void parseResponseHeaderAndBody(final byte[] buffer,
+      final int offset,
+      final int length) throws IOException {
+    setStatusCode(httpResponse.getStatusLine().getStatusCode());
+
+    setStatusDescription(httpResponse.getStatusLine().getReasonPhrase());
+
+    String requestId = getResponseHeader(
+        HttpHeaderConfigurations.X_MS_REQUEST_ID);
+    if (requestId == null) {
+      requestId = AbfsHttpConstants.EMPTY_STRING;
+    }
+    setRequestId(requestId);
+
+    // dump the headers
+    AbfsIoUtils.dumpHeadersToDebugLog("Response Headers",
+        getResponseHeaders(httpResponse));
+    parseResponse(buffer, offset, length);
+  }
+
+  @VisibleForTesting
+  HttpResponse executeRequest() throws IOException {
+    abfsHttpClientContext = setFinalAbfsClientContext();
+    HttpResponse response = abfsApacheHttpClient.execute(httpRequestBase,
+        abfsHttpClientContext);
+    setConnectionTimeMs(abfsHttpClientContext.getConnectTime());
+    setSendRequestTimeMs(abfsHttpClientContext.getSendTime());
+    setRecvResponseTimeMs(abfsHttpClientContext.getReadTime());
+    return response;
+  }
+
+  private Map<String, List<String>> getResponseHeaders(final HttpResponse 
httpResponse) {
+    if (httpResponse == null || httpResponse.getAllHeaders() == null) {
+      return new HashMap<>();
+    }
+    Map<String, List<String>> map = new HashMap<>();
+    for (Header header : httpResponse.getAllHeaders()) {
+      map.put(header.getName(), new ArrayList<String>(
+          Collections.singleton(header.getValue())));
+    }
+    return map;
+  }
+
+  @Override
+  public void setRequestProperty(final String key, final String value) {
+    setHeader(key, value);
+  }
+
+  @Override
+  Map<String, List<String>> getRequestProperties() {
+    Map<String, List<String>> map = new HashMap<>();
+    for (AbfsHttpHeader header : requestHeaders) {
+      map.put(header.getName(),
+          new ArrayList<String>() {{
+            add(header.getValue());
+          }});
+    }
+    return map;
+  }
+
+  @Override
+  public String getResponseHeader(final String headerName) {
+    if (httpResponse == null) {
+      return null;
+    }
+    Header header = httpResponse.getFirstHeader(headerName);
+    if (header != null) {
+      return header.getValue();
+    }
+    return null;
+  }
+
+  @Override
+  InputStream getContentInputStream()
+      throws IOException {
+    if (httpResponse == null) {
+      return null;
+    }
+    HttpEntity entity = httpResponse.getEntity();
+    if (entity != null) {
+      return httpResponse.getEntity().getContent();
+    }
+    return null;
+  }
+
+  public void sendPayload(final byte[] buffer,
+      final int offset,
+      final int length)
+      throws IOException {
+    if (!isPayloadRequest) {
+      return;
+    }
+
+    if (HTTP_METHOD_PUT.equals(getMethod())) {
+      httpRequestBase = new HttpPut(getUri());

Review Comment:
   taken.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java:
##########
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.azurebfs.services.kac.KeepAliveCache;
+import org.apache.http.HttpClientConnection;
+import org.apache.http.config.Registry;
+import org.apache.http.config.SocketConfig;
+import org.apache.http.conn.ConnectionPoolTimeoutException;
+import org.apache.http.conn.ConnectionRequest;
+import org.apache.http.conn.HttpClientConnectionManager;
+import org.apache.http.conn.HttpClientConnectionOperator;
+import org.apache.http.conn.routing.HttpRoute;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.impl.conn.DefaultHttpClientConnectionOperator;
+import org.apache.http.impl.conn.ManagedHttpClientConnectionFactory;
+import org.apache.http.protocol.HttpContext;
+import org.apache.http.util.Asserts;
+
+/**
+ * AbfsConnectionManager is a custom implementation of {@link 
HttpClientConnectionManager}.
+ * This implementation manages connection-pooling heuristics and custom 
implementation
+ * of {@link ManagedHttpClientConnectionFactory}.
+ */
+public class AbfsConnectionManager implements HttpClientConnectionManager {
+
+  private final KeepAliveCache kac = KeepAliveCache.getInstance();
+
+  private final AbfsConnFactory httpConnectionFactory;
+
+  private final HttpClientConnectionOperator connectionOperator;
+
+  public AbfsConnectionManager(Registry<ConnectionSocketFactory> 
socketFactoryRegistry,
+      AbfsConnFactory connectionFactory) {
+    this.httpConnectionFactory = connectionFactory;
+    connectionOperator = new DefaultHttpClientConnectionOperator(
+        socketFactoryRegistry, null, null);
+  }
+
+  @Override
+  public ConnectionRequest requestConnection(final HttpRoute route,
+      final Object state) {
+    return new ConnectionRequest() {
+      @Override
+      public HttpClientConnection get(final long timeout,
+          final TimeUnit timeUnit)
+          throws InterruptedException, ExecutionException,
+          ConnectionPoolTimeoutException {
+        try {
+          HttpClientConnection client = kac.get(route);
+          if (client != null && client.isOpen()) {

Review Comment:
   taken.





> [ABFS]: ApacheHttpClient adaptation as network library
> ------------------------------------------------------
>
>                 Key: HADOOP-19120
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19120
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/azure
>    Affects Versions: 3.5.0
>            Reporter: Pranav Saxena
>            Assignee: Pranav Saxena
>            Priority: Major
>              Labels: pull-request-available
>
> Apache HttpClient is more feature-rich and flexible and gives application 
> more granular control over networking parameter.
> ABFS currently relies on the JDK-net library. This library is managed by 
> OpenJDK and has no performance problem. However, it limits the application's 
> control over networking, and there are very few APIs and hooks exposed that 
> the application can use to get metrics, choose which and when a connection 
> should be reused. ApacheHttpClient will give important hooks to fetch 
> important metrics and control networking parameters.
> A custom implementation of connection-pool is used. The implementation is 
> adapted from the JDK8 connection pooling. Reasons for doing it:
> 1. PoolingHttpClientConnectionManager heuristic caches all the reusable 
> connections it has created. JDK's implementation only caches limited number 
> of connections. The limit is given by JVM system property 
> "http.maxConnections". If there is no system-property, it defaults to 5. 
> Connection-establishment latency increased with all the connections were 
> cached. Hence, adapting the pooling heuristic of JDK netlib,
> 2. In PoolingHttpClientConnectionManager, it expects the application to 
> provide `setMaxPerRoute` and `setMaxTotal`, which the implementation uses as 
> the total number of connections it can create. For application using ABFS, it 
> is not feasible to provide a value in the initialisation of the 
> connectionManager. JDK's implementation has no cap on the number of 
> connections it can have opened on a moment. Hence, adapting the pooling 
> heuristic of JDK netlib,



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to