SENTRY-1580: Provide pooled client connection model with HA (Alex Kolbasov, reviewed by Vamsee Yarlagadda, Kalyan Kalvagadda) SENTRY-1800: Flaky testConcurrentUpdateChanges test SENTRY-1724: Remove old PoolClientInvocationHandler SENTRY-1739: Sentry Kafka tests do not stop periodic update after the test end SENTRY-1735: Sentry Clients should not log every connection request SENTRY-1738: Inefficient connection management by retrying invocation handler SENTRY-1737: SentryTransportFactory may use incorrect kerberos principal
Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/5dc6b285 Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/5dc6b285 Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/5dc6b285 Branch: refs/heads/sentry-ha-redesign Commit: 5dc6b2855e6d3d7f0c2b7b7acaa0a6c179644632 Parents: 2f70aac Author: Alexander Kolbasov <[email protected]> Authored: Fri Jun 9 23:47:31 2017 -0700 Committer: Alexander Kolbasov <[email protected]> Committed: Fri Jun 9 23:47:31 2017 -0700 ---------------------------------------------------------------------- pom.xml | 2 +- .../binding/hive/authz/SentryConfigTool.java | 28 +- .../SentryMetastorePostEventListenerBase.java | 20 +- .../hive/ql/exec/SentryGrantRevokeTask.java | 13 +- .../SentryMetastorePostEventListener.java | 17 +- .../sentry/kafka/binding/KafkaAuthBinding.java | 8 +- .../binding/solr/authz/SolrAuthzBinding.java | 8 +- .../sentry/sqoop/binding/SqoopAuthBinding.java | 8 +- sentry-core/sentry-core-common/pom.xml | 4 + .../transport/RetryClientInvocationHandler.java | 117 ++++--- .../SentryClientTransportConfigInterface.java | 91 ++++-- .../SentryClientTransportConstants.java | 91 ++++-- .../core/common/transport/SentryConnection.java | 54 ++++ .../SentryHDFSClientTransportConfig.java | 71 +++-- .../SentryPolicyClientTransportConfig.java | 67 ++-- .../common/transport/SentryServiceClient.java | 43 --- .../transport/SentryTransportFactory.java | 311 ++++++------------- .../common/transport/SentryTransportPool.java | 306 ++++++++++++++++++ .../common/transport/TTransportWrapper.java | 89 ++++++ .../core/common/transport/TransportFactory.java | 39 +++ .../UserGroupInformationInitializer.java | 53 ---- .../sentry/core/common/utils/ThriftUtil.java | 10 +- sentry-hdfs/sentry-hdfs-dist/pom.xml | 3 +- .../org/apache/sentry/hdfs/SentryUpdater.java | 6 +- .../sentry/hdfs/SentryHDFSServiceClient.java | 14 +- .../SentryHDFSServiceClientDefaultImpl.java | 107 +++---- .../hdfs/SentryHDFSServiceClientFactory.java | 83 ++++- .../hdfs/SentryHdfsServiceIntegrationBase.java | 5 +- .../hdfs/TestSentryHDFSServiceClientForUgi.java | 70 ----- .../provider/db/SimpleDBProviderBackend.java | 9 +- .../generic/SentryGenericProviderBackend.java | 17 +- .../provider/db/generic/UpdatableCache.java | 55 +++- .../thrift/SentryGenericServiceClient.java | 4 +- .../SentryGenericServiceClientDefaultImpl.java | 108 ++++--- .../SentryGenericServiceClientFactory.java | 89 +++++- .../db/generic/tools/SentryConfigToolSolr.java | 12 +- .../db/generic/tools/SentryShellKafka.java | 62 ++-- .../db/generic/tools/SentryShellSolr.java | 62 ++-- .../thrift/SentryPolicyServiceClient.java | 4 +- .../SentryPolicyServiceClientDefaultImpl.java | 220 +++++++------ .../provider/db/tools/SentryShellHive.java | 57 ++-- .../thrift/PoolClientInvocationHandler.java | 294 ------------------ .../sentry/service/thrift/SentryService.java | 5 +- .../thrift/SentryServiceClientFactory.java | 95 ++++-- .../thrift/SentryServiceClientPoolFactory.java | 72 ----- .../sentry/service/thrift/ServiceConstants.java | 2 +- .../TestSentryGenericServiceClientForUgi.java | 68 ---- .../db/service/persistent/TestSentryStore.java | 8 +- .../TestSentryPolicyServiceClientForUgi.java | 71 ----- .../TestSentryServiceWithInvalidMsgSize.java | 2 + .../thrift/TestPoolClientInvocationHandler.java | 79 ----- .../tests/e2e/hdfs/TestHDFSIntegrationBase.java | 7 + .../e2e/kafka/AbstractKafkaSentryTestBase.java | 21 +- .../sentry/tests/e2e/kafka/TestAuthorize.java | 6 + .../e2e/sqoop/AbstractSqoopSentryTestBase.java | 11 +- 55 files changed, 1567 insertions(+), 1611 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/5dc6b285/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index ad54cfd..ba13a0d 100644 --- a/pom.xml +++ b/pom.xml @@ -60,7 +60,7 @@ limitations under the License. <build.helper.maven.plugin.version>1.8</build.helper.maven.plugin.version> <cglib.version>2.2</cglib.version> <commons-cli.version>1.2</commons-cli.version> - <commons-pool2.version>2.2</commons-pool2.version> + <commons-pool2.version>2.4.2</commons-pool2.version> <commons.lang.version>2.6</commons.lang.version> <commons.logging.version>1.2</commons.logging.version> <curator.version>2.11.1</curator.version> http://git-wip-us.apache.org/repos/asf/sentry/blob/5dc6b285/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/hive/authz/SentryConfigTool.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/hive/authz/SentryConfigTool.java b/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/hive/authz/SentryConfigTool.java index a3140f2..8a5085b 100644 --- a/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/hive/authz/SentryConfigTool.java +++ b/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/hive/authz/SentryConfigTool.java @@ -291,23 +291,27 @@ public class SentryConfigTool { Map<String, Map<String, Set<String>>> policyFileMappingData = sentryPolicyFileFormatter.parse( importPolicyFilePath, authzConf); // todo: here should be an validator to check the data's value, format, hierarchy - SentryPolicyServiceClient client = SentryServiceClientFactory.create(getAuthzConf()); - // import the mapping data to database - client.importPolicy(policyFileMappingData, requestorUserName, importOverwriteRole); + try(SentryPolicyServiceClient client = + SentryServiceClientFactory.create(getAuthzConf())) { + // import the mapping data to database + client.importPolicy(policyFileMappingData, requestorUserName, importOverwriteRole); + } } // export the sentry mapping data to file public void exportPolicy() throws Exception { String requestorUserName = System.getProperty("user.name", ""); - SentryPolicyServiceClient client = SentryServiceClientFactory.create(getAuthzConf()); - // export the sentry mapping data from database to map structure - Map<String, Map<String, Set<String>>> policyFileMappingData = client - .exportPolicy(requestorUserName, objectPath); - // get the FileFormatter according to the configuration - SentryPolicyFileFormatter sentryPolicyFileFormatter = SentryPolicyFileFormatFactory - .createFileFormatter(authzConf); - // write the sentry mapping data to exportPolicyFilePath with the data in map structure - sentryPolicyFileFormatter.write(exportPolicyFilePath, policyFileMappingData); + try (SentryPolicyServiceClient client = + SentryServiceClientFactory.create(getAuthzConf())) { + // export the sentry mapping data from database to map structure + Map<String, Map<String, Set<String>>> policyFileMappingData = client + .exportPolicy(requestorUserName, objectPath); + // get the FileFormatter according to the configuration + SentryPolicyFileFormatter sentryPolicyFileFormatter = SentryPolicyFileFormatFactory + .createFileFormatter(authzConf); + // write the sentry mapping data to exportPolicyFilePath with the data in map structure + sentryPolicyFileFormatter.write(exportPolicyFilePath, policyFileMappingData); + } } // list permissions for given user http://git-wip-us.apache.org/repos/asf/sentry/blob/5dc6b285/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListenerBase.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListenerBase.java b/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListenerBase.java index 262db11..2abdd53 100644 --- a/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListenerBase.java +++ b/sentry-binding/sentry-binding-hive-common/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListenerBase.java @@ -356,11 +356,11 @@ public class SentryMetastorePostEventListenerBase extends MetaStoreEventListener throws SentryUserException, IOException, MetaException { String requestorUserName = UserGroupInformation.getCurrentUser() .getShortUserName(); - SentryPolicyServiceClient sentryClient = getSentryServiceClient(); - sentryClient.dropPrivileges(requestorUserName, authorizableTable); - - // Close the connection after dropping privileges is done. - sentryClient.close(); + try(SentryPolicyServiceClient sentryClient = getSentryServiceClient()) { + sentryClient.dropPrivileges(requestorUserName, authorizableTable); + } catch (Exception e) { + e.printStackTrace(); + } } private void renameSentryTablePrivilege(String oldDbName, String oldTabName, @@ -379,9 +379,7 @@ public class SentryMetastorePostEventListenerBase extends MetaStoreEventListener if (!oldTabName.equalsIgnoreCase(newTabName) && syncWithPolicyStore(AuthzConfVars.AUTHZ_SYNC_ALTER_WITH_POLICY_STORE)) { - SentryPolicyServiceClient sentryClient = getSentryServiceClient(); - - try { + try (SentryPolicyServiceClient sentryClient = getSentryServiceClient()){ String requestorUserName = UserGroupInformation.getCurrentUser() .getShortUserName(); sentryClient.renamePrivileges(requestorUserName, oldAuthorizableTable, newAuthorizableTable); @@ -392,10 +390,8 @@ public class SentryMetastorePostEventListenerBase extends MetaStoreEventListener + " Error: " + e.getMessage()); } catch (IOException e) { throw new MetaException("Failed to find local user " + e.getMessage()); - } finally { - - // Close the connection after renaming privileges is done. - sentryClient.close(); + } catch (Exception e) { + e.printStackTrace(); } } // The HDFS plugin needs to know if it's a path change (set location) http://git-wip-us.apache.org/repos/asf/sentry/blob/5dc6b285/sentry-binding/sentry-binding-hive/src/main/java/org/apache/hadoop/hive/ql/exec/SentryGrantRevokeTask.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/hadoop/hive/ql/exec/SentryGrantRevokeTask.java b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/hadoop/hive/ql/exec/SentryGrantRevokeTask.java index da587d0..3f06cae 100644 --- a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/hadoop/hive/ql/exec/SentryGrantRevokeTask.java +++ b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/hadoop/hive/ql/exec/SentryGrantRevokeTask.java @@ -98,7 +98,6 @@ public class SentryGrantRevokeTask extends Task<DDLWork> implements Serializable private static final int terminator = Utilities.newLineCode; private static final long serialVersionUID = -7625118066790571999L; - private SentryPolicyServiceClient sentryClient; private HiveConf conf; private HiveAuthzBinding hiveAuthzBinding; private HiveAuthzConf authzConf; @@ -116,13 +115,8 @@ public class SentryGrantRevokeTask extends Task<DDLWork> implements Serializable @Override public int execute(DriverContext driverContext) { - try { - try { - this.sentryClient = SentryServiceClientFactory.create(authzConf); - } catch (Exception e) { - String msg = "Error creating Sentry client: " + e.getMessage(); - throw new RuntimeException(msg, e); - } + try (SentryPolicyServiceClient sentryClient = + SentryServiceClientFactory.create(authzConf)) { Preconditions.checkNotNull(hiveAuthzBinding, "HiveAuthzBinding cannot be null"); Preconditions.checkNotNull(authzConf, "HiveAuthConf cannot be null"); Preconditions.checkNotNull(subject, "Subject cannot be null"); @@ -179,9 +173,6 @@ public class SentryGrantRevokeTask extends Task<DDLWork> implements Serializable console.printError(msg); return RETURN_CODE_FAILURE; } finally { - if (sentryClient != null) { - sentryClient.close(); - } if (hiveAuthzBinding != null) { hiveAuthzBinding.close(); } http://git-wip-us.apache.org/repos/asf/sentry/blob/5dc6b285/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListener.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListener.java b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListener.java index fdb6df4..3ec2eed 100644 --- a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListener.java +++ b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentryMetastorePostEventListener.java @@ -364,11 +364,12 @@ public class SentryMetastorePostEventListener extends MetaStoreEventListener { throws SentryUserException, IOException, MetaException { String requestorUserName = UserGroupInformation.getCurrentUser() .getShortUserName(); - SentryPolicyServiceClient sentryClient = getSentryServiceClient(); - sentryClient.dropPrivileges(requestorUserName, authorizableTable); - - // Close the connection after dropping privileges is done. - sentryClient.close(); + try (SentryPolicyServiceClient sentryClient = SentryServiceClientFactory.create(authzConf)) { + sentryClient.dropPrivileges(requestorUserName, authorizableTable); + } catch (Exception e) { + throw new MetaException("Failed to connect to Sentry service " + + e.getMessage()); + } } private void renameSentryTablePrivilege(String oldDbName, String oldTabName, @@ -403,7 +404,11 @@ public class SentryMetastorePostEventListener extends MetaStoreEventListener { } finally { // Close the connection after renaming privileges is done. - sentryClient.close(); + try { + sentryClient.close(); + } catch (Exception e) { + e.printStackTrace(); + } } } // The HDFS plugin needs to know if it's a path change (set location) http://git-wip-us.apache.org/repos/asf/sentry/blob/5dc6b285/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java index 4851114..f5d4431 100644 --- a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java +++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java @@ -365,9 +365,7 @@ public class KafkaAuthBinding { } private <T> T execute(Command<T> cmd) throws KafkaException { - SentryGenericServiceClient client = null; - try { - client = getClient(); + try (SentryGenericServiceClient client = getClient()){ return cmd.run(client); } catch (SentryUserException ex) { String msg = "Unable to excute command on sentry server: " + ex.getMessage(); @@ -377,10 +375,6 @@ public class KafkaAuthBinding { String msg = "Unable to obtain client:" + ex.getMessage(); LOG.error(msg, ex); throw new KafkaException(msg, ex); - } finally { - if (client != null) { - client.close(); - } } } http://git-wip-us.apache.org/repos/asf/sentry/blob/5dc6b285/sentry-binding/sentry-binding-solr/src/main/java/org/apache/sentry/binding/solr/authz/SolrAuthzBinding.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-solr/src/main/java/org/apache/sentry/binding/solr/authz/SolrAuthzBinding.java b/sentry-binding/sentry-binding-solr/src/main/java/org/apache/sentry/binding/solr/authz/SolrAuthzBinding.java index 2400673..37adb56 100644 --- a/sentry-binding/sentry-binding-solr/src/main/java/org/apache/sentry/binding/solr/authz/SolrAuthzBinding.java +++ b/sentry-binding/sentry-binding-solr/src/main/java/org/apache/sentry/binding/solr/authz/SolrAuthzBinding.java @@ -298,9 +298,7 @@ public class SolrAuthzBinding { if (!isSyncEnabled()) { return; } - SentryGenericServiceClient client = null; - try { - client = getClient(); + try (SentryGenericServiceClient client = getClient()) { TSentryPrivilege tPrivilege = new TSentryPrivilege(); tPrivilege.setComponent(AuthorizationComponent.Search); tPrivilege.setServiceName(authzConf.get(SENTRY_SEARCH_SERVICE_KEY, @@ -316,10 +314,6 @@ public class SolrAuthzBinding { " can't delete privileges for collection " + collection); } catch (Exception ex) { throw new SentrySolrAuthorizationException("Unable to obtain client:" + ex.getMessage()); - } finally { - if (client != null) { - client.close(); - } } } } http://git-wip-us.apache.org/repos/asf/sentry/blob/5dc6b285/sentry-binding/sentry-binding-sqoop/src/main/java/org/apache/sentry/sqoop/binding/SqoopAuthBinding.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-sqoop/src/main/java/org/apache/sentry/sqoop/binding/SqoopAuthBinding.java b/sentry-binding/sentry-binding-sqoop/src/main/java/org/apache/sentry/sqoop/binding/SqoopAuthBinding.java index 84a61cc..11e2aa4 100644 --- a/sentry-binding/sentry-binding-sqoop/src/main/java/org/apache/sentry/sqoop/binding/SqoopAuthBinding.java +++ b/sentry-binding/sentry-binding-sqoop/src/main/java/org/apache/sentry/sqoop/binding/SqoopAuthBinding.java @@ -408,9 +408,7 @@ public class SqoopAuthBinding { } private <T> T execute(Command<T> cmd) throws SqoopException { - SentryGenericServiceClient client = null; - try { - client = getClient(); + try (SentryGenericServiceClient client = getClient()){ return cmd.run(client); } catch (SentryUserException ex) { String msg = "Unable to excute command on sentry server: " + ex.getMessage(); @@ -420,10 +418,6 @@ public class SqoopAuthBinding { String msg = "Unable to obtain client:" + ex.getMessage(); LOG.error(msg, ex); throw new SqoopException(SecurityError.AUTH_0014, msg, ex); - } finally { - if (client != null) { - client.close(); - } } } } http://git-wip-us.apache.org/repos/asf/sentry/blob/5dc6b285/sentry-core/sentry-core-common/pom.xml ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/pom.xml b/sentry-core/sentry-core-common/pom.xml index e1be256..2e7f5c5 100644 --- a/sentry-core/sentry-core-common/pom.xml +++ b/sentry-core/sentry-core-common/pom.xml @@ -66,6 +66,10 @@ limitations under the License. <groupId>org.apache.thrift</groupId> <artifactId>libthrift</artifactId> </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-pool2</artifactId> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/sentry/blob/5dc6b285/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java index 34a594e..62d0d2c 100644 --- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/RetryClientInvocationHandler.java @@ -25,7 +25,6 @@ import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -46,17 +45,17 @@ import java.lang.reflect.Method; * <p> */ -public class RetryClientInvocationHandler extends SentryClientInvocationHandler { +public final class RetryClientInvocationHandler extends SentryClientInvocationHandler { private static final Logger LOGGER = LoggerFactory.getLogger(RetryClientInvocationHandler.class); - private SentryServiceClient client = null; + private SentryConnection client = null; private final int maxRetryCount; /** * Initialize the sentry configurations, including rpc retry count and client connection * configs for SentryPolicyServiceClientDefaultImpl */ - public RetryClientInvocationHandler(Configuration conf, SentryServiceClient clientObject, + public RetryClientInvocationHandler(Configuration conf, SentryConnection clientObject, SentryClientTransportConfigInterface transportConfig) { Preconditions.checkNotNull(conf, "Configuration object cannot be null"); Preconditions.checkNotNull(clientObject, "Client Object cannot be null"); @@ -72,76 +71,94 @@ public class RetryClientInvocationHandler extends SentryClientInvocationHandler * resend the thrift call) no more than rpcRetryTotal times. Throw SentryUserException * if failed retry after rpcRetryTotal times. * if it is failed with other exception, method would just re-throw the exception. - * Synchronized it for thread safety. */ @Override public synchronized Object invokeImpl(Object proxy, Method method, Object[] args) throws Exception { - int retryCount = 0; - Exception lastExc = null; + String methodName = method.getName(); - while (retryCount < maxRetryCount) { - // Connect to a sentry server if not connected yet. - try { - client.connect(); - } catch (IOException e) { - // Increase the retry num - // Retry when the exception is caused by connection problem. - retryCount++; - lastExc = e; - close(); - continue; - } + // This is an interesting special case. When we running a debugging session, it may try to + // show the client value by calling toString(). We do not want any connection + // to be established for toString() method. + if ("toString".equals(methodName)) { + return method.invoke(client, args); + } + + Exception lastExc = null; + for (int retryCount = 0; retryCount < maxRetryCount; retryCount++) { + connect(); // do the thrift call try { + LOGGER.debug("Calling {}", methodName); return method.invoke(client, args); } catch (InvocationTargetException e) { // Get the target exception, check if SentryUserException or TTransportException is wrapped. // TTransportException means there is a connection problem. + LOGGER.error("failed to execute {}", method.getName(), e); Throwable targetException = e.getCause(); - if (targetException instanceof SentryUserException || - targetException instanceof SentryHdfsServiceException) { - Throwable sentryTargetException = targetException.getCause(); - // If there has connection problem, eg, invalid connection if the service restarted, - // sentryTargetException instanceof TTransportException will be true. - if (sentryTargetException instanceof TTransportException) { - // Retry when the exception is caused by connection problem. - lastExc = new TTransportException(sentryTargetException); - LOGGER.error("Thrift call failed with TTransportException", lastExc); - // Closing the thrift client on TTransportException. New client object is - // created using new socket when an attempt to reconnect is made. - close(); + if (!((targetException instanceof SentryUserException) || + (targetException instanceof SentryHdfsServiceException))) { + throw e; + } + Throwable sentryTargetException = targetException.getCause(); + // If there has connection problem, eg, invalid connection if the service restarted, + // sentryTargetException instanceof TTransportException will be true. + if (sentryTargetException instanceof TTransportException) { + // Retry when the exception is caused by connection problem. + lastExc = new TTransportException(sentryTargetException); + LOGGER.error("Thrift call failed", lastExc); + // The connection to the server is bad, inform the client of the problem + client.invalidate(); + } else { + // Semantic exception which does not indicate the connection failure. + // Do not need to reconnect to the sentry server. + if (targetException instanceof SentryUserException) { + throw (SentryUserException) targetException; } else { - // The exception is thrown by thrift call, eg, SentryAccessDeniedException. - // Do not need to reconnect to the sentry server. - if (targetException instanceof SentryUserException) { - throw (SentryUserException) targetException; - } else { - throw (SentryHdfsServiceException) targetException; - } + throw (SentryHdfsServiceException) targetException; } - } else { - throw e; } } - - // Increase the retry num - retryCount++; } // Throw the exception as reaching the max rpc retry num. String error = String.format("Request failed, %d retries attempted ", maxRetryCount); - LOGGER.error(error, lastExc); throw new SentryUserException(error, lastExc); } + /** + * Connect the client, retry multiple times + * @throws Exception + */ + private void connect() throws Exception { + Exception lastExc = null; + for (int retryCount = 0; retryCount < maxRetryCount; retryCount++) { + try { + client.connect(); + return; + } catch (TTransportException e) { + // Retry when the exception is caused by connection problem. + LOGGER.error("failed to connect", e); + lastExc = e; + } catch (Exception e) { + Throwable causeException = e.getCause(); + if (causeException instanceof TTransportException) { + // Retry when the exception is caused by connection problem. + LOGGER.error("failed to connect", e); + lastExc = e; + continue; + } + // Some other failure, re-throw it + throw e; + } + } + assert lastExc != null; + throw lastExc; + } + @Override public synchronized void close() { - try { - LOGGER.debug("Releasing the current client connection"); - client.disconnect(); - } catch (Exception e) { - LOGGER.error("Encountered failure while closing the connection"); - } + //We are done with this client + client.done(); } } http://git-wip-us.apache.org/repos/asf/sentry/blob/5dc6b285/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java index 9ea7185..9fd4013 100644 --- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConfigInterface.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,8 +17,8 @@ */ package org.apache.sentry.core.common.transport; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.apache.hadoop.conf.Configuration; -import org.apache.sentry.core.common.exception.MissingConfigurationException; /** * Configuration interface for Sentry Thrift Clients @@ -29,14 +29,6 @@ import org.apache.sentry.core.common.exception.MissingConfigurationException; * the transport configuration. */ interface SentryClientTransportConfigInterface { - /** - * @param conf configuration - * @return number of times client retry logic should iterate through all - * the servers before giving up. - * @throws MissingConfigurationException if property is mandatory and is missing in - * configuration. - */ - int getSentryFullRetryTotal(Configuration conf) throws MissingConfigurationException; /** * @param conf configuration @@ -49,46 +41,36 @@ interface SentryClientTransportConfigInterface { * @param conf configuration * @return True, if kerberos should be enabled. * False, Iff kerberos is enabled. - * @throws MissingConfigurationException if property is mandatory and is missing in - * configuration. */ - boolean isKerberosEnabled(Configuration conf) throws MissingConfigurationException; + boolean isKerberosEnabled(Configuration conf); /** * @param conf configuration * @return True, if Ugi transport has to be used * False, If not. - * @throws MissingConfigurationException if property is mandatory and is missing in - * configuration. */ - boolean useUserGroupInformation(Configuration conf) throws MissingConfigurationException; + boolean useUserGroupInformation(Configuration conf); /** * @param conf configuration * @return principle for the particular sentry service - * @throws MissingConfigurationException if property is mandatory and is missing in - * configuration. */ - String getSentryPrincipal(Configuration conf) throws MissingConfigurationException; + String getSentryPrincipal(Configuration conf); /** * Port in RPC Addresses configured is optional * @param conf configuration * @return comma-separated list of available sentry server addresses. - * @throws MissingConfigurationException if property is mandatory and is missing in - * configuration. */ - String getSentryServerRpcAddress(Configuration conf) throws MissingConfigurationException; + String getSentryServerRpcAddress(Configuration conf); /** * Port in RPC Addresses configured is optional. If a port is not provided for a server * listed in RPC configuration, this configuration is used as a default port. * @param conf configuration * @return port where sentry server is listening. - * @throws MissingConfigurationException if property is mandatory and is missing in - * configuration. */ - int getServerRpcPort(Configuration conf) throws MissingConfigurationException; + int getServerRpcPort(Configuration conf); /** * @param conf configuration @@ -96,17 +78,60 @@ interface SentryClientTransportConfigInterface { * establishment of connection to the server. If the connection * is not established with-in this interval client should try connecting * to next configured server - * @throws MissingConfigurationException if property is mandatory and is missing in - * configuration. */ - int getServerRpcConnTimeoutInMs(Configuration conf) throws MissingConfigurationException; + int getServerRpcConnTimeoutInMs(Configuration conf); + + /** + * Maximum number of connections in the pool. + * See {@link org.apache.commons.pool2.impl.GenericObjectPoolConfig#setMaxTotal(int)} + * @param conf configuration + * @return maximum number of connection objects in the pool + */ + int getPoolMaxTotal(Configuration conf); + + /** + * Minimum number of idle obects on the pool. + * See {@link org.apache.commons.pool2.impl.GenericObjectPoolConfig#setMinIdle(int)} + * @param conf Configuration + * @return Minimum idle connections to keep in the pool + */ + int getPoolMinIdle(Configuration conf); + + /** + * Maximum number of idle connections in the pool. + * See {@link org.apache.commons.pool2.impl.GenericObjectPoolConfig#setMaxIdle(int)} + * @param conf Configuration + * @return Maximum number of idle connections in the pool + */ + int getPoolMaxIdle(Configuration conf); + + /** + * This is the minimum amount of time an object may sit idle in the pool + * before it is eligible for eviction. + * See {@link org.apache.commons.pool2.impl.GenericObjectPoolConfig#setMinEvictableIdleTimeMillis} + * @param conf Configuration + * @return The value for the pool minimum eviction time. + */ + long getMinEvictableTimeSec(Configuration conf); + + /** + * The number of seconds to sleep between runs of the idle object evictor thread. + * When non-positive, no idle object evictor thread will be run. + * See {@link GenericObjectPoolConfig#getTimeBetweenEvictionRunsMillis()} + * @param conf Configuration + * @return The number of seconds to sleep between runs of the idle object evictor thread. + */ + long getTimeBetweenEvictionRunsSec(Configuration conf); + + /** + * @param conf configuration + * @return True if using load-balancing between Sentry servers + */ + boolean isLoadBalancingEnabled(Configuration conf); /** - * * @param conf configuration - * @return True if the client should load balance connections between multiple servers - * @throws MissingConfigurationException if property is mandatory and is missing in - * configuration. + * @return true if transport pools are enabled */ - boolean isLoadBalancingEnabled(Configuration conf)throws MissingConfigurationException; + boolean isTransportPoolEnabled(Configuration conf); } http://git-wip-us.apache.org/repos/asf/sentry/blob/5dc6b285/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConstants.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConstants.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConstants.java index 651173e..fd07887 100644 --- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConstants.java +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryClientTransportConstants.java @@ -19,6 +19,8 @@ package org.apache.sentry.core.common.transport; +import java.util.concurrent.TimeUnit; + /** * Defines configuration strings needed for sentry thrift clients to handle the transport level * operations. @@ -27,7 +29,7 @@ package org.apache.sentry.core.common.transport; * Clients that needs these configuration string use the implementations of interface * <code>SentryClientTransportConfigInterface</code>. */ -class SentryClientTransportConstants { +public final class SentryClientTransportConstants { /** * max retry num for client rpc @@ -44,8 +46,18 @@ class SentryClientTransportConstants { "sentry.service.client.connection.full.retry-total"; static final int SENTRY_FULL_RETRY_TOTAL_DEFAULT = 2; + /** + * Enable load balancing between servers + */ + static final String SENTRY_CLIENT_LOAD_BALANCING = + "sentry.service.client.connection.loadbalance"; + static final boolean SENTRY_CLIENT_LOAD_BALANCING_DEFAULT = true; + static final int RPC_PORT_DEFAULT = 8038; + private SentryClientTransportConstants() { + } + /** * Defines configuration strings needed for sentry thrift policy clients to handle * the transport level operations. @@ -57,7 +69,6 @@ class SentryClientTransportConstants { //configuration for server address. It can be coma seperated list of server addresses. static final String SERVER_RPC_ADDRESS = "sentry.service.client.server.rpc-address"; - /** * This configuration parameter is only meant to be used for testing purposes. */ @@ -72,7 +83,7 @@ class SentryClientTransportConstants { static final int SENTRY_FULL_RETRY_TOTAL_DEFAULT = SentryClientTransportConstants.SENTRY_FULL_RETRY_TOTAL_DEFAULT; - static final String SECURITY_USE_UGI_TRANSPORT = "sentry.service.security.use.ugi"; + public static final String SECURITY_USE_UGI_TRANSPORT = "sentry.service.security.use.ugi"; static final String PRINCIPAL = "sentry.service.server.principal"; //configration for the client connection timeout. @@ -88,35 +99,39 @@ class SentryClientTransportConstants { static final String SENTRY_RPC_RETRY_TOTAL = "sentry.service.client.rpc.retry-total"; static final int SENTRY_RPC_RETRY_TOTAL_DEFAULT = 3; - // connection pool configuration - static final String SENTRY_POOL_ENABLED = "sentry.service.client.connection.pool.enabled"; - static final boolean SENTRY_POOL_ENABLED_DEFAULT = false; + // commons-pool configuration + static final String SENTRY_POOL_ENABLE = "sentry.service.client.connection.pool.enabled"; + static final boolean SENTRY_POOL_ENABLE_DEFAULT = true; - // commons-pool configuration for pool size + /** Allow unlimited number of idle connections */ static final String SENTRY_POOL_MAX_TOTAL = "sentry.service.client.connection.pool.max-total"; - static final int SENTRY_POOL_MAX_TOTAL_DEFAULT = 8; + static final int SENTRY_POOL_MAX_TOTAL_DEFAULT = -1; static final String SENTRY_POOL_MAX_IDLE = "sentry.service.client.connection.pool.max-idle"; - static final int SENTRY_POOL_MAX_IDLE_DEFAULT = 8; + static final int SENTRY_POOL_MAX_IDLE_DEFAULT = 400; static final String SENTRY_POOL_MIN_IDLE = "sentry.service.client.connection.pool.min-idle"; - static final int SENTRY_POOL_MIN_IDLE_DEFAULT = 0; - - // configuration to load balance the connections to the configured sentry servers - static final String SENTRY_CLIENT_LOAD_BALANCING = "sentry.service.client.connection.loadbalance"; - static final boolean SENTRY_CLIENT_LOAD_BALANCING_DEFAULT = true; - - // retry num for getting the connection from connection pool - static final String SENTRY_POOL_RETRY_TOTAL = - SentryClientTransportConstants.SENTRY_FULL_RETRY_TOTAL; - static final int SENTRY_POOL_RETRY_TOTAL_DEFAULT = - SentryClientTransportConstants.SENTRY_RPC_RETRY_TOTAL_DEFAULT; - + static final int SENTRY_POOL_MIN_IDLE_DEFAULT = 10; + static final String SENTRY_POOL_MIN_EVICTION_TIME_SEC = + "sentry.service.client.connection.pool.eviction.mintime.sec"; + // 2 minutes seconds min time before eviction + static final long SENTRY_POOL_MIN_EVICTION_TIME_SEC_DEFAULT = + TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);; + static final String SENTRY_POOL_EVICTION_INTERVAL_SEC = + "sentry.service.client.connection.pool.eviction.interval.sec"; + // Run eviction thread every minute + static final long SENTRY_POOL_EVICTION_INTERVAL_SEC_DEFAULT = + TimeUnit.MILLISECONDS.convert(1L, TimeUnit.MINUTES); + + static final String SENTRY_CLIENT_LOAD_BALANCING = + SentryClientTransportConstants.SENTRY_CLIENT_LOAD_BALANCING; + static final boolean SENTRY_CLIENT_LOAD_BALANCING_DEFAULT = + SentryClientTransportConstants.SENTRY_CLIENT_LOAD_BALANCING_DEFAULT; } /** * Defines configuration strings needed for sentry HDFS clients to handle the transport level * operations. */ - static class HDFSClientConstants { + public static class HDFSClientConstants { //Default server port static final int SERVER_RPC_PORT_DEFAULT = SentryClientTransportConstants.RPC_PORT_DEFAULT; @@ -142,14 +157,10 @@ class SentryClientTransportConstants { static final int SENTRY_FULL_RETRY_TOTAL_DEFAULT = SentryClientTransportConstants.SENTRY_FULL_RETRY_TOTAL_DEFAULT; - static final String SECURITY_USE_UGI_TRANSPORT = "sentry.hdfs.service.security.use.ugi"; + public static final String SECURITY_USE_UGI_TRANSPORT = "sentry.hdfs.service.security.use.ugi"; static final String PRINCIPAL = "sentry.hdfs.service.server.principal"; - static final String RPC_ADDRESS = "sentry.hdfs.service.client.server.rpc-address"; - - static final String RPC_ADDRESS_DEFAULT = "0.0.0.0"; //NOPMD - //configration for the client connection timeout. static final String SERVER_RPC_CONN_TIMEOUT = "sentry.hdfs.service.client.server.rpc-connection-timeout"; @@ -165,8 +176,28 @@ class SentryClientTransportConstants { static final int SENTRY_RPC_RETRY_TOTAL_DEFAULT = 3; - // configuration to load balance the connections to the configured sentry servers - static final String SENTRY_CLIENT_LOAD_BALANCING = "sentry.hdfs.service.client.connection.loadbalance"; - static final boolean SENTRY_CLIENT_LOAD_BALANCING_DEFAULT = true; + // commons-pool configuration - disable pool for HDFS clients + static final String SENTRY_POOL_ENABLE = "sentry.hdfs.service.client.connection.pool.enable"; + static final boolean SENTRY_POOL_ENABLE_DEFAULT = false; + + /** Total maximum number of open connections. There shouldn't be many. */ + static final String SENTRY_POOL_MAX_TOTAL = "sentry.hdfs.service.client.connection.pool.max-total"; + static final int SENTRY_POOL_MAX_TOTAL_DEFAULT = 16; + /** Maximum number of idle connections to keep */ + static final String SENTRY_POOL_MAX_IDLE = "sentry.hdfs.service.client.connection.pool.max-idle"; + static final int SENTRY_POOL_MAX_IDLE_DEFAULT = 2; + static final String SENTRY_POOL_MIN_IDLE = "sentry.hdfs.service.client.connection.pool.min-idle"; + static final int SENTRY_POOL_MIN_IDLE_DEFAULT = 1; + static final String SENTRY_POOL_MIN_EVICTION_TIME_SEC = + "sentry.hdfs.service.client.connection.pool.eviction.mintime.sec"; + // No evictions for HDFS connections by default + static final long SENTRY_POOL_MIN_EVICTION_TIME_SEC_DEFAULT = 0L; + static final String SENTRY_POOL_EVICTION_INTERVAL_SEC = + "sentry.hdfs.service.client.connection.pool.eviction.interval.sec"; + static final long SENTRY_POOL_EVICTION_INTERVAL_SEC_DEFAULT = -1L; + static final String SENTRY_CLIENT_LOAD_BALANCING = + SentryClientTransportConstants.SENTRY_CLIENT_LOAD_BALANCING; + static final boolean SENTRY_CLIENT_LOAD_BALANCING_DEFAULT = + SentryClientTransportConstants.SENTRY_CLIENT_LOAD_BALANCING_DEFAULT; } } http://git-wip-us.apache.org/repos/asf/sentry/blob/5dc6b285/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryConnection.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryConnection.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryConnection.java new file mode 100644 index 0000000..b5f2bcf --- /dev/null +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryConnection.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.core.common.transport; + +/** + * Representation of a connection to a Sentry Server. + * <ul> + * <li>Connection is initialized using the {@link #connect()} method.</li> + * <li>When the connection is no longer used, the {@link #done()} method should be called to + * deallocate any resources.</li> + * <li>If the user detected that connection is broken, they should call + * {@link #invalidate()} method. The connection can not be used after that.</li> + * </ul> + */ +public interface SentryConnection { + /** + * Connect to Sentry server. + * Either creates a new connection or reuses an existing one. + * @throws Exception on failure to connect. + */ + void connect() throws Exception; + + /** + * Disconnect from the server. May close connection or return it to a + * pool for reuse. + */ + void done(); + + /** + * The connection is assumed to be non-working, invalidate it. + * Subsequent {@link #connect() call} should attempt to obtain + * another connection. + * <p> + * The implementation may attempt to connect + * to another server immediately or delay it till the call to + * {@link #connect()}. + */ + void invalidate(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sentry/blob/5dc6b285/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryHDFSClientTransportConfig.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryHDFSClientTransportConfig.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryHDFSClientTransportConfig.java index 2d80827..1724e7f 100644 --- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryHDFSClientTransportConfig.java +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryHDFSClientTransportConfig.java @@ -32,63 +32,92 @@ import static org.apache.sentry.core.common.transport.SentryClientTransportConst */ public final class SentryHDFSClientTransportConfig implements SentryClientTransportConfigInterface { - public SentryHDFSClientTransportConfig() { } @Override - public boolean isKerberosEnabled(Configuration conf) throws MissingConfigurationException { + public boolean isKerberosEnabled(Configuration conf) { return (conf.get(SECURITY_MODE, SentryConstants.KERBEROS_MODE).trim() .equalsIgnoreCase((SentryConstants.KERBEROS_MODE))); } @Override - public int getSentryFullRetryTotal(Configuration conf) throws MissingConfigurationException { - return conf.getInt(SENTRY_FULL_RETRY_TOTAL, SENTRY_FULL_RETRY_TOTAL_DEFAULT); - } - - @Override public int getSentryRpcRetryTotal(Configuration conf) { return conf.getInt(SENTRY_RPC_RETRY_TOTAL, SENTRY_RPC_RETRY_TOTAL_DEFAULT); } @Override - public boolean useUserGroupInformation(Configuration conf) - throws MissingConfigurationException { - return Boolean.valueOf(conf.get(SECURITY_USE_UGI_TRANSPORT, "true")); + public boolean useUserGroupInformation(Configuration conf) { + return Boolean.parseBoolean(conf.get(SECURITY_USE_UGI_TRANSPORT, "true")); } + /** + * @throws MissingConfigurationException + */ @Override - public String getSentryPrincipal(Configuration conf) throws MissingConfigurationException { + public String getSentryPrincipal(Configuration conf) { String principle = conf.get(PRINCIPAL); - if (principle != null && !principle.isEmpty()) { + if ((principle != null) && !principle.isEmpty()) { return principle; } throw new MissingConfigurationException(PRINCIPAL); } + /** + * @throws MissingConfigurationException + */ @Override - public String getSentryServerRpcAddress(Configuration conf) - throws MissingConfigurationException { + public String getSentryServerRpcAddress(Configuration conf) { String serverAddress = conf.get(SERVER_RPC_ADDRESS); - if (serverAddress != null && !serverAddress.isEmpty()) { + if ((serverAddress != null) && !serverAddress.isEmpty()) { return serverAddress; } throw new MissingConfigurationException(SERVER_RPC_ADDRESS); } @Override - public int getServerRpcPort(Configuration conf) throws MissingConfigurationException { + public int getServerRpcPort(Configuration conf) { return conf.getInt(SERVER_RPC_PORT, SentryClientTransportConstants.RPC_PORT_DEFAULT); } @Override - public int getServerRpcConnTimeoutInMs(Configuration conf) - throws MissingConfigurationException { + public int getServerRpcConnTimeoutInMs(Configuration conf) { return conf.getInt(SERVER_RPC_CONN_TIMEOUT, SERVER_RPC_CONN_TIMEOUT_DEFAULT); } @Override - public boolean isLoadBalancingEnabled(Configuration conf) - throws MissingConfigurationException { - return conf.getBoolean(SENTRY_CLIENT_LOAD_BALANCING, SENTRY_CLIENT_LOAD_BALANCING_DEFAULT); + public int getPoolMaxTotal(Configuration conf) { + return conf.getInt(SENTRY_POOL_MAX_TOTAL, SENTRY_POOL_MAX_TOTAL_DEFAULT); + } + + @Override + public int getPoolMinIdle(Configuration conf) { + return conf.getInt(SENTRY_POOL_MIN_IDLE, SENTRY_POOL_MIN_IDLE_DEFAULT); + } + + @Override + public int getPoolMaxIdle(Configuration conf) { + return conf.getInt(SENTRY_POOL_MAX_IDLE, SENTRY_POOL_MAX_IDLE_DEFAULT); + } + + @Override + public long getMinEvictableTimeSec(Configuration conf) { + return conf.getLong(SENTRY_POOL_MIN_EVICTION_TIME_SEC, + SENTRY_POOL_MIN_EVICTION_TIME_SEC_DEFAULT); + } + + @Override + public long getTimeBetweenEvictionRunsSec(Configuration conf) { + return conf.getLong(SENTRY_POOL_EVICTION_INTERVAL_SEC, + SENTRY_POOL_EVICTION_INTERVAL_SEC_DEFAULT); + } + + @Override + public boolean isLoadBalancingEnabled(Configuration conf) { + return conf.getBoolean(SENTRY_CLIENT_LOAD_BALANCING, + SENTRY_CLIENT_LOAD_BALANCING_DEFAULT); + } + + @Override + public boolean isTransportPoolEnabled(Configuration conf) { + return conf.getBoolean(SENTRY_POOL_ENABLE, SENTRY_POOL_ENABLE_DEFAULT); } } http://git-wip-us.apache.org/repos/asf/sentry/blob/5dc6b285/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryPolicyClientTransportConfig.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryPolicyClientTransportConfig.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryPolicyClientTransportConfig.java index c97fe97..45522df 100644 --- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryPolicyClientTransportConfig.java +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryPolicyClientTransportConfig.java @@ -21,6 +21,7 @@ package org.apache.sentry.core.common.transport; import org.apache.hadoop.conf.Configuration; import org.apache.sentry.core.common.exception.MissingConfigurationException; import org.apache.sentry.core.common.utils.SentryConstants; + import static org.apache.sentry.core.common.transport.SentryClientTransportConstants.PolicyClientConstants.*; /** @@ -32,63 +33,91 @@ import static org.apache.sentry.core.common.transport.SentryClientTransportConst */ public final class SentryPolicyClientTransportConfig implements SentryClientTransportConfigInterface { - public SentryPolicyClientTransportConfig() { } @Override - public boolean isKerberosEnabled(Configuration conf) throws MissingConfigurationException { + public boolean isKerberosEnabled(Configuration conf) { return (conf.get(SECURITY_MODE, SentryConstants.KERBEROS_MODE).trim() .equalsIgnoreCase((SentryConstants.KERBEROS_MODE))); } @Override - public int getSentryFullRetryTotal(Configuration conf) throws MissingConfigurationException { - return conf.getInt(SENTRY_FULL_RETRY_TOTAL, SENTRY_FULL_RETRY_TOTAL_DEFAULT); - } - - @Override public int getSentryRpcRetryTotal(Configuration conf) { return conf.getInt(SENTRY_RPC_RETRY_TOTAL, SENTRY_RPC_RETRY_TOTAL_DEFAULT); } @Override - public boolean useUserGroupInformation(Configuration conf) - throws MissingConfigurationException { + public boolean useUserGroupInformation(Configuration conf) { return Boolean.valueOf(conf.get(SECURITY_USE_UGI_TRANSPORT, "true")); } + /** + * @throws MissingConfigurationException + */ @Override - public String getSentryPrincipal(Configuration conf) throws MissingConfigurationException { + public String getSentryPrincipal(Configuration conf) { String principle = conf.get(PRINCIPAL); - if (principle != null && !principle.isEmpty()) { + if ((principle != null) && !principle.isEmpty()) { return principle; } throw new MissingConfigurationException(PRINCIPAL); } + /** + * @throws MissingConfigurationException + */ @Override - public String getSentryServerRpcAddress(Configuration conf) - throws MissingConfigurationException { + public String getSentryServerRpcAddress(Configuration conf) { String serverAddress = conf.get(SERVER_RPC_ADDRESS); - if (serverAddress != null && !serverAddress.isEmpty()) { + if ((serverAddress != null) && !serverAddress.isEmpty()) { return serverAddress; } throw new MissingConfigurationException(SERVER_RPC_ADDRESS); } @Override - public int getServerRpcPort(Configuration conf) throws MissingConfigurationException { + public int getServerRpcPort(Configuration conf) { return conf.getInt(SERVER_RPC_PORT, SentryClientTransportConstants.RPC_PORT_DEFAULT); } @Override - public int getServerRpcConnTimeoutInMs(Configuration conf) - throws MissingConfigurationException { + public int getServerRpcConnTimeoutInMs(Configuration conf) { return conf.getInt(SERVER_RPC_CONN_TIMEOUT, SERVER_RPC_CONN_TIMEOUT_DEFAULT); } @Override - public boolean isLoadBalancingEnabled(Configuration conf) - throws MissingConfigurationException { + public int getPoolMaxTotal(Configuration conf) { + return conf.getInt(SENTRY_POOL_MAX_TOTAL, SENTRY_POOL_MAX_TOTAL_DEFAULT); + } + + @Override + public int getPoolMinIdle(Configuration conf) { + return conf.getInt(SENTRY_POOL_MIN_IDLE, SENTRY_POOL_MIN_IDLE_DEFAULT); + } + + @Override + public int getPoolMaxIdle(Configuration conf) { + return conf.getInt(SENTRY_POOL_MAX_IDLE, SENTRY_POOL_MAX_IDLE_DEFAULT); + } + + @Override + public long getMinEvictableTimeSec(Configuration conf) { + return conf.getLong(SENTRY_POOL_MIN_EVICTION_TIME_SEC, + SENTRY_POOL_MIN_EVICTION_TIME_SEC_DEFAULT); + } + + @Override + public long getTimeBetweenEvictionRunsSec(Configuration conf) { + return conf.getLong(SENTRY_POOL_EVICTION_INTERVAL_SEC, + SENTRY_POOL_EVICTION_INTERVAL_SEC_DEFAULT); + } + + @Override + public boolean isLoadBalancingEnabled(Configuration conf) { return conf.getBoolean(SENTRY_CLIENT_LOAD_BALANCING, SENTRY_CLIENT_LOAD_BALANCING_DEFAULT); } + + @Override + public boolean isTransportPoolEnabled(Configuration conf) { + return conf.getBoolean(SENTRY_POOL_ENABLE, SENTRY_POOL_ENABLE_DEFAULT); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sentry/blob/5dc6b285/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java deleted file mode 100644 index 9a10ca5..0000000 --- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryServiceClient.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sentry.core.common.transport; - -/** - * Client interface for Proxy Invocation handlers - * <p> - * Defines interface that Sentry client's should expose to the Invocation handlers like - * <code>RetryClientInvocationHandler</code> used to proxy the method invocation on sentry - * client instances . - * <p> - * All the sentry clients that need retrying and failover capabilities should implement - * this interface. - */ -public interface SentryServiceClient { - /** - * Connect to Sentry server. - * Either creates a new connection or reuses an existing one. - * @throws Exception on failure to acquire a transport towards server. - */ - void connect() throws Exception; - - /** - * Disconnect from the server. May close connection or return it to a - * pool for reuse. - */ - void disconnect(); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sentry/blob/5dc6b285/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java index 74aced2..d299113 100644 --- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportFactory.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -22,215 +22,72 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.net.HostAndPort; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.sentry.core.common.exception.MissingConfigurationException; import org.apache.thrift.transport.TSaslClientTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; -import org.apache.sentry.core.common.utils.ThriftUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.concurrent.ThreadSafe; import javax.security.sasl.Sasl; -import javax.security.sasl.SaslException; - import java.io.IOException; import java.net.InetSocketAddress; import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.Collections; /** - * Create Thrift transports suitable for talking to Sentry + * Factory for producing connected Thrift transports. + * It can produce regular transports as well as Kerberos-enabled transports. + * <p> + * This class is immutable and thus thread-safe. */ +@ThreadSafe +public final class SentryTransportFactory implements TransportFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(SentryTransportFactory.class); -public class SentryTransportFactory { - protected final Configuration conf; - private String[] serverPrincipalParts; - protected TTransport thriftTransport; + private final Configuration conf; + private final boolean useUgi; + private final String serverPrincipal; private final int connectionTimeout; - private static final Logger LOGGER = LoggerFactory.getLogger(SentryTransportFactory.class); - // configs for connection retry - private final int connectionFullRetryTotal; - private final ArrayList<InetSocketAddress> endpoints; - private final SentryClientTransportConfigInterface transportConfig; + private final boolean isKerberosEnabled; private static final ImmutableMap<String, String> SASL_PROPERTIES = ImmutableMap.of(Sasl.SERVER_AUTH, "true", Sasl.QOP, "auth-conf"); /** - * This transport wraps the Sasl transports to set up the right UGI context for open(). - */ - public static class UgiSaslClientTransport extends TSaslClientTransport { - UserGroupInformation ugi = null; - - public UgiSaslClientTransport(String mechanism, String protocol, - String serverName, TTransport transport, - boolean wrapUgi, Configuration conf) - throws IOException, SaslException { - super(mechanism, null, protocol, serverName, SASL_PROPERTIES, null, - transport); - if (wrapUgi) { - ugi = UserGroupInformation.getLoginUser(); - } - } - - // open the SASL transport with using the current UserGroupInformation - // This is needed to get the current login context stored - @Override - public void open() throws TTransportException { - if (ugi == null) { - baseOpen(); - } else { - try { - if (ugi.isFromKeytab()) { - ugi.checkTGTAndReloginFromKeytab(); - } - ugi.doAs(new PrivilegedExceptionAction<Void>() { - public Void run() throws TTransportException { - baseOpen(); - return null; - } - }); - } catch (IOException e) { - throw new TTransportException("Failed to open SASL transport: " + e.getMessage(), e); - } catch (InterruptedException e) { - throw new TTransportException( - "Interrupted while opening underlying transport: " + e.getMessage(), e); - } - } - } - - private void baseOpen() throws TTransportException { - super.open(); - } - } - - /** * Initialize the object based on the sentry configuration provided. - * List of configured servers are reordered randomly preventing all - * clients connecting to the same server. * * @param conf Sentry configuration * @param transportConfig transport configuration to use */ public SentryTransportFactory(Configuration conf, - SentryClientTransportConfigInterface transportConfig) throws IOException { + SentryClientTransportConfigInterface transportConfig) { this.conf = conf; Preconditions.checkNotNull(this.conf, "Configuration object cannot be null"); - serverPrincipalParts = null; - this.transportConfig = transportConfig; - - try { - this.connectionTimeout = transportConfig.getServerRpcConnTimeoutInMs(conf); - this.connectionFullRetryTotal = transportConfig.getSentryFullRetryTotal(conf); - if(transportConfig.isKerberosEnabled(conf) && - transportConfig.useUserGroupInformation(conf)) { - // Re-initializing UserGroupInformation, if needed - UserGroupInformationInitializer.initialize(conf); - } - String hostsAndPortsStr = transportConfig.getSentryServerRpcAddress(conf); - - int serverPort = transportConfig.getServerRpcPort(conf); - - String[] hostsAndPortsStrArr = hostsAndPortsStr.split(","); - HostAndPort[] hostsAndPorts = ThriftUtil.parseHostPortStrings(hostsAndPortsStrArr, serverPort); - - this.endpoints = new ArrayList<>(hostsAndPortsStrArr.length); - for (HostAndPort endpoint : hostsAndPorts) { - this.endpoints.add( - new InetSocketAddress(endpoint.getHostText(), endpoint.getPort())); - LOGGER.debug("Added server endpoint: " + endpoint.toString()); - } - - if((endpoints.size() > 1) && (transportConfig.isLoadBalancingEnabled(conf))) { - // Reorder endpoints randomly to prevent all clients connecting to the same endpoint - // and load balance the connections towards sentry servers - Collections.shuffle(endpoints); - } - } catch (MissingConfigurationException e) { - throw new RuntimeException("Sentry Thrift Client Creation Failed: " + e.getMessage(), e); - } - } - - /** - * Initialize object based on the parameters provided provided. - * - * @param addr Host address which the client needs to connect - * @param port Host Port which the client needs to connect - * @param conf Sentry configuration - * @param transportConfig transport configuration to use - */ - public SentryTransportFactory(String addr, int port, Configuration conf, - SentryClientTransportConfigInterface transportConfig) throws IOException { - // copy the configuration because we may make modifications to it. - this.conf = new Configuration(conf); - serverPrincipalParts = null; - Preconditions.checkNotNull(this.conf, "Configuration object cannot be null"); - this.transportConfig = transportConfig; - - try { - this.endpoints = new ArrayList<>(1); - this.endpoints.add(NetUtils.createSocketAddr(addr, port)); - this.connectionTimeout = transportConfig.getServerRpcConnTimeoutInMs(conf); - this.connectionFullRetryTotal = transportConfig.getSentryFullRetryTotal(conf); - } catch (MissingConfigurationException e) { - throw new RuntimeException("Sentry Thrift Client Creation Failed: " + e.getMessage(), e); - } - } - - - /** - * On connection error, Iterates through all the configured servers and tries to connect. - * On successful connection, control returns - * On connection failure, continues iterating through all the configured sentry servers, - * and then retries the whole server list no more than connectionFullRetryTotal times. - * In this case, it won't introduce more latency when some server fails. - * <p> - * TODO: Add metrics for the number of successful connects and errors per client, and total number of retries. - */ - public TTransport getTransport() throws IOException { - IOException currentException = null; - for (int retryCount = 0; retryCount < connectionFullRetryTotal; retryCount++) { - try { - return connectToAvailableServer(); - } catch (IOException e) { - currentException = e; - LOGGER.error( - "Failed to connect to all the configured sentry servers, Retrying again"); - } + connectionTimeout = transportConfig.getServerRpcConnTimeoutInMs(conf); + isKerberosEnabled = transportConfig.isKerberosEnabled(conf); + if (isKerberosEnabled) { + useUgi = transportConfig.useUserGroupInformation(conf); + serverPrincipal = transportConfig.getSentryPrincipal(conf); + } else { + serverPrincipal = null; + useUgi = false; } - // Throws exception on reaching the connectionFullRetryTotal. - LOGGER.error( - String.format("Reach the max connection retry num %d ", connectionFullRetryTotal), - currentException); - throw currentException; } /** - * Iterates through all the configured servers and tries to connect. - * On connection error, tries to connect to next server. - * Control returns on successful connection OR it's done trying to all the - * configured servers. - * + * Connect to the endpoint and return a connected Thrift transport. + * @return Connection to the endpoint * @throws IOException */ - private TTransport connectToAvailableServer() throws IOException { - IOException currentException = null; - for (InetSocketAddress addr : endpoints) { - try { - return connectToServer(addr); - } catch (IOException e) { - LOGGER.error(String.format("Failed connection to %s: %s", - addr.toString(), e.getMessage()), e); - currentException = e; - } - } - throw currentException; + @Override + public TTransportWrapper getTransport(HostAndPort endpoint) throws IOException { + return new TTransportWrapper(connectToServer(new InetSocketAddress(endpoint.getHostText(), + endpoint.getPort())), + endpoint); } /** @@ -241,70 +98,94 @@ public class SentryTransportFactory { */ private TTransport connectToServer(InetSocketAddress serverAddress) throws IOException { try { - thriftTransport = createTransport(serverAddress); + TTransport thriftTransport = createTransport(serverAddress); thriftTransport.open(); + return thriftTransport; } catch (TTransportException e) { throw new IOException("Failed to open transport: " + e.getMessage(), e); - } catch (MissingConfigurationException e) { - throw new RuntimeException(e.getMessage(), e); } - - LOGGER.debug("Successfully opened transport: " + thriftTransport + " to " + serverAddress); - return thriftTransport; } /** - * New socket is is created - * - * @param serverAddress - * @return + * Create transport given InetSocketAddress + * @param serverAddress - endpoint address + * @return unconnected transport * @throws TTransportException - * @throws MissingConfigurationException * @throws IOException */ + @SuppressWarnings("squid:S2095") private TTransport createTransport(InetSocketAddress serverAddress) - throws TTransportException, MissingConfigurationException, IOException { - TTransport socket = new TSocket(serverAddress.getHostName(), - serverAddress.getPort(), connectionTimeout); + throws IOException { + String hostName = serverAddress.getHostName(); + int port = serverAddress.getPort(); + TTransport socket = new TSocket(hostName, port, connectionTimeout); - if (!transportConfig.isKerberosEnabled(conf)) { + if (!isKerberosEnabled) { + LOGGER.debug("created unprotected connection to {}:{} ", hostName, port); return socket; - } else { - String serverPrincipal = transportConfig.getSentryPrincipal(conf); - serverPrincipal = SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress()); - LOGGER.debug("Using server kerberos principal: " + serverPrincipal); - if (serverPrincipalParts == null) { - serverPrincipalParts = SaslRpcServer.splitKerberosName(serverPrincipal); - Preconditions.checkArgument(serverPrincipalParts.length == 3, - "Kerberos principal should have 3 parts: " + serverPrincipal); - } + } - boolean wrapUgi = transportConfig.useUserGroupInformation(conf); - return new UgiSaslClientTransport(SaslRpcServer.AuthMethod.KERBEROS.getMechanismName(), - serverPrincipalParts[0], serverPrincipalParts[1], - socket, wrapUgi, conf); + String principal = SecurityUtil.getServerPrincipal(serverPrincipal, serverAddress.getAddress()); + String[] serverPrincipalParts = SaslRpcServer.splitKerberosName(principal); + if (serverPrincipalParts.length != 3) { + throw new IOException("Kerberos principal should have 3 parts: " + principal); } - } - private boolean isConnected() { - return thriftTransport != null && thriftTransport.isOpen(); - } + UgiSaslClientTransport connection = + new UgiSaslClientTransport(SaslRpcServer.AuthMethod.KERBEROS.getMechanismName(), + serverPrincipalParts[0], serverPrincipalParts[1], + socket, useUgi); - /** - * Method currently closes the transport - * TODO (Kalyan) Plan is to hold the transport and resuse it accross multiple client's - * That way, new connection need not be created for each new client - */ - public void releaseTransport() { - close(); + LOGGER.debug("creating secured connection to {}:{} ", hostName, port); + return connection; } /** - * Method closes the transport + * This transport wraps the Sasl transports to set up the right UGI context for open(). */ - public void close() { - if (isConnected()) { - thriftTransport.close(); + private static class UgiSaslClientTransport extends TSaslClientTransport { + private UserGroupInformation ugi = null; + + UgiSaslClientTransport(String mechanism, String protocol, + String serverName, TTransport transport, + boolean wrapUgi) + throws IOException { + super(mechanism, null, protocol, serverName, SASL_PROPERTIES, null, + transport); + if (wrapUgi) { + ugi = UserGroupInformation.getLoginUser(); + } + } + + // open the SASL transport with using the current UserGroupInformation + // This is needed to get the current login context stored + @Override + public void open() throws TTransportException { + if (ugi == null) { + baseOpen(); + } else { + try { + if (ugi.isFromKeytab()) { + ugi.checkTGTAndReloginFromKeytab(); + } + ugi.doAs(new PrivilegedExceptionAction<Void>() { + public Void run() throws TTransportException { + baseOpen(); + return null; + } + }); + } catch (IOException e) { + throw new TTransportException("Failed to open SASL transport: " + e.getMessage(), e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new TTransportException( + "Interrupted while opening underlying transport: " + e.getMessage(), e); + } + } + } + + private void baseOpen() throws TTransportException { + super.open(); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sentry/blob/5dc6b285/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportPool.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportPool.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportPool.java new file mode 100644 index 0000000..04a515a --- /dev/null +++ b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/transport/SentryTransportPool.java @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.core.common.transport; + +import com.google.common.base.Preconditions; +import com.google.common.net.HostAndPort; +import org.apache.commons.pool2.BaseKeyedPooledObjectFactory; +import org.apache.commons.pool2.KeyedObjectPool; +import org.apache.commons.pool2.PooledObject; +import org.apache.commons.pool2.impl.DefaultPooledObject; +import org.apache.commons.pool2.impl.GenericKeyedObjectPool; +import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig; +import org.apache.hadoop.conf.Configuration; +import org.apache.sentry.core.common.utils.ThriftUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.ThreadSafe; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Pool of transport connections to Sentry servers. + * The pool caches open connections to multiple Sentry servers, + * specified in the configuration. + * + * When transport pooling is disabled in configuration, + * creates transports directly and doesn't cache connections. + */ +@ThreadSafe +public final class SentryTransportPool implements AutoCloseable { + private static final Logger LOGGER = LoggerFactory.getLogger(SentryTransportPool.class); + + // Used for logging to identify pool instances. This is only useful for test debugging + // so we do not preserve thread safety for this field. + private static int poolId = 0; + private final int id; + + // True if using Object pool + private final boolean isPoolEnabled; + + // Load balance between servers if true + private final boolean doLoadBalancing; + + // List of all known servers + private final ArrayList<HostAndPort> endpoints; + + // Transport pool which keeps connected transports + private final KeyedObjectPool<HostAndPort, TTransportWrapper> pool; + // Source of connected transports + private final TransportFactory transportFactory; + + // Set when we are closed + private final AtomicBoolean closed = new AtomicBoolean(); + + /** + * Configure transport pool. + * <p> + * The pool accepts the following configuration: + * <ul> + * <li>Maximum total number of objects in the pool</li> + * <li>Minimum number of idle objects</li> + * <li>Maximum number of idle objects</li> + * <li>Minimum time before the object is evicted</li> + * <li>Interval between evictions</li> + * </ul> + * @param conf Configuration + * @param transportConfig Configuration interface + * @param transportFactory Transport factory used to produce transports + */ + public SentryTransportPool(Configuration conf, + SentryClientTransportConfigInterface transportConfig, + TransportFactory transportFactory) { + + // This isn't thread-safe, but we don't care - it is only used + // for debugging when running tests - normal apps use a single pool + poolId++; + id = poolId; + + this.transportFactory = transportFactory; + doLoadBalancing = transportConfig.isLoadBalancingEnabled(conf); + isPoolEnabled = transportConfig.isTransportPoolEnabled(conf); + + // Get list of server addresses + String hostsAndPortsStr = transportConfig.getSentryServerRpcAddress(conf); + int serverPort = transportConfig.getServerRpcPort(conf); + LOGGER.info("Creating pool for {} with default port {}", + hostsAndPortsStr, serverPort); + String[] hostsAndPortsStrArr = hostsAndPortsStr.split(","); + Preconditions.checkArgument(hostsAndPortsStrArr.length > 0, + "At least one server should be specified"); + + endpoints = new ArrayList<>(hostsAndPortsStrArr.length); + for(String addr: hostsAndPortsStrArr) { + HostAndPort endpoint = ThriftUtil.parseAddress(addr, serverPort); + LOGGER.info("Adding endpoint {}", endpoint); + endpoints.add(endpoint); + } + + if (!isPoolEnabled) { + pool = null; + LOGGER.info("Connection pooling is disabled"); + return; + } + + LOGGER.info("Connection pooling is enabled"); + // Set pool configuration based on Configuration settings + GenericKeyedObjectPoolConfig poolConfig = new GenericKeyedObjectPoolConfig(); + + // Don't limit maximum number of objects in the pool + poolConfig.setMaxTotal(-1); + + poolConfig.setMinIdlePerKey(transportConfig.getPoolMinIdle(conf)); + poolConfig.setMaxIdlePerKey(transportConfig.getPoolMaxIdle(conf)); + + // Do not block when pool is exhausted, throw exception instead + poolConfig.setBlockWhenExhausted(false); + poolConfig.setTestOnReturn(true); + + // No limit for total objects in the pool + poolConfig.setMaxTotalPerKey(transportConfig.getPoolMaxTotal(conf)); + poolConfig.setMinEvictableIdleTimeMillis(transportConfig.getMinEvictableTimeSec(conf)); + poolConfig.setTimeBetweenEvictionRunsMillis(transportConfig.getTimeBetweenEvictionRunsSec(conf)); + + // Create object pool + pool = new GenericKeyedObjectPool<>(new PoolFactory(this.transportFactory, id), + poolConfig); + } + + public TTransportWrapper getTransport() throws Exception { + List<HostAndPort> servers; + // If we are doing load balancing and there is more then one server, + // shuffle them before obtaining connection + if (doLoadBalancing && (endpoints.size() > 1)) { + servers = new ArrayList<>(endpoints); + Collections.shuffle(servers); + } else { + servers = endpoints; + } + + // Try to get a connection from one of the pools + Exception failure = null; + for(HostAndPort addr: servers) { + try { + TTransportWrapper transport = + isPoolEnabled ? + pool.borrowObject(addr) : + transportFactory.getTransport(addr); + LOGGER.debug("[{}] obtained transport {}", id, transport); + if (LOGGER.isDebugEnabled() && isPoolEnabled) { + LOGGER.debug("Currently {} active connections, {} idle connections", + pool.getNumActive(), pool.getNumIdle()); + } + return transport; + } catch (IllegalStateException e) { + // Should not happen + LOGGER.error("Unexpected error from pool {}", id, e); + failure = e; + } catch (Exception e) { + LOGGER.error("Failed to obtain transport for {}: {}", + addr, e.getMessage()); + failure = e; + } + } + // Failed to borrow connect to any endpoint + assert failure != null; + throw failure; + } + + /** + * Return transport to the pool + * @param transport Open transport + */ + public void returnTransport(TTransportWrapper transport) { + if (closed.get()) { + LOGGER.debug("Returned {} to closed pool", transport); + transport.close(); + return; + } + try { + if (isPoolEnabled) { + LOGGER.debug("[{}] returning {}", id, transport); + pool.returnObject(transport.getAddress(), transport); + } else { + LOGGER.debug("Closing {}", transport); + transport.close(); + } + } catch (Exception e) { + LOGGER.error("Failed to return {}", transport, e); + } + } + + public void invalidateTransport(TTransportWrapper transport) { + if (closed.get()) { + LOGGER.debug("invalidated {} for closed pool", transport); + transport.close(); + return; + } + try { + LOGGER.debug("[{}] Invalidating address {}", id, transport); + if (!isPoolEnabled) { + transport.close(); + } else { + pool.invalidateObject(transport.getAddress(), transport); + // Invalidate the whole pool associated with the given address + // It is a bit brutal since a single bad connection may + // cause an invalidation, but otherwise we may have a lot of bad + // connections in the pool and try to return them. + pool.clear(transport.getAddress()); + } + } catch (Exception e) { + LOGGER.error("Failed to invalidate {}", transport, e); + } + } + + @Override + public void close() throws Exception { + if (closed.get()) { + // already closed + return; + } + LOGGER.debug("[{}] closing", id); + if (pool != null) { + LOGGER.debug("Closing pool of {}/{} endpoints", + pool.getNumIdle(), pool.getNumActive()); + pool.close(); + } + } + + /** + * Factory that creates and destroys pool objects + */ + private static final class PoolFactory + extends BaseKeyedPooledObjectFactory<HostAndPort, TTransportWrapper> { + private final TransportFactory transportFactory; + private final int id; + + /** + * Create a pool factory associated with the given transport factory + * @param transportFactory - factory producing transports + * @param id pool id (for debugging) + */ + private PoolFactory(TransportFactory transportFactory, int id) { + this.transportFactory = transportFactory; + this.id = id; + } + + @Override + public boolean validateObject(HostAndPort key, PooledObject<TTransportWrapper> p) { + TTransportWrapper transport = p.getObject(); + if (transport == null) { + LOGGER.error("No transport to validate"); + return false; + } + if (transport.getAddress() != key) { + LOGGER.error("Invalid endpoint {}: does not match {}", transport, key); + return false; + } + return true; + } + + @Override + public TTransportWrapper create(HostAndPort key) throws Exception { + TTransportWrapper transportWrapper = transportFactory.getTransport(key); + LOGGER.debug("[{}] created {}", id, transportWrapper); + return transportWrapper; + } + + @Override + public void destroyObject(HostAndPort key, PooledObject<TTransportWrapper> p) throws Exception { + TTransportWrapper transport = p.getObject(); + if (transport != null) { + LOGGER.debug("[{}] Destroying endpoint {}", id, transport); + try { + transport.close(); + } catch (RuntimeException e) { + LOGGER.error("fail to destroy endpoint {}", transport, e); + } + } + } + + @Override + public PooledObject<TTransportWrapper> wrap(TTransportWrapper value) { + return new DefaultPooledObject<>(value); + } + } + +}
