PHOENIX-3611 Cache for client connections will expire (and close) entries in LRU fashion.
Signed-off-by: Andrew Purtell <apurt...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d75458fe Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d75458fe Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d75458fe Branch: refs/heads/4.x-HBase-1.1 Commit: d75458fee60fc16661b9394a9e93824d8c3c0363 Parents: 59e5115 Author: Geoffrey <gjac...@salesforce.com> Authored: Thu Jan 19 16:08:20 2017 -0800 Committer: Andrew Purtell <apurt...@apache.org> Committed: Fri Jan 20 16:17:19 2017 -0800 ---------------------------------------------------------------------- .../org/apache/phoenix/jdbc/PhoenixDriver.java | 99 +++++++++++++------- .../org/apache/phoenix/query/QueryServices.java | 5 +- .../phoenix/query/QueryServicesOptions.java | 2 + 3 files changed, 70 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/d75458fe/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java index fa31dd9..b2acacf 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java @@ -23,20 +23,13 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.util.Properties; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.annotation.concurrent.GuardedBy; +import com.google.common.cache.*; import org.apache.hadoop.conf.Configuration; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; @@ -50,7 +43,6 @@ import org.apache.phoenix.query.QueryServicesOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -147,13 +139,43 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver { } // One entry per cluster here - private final ConcurrentMap<ConnectionInfo,ConnectionQueryServices> connectionQueryServicesMap = new ConcurrentHashMap<ConnectionInfo,ConnectionQueryServices>(3); + private final Cache<ConnectionInfo, ConnectionQueryServices> connectionQueryServicesCache = + initializeConnectionCache(); public PhoenixDriver() { // for Squirrel // Use production services implementation super(); } - + + private Cache<ConnectionInfo, ConnectionQueryServices> initializeConnectionCache() { + Configuration config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(); + int maxCacheSize = config.getInt(QueryServices.CLIENT_CONNECTION_CACHE_MAX_SIZE, + QueryServicesOptions.DEFAULT_CLIENT_CONNECTION_CACHE_MAX_SIZE); + int maxCacheDuration = config.getInt(QueryServices.CLIENT_CONNECTION_CACHE_MAX_DURATION_MILLISECONDS, + QueryServicesOptions.DEFAULT_CLIENT_CONNECTION_CACHE_MAX_DURATION); + RemovalListener<ConnectionInfo, ConnectionQueryServices> cacheRemovalListener = + new RemovalListener<ConnectionInfo, ConnectionQueryServices>() { + @Override + public void onRemoval(RemovalNotification<ConnectionInfo, ConnectionQueryServices> notification) { + String connInfoIdentifier = notification.getKey().toString(); + logger.debug("Expiring " + connInfoIdentifier + " because of " + + notification.getCause().name()); + + try { + notification.getValue().close(); + } + catch (SQLException se) { + logger.error("Error while closing expired cache connection " + connInfoIdentifier, se); + } + } + }; + return CacheBuilder.newBuilder() + .maximumSize(maxCacheSize) + .expireAfterAccess(maxCacheDuration, TimeUnit.MILLISECONDS) + .removalListener(cacheRemovalListener) + .build(); + } + // writes guarded by "this" private volatile QueryServices services; @@ -206,38 +228,49 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver { } @Override - protected ConnectionQueryServices getConnectionQueryServices(String url, Properties info) throws SQLException { + protected ConnectionQueryServices getConnectionQueryServices(String url, final Properties info) throws SQLException { try { lockInterruptibly(LockMode.READ); checkClosed(); ConnectionInfo connInfo = ConnectionInfo.create(url); - QueryServices services = getQueryServices(); - // Also performs the Kerberos login if the URL/properties request this - ConnectionInfo normalizedConnInfo = connInfo.normalize(services.getProps(), info); - ConnectionQueryServices connectionQueryServices = connectionQueryServicesMap.get(normalizedConnInfo); - if (connectionQueryServices == null) { - if (normalizedConnInfo.isConnectionless()) { - connectionQueryServices = new ConnectionlessQueryServicesImpl(services, normalizedConnInfo, info); - } else { - connectionQueryServices = new ConnectionQueryServicesImpl(services, normalizedConnInfo, info); - } - ConnectionQueryServices prevValue = connectionQueryServicesMap.putIfAbsent(normalizedConnInfo, connectionQueryServices); - if (prevValue != null) { - connectionQueryServices = prevValue; - } - } - boolean success = false; SQLException sqlE = null; + boolean success = false; + final QueryServices services = getQueryServices(); + ConnectionQueryServices connectionQueryServices = null; + // Also performs the Kerberos login if the URL/properties request this + final ConnectionInfo normalizedConnInfo = connInfo.normalize(services.getProps(), info); try { + connectionQueryServices = + connectionQueryServicesCache.get(normalizedConnInfo, new Callable<ConnectionQueryServices>() { + @Override + public ConnectionQueryServices call() throws Exception { + ConnectionQueryServices connectionQueryServices; + if (normalizedConnInfo.isConnectionless()) { + connectionQueryServices = new ConnectionlessQueryServicesImpl(services, normalizedConnInfo, info); + } else { + connectionQueryServices = new ConnectionQueryServicesImpl(services, normalizedConnInfo, info); + } + + return connectionQueryServices; + } + }); + connectionQueryServices.init(url, info); success = true; - } catch (SQLException e) { + } catch (ExecutionException ee){ + if (ee.getCause() instanceof SQLException) { + sqlE = (SQLException) ee.getCause(); + } else { + throw new SQLException(ee); + } + } + catch (SQLException e) { sqlE = e; } finally { if (!success) { // Remove from map, as initialization failed - connectionQueryServicesMap.remove(normalizedConnInfo); + connectionQueryServicesCache.invalidate(normalizedConnInfo); if (sqlE != null) { throw sqlE; } @@ -319,8 +352,4 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver { } } - @VisibleForTesting - protected ConcurrentMap<ConnectionInfo,ConnectionQueryServices> getCachedConnections() { - return this.connectionQueryServicesMap; - } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d75458fe/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 14d9887..233007f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -233,7 +233,10 @@ public interface QueryServices extends SQLCloseable { public static final String CLIENT_CACHE_ENCODING = "phoenix.table.client.cache.encoding"; public static final String AUTO_UPGRADE_ENABLED = "phoenix.autoupgrade.enabled"; - + + public static final String CLIENT_CONNECTION_CACHE_MAX_SIZE = "phoenix.client.connection.cache.max.size"; + public static final String CLIENT_CONNECTION_CACHE_MAX_DURATION_MILLISECONDS = + "phoenix.client.connection.max.duration"; /** * Get executor service used for parallel scans */ http://git-wip-us.apache.org/repos/asf/phoenix/blob/d75458fe/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 0e8b9d5..df203b5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -262,6 +262,8 @@ public class QueryServicesOptions { public static final String DEFAULT_CLIENT_CACHE_ENCODING = PTableRefFactory.Encoding.OBJECT.toString(); public static final boolean DEFAULT_AUTO_UPGRADE_ENABLED = true; + public static final int DEFAULT_CLIENT_CONNECTION_CACHE_MAX_SIZE = 100; + public static final int DEFAULT_CLIENT_CONNECTION_CACHE_MAX_DURATION = 86400000; @SuppressWarnings("serial") public static final Set<String> DEFAULT_QUERY_SERVER_SKIP_WORDS = new HashSet<String>() {