[
https://issues.apache.org/jira/browse/HADOOP-19120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17855128#comment-17855128
]
ASF GitHub Bot commented on HADOOP-19120:
-----------------------------------------
steveloughran commented on code in PR #6633:
URL: https://github.com/apache/hadoop/pull/6633#discussion_r1640229114
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeepAliveCache.java:
##########
@@ -48,296 +50,214 @@
* number of connections it can create.</li>
* </ol>
*/
-public final class KeepAliveCache
- extends HashMap<KeepAliveCache.KeepAliveKey, KeepAliveCache.ClientVector>
- implements Runnable {
+public final class KeepAliveCache extends Stack<KeepAliveCache.KeepAliveEntry>
+ implements
+ Closeable {
- private int maxConn;
+ /**
+ * Scheduled timer that evicts idle connections.
+ */
+ private final Timer timer;
- private long connectionIdleTTL = KAC_DEFAULT_CONN_TTL;
+ /**
+ * Task provided to the timer that owns eviction logic.
+ */
+ private final TimerTask timerTask;
- private Thread keepAliveTimer = null;
+ /**
+ * Flag to indicate if the cache is closed.
+ */
+ private boolean isClosed;
- private boolean isPaused = false;
+ /**
+ * Counter to keep track of the number of KeepAliveCache instances created.
+ */
+ private static final AtomicInteger KAC_COUNTER = new AtomicInteger(0);
- private KeepAliveCache() {
- setMaxConn();
- }
+ /**
+ * Maximum number of connections that can be cached.
+ */
+ private final int maxConn;
+
+ /**
+ * Time-to-live for an idle connection.
+ */
+ private final long connectionIdleTTL;
+
+ /**
+ * Flag to indicate if the eviction thread is paused.
+ */
+ private boolean isPaused = false;
+ @VisibleForTesting
synchronized void pauseThread() {
isPaused = true;
}
+ @VisibleForTesting
synchronized void resumeThread() {
isPaused = false;
- notify();
}
- private void setMaxConn() {
+ /**
+ * @return connectionIdleTTL
Review Comment:
nit: add a . to keep javadoc happy
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeepAliveCache.java:
##########
@@ -48,296 +50,214 @@
* number of connections it can create.</li>
* </ol>
*/
-public final class KeepAliveCache
- extends HashMap<KeepAliveCache.KeepAliveKey, KeepAliveCache.ClientVector>
- implements Runnable {
+public final class KeepAliveCache extends Stack<KeepAliveCache.KeepAliveEntry>
+ implements
+ Closeable {
- private int maxConn;
+ /**
+ * Scheduled timer that evicts idle connections.
+ */
+ private final Timer timer;
- private long connectionIdleTTL = KAC_DEFAULT_CONN_TTL;
+ /**
+ * Task provided to the timer that owns eviction logic.
+ */
+ private final TimerTask timerTask;
- private Thread keepAliveTimer = null;
+ /**
+ * Flag to indicate if the cache is closed.
+ */
+ private boolean isClosed;
- private boolean isPaused = false;
+ /**
+ * Counter to keep track of the number of KeepAliveCache instances created.
+ */
+ private static final AtomicInteger KAC_COUNTER = new AtomicInteger(0);
- private KeepAliveCache() {
- setMaxConn();
- }
+ /**
+ * Maximum number of connections that can be cached.
+ */
+ private final int maxConn;
+
+ /**
+ * Time-to-live for an idle connection.
+ */
+ private final long connectionIdleTTL;
+
+ /**
+ * Flag to indicate if the eviction thread is paused.
+ */
+ private boolean isPaused = false;
+ @VisibleForTesting
synchronized void pauseThread() {
isPaused = true;
}
+ @VisibleForTesting
synchronized void resumeThread() {
isPaused = false;
- notify();
}
- private void setMaxConn() {
+ /**
+ * @return connectionIdleTTL
+ */
+ @VisibleForTesting
+ public long getConnectionIdleTTL() {
+ return connectionIdleTTL;
+ }
+
+ public KeepAliveCache(AbfsConfiguration abfsConfiguration) {
Review Comment:
javadoc
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeepAliveCache.java:
##########
@@ -48,296 +50,214 @@
* number of connections it can create.</li>
* </ol>
*/
-public final class KeepAliveCache
- extends HashMap<KeepAliveCache.KeepAliveKey, KeepAliveCache.ClientVector>
- implements Runnable {
+public final class KeepAliveCache extends Stack<KeepAliveCache.KeepAliveEntry>
+ implements
+ Closeable {
- private int maxConn;
+ /**
+ * Scheduled timer that evicts idle connections.
+ */
+ private final Timer timer;
- private long connectionIdleTTL = KAC_DEFAULT_CONN_TTL;
+ /**
+ * Task provided to the timer that owns eviction logic.
+ */
+ private final TimerTask timerTask;
- private Thread keepAliveTimer = null;
+ /**
+ * Flag to indicate if the cache is closed.
+ */
+ private boolean isClosed;
- private boolean isPaused = false;
+ /**
+ * Counter to keep track of the number of KeepAliveCache instances created.
+ */
+ private static final AtomicInteger KAC_COUNTER = new AtomicInteger(0);
- private KeepAliveCache() {
- setMaxConn();
- }
+ /**
+ * Maximum number of connections that can be cached.
+ */
+ private final int maxConn;
+
+ /**
+ * Time-to-live for an idle connection.
+ */
+ private final long connectionIdleTTL;
+
+ /**
+ * Flag to indicate if the eviction thread is paused.
+ */
+ private boolean isPaused = false;
+ @VisibleForTesting
synchronized void pauseThread() {
isPaused = true;
}
+ @VisibleForTesting
synchronized void resumeThread() {
isPaused = false;
- notify();
}
- private void setMaxConn() {
+ /**
+ * @return connectionIdleTTL
+ */
+ @VisibleForTesting
+ public long getConnectionIdleTTL() {
+ return connectionIdleTTL;
+ }
+
+ public KeepAliveCache(AbfsConfiguration abfsConfiguration) {
+ this.timer = new Timer(
+ String.format("abfs-kac-" + KAC_COUNTER.getAndIncrement()), true);
String sysPropMaxConn = System.getProperty(HTTP_MAX_CONN_SYS_PROP);
if (sysPropMaxConn == null) {
- maxConn = DEFAULT_MAX_CONN_SYS_PROP;
+ this.maxConn =
abfsConfiguration.getMaxApacheHttpClientCacheConnections();
Review Comment:
I'd prefer the abfs config to always override the sysprop if set, because
its a lot easier to update the cluster-wise site configuration than process
launch parameters
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeepAliveCache.java:
##########
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Stack;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.http.HttpClientConnection;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_MAX_CONN_SYS_PROP;
+
+/**
+ * Connection-pooling heuristics used by {@link AbfsConnectionManager}. Each
+ * instance of FileSystem has its own KeepAliveCache.
+ * <p>
+ * Why this implementation is required in comparison to {@link
org.apache.http.impl.conn.PoolingHttpClientConnectionManager}
+ * connection-pooling:
+ * <ol>
+ * <li>PoolingHttpClientConnectionManager heuristic caches all the reusable
connections it has created.
+ * JDK's implementation only caches limited number of connections. The limit
is given by JVM system
+ * property "http.maxConnections". If there is no system-property, it defaults
to 5.</li>
+ * <li>In PoolingHttpClientConnectionManager, it expects the application to
provide `setMaxPerRoute` and `setMaxTotal`,
+ * which the implementation uses as the total number of connections it can
create. For application using ABFS, it is not
+ * feasible to provide a value in the initialisation of the connectionManager.
JDK's implementation has no cap on the
+ * number of connections it can create.</li>
+ * </ol>
+ */
+public final class KeepAliveCache extends Stack<KeepAliveCache.KeepAliveEntry>
+ implements
+ Closeable {
+
+ /**
+ * Scheduled timer that evicts idle connections.
+ */
+ private final Timer timer;
+
+ /**
+ * Task provided to the timer that owns eviction logic.
+ */
+ private final TimerTask timerTask;
+
+ /**
+ * Flag to indicate if the cache is closed.
+ */
+ private boolean isClosed;
+
+ /**
+ * Counter to keep track of the number of KeepAliveCache instances created.
+ */
+ private static final AtomicInteger KAC_COUNTER = new AtomicInteger(0);
+
+ /**
+ * Maximum number of connections that can be cached.
+ */
+ private final int maxConn;
+
+ /**
+ * Time-to-live for an idle connection.
+ */
+ private final long connectionIdleTTL;
+
+ /**
+ * Flag to indicate if the eviction thread is paused.
+ */
+ private boolean isPaused = false;
+
+ @VisibleForTesting
+ synchronized void pauseThread() {
+ isPaused = true;
+ }
+
+ @VisibleForTesting
+ synchronized void resumeThread() {
+ isPaused = false;
+ }
+
+ /**
+ * @return connectionIdleTTL
+ */
+ @VisibleForTesting
+ public long getConnectionIdleTTL() {
+ return connectionIdleTTL;
+ }
+
+ public KeepAliveCache(AbfsConfiguration abfsConfiguration) {
+ this.timer = new Timer(
+ String.format("abfs-kac-" + KAC_COUNTER.getAndIncrement()), true);
+ String sysPropMaxConn = System.getProperty(HTTP_MAX_CONN_SYS_PROP);
+ if (sysPropMaxConn == null) {
+ this.maxConn =
abfsConfiguration.getMaxApacheHttpClientCacheConnections();
+ } else {
+ maxConn = Integer.parseInt(sysPropMaxConn);
+ }
+
+ this.connectionIdleTTL
+ = abfsConfiguration.getMaxApacheHttpClientConnectionIdleTime();
+ this.timerTask = new TimerTask() {
+ @Override
+ public void run() {
+ if (isPaused) {
+ return;
+ }
+ evictIdleConnection();
+ }
+ };
+ timer.schedule(timerTask, 0, connectionIdleTTL);
+ }
+
+ /**
+ * Iterate over the cache and evict the idle connections. An idle connection
is
+ * one that has been in the cache for more than connectionIdleTTL
milliseconds.
+ */
+ synchronized void evictIdleConnection() {
+ long currentTime = System.currentTimeMillis();
+ int i;
+ for (i = 0; i < size(); i++) {
+ KeepAliveEntry e = elementAt(i);
+ if ((currentTime - e.idleStartTime) > connectionIdleTTL
+ || e.httpClientConnection.isStale()) {
+ HttpClientConnection hc = e.httpClientConnection;
+ closeHtpClientConnection(hc);
+ } else {
+ break;
+ }
+ }
+ subList(0, i).clear();
+ }
+
+ /**
+ * Safe close of the HttpClientConnection.
+ *
+ * @param hc HttpClientConnection to be closed
+ */
+ private void closeHtpClientConnection(final HttpClientConnection hc) {
+ try {
+ hc.close();
+ } catch (IOException ignored) {
+
+ }
+ }
+
+ /**
+ * Close all connections in cache and cancel the eviction timer.
+ */
+ @Override
+ public synchronized void close() {
+ isClosed = true;
+ timerTask.cancel();
+ timer.purge();
+ while (!empty()) {
+ KeepAliveEntry e = pop();
+ closeHtpClientConnection(e.httpClientConnection);
+ }
+ }
+
+ /**
+ * Gets the latest added HttpClientConnection from the cache. The returned
connection
+ * is non-stale and has been in the cache for less than connectionIdleTTL
milliseconds.
+ *
+ * The cache is checked from the top of the stack. If the connection is
stale or has been
+ * in the cache for more than connectionIdleTTL milliseconds, it is closed
and the next
+ * connection is checked. Once a valid connection is found, it is returned.
+ *
+ * @return HttpClientConnection: if a valid connection is found, else null.
+ * @throws IOException if the cache is closed.
+ */
+ public synchronized HttpClientConnection get()
+ throws IOException {
+ if (isClosed) {
+ throw new IOException("KeepAliveCache is closed");
+ }
+ if (empty()) {
+ return null;
+ }
+ HttpClientConnection hc = null;
+ long currentTime = System.currentTimeMillis();
+ do {
+ KeepAliveEntry e = pop();
+ if ((currentTime - e.idleStartTime) > connectionIdleTTL
+ || e.httpClientConnection.isStale()) {
+ closeHtpClientConnection(e.httpClientConnection);
+ } else {
+ hc = e.httpClientConnection;
+ }
+ } while ((hc == null) && (!empty()));
+ return hc;
+ }
+
+ /**
+ * Puts the HttpClientConnection in the cache. If the size of cache is equal
to
+ * maxConn, the give HttpClientConnection is closed and not added in cache.
+ *
+ * @param httpClientConnection HttpClientConnection to be cached
+ */
+ public synchronized void put(HttpClientConnection httpClientConnection) {
+ if (isClosed) {
+ return;
+ }
+ if (size() >= maxConn) {
+ closeHtpClientConnection(httpClientConnection);
+ return;
+ }
+ KeepAliveEntry entry = new KeepAliveEntry(httpClientConnection,
+ System.currentTimeMillis());
+ push(entry);
+ }
+
+ @Override
+ public synchronized boolean equals(final Object o) {
+ if (o instanceof KeepAliveCache) {
+ KeepAliveCache inst = (KeepAliveCache) o;
+ for (int i = 0; i < size(); i++) {
+ if (!elementAt(i).equals((inst.elementAt(i)))) {
+ return false;
+ }
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public synchronized int hashCode() {
+ return super.hashCode();
+ }
+
+ /**
+ * Entry data-structure in the cache.
+ */
+ static class KeepAliveEntry {
+
+ /**HttpClientConnection in the cache entry.*/
+ private final HttpClientConnection httpClientConnection;
+
+ /**Time at which the HttpClientConnection was added to the cache.*/
+ private final long idleStartTime;
+
+ KeepAliveEntry(HttpClientConnection hc, long idleStartTime) {
+ this.httpClientConnection = hc;
+ this.idleStartTime = idleStartTime;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (o instanceof KeepAliveEntry) {
+ return httpClientConnection.equals(
+ ((KeepAliveEntry) o).httpClientConnection);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return httpClientConnection.hashCode();
+ }
+ }
+
+ // Methods to prevent serialization of the KeepAliveCache.
Review Comment:
cut these, not needed
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeepAliveCache.java:
##########
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Stack;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.http.HttpClientConnection;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_MAX_CONN_SYS_PROP;
+
+/**
+ * Connection-pooling heuristics used by {@link AbfsConnectionManager}. Each
+ * instance of FileSystem has its own KeepAliveCache.
+ * <p>
+ * Why this implementation is required in comparison to {@link
org.apache.http.impl.conn.PoolingHttpClientConnectionManager}
+ * connection-pooling:
+ * <ol>
+ * <li>PoolingHttpClientConnectionManager heuristic caches all the reusable
connections it has created.
+ * JDK's implementation only caches limited number of connections. The limit
is given by JVM system
+ * property "http.maxConnections". If there is no system-property, it defaults
to 5.</li>
+ * <li>In PoolingHttpClientConnectionManager, it expects the application to
provide `setMaxPerRoute` and `setMaxTotal`,
+ * which the implementation uses as the total number of connections it can
create. For application using ABFS, it is not
+ * feasible to provide a value in the initialisation of the connectionManager.
JDK's implementation has no cap on the
+ * number of connections it can create.</li>
+ * </ol>
+ */
+public final class KeepAliveCache extends Stack<KeepAliveCache.KeepAliveEntry>
+ implements
+ Closeable {
+
+ /**
+ * Scheduled timer that evicts idle connections.
+ */
+ private final Timer timer;
+
+ /**
+ * Task provided to the timer that owns eviction logic.
+ */
+ private final TimerTask timerTask;
+
+ /**
+ * Flag to indicate if the cache is closed.
+ */
+ private boolean isClosed;
+
+ /**
+ * Counter to keep track of the number of KeepAliveCache instances created.
+ */
+ private static final AtomicInteger KAC_COUNTER = new AtomicInteger(0);
+
+ /**
+ * Maximum number of connections that can be cached.
+ */
+ private final int maxConn;
+
+ /**
+ * Time-to-live for an idle connection.
+ */
+ private final long connectionIdleTTL;
+
+ /**
+ * Flag to indicate if the eviction thread is paused.
+ */
+ private boolean isPaused = false;
+
+ @VisibleForTesting
+ synchronized void pauseThread() {
+ isPaused = true;
+ }
+
+ @VisibleForTesting
+ synchronized void resumeThread() {
+ isPaused = false;
+ }
+
+ /**
+ * @return connectionIdleTTL
+ */
+ @VisibleForTesting
+ public long getConnectionIdleTTL() {
+ return connectionIdleTTL;
+ }
+
+ public KeepAliveCache(AbfsConfiguration abfsConfiguration) {
+ this.timer = new Timer(
+ String.format("abfs-kac-" + KAC_COUNTER.getAndIncrement()), true);
+ String sysPropMaxConn = System.getProperty(HTTP_MAX_CONN_SYS_PROP);
+ if (sysPropMaxConn == null) {
+ this.maxConn =
abfsConfiguration.getMaxApacheHttpClientCacheConnections();
+ } else {
+ maxConn = Integer.parseInt(sysPropMaxConn);
+ }
+
+ this.connectionIdleTTL
+ = abfsConfiguration.getMaxApacheHttpClientConnectionIdleTime();
+ this.timerTask = new TimerTask() {
+ @Override
+ public void run() {
+ if (isPaused) {
+ return;
+ }
+ evictIdleConnection();
+ }
+ };
+ timer.schedule(timerTask, 0, connectionIdleTTL);
+ }
+
+ /**
+ * Iterate over the cache and evict the idle connections. An idle connection
is
+ * one that has been in the cache for more than connectionIdleTTL
milliseconds.
+ */
+ synchronized void evictIdleConnection() {
+ long currentTime = System.currentTimeMillis();
+ int i;
+ for (i = 0; i < size(); i++) {
+ KeepAliveEntry e = elementAt(i);
+ if ((currentTime - e.idleStartTime) > connectionIdleTTL
+ || e.httpClientConnection.isStale()) {
+ HttpClientConnection hc = e.httpClientConnection;
+ closeHtpClientConnection(hc);
+ } else {
+ break;
+ }
+ }
+ subList(0, i).clear();
+ }
+
+ /**
+ * Safe close of the HttpClientConnection.
+ *
+ * @param hc HttpClientConnection to be closed
+ */
+ private void closeHtpClientConnection(final HttpClientConnection hc) {
+ try {
+ hc.close();
+ } catch (IOException ignored) {
+
+ }
+ }
+
+ /**
+ * Close all connections in cache and cancel the eviction timer.
+ */
+ @Override
+ public synchronized void close() {
+ isClosed = true;
+ timerTask.cancel();
+ timer.purge();
+ while (!empty()) {
+ KeepAliveEntry e = pop();
+ closeHtpClientConnection(e.httpClientConnection);
+ }
+ }
+
+ /**
+ * Gets the latest added HttpClientConnection from the cache. The returned
connection
+ * is non-stale and has been in the cache for less than connectionIdleTTL
milliseconds.
+ *
+ * The cache is checked from the top of the stack. If the connection is
stale or has been
+ * in the cache for more than connectionIdleTTL milliseconds, it is closed
and the next
+ * connection is checked. Once a valid connection is found, it is returned.
+ *
+ * @return HttpClientConnection: if a valid connection is found, else null.
+ * @throws IOException if the cache is closed.
+ */
+ public synchronized HttpClientConnection get()
+ throws IOException {
+ if (isClosed) {
+ throw new IOException("KeepAliveCache is closed");
+ }
+ if (empty()) {
+ return null;
+ }
+ HttpClientConnection hc = null;
+ long currentTime = System.currentTimeMillis();
+ do {
+ KeepAliveEntry e = pop();
+ if ((currentTime - e.idleStartTime) > connectionIdleTTL
+ || e.httpClientConnection.isStale()) {
+ closeHtpClientConnection(e.httpClientConnection);
+ } else {
+ hc = e.httpClientConnection;
+ }
+ } while ((hc == null) && (!empty()));
+ return hc;
+ }
+
+ /**
+ * Puts the HttpClientConnection in the cache. If the size of cache is equal
to
+ * maxConn, the give HttpClientConnection is closed and not added in cache.
+ *
+ * @param httpClientConnection HttpClientConnection to be cached
+ */
+ public synchronized void put(HttpClientConnection httpClientConnection) {
+ if (isClosed) {
+ return;
+ }
+ if (size() >= maxConn) {
+ closeHtpClientConnection(httpClientConnection);
+ return;
+ }
+ KeepAliveEntry entry = new KeepAliveEntry(httpClientConnection,
+ System.currentTimeMillis());
+ push(entry);
+ }
+
+ @Override
+ public synchronized boolean equals(final Object o) {
+ if (o instanceof KeepAliveCache) {
+ KeepAliveCache inst = (KeepAliveCache) o;
+ for (int i = 0; i < size(); i++) {
+ if (!elementAt(i).equals((inst.elementAt(i)))) {
+ return false;
+ }
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public synchronized int hashCode() {
Review Comment:
cut
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeepAliveCache.java:
##########
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Stack;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.http.HttpClientConnection;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_MAX_CONN_SYS_PROP;
+
+/**
+ * Connection-pooling heuristics used by {@link AbfsConnectionManager}. Each
+ * instance of FileSystem has its own KeepAliveCache.
+ * <p>
+ * Why this implementation is required in comparison to {@link
org.apache.http.impl.conn.PoolingHttpClientConnectionManager}
+ * connection-pooling:
+ * <ol>
+ * <li>PoolingHttpClientConnectionManager heuristic caches all the reusable
connections it has created.
+ * JDK's implementation only caches limited number of connections. The limit
is given by JVM system
+ * property "http.maxConnections". If there is no system-property, it defaults
to 5.</li>
+ * <li>In PoolingHttpClientConnectionManager, it expects the application to
provide `setMaxPerRoute` and `setMaxTotal`,
+ * which the implementation uses as the total number of connections it can
create. For application using ABFS, it is not
+ * feasible to provide a value in the initialisation of the connectionManager.
JDK's implementation has no cap on the
+ * number of connections it can create.</li>
+ * </ol>
+ */
+public final class KeepAliveCache extends Stack<KeepAliveCache.KeepAliveEntry>
+ implements
+ Closeable {
+
+ /**
+ * Scheduled timer that evicts idle connections.
+ */
+ private final Timer timer;
+
+ /**
+ * Task provided to the timer that owns eviction logic.
+ */
+ private final TimerTask timerTask;
+
+ /**
+ * Flag to indicate if the cache is closed.
+ */
+ private boolean isClosed;
+
+ /**
+ * Counter to keep track of the number of KeepAliveCache instances created.
+ */
+ private static final AtomicInteger KAC_COUNTER = new AtomicInteger(0);
+
+ /**
+ * Maximum number of connections that can be cached.
+ */
+ private final int maxConn;
+
+ /**
+ * Time-to-live for an idle connection.
+ */
+ private final long connectionIdleTTL;
+
+ /**
+ * Flag to indicate if the eviction thread is paused.
+ */
+ private boolean isPaused = false;
Review Comment:
either make atomicboolean or declare `volatile`. I'd prefer the atomic
boolean
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeepAliveCache.java:
##########
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Stack;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.http.HttpClientConnection;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_MAX_CONN_SYS_PROP;
+
+/**
+ * Connection-pooling heuristics used by {@link AbfsConnectionManager}. Each
+ * instance of FileSystem has its own KeepAliveCache.
+ * <p>
+ * Why this implementation is required in comparison to {@link
org.apache.http.impl.conn.PoolingHttpClientConnectionManager}
+ * connection-pooling:
+ * <ol>
+ * <li>PoolingHttpClientConnectionManager heuristic caches all the reusable
connections it has created.
+ * JDK's implementation only caches limited number of connections. The limit
is given by JVM system
+ * property "http.maxConnections". If there is no system-property, it defaults
to 5.</li>
+ * <li>In PoolingHttpClientConnectionManager, it expects the application to
provide `setMaxPerRoute` and `setMaxTotal`,
+ * which the implementation uses as the total number of connections it can
create. For application using ABFS, it is not
+ * feasible to provide a value in the initialisation of the connectionManager.
JDK's implementation has no cap on the
+ * number of connections it can create.</li>
+ * </ol>
+ */
+public final class KeepAliveCache extends Stack<KeepAliveCache.KeepAliveEntry>
+ implements
+ Closeable {
+
+ /**
+ * Scheduled timer that evicts idle connections.
+ */
+ private final Timer timer;
+
+ /**
+ * Task provided to the timer that owns eviction logic.
+ */
+ private final TimerTask timerTask;
+
+ /**
+ * Flag to indicate if the cache is closed.
+ */
+ private boolean isClosed;
+
+ /**
+ * Counter to keep track of the number of KeepAliveCache instances created.
+ */
+ private static final AtomicInteger KAC_COUNTER = new AtomicInteger(0);
+
+ /**
+ * Maximum number of connections that can be cached.
+ */
+ private final int maxConn;
+
+ /**
+ * Time-to-live for an idle connection.
+ */
+ private final long connectionIdleTTL;
+
+ /**
+ * Flag to indicate if the eviction thread is paused.
+ */
+ private boolean isPaused = false;
+
+ @VisibleForTesting
+ synchronized void pauseThread() {
+ isPaused = true;
+ }
+
+ @VisibleForTesting
+ synchronized void resumeThread() {
+ isPaused = false;
+ }
+
+ /**
+ * @return connectionIdleTTL
+ */
+ @VisibleForTesting
+ public long getConnectionIdleTTL() {
+ return connectionIdleTTL;
+ }
+
+ public KeepAliveCache(AbfsConfiguration abfsConfiguration) {
+ this.timer = new Timer(
+ String.format("abfs-kac-" + KAC_COUNTER.getAndIncrement()), true);
+ String sysPropMaxConn = System.getProperty(HTTP_MAX_CONN_SYS_PROP);
+ if (sysPropMaxConn == null) {
+ this.maxConn =
abfsConfiguration.getMaxApacheHttpClientCacheConnections();
+ } else {
+ maxConn = Integer.parseInt(sysPropMaxConn);
+ }
+
+ this.connectionIdleTTL
+ = abfsConfiguration.getMaxApacheHttpClientConnectionIdleTime();
+ this.timerTask = new TimerTask() {
+ @Override
+ public void run() {
+ if (isPaused) {
+ return;
+ }
+ evictIdleConnection();
+ }
+ };
+ timer.schedule(timerTask, 0, connectionIdleTTL);
+ }
+
+ /**
+ * Iterate over the cache and evict the idle connections. An idle connection
is
+ * one that has been in the cache for more than connectionIdleTTL
milliseconds.
+ */
+ synchronized void evictIdleConnection() {
+ long currentTime = System.currentTimeMillis();
+ int i;
+ for (i = 0; i < size(); i++) {
+ KeepAliveEntry e = elementAt(i);
+ if ((currentTime - e.idleStartTime) > connectionIdleTTL
+ || e.httpClientConnection.isStale()) {
+ HttpClientConnection hc = e.httpClientConnection;
+ closeHtpClientConnection(hc);
+ } else {
+ break;
+ }
+ }
+ subList(0, i).clear();
+ }
+
+ /**
+ * Safe close of the HttpClientConnection.
+ *
+ * @param hc HttpClientConnection to be closed
+ */
+ private void closeHtpClientConnection(final HttpClientConnection hc) {
+ try {
+ hc.close();
+ } catch (IOException ignored) {
+
+ }
+ }
+
+ /**
+ * Close all connections in cache and cancel the eviction timer.
+ */
+ @Override
+ public synchronized void close() {
+ isClosed = true;
+ timerTask.cancel();
+ timer.purge();
+ while (!empty()) {
+ KeepAliveEntry e = pop();
+ closeHtpClientConnection(e.httpClientConnection);
+ }
+ }
+
+ /**
+ * Gets the latest added HttpClientConnection from the cache. The returned
connection
+ * is non-stale and has been in the cache for less than connectionIdleTTL
milliseconds.
+ *
+ * The cache is checked from the top of the stack. If the connection is
stale or has been
+ * in the cache for more than connectionIdleTTL milliseconds, it is closed
and the next
+ * connection is checked. Once a valid connection is found, it is returned.
+ *
+ * @return HttpClientConnection: if a valid connection is found, else null.
+ * @throws IOException if the cache is closed.
+ */
+ public synchronized HttpClientConnection get()
+ throws IOException {
+ if (isClosed) {
+ throw new IOException("KeepAliveCache is closed");
+ }
+ if (empty()) {
+ return null;
+ }
+ HttpClientConnection hc = null;
+ long currentTime = System.currentTimeMillis();
+ do {
+ KeepAliveEntry e = pop();
+ if ((currentTime - e.idleStartTime) > connectionIdleTTL
+ || e.httpClientConnection.isStale()) {
+ closeHtpClientConnection(e.httpClientConnection);
+ } else {
+ hc = e.httpClientConnection;
+ }
+ } while ((hc == null) && (!empty()));
+ return hc;
+ }
+
+ /**
+ * Puts the HttpClientConnection in the cache. If the size of cache is equal
to
+ * maxConn, the give HttpClientConnection is closed and not added in cache.
+ *
+ * @param httpClientConnection HttpClientConnection to be cached
+ */
+ public synchronized void put(HttpClientConnection httpClientConnection) {
Review Comment:
make this boolean "entry was cached" so tests can check its behaviour
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeepAliveCache.java:
##########
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Stack;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.http.HttpClientConnection;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_MAX_CONN_SYS_PROP;
+
+/**
+ * Connection-pooling heuristics used by {@link AbfsConnectionManager}. Each
+ * instance of FileSystem has its own KeepAliveCache.
+ * <p>
+ * Why this implementation is required in comparison to {@link
org.apache.http.impl.conn.PoolingHttpClientConnectionManager}
+ * connection-pooling:
+ * <ol>
+ * <li>PoolingHttpClientConnectionManager heuristic caches all the reusable
connections it has created.
+ * JDK's implementation only caches limited number of connections. The limit
is given by JVM system
+ * property "http.maxConnections". If there is no system-property, it defaults
to 5.</li>
+ * <li>In PoolingHttpClientConnectionManager, it expects the application to
provide `setMaxPerRoute` and `setMaxTotal`,
+ * which the implementation uses as the total number of connections it can
create. For application using ABFS, it is not
+ * feasible to provide a value in the initialisation of the connectionManager.
JDK's implementation has no cap on the
+ * number of connections it can create.</li>
+ * </ol>
+ */
+public final class KeepAliveCache extends Stack<KeepAliveCache.KeepAliveEntry>
+ implements
+ Closeable {
+
+ /**
+ * Scheduled timer that evicts idle connections.
+ */
+ private final Timer timer;
+
+ /**
+ * Task provided to the timer that owns eviction logic.
+ */
+ private final TimerTask timerTask;
+
+ /**
+ * Flag to indicate if the cache is closed.
+ */
+ private boolean isClosed;
+
+ /**
+ * Counter to keep track of the number of KeepAliveCache instances created.
+ */
+ private static final AtomicInteger KAC_COUNTER = new AtomicInteger(0);
+
+ /**
+ * Maximum number of connections that can be cached.
+ */
+ private final int maxConn;
+
+ /**
+ * Time-to-live for an idle connection.
+ */
+ private final long connectionIdleTTL;
+
+ /**
+ * Flag to indicate if the eviction thread is paused.
+ */
+ private boolean isPaused = false;
+
+ @VisibleForTesting
+ synchronized void pauseThread() {
+ isPaused = true;
+ }
+
+ @VisibleForTesting
+ synchronized void resumeThread() {
+ isPaused = false;
+ }
+
+ /**
+ * @return connectionIdleTTL
+ */
+ @VisibleForTesting
+ public long getConnectionIdleTTL() {
+ return connectionIdleTTL;
+ }
+
+ public KeepAliveCache(AbfsConfiguration abfsConfiguration) {
+ this.timer = new Timer(
+ String.format("abfs-kac-" + KAC_COUNTER.getAndIncrement()), true);
+ String sysPropMaxConn = System.getProperty(HTTP_MAX_CONN_SYS_PROP);
+ if (sysPropMaxConn == null) {
+ this.maxConn =
abfsConfiguration.getMaxApacheHttpClientCacheConnections();
+ } else {
+ maxConn = Integer.parseInt(sysPropMaxConn);
+ }
+
+ this.connectionIdleTTL
+ = abfsConfiguration.getMaxApacheHttpClientConnectionIdleTime();
+ this.timerTask = new TimerTask() {
+ @Override
+ public void run() {
+ if (isPaused) {
+ return;
+ }
+ evictIdleConnection();
+ }
+ };
+ timer.schedule(timerTask, 0, connectionIdleTTL);
+ }
+
+ /**
+ * Iterate over the cache and evict the idle connections. An idle connection
is
+ * one that has been in the cache for more than connectionIdleTTL
milliseconds.
+ */
+ synchronized void evictIdleConnection() {
+ long currentTime = System.currentTimeMillis();
+ int i;
+ for (i = 0; i < size(); i++) {
+ KeepAliveEntry e = elementAt(i);
+ if ((currentTime - e.idleStartTime) > connectionIdleTTL
+ || e.httpClientConnection.isStale()) {
+ HttpClientConnection hc = e.httpClientConnection;
+ closeHtpClientConnection(hc);
+ } else {
+ break;
+ }
+ }
+ subList(0, i).clear();
+ }
+
+ /**
+ * Safe close of the HttpClientConnection.
+ *
+ * @param hc HttpClientConnection to be closed
+ */
+ private void closeHtpClientConnection(final HttpClientConnection hc) {
+ try {
+ hc.close();
+ } catch (IOException ignored) {
+
Review Comment:
log at debug, in case it is important
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeepAliveCache.java:
##########
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Stack;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.http.HttpClientConnection;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_MAX_CONN_SYS_PROP;
+
+/**
+ * Connection-pooling heuristics used by {@link AbfsConnectionManager}. Each
+ * instance of FileSystem has its own KeepAliveCache.
+ * <p>
+ * Why this implementation is required in comparison to {@link
org.apache.http.impl.conn.PoolingHttpClientConnectionManager}
+ * connection-pooling:
+ * <ol>
+ * <li>PoolingHttpClientConnectionManager heuristic caches all the reusable
connections it has created.
+ * JDK's implementation only caches limited number of connections. The limit
is given by JVM system
+ * property "http.maxConnections". If there is no system-property, it defaults
to 5.</li>
+ * <li>In PoolingHttpClientConnectionManager, it expects the application to
provide `setMaxPerRoute` and `setMaxTotal`,
+ * which the implementation uses as the total number of connections it can
create. For application using ABFS, it is not
+ * feasible to provide a value in the initialisation of the connectionManager.
JDK's implementation has no cap on the
+ * number of connections it can create.</li>
+ * </ol>
+ */
+public final class KeepAliveCache extends Stack<KeepAliveCache.KeepAliveEntry>
+ implements
+ Closeable {
+
+ /**
+ * Scheduled timer that evicts idle connections.
+ */
+ private final Timer timer;
+
+ /**
+ * Task provided to the timer that owns eviction logic.
+ */
+ private final TimerTask timerTask;
+
+ /**
+ * Flag to indicate if the cache is closed.
+ */
+ private boolean isClosed;
+
+ /**
+ * Counter to keep track of the number of KeepAliveCache instances created.
+ */
+ private static final AtomicInteger KAC_COUNTER = new AtomicInteger(0);
+
+ /**
+ * Maximum number of connections that can be cached.
+ */
+ private final int maxConn;
+
+ /**
+ * Time-to-live for an idle connection.
+ */
+ private final long connectionIdleTTL;
+
+ /**
+ * Flag to indicate if the eviction thread is paused.
+ */
+ private boolean isPaused = false;
+
+ @VisibleForTesting
+ synchronized void pauseThread() {
+ isPaused = true;
+ }
+
+ @VisibleForTesting
+ synchronized void resumeThread() {
+ isPaused = false;
+ }
+
+ /**
+ * @return connectionIdleTTL
+ */
+ @VisibleForTesting
+ public long getConnectionIdleTTL() {
+ return connectionIdleTTL;
+ }
+
+ public KeepAliveCache(AbfsConfiguration abfsConfiguration) {
+ this.timer = new Timer(
+ String.format("abfs-kac-" + KAC_COUNTER.getAndIncrement()), true);
+ String sysPropMaxConn = System.getProperty(HTTP_MAX_CONN_SYS_PROP);
+ if (sysPropMaxConn == null) {
+ this.maxConn =
abfsConfiguration.getMaxApacheHttpClientCacheConnections();
+ } else {
+ maxConn = Integer.parseInt(sysPropMaxConn);
+ }
+
+ this.connectionIdleTTL
+ = abfsConfiguration.getMaxApacheHttpClientConnectionIdleTime();
+ this.timerTask = new TimerTask() {
+ @Override
+ public void run() {
+ if (isPaused) {
Review Comment:
or is closed
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeepAliveCache.java:
##########
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Stack;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.http.HttpClientConnection;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_MAX_CONN_SYS_PROP;
+
+/**
+ * Connection-pooling heuristics used by {@link AbfsConnectionManager}. Each
+ * instance of FileSystem has its own KeepAliveCache.
+ * <p>
+ * Why this implementation is required in comparison to {@link
org.apache.http.impl.conn.PoolingHttpClientConnectionManager}
+ * connection-pooling:
+ * <ol>
+ * <li>PoolingHttpClientConnectionManager heuristic caches all the reusable
connections it has created.
+ * JDK's implementation only caches limited number of connections. The limit
is given by JVM system
+ * property "http.maxConnections". If there is no system-property, it defaults
to 5.</li>
+ * <li>In PoolingHttpClientConnectionManager, it expects the application to
provide `setMaxPerRoute` and `setMaxTotal`,
+ * which the implementation uses as the total number of connections it can
create. For application using ABFS, it is not
+ * feasible to provide a value in the initialisation of the connectionManager.
JDK's implementation has no cap on the
+ * number of connections it can create.</li>
+ * </ol>
+ */
+public final class KeepAliveCache extends Stack<KeepAliveCache.KeepAliveEntry>
+ implements
+ Closeable {
+
+ /**
+ * Scheduled timer that evicts idle connections.
+ */
+ private final Timer timer;
+
+ /**
+ * Task provided to the timer that owns eviction logic.
+ */
+ private final TimerTask timerTask;
+
+ /**
+ * Flag to indicate if the cache is closed.
+ */
+ private boolean isClosed;
+
+ /**
+ * Counter to keep track of the number of KeepAliveCache instances created.
+ */
+ private static final AtomicInteger KAC_COUNTER = new AtomicInteger(0);
+
+ /**
+ * Maximum number of connections that can be cached.
+ */
+ private final int maxConn;
+
+ /**
+ * Time-to-live for an idle connection.
+ */
+ private final long connectionIdleTTL;
+
+ /**
+ * Flag to indicate if the eviction thread is paused.
+ */
+ private boolean isPaused = false;
+
+ @VisibleForTesting
+ synchronized void pauseThread() {
+ isPaused = true;
+ }
+
+ @VisibleForTesting
+ synchronized void resumeThread() {
+ isPaused = false;
+ }
+
+ /**
+ * @return connectionIdleTTL
+ */
+ @VisibleForTesting
+ public long getConnectionIdleTTL() {
+ return connectionIdleTTL;
+ }
+
+ public KeepAliveCache(AbfsConfiguration abfsConfiguration) {
+ this.timer = new Timer(
+ String.format("abfs-kac-" + KAC_COUNTER.getAndIncrement()), true);
+ String sysPropMaxConn = System.getProperty(HTTP_MAX_CONN_SYS_PROP);
+ if (sysPropMaxConn == null) {
+ this.maxConn =
abfsConfiguration.getMaxApacheHttpClientCacheConnections();
+ } else {
+ maxConn = Integer.parseInt(sysPropMaxConn);
+ }
+
+ this.connectionIdleTTL
+ = abfsConfiguration.getMaxApacheHttpClientConnectionIdleTime();
+ this.timerTask = new TimerTask() {
+ @Override
+ public void run() {
+ if (isPaused) {
+ return;
+ }
+ evictIdleConnection();
+ }
+ };
+ timer.schedule(timerTask, 0, connectionIdleTTL);
+ }
+
+ /**
+ * Iterate over the cache and evict the idle connections. An idle connection
is
+ * one that has been in the cache for more than connectionIdleTTL
milliseconds.
+ */
+ synchronized void evictIdleConnection() {
+ long currentTime = System.currentTimeMillis();
+ int i;
+ for (i = 0; i < size(); i++) {
+ KeepAliveEntry e = elementAt(i);
+ if ((currentTime - e.idleStartTime) > connectionIdleTTL
+ || e.httpClientConnection.isStale()) {
+ HttpClientConnection hc = e.httpClientConnection;
+ closeHtpClientConnection(hc);
+ } else {
+ break;
+ }
+ }
+ subList(0, i).clear();
+ }
+
+ /**
+ * Safe close of the HttpClientConnection.
+ *
+ * @param hc HttpClientConnection to be closed
+ */
+ private void closeHtpClientConnection(final HttpClientConnection hc) {
+ try {
+ hc.close();
+ } catch (IOException ignored) {
+
+ }
+ }
+
+ /**
+ * Close all connections in cache and cancel the eviction timer.
+ */
+ @Override
+ public synchronized void close() {
+ isClosed = true;
+ timerTask.cancel();
+ timer.purge();
+ while (!empty()) {
+ KeepAliveEntry e = pop();
+ closeHtpClientConnection(e.httpClientConnection);
+ }
+ }
+
+ /**
+ * Gets the latest added HttpClientConnection from the cache. The returned
connection
+ * is non-stale and has been in the cache for less than connectionIdleTTL
milliseconds.
+ *
+ * The cache is checked from the top of the stack. If the connection is
stale or has been
+ * in the cache for more than connectionIdleTTL milliseconds, it is closed
and the next
+ * connection is checked. Once a valid connection is found, it is returned.
+ *
+ * @return HttpClientConnection: if a valid connection is found, else null.
+ * @throws IOException if the cache is closed.
+ */
+ public synchronized HttpClientConnection get()
+ throws IOException {
+ if (isClosed) {
+ throw new IOException("KeepAliveCache is closed");
+ }
+ if (empty()) {
+ return null;
+ }
+ HttpClientConnection hc = null;
+ long currentTime = System.currentTimeMillis();
+ do {
+ KeepAliveEntry e = pop();
+ if ((currentTime - e.idleStartTime) > connectionIdleTTL
+ || e.httpClientConnection.isStale()) {
+ closeHtpClientConnection(e.httpClientConnection);
+ } else {
+ hc = e.httpClientConnection;
+ }
+ } while ((hc == null) && (!empty()));
+ return hc;
+ }
+
+ /**
+ * Puts the HttpClientConnection in the cache. If the size of cache is equal
to
+ * maxConn, the give HttpClientConnection is closed and not added in cache.
+ *
Review Comment:
add if the pool is closed will return "false"
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeepAliveCache.java:
##########
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Stack;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.http.HttpClientConnection;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_MAX_CONN_SYS_PROP;
+
+/**
+ * Connection-pooling heuristics used by {@link AbfsConnectionManager}. Each
+ * instance of FileSystem has its own KeepAliveCache.
+ * <p>
+ * Why this implementation is required in comparison to {@link
org.apache.http.impl.conn.PoolingHttpClientConnectionManager}
+ * connection-pooling:
+ * <ol>
+ * <li>PoolingHttpClientConnectionManager heuristic caches all the reusable
connections it has created.
+ * JDK's implementation only caches limited number of connections. The limit
is given by JVM system
+ * property "http.maxConnections". If there is no system-property, it defaults
to 5.</li>
+ * <li>In PoolingHttpClientConnectionManager, it expects the application to
provide `setMaxPerRoute` and `setMaxTotal`,
+ * which the implementation uses as the total number of connections it can
create. For application using ABFS, it is not
+ * feasible to provide a value in the initialisation of the connectionManager.
JDK's implementation has no cap on the
+ * number of connections it can create.</li>
+ * </ol>
+ */
+public final class KeepAliveCache extends Stack<KeepAliveCache.KeepAliveEntry>
+ implements
+ Closeable {
+
+ /**
+ * Scheduled timer that evicts idle connections.
+ */
+ private final Timer timer;
+
+ /**
+ * Task provided to the timer that owns eviction logic.
+ */
+ private final TimerTask timerTask;
+
+ /**
+ * Flag to indicate if the cache is closed.
+ */
+ private boolean isClosed;
+
+ /**
+ * Counter to keep track of the number of KeepAliveCache instances created.
+ */
+ private static final AtomicInteger KAC_COUNTER = new AtomicInteger(0);
+
+ /**
+ * Maximum number of connections that can be cached.
+ */
+ private final int maxConn;
+
+ /**
+ * Time-to-live for an idle connection.
+ */
+ private final long connectionIdleTTL;
+
+ /**
+ * Flag to indicate if the eviction thread is paused.
+ */
+ private boolean isPaused = false;
+
+ @VisibleForTesting
+ synchronized void pauseThread() {
+ isPaused = true;
+ }
+
+ @VisibleForTesting
+ synchronized void resumeThread() {
+ isPaused = false;
+ }
+
+ /**
+ * @return connectionIdleTTL
+ */
+ @VisibleForTesting
+ public long getConnectionIdleTTL() {
+ return connectionIdleTTL;
+ }
+
+ public KeepAliveCache(AbfsConfiguration abfsConfiguration) {
+ this.timer = new Timer(
+ String.format("abfs-kac-" + KAC_COUNTER.getAndIncrement()), true);
+ String sysPropMaxConn = System.getProperty(HTTP_MAX_CONN_SYS_PROP);
+ if (sysPropMaxConn == null) {
+ this.maxConn =
abfsConfiguration.getMaxApacheHttpClientCacheConnections();
+ } else {
+ maxConn = Integer.parseInt(sysPropMaxConn);
+ }
+
+ this.connectionIdleTTL
+ = abfsConfiguration.getMaxApacheHttpClientConnectionIdleTime();
+ this.timerTask = new TimerTask() {
+ @Override
+ public void run() {
+ if (isPaused) {
+ return;
+ }
+ evictIdleConnection();
+ }
+ };
+ timer.schedule(timerTask, 0, connectionIdleTTL);
+ }
+
+ /**
+ * Iterate over the cache and evict the idle connections. An idle connection
is
+ * one that has been in the cache for more than connectionIdleTTL
milliseconds.
+ */
+ synchronized void evictIdleConnection() {
+ long currentTime = System.currentTimeMillis();
+ int i;
+ for (i = 0; i < size(); i++) {
+ KeepAliveEntry e = elementAt(i);
+ if ((currentTime - e.idleStartTime) > connectionIdleTTL
+ || e.httpClientConnection.isStale()) {
+ HttpClientConnection hc = e.httpClientConnection;
+ closeHtpClientConnection(hc);
+ } else {
+ break;
+ }
+ }
+ subList(0, i).clear();
+ }
+
+ /**
+ * Safe close of the HttpClientConnection.
+ *
+ * @param hc HttpClientConnection to be closed
+ */
+ private void closeHtpClientConnection(final HttpClientConnection hc) {
+ try {
+ hc.close();
+ } catch (IOException ignored) {
+
+ }
+ }
+
+ /**
+ * Close all connections in cache and cancel the eviction timer.
+ */
+ @Override
+ public synchronized void close() {
+ isClosed = true;
+ timerTask.cancel();
+ timer.purge();
+ while (!empty()) {
+ KeepAliveEntry e = pop();
+ closeHtpClientConnection(e.httpClientConnection);
+ }
+ }
+
+ /**
+ * Gets the latest added HttpClientConnection from the cache. The returned
connection
+ * is non-stale and has been in the cache for less than connectionIdleTTL
milliseconds.
+ *
+ * The cache is checked from the top of the stack. If the connection is
stale or has been
+ * in the cache for more than connectionIdleTTL milliseconds, it is closed
and the next
+ * connection is checked. Once a valid connection is found, it is returned.
+ *
+ * @return HttpClientConnection: if a valid connection is found, else null.
+ * @throws IOException if the cache is closed.
+ */
+ public synchronized HttpClientConnection get()
+ throws IOException {
+ if (isClosed) {
+ throw new IOException("KeepAliveCache is closed");
+ }
+ if (empty()) {
+ return null;
+ }
+ HttpClientConnection hc = null;
+ long currentTime = System.currentTimeMillis();
+ do {
+ KeepAliveEntry e = pop();
+ if ((currentTime - e.idleStartTime) > connectionIdleTTL
+ || e.httpClientConnection.isStale()) {
+ closeHtpClientConnection(e.httpClientConnection);
+ } else {
+ hc = e.httpClientConnection;
+ }
+ } while ((hc == null) && (!empty()));
+ return hc;
+ }
+
+ /**
+ * Puts the HttpClientConnection in the cache. If the size of cache is equal
to
+ * maxConn, the give HttpClientConnection is closed and not added in cache.
+ *
+ * @param httpClientConnection HttpClientConnection to be cached
+ */
+ public synchronized void put(HttpClientConnection httpClientConnection) {
+ if (isClosed) {
+ return;
+ }
+ if (size() >= maxConn) {
+ closeHtpClientConnection(httpClientConnection);
+ return;
+ }
+ KeepAliveEntry entry = new KeepAliveEntry(httpClientConnection,
+ System.currentTimeMillis());
+ push(entry);
+ }
+
+ @Override
+ public synchronized boolean equals(final Object o) {
Review Comment:
let's cut this, I don't see any need for it
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeepAliveCache.java:
##########
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Stack;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.http.HttpClientConnection;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_MAX_CONN_SYS_PROP;
+
+/**
+ * Connection-pooling heuristics used by {@link AbfsConnectionManager}. Each
+ * instance of FileSystem has its own KeepAliveCache.
+ * <p>
+ * Why this implementation is required in comparison to {@link
org.apache.http.impl.conn.PoolingHttpClientConnectionManager}
+ * connection-pooling:
+ * <ol>
+ * <li>PoolingHttpClientConnectionManager heuristic caches all the reusable
connections it has created.
+ * JDK's implementation only caches limited number of connections. The limit
is given by JVM system
+ * property "http.maxConnections". If there is no system-property, it defaults
to 5.</li>
+ * <li>In PoolingHttpClientConnectionManager, it expects the application to
provide `setMaxPerRoute` and `setMaxTotal`,
+ * which the implementation uses as the total number of connections it can
create. For application using ABFS, it is not
+ * feasible to provide a value in the initialisation of the connectionManager.
JDK's implementation has no cap on the
+ * number of connections it can create.</li>
+ * </ol>
+ */
+public final class KeepAliveCache extends Stack<KeepAliveCache.KeepAliveEntry>
+ implements
+ Closeable {
+
+ /**
+ * Scheduled timer that evicts idle connections.
+ */
+ private final Timer timer;
+
+ /**
+ * Task provided to the timer that owns eviction logic.
+ */
+ private final TimerTask timerTask;
+
+ /**
+ * Flag to indicate if the cache is closed.
+ */
+ private boolean isClosed;
+
+ /**
+ * Counter to keep track of the number of KeepAliveCache instances created.
+ */
+ private static final AtomicInteger KAC_COUNTER = new AtomicInteger(0);
+
+ /**
+ * Maximum number of connections that can be cached.
+ */
+ private final int maxConn;
+
+ /**
+ * Time-to-live for an idle connection.
+ */
+ private final long connectionIdleTTL;
+
+ /**
+ * Flag to indicate if the eviction thread is paused.
+ */
+ private boolean isPaused = false;
+
+ @VisibleForTesting
+ synchronized void pauseThread() {
+ isPaused = true;
+ }
+
+ @VisibleForTesting
+ synchronized void resumeThread() {
+ isPaused = false;
+ }
+
+ /**
+ * @return connectionIdleTTL
+ */
+ @VisibleForTesting
+ public long getConnectionIdleTTL() {
+ return connectionIdleTTL;
+ }
+
+ public KeepAliveCache(AbfsConfiguration abfsConfiguration) {
+ this.timer = new Timer(
+ String.format("abfs-kac-" + KAC_COUNTER.getAndIncrement()), true);
+ String sysPropMaxConn = System.getProperty(HTTP_MAX_CONN_SYS_PROP);
+ if (sysPropMaxConn == null) {
+ this.maxConn =
abfsConfiguration.getMaxApacheHttpClientCacheConnections();
+ } else {
+ maxConn = Integer.parseInt(sysPropMaxConn);
+ }
+
+ this.connectionIdleTTL
+ = abfsConfiguration.getMaxApacheHttpClientConnectionIdleTime();
+ this.timerTask = new TimerTask() {
+ @Override
+ public void run() {
+ if (isPaused) {
+ return;
+ }
+ evictIdleConnection();
+ }
+ };
+ timer.schedule(timerTask, 0, connectionIdleTTL);
+ }
+
+ /**
+ * Iterate over the cache and evict the idle connections. An idle connection
is
+ * one that has been in the cache for more than connectionIdleTTL
milliseconds.
+ */
+ synchronized void evictIdleConnection() {
+ long currentTime = System.currentTimeMillis();
+ int i;
+ for (i = 0; i < size(); i++) {
+ KeepAliveEntry e = elementAt(i);
+ if ((currentTime - e.idleStartTime) > connectionIdleTTL
+ || e.httpClientConnection.isStale()) {
+ HttpClientConnection hc = e.httpClientConnection;
+ closeHtpClientConnection(hc);
+ } else {
+ break;
+ }
+ }
+ subList(0, i).clear();
+ }
+
+ /**
+ * Safe close of the HttpClientConnection.
+ *
+ * @param hc HttpClientConnection to be closed
+ */
+ private void closeHtpClientConnection(final HttpClientConnection hc) {
+ try {
+ hc.close();
+ } catch (IOException ignored) {
+
+ }
+ }
+
+ /**
+ * Close all connections in cache and cancel the eviction timer.
+ */
+ @Override
+ public synchronized void close() {
+ isClosed = true;
+ timerTask.cancel();
+ timer.purge();
+ while (!empty()) {
+ KeepAliveEntry e = pop();
+ closeHtpClientConnection(e.httpClientConnection);
+ }
+ }
+
+ /**
+ * Gets the latest added HttpClientConnection from the cache. The returned
connection
+ * is non-stale and has been in the cache for less than connectionIdleTTL
milliseconds.
+ *
Review Comment:
add <p> elements between paragraphs
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeepAliveCache.java:
##########
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Stack;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.http.HttpClientConnection;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_MAX_CONN_SYS_PROP;
+
+/**
+ * Connection-pooling heuristics used by {@link AbfsConnectionManager}. Each
+ * instance of FileSystem has its own KeepAliveCache.
+ * <p>
+ * Why this implementation is required in comparison to {@link
org.apache.http.impl.conn.PoolingHttpClientConnectionManager}
+ * connection-pooling:
+ * <ol>
+ * <li>PoolingHttpClientConnectionManager heuristic caches all the reusable
connections it has created.
+ * JDK's implementation only caches limited number of connections. The limit
is given by JVM system
+ * property "http.maxConnections". If there is no system-property, it defaults
to 5.</li>
+ * <li>In PoolingHttpClientConnectionManager, it expects the application to
provide `setMaxPerRoute` and `setMaxTotal`,
+ * which the implementation uses as the total number of connections it can
create. For application using ABFS, it is not
+ * feasible to provide a value in the initialisation of the connectionManager.
JDK's implementation has no cap on the
+ * number of connections it can create.</li>
+ * </ol>
+ */
+public final class KeepAliveCache extends Stack<KeepAliveCache.KeepAliveEntry>
+ implements
+ Closeable {
+
+ /**
+ * Scheduled timer that evicts idle connections.
+ */
+ private final Timer timer;
+
+ /**
+ * Task provided to the timer that owns eviction logic.
+ */
+ private final TimerTask timerTask;
+
+ /**
+ * Flag to indicate if the cache is closed.
+ */
+ private boolean isClosed;
+
+ /**
+ * Counter to keep track of the number of KeepAliveCache instances created.
+ */
+ private static final AtomicInteger KAC_COUNTER = new AtomicInteger(0);
+
+ /**
+ * Maximum number of connections that can be cached.
+ */
+ private final int maxConn;
+
+ /**
+ * Time-to-live for an idle connection.
+ */
+ private final long connectionIdleTTL;
+
+ /**
+ * Flag to indicate if the eviction thread is paused.
+ */
+ private boolean isPaused = false;
+
+ @VisibleForTesting
+ synchronized void pauseThread() {
+ isPaused = true;
+ }
+
+ @VisibleForTesting
+ synchronized void resumeThread() {
+ isPaused = false;
+ }
+
+ /**
+ * @return connectionIdleTTL
+ */
+ @VisibleForTesting
+ public long getConnectionIdleTTL() {
+ return connectionIdleTTL;
+ }
+
+ public KeepAliveCache(AbfsConfiguration abfsConfiguration) {
+ this.timer = new Timer(
+ String.format("abfs-kac-" + KAC_COUNTER.getAndIncrement()), true);
+ String sysPropMaxConn = System.getProperty(HTTP_MAX_CONN_SYS_PROP);
+ if (sysPropMaxConn == null) {
+ this.maxConn =
abfsConfiguration.getMaxApacheHttpClientCacheConnections();
+ } else {
+ maxConn = Integer.parseInt(sysPropMaxConn);
+ }
+
+ this.connectionIdleTTL
+ = abfsConfiguration.getMaxApacheHttpClientConnectionIdleTime();
+ this.timerTask = new TimerTask() {
+ @Override
+ public void run() {
+ if (isPaused) {
+ return;
+ }
+ evictIdleConnection();
+ }
+ };
+ timer.schedule(timerTask, 0, connectionIdleTTL);
+ }
+
+ /**
+ * Iterate over the cache and evict the idle connections. An idle connection
is
+ * one that has been in the cache for more than connectionIdleTTL
milliseconds.
+ */
+ synchronized void evictIdleConnection() {
+ long currentTime = System.currentTimeMillis();
+ int i;
+ for (i = 0; i < size(); i++) {
+ KeepAliveEntry e = elementAt(i);
+ if ((currentTime - e.idleStartTime) > connectionIdleTTL
+ || e.httpClientConnection.isStale()) {
+ HttpClientConnection hc = e.httpClientConnection;
+ closeHtpClientConnection(hc);
+ } else {
+ break;
+ }
+ }
+ subList(0, i).clear();
+ }
+
+ /**
+ * Safe close of the HttpClientConnection.
+ *
+ * @param hc HttpClientConnection to be closed
+ */
+ private void closeHtpClientConnection(final HttpClientConnection hc) {
Review Comment:
spelling
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeepAliveCache.java:
##########
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Stack;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.http.HttpClientConnection;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_MAX_CONN_SYS_PROP;
+
+/**
+ * Connection-pooling heuristics used by {@link AbfsConnectionManager}. Each
+ * instance of FileSystem has its own KeepAliveCache.
+ * <p>
+ * Why this implementation is required in comparison to {@link
org.apache.http.impl.conn.PoolingHttpClientConnectionManager}
+ * connection-pooling:
+ * <ol>
+ * <li>PoolingHttpClientConnectionManager heuristic caches all the reusable
connections it has created.
+ * JDK's implementation only caches limited number of connections. The limit
is given by JVM system
Review Comment:
"a limited number"
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeepAliveCache.java:
##########
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Stack;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.http.HttpClientConnection;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_MAX_CONN_SYS_PROP;
+
+/**
+ * Connection-pooling heuristics used by {@link AbfsConnectionManager}. Each
+ * instance of FileSystem has its own KeepAliveCache.
+ * <p>
+ * Why this implementation is required in comparison to {@link
org.apache.http.impl.conn.PoolingHttpClientConnectionManager}
+ * connection-pooling:
+ * <ol>
+ * <li>PoolingHttpClientConnectionManager heuristic caches all the reusable
connections it has created.
+ * JDK's implementation only caches limited number of connections. The limit
is given by JVM system
+ * property "http.maxConnections". If there is no system-property, it defaults
to 5.</li>
+ * <li>In PoolingHttpClientConnectionManager, it expects the application to
provide `setMaxPerRoute` and `setMaxTotal`,
+ * which the implementation uses as the total number of connections it can
create. For application using ABFS, it is not
+ * feasible to provide a value in the initialisation of the connectionManager.
JDK's implementation has no cap on the
+ * number of connections it can create.</li>
+ * </ol>
+ */
+public final class KeepAliveCache extends Stack<KeepAliveCache.KeepAliveEntry>
+ implements
+ Closeable {
+
+ /**
+ * Scheduled timer that evicts idle connections.
+ */
+ private final Timer timer;
+
+ /**
+ * Task provided to the timer that owns eviction logic.
+ */
+ private final TimerTask timerTask;
+
+ /**
+ * Flag to indicate if the cache is closed.
+ */
+ private boolean isClosed;
+
+ /**
+ * Counter to keep track of the number of KeepAliveCache instances created.
+ */
+ private static final AtomicInteger KAC_COUNTER = new AtomicInteger(0);
+
+ /**
+ * Maximum number of connections that can be cached.
+ */
+ private final int maxConn;
+
+ /**
+ * Time-to-live for an idle connection.
+ */
+ private final long connectionIdleTTL;
+
+ /**
+ * Flag to indicate if the eviction thread is paused.
+ */
+ private boolean isPaused = false;
+
+ @VisibleForTesting
+ synchronized void pauseThread() {
+ isPaused = true;
+ }
+
+ @VisibleForTesting
+ synchronized void resumeThread() {
+ isPaused = false;
+ }
+
+ /**
+ * @return connectionIdleTTL
+ */
+ @VisibleForTesting
+ public long getConnectionIdleTTL() {
+ return connectionIdleTTL;
+ }
+
+ public KeepAliveCache(AbfsConfiguration abfsConfiguration) {
+ this.timer = new Timer(
+ String.format("abfs-kac-" + KAC_COUNTER.getAndIncrement()), true);
+ String sysPropMaxConn = System.getProperty(HTTP_MAX_CONN_SYS_PROP);
+ if (sysPropMaxConn == null) {
+ this.maxConn =
abfsConfiguration.getMaxApacheHttpClientCacheConnections();
+ } else {
+ maxConn = Integer.parseInt(sysPropMaxConn);
+ }
+
+ this.connectionIdleTTL
+ = abfsConfiguration.getMaxApacheHttpClientConnectionIdleTime();
+ this.timerTask = new TimerTask() {
+ @Override
+ public void run() {
+ if (isPaused) {
+ return;
+ }
+ evictIdleConnection();
+ }
+ };
+ timer.schedule(timerTask, 0, connectionIdleTTL);
+ }
+
+ /**
+ * Iterate over the cache and evict the idle connections. An idle connection
is
+ * one that has been in the cache for more than connectionIdleTTL
milliseconds.
+ */
+ synchronized void evictIdleConnection() {
+ long currentTime = System.currentTimeMillis();
+ int i;
+ for (i = 0; i < size(); i++) {
+ KeepAliveEntry e = elementAt(i);
+ if ((currentTime - e.idleStartTime) > connectionIdleTTL
+ || e.httpClientConnection.isStale()) {
+ HttpClientConnection hc = e.httpClientConnection;
+ closeHtpClientConnection(hc);
+ } else {
+ break;
+ }
+ }
+ subList(0, i).clear();
+ }
+
+ /**
+ * Safe close of the HttpClientConnection.
+ *
+ * @param hc HttpClientConnection to be closed
+ */
+ private void closeHtpClientConnection(final HttpClientConnection hc) {
+ try {
+ hc.close();
+ } catch (IOException ignored) {
+
+ }
+ }
+
+ /**
+ * Close all connections in cache and cancel the eviction timer.
+ */
+ @Override
+ public synchronized void close() {
+ isClosed = true;
Review Comment:
if isClosed don't re-enter, so check and return fast there
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeepAliveCache.java:
##########
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Stack;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.http.HttpClientConnection;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_MAX_CONN_SYS_PROP;
+
+/**
+ * Connection-pooling heuristics used by {@link AbfsConnectionManager}. Each
+ * instance of FileSystem has its own KeepAliveCache.
+ * <p>
+ * Why this implementation is required in comparison to {@link
org.apache.http.impl.conn.PoolingHttpClientConnectionManager}
+ * connection-pooling:
+ * <ol>
+ * <li>PoolingHttpClientConnectionManager heuristic caches all the reusable
connections it has created.
+ * JDK's implementation only caches limited number of connections. The limit
is given by JVM system
+ * property "http.maxConnections". If there is no system-property, it defaults
to 5.</li>
+ * <li>In PoolingHttpClientConnectionManager, it expects the application to
provide `setMaxPerRoute` and `setMaxTotal`,
+ * which the implementation uses as the total number of connections it can
create. For application using ABFS, it is not
+ * feasible to provide a value in the initialisation of the connectionManager.
JDK's implementation has no cap on the
+ * number of connections it can create.</li>
+ * </ol>
+ */
+public final class KeepAliveCache extends Stack<KeepAliveCache.KeepAliveEntry>
+ implements
+ Closeable {
+
+ /**
+ * Scheduled timer that evicts idle connections.
+ */
+ private final Timer timer;
+
+ /**
+ * Task provided to the timer that owns eviction logic.
+ */
+ private final TimerTask timerTask;
+
+ /**
+ * Flag to indicate if the cache is closed.
+ */
+ private boolean isClosed;
Review Comment:
maybe make a final atomic boolean
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeepAliveCache.java:
##########
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Stack;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.http.HttpClientConnection;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_MAX_CONN_SYS_PROP;
+
+/**
+ * Connection-pooling heuristics used by {@link AbfsConnectionManager}. Each
+ * instance of FileSystem has its own KeepAliveCache.
+ * <p>
+ * Why this implementation is required in comparison to {@link
org.apache.http.impl.conn.PoolingHttpClientConnectionManager}
+ * connection-pooling:
+ * <ol>
+ * <li>PoolingHttpClientConnectionManager heuristic caches all the reusable
connections it has created.
+ * JDK's implementation only caches limited number of connections. The limit
is given by JVM system
+ * property "http.maxConnections". If there is no system-property, it defaults
to 5.</li>
+ * <li>In PoolingHttpClientConnectionManager, it expects the application to
provide `setMaxPerRoute` and `setMaxTotal`,
+ * which the implementation uses as the total number of connections it can
create. For application using ABFS, it is not
+ * feasible to provide a value in the initialisation of the connectionManager.
JDK's implementation has no cap on the
+ * number of connections it can create.</li>
+ * </ol>
+ */
+public final class KeepAliveCache extends Stack<KeepAliveCache.KeepAliveEntry>
+ implements
+ Closeable {
+
+ /**
+ * Scheduled timer that evicts idle connections.
+ */
+ private final Timer timer;
+
+ /**
+ * Task provided to the timer that owns eviction logic.
+ */
+ private final TimerTask timerTask;
+
+ /**
+ * Flag to indicate if the cache is closed.
+ */
+ private boolean isClosed;
+
+ /**
+ * Counter to keep track of the number of KeepAliveCache instances created.
+ */
+ private static final AtomicInteger KAC_COUNTER = new AtomicInteger(0);
+
+ /**
+ * Maximum number of connections that can be cached.
+ */
+ private final int maxConn;
+
+ /**
+ * Time-to-live for an idle connection.
+ */
+ private final long connectionIdleTTL;
+
+ /**
+ * Flag to indicate if the eviction thread is paused.
+ */
+ private boolean isPaused = false;
+
+ @VisibleForTesting
+ synchronized void pauseThread() {
+ isPaused = true;
+ }
+
+ @VisibleForTesting
+ synchronized void resumeThread() {
+ isPaused = false;
+ }
+
+ /**
+ * @return connectionIdleTTL
+ */
+ @VisibleForTesting
+ public long getConnectionIdleTTL() {
+ return connectionIdleTTL;
+ }
+
+ public KeepAliveCache(AbfsConfiguration abfsConfiguration) {
+ this.timer = new Timer(
+ String.format("abfs-kac-" + KAC_COUNTER.getAndIncrement()), true);
+ String sysPropMaxConn = System.getProperty(HTTP_MAX_CONN_SYS_PROP);
+ if (sysPropMaxConn == null) {
+ this.maxConn =
abfsConfiguration.getMaxApacheHttpClientCacheConnections();
+ } else {
+ maxConn = Integer.parseInt(sysPropMaxConn);
+ }
+
+ this.connectionIdleTTL
+ = abfsConfiguration.getMaxApacheHttpClientConnectionIdleTime();
+ this.timerTask = new TimerTask() {
+ @Override
+ public void run() {
+ if (isPaused) {
+ return;
+ }
+ evictIdleConnection();
+ }
+ };
+ timer.schedule(timerTask, 0, connectionIdleTTL);
+ }
+
+ /**
+ * Iterate over the cache and evict the idle connections. An idle connection
is
+ * one that has been in the cache for more than connectionIdleTTL
milliseconds.
+ */
+ synchronized void evictIdleConnection() {
+ long currentTime = System.currentTimeMillis();
+ int i;
+ for (i = 0; i < size(); i++) {
+ KeepAliveEntry e = elementAt(i);
+ if ((currentTime - e.idleStartTime) > connectionIdleTTL
+ || e.httpClientConnection.isStale()) {
+ HttpClientConnection hc = e.httpClientConnection;
+ closeHtpClientConnection(hc);
+ } else {
+ break;
+ }
+ }
+ subList(0, i).clear();
+ }
+
+ /**
+ * Safe close of the HttpClientConnection.
+ *
+ * @param hc HttpClientConnection to be closed
+ */
+ private void closeHtpClientConnection(final HttpClientConnection hc) {
+ try {
+ hc.close();
+ } catch (IOException ignored) {
+
+ }
+ }
+
+ /**
+ * Close all connections in cache and cancel the eviction timer.
+ */
+ @Override
+ public synchronized void close() {
+ isClosed = true;
+ timerTask.cancel();
+ timer.purge();
+ while (!empty()) {
+ KeepAliveEntry e = pop();
+ closeHtpClientConnection(e.httpClientConnection);
+ }
+ }
+
+ /**
+ * Gets the latest added HttpClientConnection from the cache. The returned
connection
+ * is non-stale and has been in the cache for less than connectionIdleTTL
milliseconds.
+ *
+ * The cache is checked from the top of the stack. If the connection is
stale or has been
+ * in the cache for more than connectionIdleTTL milliseconds, it is closed
and the next
+ * connection is checked. Once a valid connection is found, it is returned.
+ *
+ * @return HttpClientConnection: if a valid connection is found, else null.
+ * @throws IOException if the cache is closed.
+ */
+ public synchronized HttpClientConnection get()
+ throws IOException {
+ if (isClosed) {
+ throw new IOException("KeepAliveCache is closed");
Review Comment:
make a constant string, add a test to intercept() the exception/text when
closed
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeepAliveCache.java:
##########
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Stack;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.http.HttpClientConnection;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_MAX_CONN_SYS_PROP;
+
+/**
+ * Connection-pooling heuristics used by {@link AbfsConnectionManager}. Each
+ * instance of FileSystem has its own KeepAliveCache.
+ * <p>
+ * Why this implementation is required in comparison to {@link
org.apache.http.impl.conn.PoolingHttpClientConnectionManager}
+ * connection-pooling:
+ * <ol>
+ * <li>PoolingHttpClientConnectionManager heuristic caches all the reusable
connections it has created.
+ * JDK's implementation only caches limited number of connections. The limit
is given by JVM system
+ * property "http.maxConnections". If there is no system-property, it defaults
to 5.</li>
+ * <li>In PoolingHttpClientConnectionManager, it expects the application to
provide `setMaxPerRoute` and `setMaxTotal`,
+ * which the implementation uses as the total number of connections it can
create. For application using ABFS, it is not
+ * feasible to provide a value in the initialisation of the connectionManager.
JDK's implementation has no cap on the
+ * number of connections it can create.</li>
+ * </ol>
+ */
+public final class KeepAliveCache extends Stack<KeepAliveCache.KeepAliveEntry>
+ implements
+ Closeable {
+
+ /**
+ * Scheduled timer that evicts idle connections.
+ */
+ private final Timer timer;
+
+ /**
+ * Task provided to the timer that owns eviction logic.
+ */
+ private final TimerTask timerTask;
+
+ /**
+ * Flag to indicate if the cache is closed.
+ */
+ private boolean isClosed;
+
+ /**
+ * Counter to keep track of the number of KeepAliveCache instances created.
+ */
+ private static final AtomicInteger KAC_COUNTER = new AtomicInteger(0);
+
+ /**
+ * Maximum number of connections that can be cached.
+ */
+ private final int maxConn;
+
+ /**
+ * Time-to-live for an idle connection.
+ */
+ private final long connectionIdleTTL;
+
+ /**
+ * Flag to indicate if the eviction thread is paused.
+ */
+ private boolean isPaused = false;
+
+ @VisibleForTesting
+ synchronized void pauseThread() {
+ isPaused = true;
+ }
+
+ @VisibleForTesting
+ synchronized void resumeThread() {
+ isPaused = false;
+ }
+
+ /**
+ * @return connectionIdleTTL
+ */
+ @VisibleForTesting
+ public long getConnectionIdleTTL() {
+ return connectionIdleTTL;
+ }
+
+ public KeepAliveCache(AbfsConfiguration abfsConfiguration) {
+ this.timer = new Timer(
+ String.format("abfs-kac-" + KAC_COUNTER.getAndIncrement()), true);
Review Comment:
no formatting is taking place here. review this string and maybe cust
string.format out entirely
> [ABFS]: ApacheHttpClient adaptation as network library
> ------------------------------------------------------
>
> Key: HADOOP-19120
> URL: https://issues.apache.org/jira/browse/HADOOP-19120
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/azure
> Affects Versions: 3.5.0
> Reporter: Pranav Saxena
> Assignee: Pranav Saxena
> Priority: Major
> Labels: pull-request-available
>
> Apache HttpClient is more feature-rich and flexible and gives application
> more granular control over networking parameter.
> ABFS currently relies on the JDK-net library. This library is managed by
> OpenJDK and has no performance problem. However, it limits the application's
> control over networking, and there are very few APIs and hooks exposed that
> the application can use to get metrics, choose which and when a connection
> should be reused. ApacheHttpClient will give important hooks to fetch
> important metrics and control networking parameters.
> A custom implementation of connection-pool is used. The implementation is
> adapted from the JDK8 connection pooling. Reasons for doing it:
> 1. PoolingHttpClientConnectionManager heuristic caches all the reusable
> connections it has created. JDK's implementation only caches limited number
> of connections. The limit is given by JVM system property
> "http.maxConnections". If there is no system-property, it defaults to 5.
> Connection-establishment latency increased with all the connections were
> cached. Hence, adapting the pooling heuristic of JDK netlib,
> 2. In PoolingHttpClientConnectionManager, it expects the application to
> provide `setMaxPerRoute` and `setMaxTotal`, which the implementation uses as
> the total number of connections it can create. For application using ABFS, it
> is not feasible to provide a value in the initialisation of the
> connectionManager. JDK's implementation has no cap on the number of
> connections it can have opened on a moment. Hence, adapting the pooling
> heuristic of JDK netlib,
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]