[ 
https://issues.apache.org/jira/browse/HADOOP-19120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854392#comment-17854392
 ] 

ASF GitHub Bot commented on HADOOP-19120:
-----------------------------------------

saxenapranav commented on code in PR #6633:
URL: https://github.com/apache/hadoop/pull/6633#discussion_r1636235723


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeepAliveCache.java:
##########
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.http.HttpClientConnection;
+import org.apache.http.conn.routing.HttpRoute;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DEFAULT_MAX_CONN_SYS_PROP;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_MAX_CONN_SYS_PROP;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.KAC_DEFAULT_CONN_TTL;
+
+/**
+ * Connection-pooling heuristics adapted from JDK's connection pooling 
`KeepAliveCache`
+ * <p>
+ * Why this implementation is required in comparison to {@link 
org.apache.http.impl.conn.PoolingHttpClientConnectionManager}
+ * connection-pooling:
+ * <ol>
+ * <li>PoolingHttpClientConnectionManager heuristic caches all the reusable 
connections it has created.
+ * JDK's implementation only caches limited number of connections. The limit 
is given by JVM system
+ * property "http.maxConnections". If there is no system-property, it defaults 
to 5.</li>
+ * <li>In PoolingHttpClientConnectionManager, it expects the application to 
provide `setMaxPerRoute` and `setMaxTotal`,
+ * which the implementation uses as the total number of connections it can 
create. For application using ABFS, it is not
+ * feasible to provide a value in the initialisation of the connectionManager. 
JDK's implementation has no cap on the
+ * number of connections it can create.</li>
+ * </ol>
+ */
+public final class KeepAliveCache
+    extends HashMap<KeepAliveCache.KeepAliveKey, KeepAliveCache.ClientVector>
+    implements Runnable {
+
+  private int maxConn;
+
+  private long connectionIdleTTL = KAC_DEFAULT_CONN_TTL;
+
+  private Thread keepAliveTimer = null;
+
+  private boolean isPaused = false;
+
+  private KeepAliveCache() {
+    setMaxConn();
+  }
+
+  synchronized void pauseThread() {
+    isPaused = true;
+  }
+
+  synchronized void resumeThread() {
+    isPaused = false;
+    notify();
+  }
+
+  private void setMaxConn() {
+    String sysPropMaxConn = System.getProperty(HTTP_MAX_CONN_SYS_PROP);
+    if (sysPropMaxConn == null) {
+      maxConn = DEFAULT_MAX_CONN_SYS_PROP;
+    } else {
+      maxConn = Integer.parseInt(sysPropMaxConn);
+    }
+  }
+
+  public void setAbfsConfig(AbfsConfiguration abfsConfiguration) {
+    this.maxConn = abfsConfiguration.getMaxApacheHttpClientCacheConnections();
+    this.connectionIdleTTL = 
abfsConfiguration.getMaxApacheHttpClientConnectionIdleTime();
+  }
+
+  public long getConnectionIdleTTL() {
+    return connectionIdleTTL;
+  }
+
+  private static final KeepAliveCache INSTANCE = new KeepAliveCache();
+
+  public static KeepAliveCache getInstance() {
+    return INSTANCE;
+  }
+
+  @VisibleForTesting
+  void clearThread() {
+    clear();
+    setMaxConn();
+  }
+
+  private int getKacSize() {
+    return INSTANCE.maxConn;
+  }
+
+  @Override
+  public void run() {
+    do {
+      synchronized (this) {
+        while (isPaused) {
+          try {
+            wait();
+          } catch (InterruptedException ignored) {
+          }
+        }
+      }
+      kacCleanup();
+    } while (size() > 0);
+  }
+
+  private void kacCleanup() {
+    try {
+      Thread.sleep(connectionIdleTTL);
+    } catch (InterruptedException ex) {
+      return;
+    }
+    synchronized (this) {
+      long currentTime = System.currentTimeMillis();
+
+      ArrayList<KeepAliveKey> keysToRemove
+          = new ArrayList<KeepAliveKey>();
+
+      for (Map.Entry<KeepAliveKey, ClientVector> entry : entrySet()) {
+        KeepAliveKey key = entry.getKey();
+        ClientVector v = entry.getValue();
+        synchronized (v) {
+          int i;
+
+          for (i = 0; i < v.size(); i++) {
+            KeepAliveEntry e = v.elementAt(i);
+            if ((currentTime - e.idleStartTime) > v.nap
+                || e.httpClientConnection.isStale()) {
+              HttpClientConnection hc = e.httpClientConnection;
+              closeHtpClientConnection(hc);
+            } else {
+              break;
+            }
+          }
+          v.subList(0, i).clear();
+
+          if (v.size() == 0) {
+            keysToRemove.add(key);
+          }
+        }
+      }
+
+      for (KeepAliveKey key : keysToRemove) {
+        removeVector(key);
+      }
+    }
+  }
+
+  synchronized void removeVector(KeepAliveKey k) {
+    super.remove(k);
+  }
+
+  public synchronized void put(final HttpRoute httpRoute,
+      final HttpClientConnection httpClientConnection) {
+    boolean startThread = (keepAliveTimer == null);
+    if (!startThread) {
+      if (!keepAliveTimer.isAlive()) {
+        startThread = true;
+      }
+    }
+    if (startThread) {
+      clear();
+      final KeepAliveCache cache = this;
+      ThreadGroup grp = Thread.currentThread().getThreadGroup();
+      ThreadGroup parent = null;
+      while ((parent = grp.getParent()) != null) {
+        grp = parent;
+      }
+
+      keepAliveTimer = new Thread(grp, cache, "Keep-Alive-Timer");

Review Comment:
   Have now differentiated from jdk's implementation.





> [ABFS]: ApacheHttpClient adaptation as network library
> ------------------------------------------------------
>
>                 Key: HADOOP-19120
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19120
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/azure
>    Affects Versions: 3.5.0
>            Reporter: Pranav Saxena
>            Assignee: Pranav Saxena
>            Priority: Major
>              Labels: pull-request-available
>
> Apache HttpClient is more feature-rich and flexible and gives application 
> more granular control over networking parameter.
> ABFS currently relies on the JDK-net library. This library is managed by 
> OpenJDK and has no performance problem. However, it limits the application's 
> control over networking, and there are very few APIs and hooks exposed that 
> the application can use to get metrics, choose which and when a connection 
> should be reused. ApacheHttpClient will give important hooks to fetch 
> important metrics and control networking parameters.
> A custom implementation of connection-pool is used. The implementation is 
> adapted from the JDK8 connection pooling. Reasons for doing it:
> 1. PoolingHttpClientConnectionManager heuristic caches all the reusable 
> connections it has created. JDK's implementation only caches limited number 
> of connections. The limit is given by JVM system property 
> "http.maxConnections". If there is no system-property, it defaults to 5. 
> Connection-establishment latency increased with all the connections were 
> cached. Hence, adapting the pooling heuristic of JDK netlib,
> 2. In PoolingHttpClientConnectionManager, it expects the application to 
> provide `setMaxPerRoute` and `setMaxTotal`, which the implementation uses as 
> the total number of connections it can create. For application using ABFS, it 
> is not feasible to provide a value in the initialisation of the 
> connectionManager. JDK's implementation has no cap on the number of 
> connections it can have opened on a moment. Hence, adapting the pooling 
> heuristic of JDK netlib,



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to