Repository: sentry Updated Branches: refs/heads/SENTRY-1580 [created] 95d073f06
http://git-wip-us.apache.org/repos/asf/sentry/blob/95d073f0/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/tools/SentryShellHive.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/tools/SentryShellHive.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/tools/SentryShellHive.java index 1d09846..09f17ed 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/tools/SentryShellHive.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/tools/SentryShellHive.java @@ -40,36 +40,39 @@ public class SentryShellHive extends SentryShellCommon { public void run() throws Exception { Command command = null; - SentryPolicyServiceClient client = SentryServiceClientFactory.create(getSentryConf()); - UserGroupInformation ugi = UserGroupInformation.getLoginUser(); - String requestorName = ugi.getShortUserName(); - if (isCreateRole) { - command = new CreateRoleCmd(roleName); - } else if (isDropRole) { - command = new DropRoleCmd(roleName); - } else if (isAddRoleGroup) { - command = new GrantRoleToGroupsCmd(roleName, groupName); - } else if (isDeleteRoleGroup) { - command = new RevokeRoleFromGroupsCmd(roleName, groupName); - } else if (isGrantPrivilegeRole) { - command = new GrantPrivilegeToRoleCmd(roleName, privilegeStr); - } else if (isRevokePrivilegeRole) { - command = new RevokePrivilegeFromRoleCmd(roleName, privilegeStr); - } else if (isListRole) { - command = new ListRolesCmd(groupName); - } else if (isListPrivilege) { - command = new ListPrivilegesCmd(roleName); - } + try(SentryPolicyServiceClient client = + SentryServiceClientFactory.create(getSentryConf())) { + UserGroupInformation ugi = UserGroupInformation.getLoginUser(); + String requestorName = ugi.getShortUserName(); - // check the requestor name - if (StringUtils.isEmpty(requestorName)) { - // The exception message will be recoreded in log file. - throw new Exception("The requestor name is empty."); - } + if (isCreateRole) { + command = new CreateRoleCmd(roleName); + } else if (isDropRole) { + command = new DropRoleCmd(roleName); + } else if (isAddRoleGroup) { + command = new GrantRoleToGroupsCmd(roleName, groupName); + } else if (isDeleteRoleGroup) { + command = new RevokeRoleFromGroupsCmd(roleName, groupName); + } else if (isGrantPrivilegeRole) { + command = new GrantPrivilegeToRoleCmd(roleName, privilegeStr); + } else if (isRevokePrivilegeRole) { + command = new RevokePrivilegeFromRoleCmd(roleName, privilegeStr); + } else if (isListRole) { + command = new ListRolesCmd(groupName); + } else if (isListPrivilege) { + command = new ListPrivilegesCmd(roleName); + } - if (command != null) { - command.execute(client, requestorName); + // check the requestor name + if (StringUtils.isEmpty(requestorName)) { + // The exception message will be recoreded in log file. + throw new Exception("The requestor name is empty."); + } + + if (command != null) { + command.execute(client, requestorName); + } } } http://git-wip-us.apache.org/repos/asf/sentry/blob/95d073f0/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java deleted file mode 100644 index acf9b05..0000000 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java +++ /dev/null @@ -1,294 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sentry.service.thrift; - -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; - -import com.google.common.net.HostAndPort; -import org.apache.commons.pool2.PooledObjectFactory; -import org.apache.commons.pool2.impl.AbandonedConfig; -import org.apache.commons.pool2.impl.GenericObjectPool; -import org.apache.commons.pool2.impl.GenericObjectPoolConfig; -import org.apache.hadoop.conf.Configuration; -import org.apache.sentry.core.common.exception.SentryUserException; -import org.apache.sentry.core.common.transport.SentryClientInvocationHandler; -import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient; -import org.apache.sentry.core.common.utils.ThriftUtil; -import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig; -import org.apache.thrift.transport.TTransportException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * The PoolClientInvocationHandler is a proxy class for handling thrift - * call. For every thrift call, get the instance of - * SentryPolicyServiceBaseClient from the commons-pool, and return the instance - * to the commons-pool after complete the call. For any exception with the call, - * discard the instance and create a new one added to the commons-pool. Then, - * get the instance and do the call again. For the thread safe, the commons-pool - * will manage the connection pool, and every thread can get the connection by - * borrowObject() and return the connection to the pool by returnObject(). - * - * TODO: Current pool model does not manage the opening connections very well, - * e.g. opening connections with failed servers should be closed promptly. - */ - -public class PoolClientInvocationHandler extends SentryClientInvocationHandler { - private static final Logger LOGGER = - LoggerFactory.getLogger(PoolClientInvocationHandler.class); - - private static final String POOL_EXCEPTION_MESSAGE = "Pool exception occurred "; - - private final Configuration conf; - - /** - * The configuration to use for our object pools. - * Null if we are not using object pools. - */ - private final GenericObjectPoolConfig poolConfig; - - /** - * The total number of connection retries to attempt per endpoint. - */ - private final int connectionRetryTotal; - - /** - * The configured sentry servers. - */ - private final Endpoint[] endpoints; - - /** - * The endpoint which we are currently using. This can be read without any locks. - * It must be written while holding the endpoints lock. - */ - private volatile int freshestEndpointIdx = 0; - - private class Endpoint { - /** - * The server address or hostname. - */ - private final String addr; - - /** - * The server port. - */ - private final int port; - - /** - * The server's poolFactory used to create new clients. - */ - private final PooledObjectFactory<SentryPolicyServiceClient> poolFactory; - - /** - * The server's pool of cached clients. - */ - private final GenericObjectPool<SentryPolicyServiceClient> pool; - - Endpoint(String addr, int port) { - this.addr = addr; - this.port = port; - this.poolFactory = new SentryServiceClientPoolFactory(addr, port, conf); - this.pool = new GenericObjectPool<SentryPolicyServiceClient>( - this.poolFactory, poolConfig, new AbandonedConfig()); - } - - GenericObjectPool<SentryPolicyServiceClient> getPool() { - return pool; - } - - String getEndPointStr() { - return new String("endpoint at [address " + addr + ", port " + port + "]"); - } - } - - public PoolClientInvocationHandler(Configuration conf) throws Exception { - this.conf = conf; - - this.poolConfig = new GenericObjectPoolConfig(); - // config the pool size for commons-pool - this.poolConfig.setMaxTotal(conf.getInt(ClientConfig.SENTRY_POOL_MAX_TOTAL, - ClientConfig.SENTRY_POOL_MAX_TOTAL_DEFAULT)); - this.poolConfig.setMinIdle(conf.getInt(ClientConfig.SENTRY_POOL_MIN_IDLE, - ClientConfig.SENTRY_POOL_MIN_IDLE_DEFAULT)); - this.poolConfig.setMaxIdle(conf.getInt(ClientConfig.SENTRY_POOL_MAX_IDLE, - ClientConfig.SENTRY_POOL_MAX_IDLE_DEFAULT)); - - // get the retry number for reconnecting service - this.connectionRetryTotal = conf.getInt(ClientConfig.SENTRY_POOL_RETRY_TOTAL, - ClientConfig.SENTRY_POOL_RETRY_TOTAL_DEFAULT); - - String hostsAndPortsStr = conf.get(ClientConfig.SERVER_RPC_ADDRESS); - if (hostsAndPortsStr == null) { - throw new RuntimeException("Config key " + - ClientConfig.SERVER_RPC_ADDRESS + " is required"); - } - int defaultPort = conf.getInt(ClientConfig.SERVER_RPC_PORT, - ClientConfig.SERVER_RPC_PORT_DEFAULT); - String[] hostsAndPortsStrArr = hostsAndPortsStr.split(","); - HostAndPort[] hostsAndPorts = ThriftUtil.parseHostPortStrings(hostsAndPortsStrArr, defaultPort); - this.endpoints = new Endpoint[hostsAndPorts.length]; - for (int i = 0; i < this.endpoints.length; i++) { - this.endpoints[i] = new Endpoint(hostsAndPorts[i].getHostText(),hostsAndPorts[i].getPort()); - LOGGER.info("Initiate sentry sever endpoint: hostname " + - hostsAndPorts[i].getHostText() + ", port " + hostsAndPorts[i].getPort()); - } - } - - @Override - public Object invokeImpl(Object proxy, Method method, Object[] args) - throws Exception { - int retryCount = 0; - /** - * The maximum number of retries that we will do. Each endpoint gets its - * own set of retries. - */ - int retryLimit = connectionRetryTotal * endpoints.length; - - /** - * The index of the endpoint to use. - */ - int endpointIdx = freshestEndpointIdx; - - /** - * A list of exceptions from each endpoint. This starts as null to avoid - * memory allocation in the common case where there is no error. - */ - Exception exc[] = null; - - Object ret = null; - - while (retryCount < retryLimit) { - GenericObjectPool<SentryPolicyServiceClient> pool = - endpoints[endpointIdx].getPool(); - try { - if ((exc != null) && - (exc[endpointIdx] instanceof TTransportException)) { - // If there was a TTransportException last time we tried to contact - // this endpoint, attempt to create a new connection before we try - // again. - synchronized (endpoints) { - // If there has room, create new instance and add it to the - // commons-pool. This instance will be returned first from the - // commons-pool, because the configuration is LIFO. - if (pool.getNumIdle() + pool.getNumActive() < pool.getMaxTotal()) { - pool.addObject(); - } - } - } - // Try to make the RPC. - ret = invokeFromPool(method, args, pool); - break; - } catch (TTransportException e) { - if (exc == null) { - exc = new Exception[endpoints.length]; - } - exc[endpointIdx] = e; - } - - Exception lastExc = exc[endpointIdx]; - synchronized (endpoints) { - int curFreshestEndpointIdx = freshestEndpointIdx; - if (curFreshestEndpointIdx == endpointIdx) { - curFreshestEndpointIdx = - (curFreshestEndpointIdx + 1) % endpoints.length; - freshestEndpointIdx = curFreshestEndpointIdx; - } - endpointIdx = curFreshestEndpointIdx; - } - // Increase the retry num, and throw the exception if can't retry again. - retryCount++; - if (retryCount == connectionRetryTotal) { - for (int i = 0; i < exc.length; i++) { - // Since freshestEndpointIdx is shared by multiple threads, it is possible that - // the ith endpoint has been tried in another thread and skipped in the current - // thread. - if (exc[i] != null) { - LOGGER.error("Sentry server " + endpoints[i].getEndPointStr() - + " is in unreachable."); - } - } - throw new SentryUserException("Sentry servers are unreachable. " + - "Diagnostics is needed for unreachable servers.", lastExc); - } - } - return ret; - } - - private Object invokeFromPool(Method method, Object[] args, - GenericObjectPool<SentryPolicyServiceClient> pool) throws Exception { - Object result = null; - SentryPolicyServiceClient client; - try { - // get the connection from the pool, don't know if the connection is broken. - client = pool.borrowObject(); - } catch (Exception e) { - LOGGER.debug(POOL_EXCEPTION_MESSAGE, e); - // If the exception is caused by connection problem, throw the TTransportException - // for reconnect. - if (e instanceof IOException) { - throw new TTransportException(e); - } - throw new SentryUserException(e.getMessage(), e); - } - try { - // do the thrift call - result = method.invoke(client, args); - } catch (InvocationTargetException e) { - // Get the target exception, check if SentryUserException or TTransportException is wrapped. - // TTransportException or IOException means there has connection problem with the pool. - Throwable targetException = e.getCause(); - if (targetException instanceof SentryUserException) { - Throwable sentryTargetException = targetException.getCause(); - // If there has connection problem, eg, invalid connection if the service restarted, - // sentryTargetException instanceof TTransportException or IOException = true. - if (sentryTargetException instanceof TTransportException - || sentryTargetException instanceof IOException) { - // If the exception is caused by connection problem, destroy the instance and - // remove it from the commons-pool. Throw the TTransportException for reconnect. - pool.invalidateObject(client); - throw new TTransportException(sentryTargetException); - } - // The exception is thrown by thrift call, eg, SentryAccessDeniedException. - throw (SentryUserException) targetException; - } - throw e; - } finally{ - try { - // return the instance to commons-pool - pool.returnObject(client); - } catch (Exception e) { - LOGGER.error(POOL_EXCEPTION_MESSAGE, e); - throw e; - } - } - return result; - } - - @Override - public void close() { - for (int i = 0; i < endpoints.length; i++) { - try { - endpoints[i].getPool().close(); - } catch (Exception e) { - LOGGER.debug(POOL_EXCEPTION_MESSAGE, e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/sentry/blob/95d073f0/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java index 9beb07b..ec938da 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java @@ -313,7 +313,8 @@ public class SentryService implements Callable, SigUtils.SigListener { hmsFollowerExecutor.scheduleAtFixedRate(hmsFollower, initDelay, period, TimeUnit.MILLISECONDS); } catch (IllegalArgumentException e) { - LOGGER.error(String.format("Could not start HMSFollower due to illegal argument. period is %s ms", period), e); + LOGGER.error(String.format("Could not start HMSFollower due to illegal argument. period is %s ms", + period), e); throw e; } } @@ -381,7 +382,7 @@ public class SentryService implements Callable, SigUtils.SigListener { sentryStoreCleanService.scheduleWithFixedDelay( storeCleaner, 0, storeCleanPeriodSecs, TimeUnit.SECONDS); - LOGGER.info("sentry store cleaner is scheduled with interval %d seconds", storeCleanPeriodSecs); + LOGGER.info("sentry store cleaner is scheduled with interval {} seconds", storeCleanPeriodSecs); } catch(IllegalArgumentException e){ LOGGER.error("Could not start SentryStoreCleaner due to illegal argument", e); http://git-wip-us.apache.org/repos/asf/sentry/blob/95d073f0/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java index 7db9310..f3aa587 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java @@ -19,36 +19,120 @@ package org.apache.sentry.service.thrift; import java.lang.reflect.Proxy; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.sentry.core.common.transport.RetryClientInvocationHandler; import org.apache.sentry.core.common.transport.SentryPolicyClientTransportConfig; +import org.apache.sentry.core.common.transport.SentryTransportFactory; +import org.apache.sentry.core.common.transport.SentryTransportPool; import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient; import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClientDefaultImpl; -import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import javax.annotation.concurrent.ThreadSafe; + +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; +import static org.apache.sentry.core.common.utils.SentryConstants.KERBEROS_MODE; + +@ThreadSafe public final class SentryServiceClientFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(SentryServiceClientFactory.class); + private static final SentryPolicyClientTransportConfig transportConfig = new SentryPolicyClientTransportConfig(); + private final Configuration conf; + private final SentryTransportPool transportPool; - private SentryServiceClientFactory() { - } + private static final AtomicReference<SentryServiceClientFactory> clientFactory = + new AtomicReference<>(); + /** + * Create a client instance. The supplied configuration is only used the first time and + * ignored afterwords. Tests that want to supply different configurations + * should call {@link #factoryReset(SentryServiceClientFactory)} to force new configuration + * read. + * @param conf Configuration + * @return client instance + * @throws Exception + */ public static SentryPolicyServiceClient create(Configuration conf) throws Exception { - boolean pooled = conf.getBoolean( - ClientConfig.SENTRY_POOL_ENABLED, ClientConfig.SENTRY_POOL_ENABLED_DEFAULT); - if (pooled) { - return (SentryPolicyServiceClient) Proxy - .newProxyInstance(SentryPolicyServiceClientDefaultImpl.class.getClassLoader(), - SentryPolicyServiceClientDefaultImpl.class.getInterfaces(), - new PoolClientInvocationHandler(conf)); - } else { - return (SentryPolicyServiceClient) Proxy - .newProxyInstance(SentryPolicyServiceClientDefaultImpl.class.getClassLoader(), - SentryPolicyServiceClientDefaultImpl.class.getInterfaces(), - new RetryClientInvocationHandler(conf, - new SentryPolicyServiceClientDefaultImpl(conf,transportConfig), transportConfig)); + SentryServiceClientFactory factory = clientFactory.get(); + if (factory != null) { + return factory.create(); + } + factory = new SentryServiceClientFactory(conf); + boolean ok = clientFactory.compareAndSet(null, factory); + if (ok) { + return factory.create(); + } + // Close old factory + factory.close(); + return clientFactory.get().create(); + } + + private SentryServiceClientFactory(Configuration conf) { + Configuration clientConf = conf; + + // When kerberos is enabled, UserGroupInformation should have been initialized with + // HADOOP_SECURITY_AUTHENTICATION property. There are instances where this is not done. + // Instead of depending on the callers to update this configuration and to be + // sure that UserGroupInformation is properly initialized, sentry client is explicitly + // doing it. + // + // This whole piece of code is a bit ugly but we want to avoid doing this in the transport + // code during connection establishment, so we are doing it upfront here instead. + boolean useKerberos = transportConfig.isKerberosEnabled(conf); + + if (useKerberos) { + LOGGER.info("Using Kerberos authentication"); + String authMode = conf.get(HADOOP_SECURITY_AUTHENTICATION, ""); + if (authMode != KERBEROS_MODE) { + // Force auth mode to be Kerberos + clientConf = new Configuration(conf); + clientConf.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS_MODE); + } + } + + this.conf = clientConf; + + boolean useUGI = transportConfig.useUserGroupInformation(conf); + + if (useUGI) { + LOGGER.info("Using UserGroupInformation authentication"); + UserGroupInformation.setConfiguration(this.conf); + } + + transportPool = new SentryTransportPool(conf, transportConfig, + new SentryTransportFactory(conf, transportConfig)); + } + + private SentryPolicyServiceClient create() throws Exception { + return (SentryPolicyServiceClient) Proxy + .newProxyInstance(SentryPolicyServiceClientDefaultImpl.class.getClassLoader(), + SentryPolicyServiceClientDefaultImpl.class.getInterfaces(), + new RetryClientInvocationHandler(conf, + new SentryPolicyServiceClientDefaultImpl(conf, transportPool), transportConfig)); + } + + /** + * Reset existing factory and return the old one. + * Only used by tests. + * @param factory new factory to use. May be null. + * @return + */ + public static SentryServiceClientFactory factoryReset(SentryServiceClientFactory factory) { + return clientFactory.getAndSet(factory); + } + + void close() { + try { + transportPool.close(); + } catch (Exception e) { + LOGGER.error("failed to close transport pool", e); } } } http://git-wip-us.apache.org/repos/asf/sentry/blob/95d073f0/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java deleted file mode 100644 index 0164fa6..0000000 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sentry.service.thrift; - -import org.apache.commons.pool2.BasePooledObjectFactory; -import org.apache.commons.pool2.PooledObject; -import org.apache.commons.pool2.impl.DefaultPooledObject; -import org.apache.hadoop.conf.Configuration; -import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient; -import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClientDefaultImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * SentryServiceClientPoolFactory is for connection pool to manage the object. Implement the related - * method to create object, destroy object and wrap object. - */ - -public class SentryServiceClientPoolFactory extends BasePooledObjectFactory<SentryPolicyServiceClient> { - - private static final Logger LOGGER = LoggerFactory.getLogger(SentryServiceClientPoolFactory.class); - - private final String addr; - private final int port; - private final Configuration conf; - - public SentryServiceClientPoolFactory(String addr, int port, - Configuration conf) { - this.addr = addr; - this.port = port; - this.conf = conf; - } - - @Override - public SentryPolicyServiceClient create() throws Exception { - LOGGER.debug("Creating Sentry Service Client..."); - return new SentryPolicyServiceClientDefaultImpl(addr, port, conf); - } - - @Override - public PooledObject<SentryPolicyServiceClient> wrap(SentryPolicyServiceClient client) { - return new DefaultPooledObject<SentryPolicyServiceClient>(client); - } - - @Override - public void destroyObject(PooledObject<SentryPolicyServiceClient> pooledObject) { - SentryPolicyServiceClient client = pooledObject.getObject(); - LOGGER.debug("Destroying Sentry Service Client: " + client); - if (client != null) { - // The close() of TSocket or TSaslClientTransport is called actually, and there has no - // exception even there has some problems, eg, the client is closed already. - // The close here is just try to close the socket and the client will be destroyed soon. - client.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/sentry/blob/95d073f0/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceWithInvalidMsgSize.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceWithInvalidMsgSize.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceWithInvalidMsgSize.java index a4dd8a6..32e67b9 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceWithInvalidMsgSize.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceWithInvalidMsgSize.java @@ -44,6 +44,7 @@ public class TestSentryServiceWithInvalidMsgSize extends SentryServiceIntegratio runTestAsSubject(new TestOperation() { @Override public void runTestAsSubject() throws Exception { + SentryServiceClientFactory oldFactory = SentryServiceClientFactory.factoryReset(null); Configuration confWithSmallMaxMsgSize = new Configuration(conf); confWithSmallMaxMsgSize.setLong(ServiceConstants.ClientConfig.SENTRY_POLICY_CLIENT_THRIFT_MAX_MESSAGE_SIZE, 20); // create a client with a small thrift max message size @@ -63,6 +64,7 @@ public class TestSentryServiceWithInvalidMsgSize extends SentryServiceIntegratio } finally { Assert.assertEquals(true, exceptionThrown); clientWithSmallMaxMsgSize.close(); + SentryServiceClientFactory.factoryReset(oldFactory); } // client can still talk with sentry server when message size is smaller. http://git-wip-us.apache.org/repos/asf/sentry/blob/95d073f0/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java deleted file mode 100644 index a202775..0000000 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sentry.service.thrift; - -import com.google.common.net.HostAndPort; -import org.apache.sentry.core.common.utils.ThriftUtil; -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TestPoolClientInvocationHandler { - private static final Logger LOGGER = - LoggerFactory.getLogger(TestPoolClientInvocationHandler.class); - - private void expectParseHostPortStrings(String hostsAndPortsStr, - String[] expectedHosts, int[] expectedPorts) throws Exception { - boolean success = false; - String[] hostsAndPortsStrArr = hostsAndPortsStr.split(","); - HostAndPort[] hostsAndPorts; - try { - hostsAndPorts = ThriftUtil.parseHostPortStrings(hostsAndPortsStrArr, 8038); - success = true; - } finally { - if (!success) { - LOGGER.error("Caught exception while parsing hosts/ports string " + - hostsAndPortsStr); - } - } - String[] hosts = new String[hostsAndPortsStrArr.length]; - int[] ports = new int[hostsAndPortsStrArr.length]; - parseHostsAndPorts(hostsAndPorts, hosts, ports); - Assert.assertArrayEquals("Got unexpected hosts results while " + - "parsing " + hostsAndPortsStr, expectedHosts, hosts); - Assert.assertArrayEquals("Got unexpected ports results while " + - "parsing " + hostsAndPortsStr, expectedPorts, ports); - } - - private void parseHostsAndPorts(HostAndPort[] hostsAndPorts, String[] hosts, int[] ports) { - for (int i = 0; i < hostsAndPorts.length; i++) { - hosts[i] = hostsAndPorts[i].getHostText(); - ports[i] = hostsAndPorts[i].getPort(); - } - } - - @SuppressWarnings("PMD.AvoidUsingHardCodedIP") - @Test - public void testParseHostPortStrings() throws Exception { - expectParseHostPortStrings("foo", new String[] {"foo"}, new int[] {8038}); - expectParseHostPortStrings("foo,bar", - new String[] {"foo", "bar"}, - new int[] {8038, 8038}); - expectParseHostPortStrings("foo:2020,bar:2021", - new String[] {"foo", "bar"}, - new int[] {2020, 2021}); - expectParseHostPortStrings("127.0.0.1:2020,127.1.0.1", - new String[] {"127.0.0.1", "127.1.0.1"}, - new int[] {2020, 8038}); - expectParseHostPortStrings("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:433", - new String[] {"2001:db8:85a3:8d3:1319:8a2e:370:7348"}, - new int[] {433}); - } -} http://git-wip-us.apache.org/repos/asf/sentry/blob/95d073f0/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/AbstractKafkaSentryTestBase.java ---------------------------------------------------------------------- diff --git a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/AbstractKafkaSentryTestBase.java b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/AbstractKafkaSentryTestBase.java index bead003..7c45999 100644 --- a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/AbstractKafkaSentryTestBase.java +++ b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/AbstractKafkaSentryTestBase.java @@ -28,6 +28,7 @@ import org.apache.sentry.core.model.kafka.KafkaActionConstant; import org.apache.sentry.core.model.kafka.Host; import org.apache.sentry.kafka.conf.KafkaAuthConf; import org.apache.sentry.provider.db.generic.SentryGenericProviderBackend; +import org.apache.sentry.provider.db.generic.UpdatableCache; import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClient; import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientFactory; import org.apache.sentry.provider.db.generic.service.thrift.TAuthorizable; @@ -79,8 +80,12 @@ public class AbstractKafkaSentryTestBase { @BeforeClass public static void beforeTestEndToEnd() throws Exception { + // Stop background update thread + UpdatableCache.disable(); setupConf(); startSentryServer(); + // We started a new server, invalidate all connections to the old one + SentryGenericServiceClientFactory.factoryReset(); setUserGroups(); setAdminPrivilege(); startKafkaServer(); @@ -88,8 +93,10 @@ public class AbstractKafkaSentryTestBase { @AfterClass public static void afterTestEndToEnd() throws Exception { - stopSentryServer(); + // Stop background update thread + UpdatableCache.disable(); stopKafkaServer(); + stopSentryServer(); } private static void stopKafkaServer() { @@ -170,10 +177,8 @@ public class AbstractKafkaSentryTestBase { } public static void setAdminPrivilege() throws Exception { - SentryGenericServiceClient sentryClient = null; - try { - /** grant all privilege to admin user */ - sentryClient = getSentryClient(); + try (SentryGenericServiceClient sentryClient = getSentryClient()){ + // grant all privilege to admin user sentryClient.createRoleIfNotExist(ADMIN_USER, ADMIN_ROLE, COMPONENT); sentryClient.addRoleToGroups(ADMIN_USER, ADMIN_ROLE, COMPONENT, Sets.newHashSet(ADMIN_GROUP)); final ArrayList<TAuthorizable> authorizables = new ArrayList<TAuthorizable>(); @@ -184,14 +189,10 @@ public class AbstractKafkaSentryTestBase { sentryClient.grantPrivilege(ADMIN_USER, ADMIN_ROLE, COMPONENT, new TSentryPrivilege(COMPONENT, "kafka", authorizables, KafkaActionConstant.ALL)); - } finally { - if (sentryClient != null) { - sentryClient.close(); - } } } - protected static SentryGenericServiceClient getSentryClient() throws Exception { + static SentryGenericServiceClient getSentryClient() throws Exception { return SentryGenericServiceClientFactory.create(getClientConfig()); } http://git-wip-us.apache.org/repos/asf/sentry/blob/95d073f0/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAuthorize.java ---------------------------------------------------------------------- diff --git a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAuthorize.java b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAuthorize.java index 0b1ef68..6d2cabf 100644 --- a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAuthorize.java +++ b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAuthorize.java @@ -34,6 +34,7 @@ import org.apache.sentry.core.model.kafka.KafkaActionConstant; import org.apache.sentry.core.model.kafka.Host; import org.apache.sentry.core.model.kafka.Topic; import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClient; +import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientFactory; import org.apache.sentry.provider.db.generic.service.thrift.TAuthorizable; import org.apache.sentry.provider.db.generic.service.thrift.TSentryPrivilege; import org.junit.Assert; @@ -55,6 +56,8 @@ public class TestAuthorize extends AbstractKafkaSentryTestBase { @Test public void testProduceConsumeForSuperuser() { + LOGGER.debug("testProduceConsumeForSuperuser"); + SentryGenericServiceClientFactory.factoryReset(); try { final String SuperuserName = "test"; testProduce(SuperuserName); @@ -66,8 +69,11 @@ public class TestAuthorize extends AbstractKafkaSentryTestBase { @Test public void testProduceConsumeCycle() throws Exception { + LOGGER.debug("testProduceConsumeCycle"); final String localhost = InetAddress.getLocalHost().getHostAddress(); + // SentryGenericServiceClientFactory.factoryReset(); + // START TESTING PRODUCER try { testProduce("user1"); http://git-wip-us.apache.org/repos/asf/sentry/blob/95d073f0/sentry-tests/sentry-tests-sqoop/src/test/java/org/apache/sentry/tests/e2e/sqoop/AbstractSqoopSentryTestBase.java ---------------------------------------------------------------------- diff --git a/sentry-tests/sentry-tests-sqoop/src/test/java/org/apache/sentry/tests/e2e/sqoop/AbstractSqoopSentryTestBase.java b/sentry-tests/sentry-tests-sqoop/src/test/java/org/apache/sentry/tests/e2e/sqoop/AbstractSqoopSentryTestBase.java index 8a01e1c..80f158a 100644 --- a/sentry-tests/sentry-tests-sqoop/src/test/java/org/apache/sentry/tests/e2e/sqoop/AbstractSqoopSentryTestBase.java +++ b/sentry-tests/sentry-tests-sqoop/src/test/java/org/apache/sentry/tests/e2e/sqoop/AbstractSqoopSentryTestBase.java @@ -197,19 +197,14 @@ public class AbstractSqoopSentryTestBase { } public static void setAdminPrivilege() throws Exception { - SentryGenericServiceClient sentryClient = null; - try { - /** grant all privilege to admin user */ - sentryClient = SentryGenericServiceClientFactory.create(getClientConfig()); + try (SentryGenericServiceClient sentryClient = + SentryGenericServiceClientFactory.create(getClientConfig())){ + // grant all privilege to admin user sentryClient.createRoleIfNotExist(ADMIN_USER, ADMIN_ROLE, COMPONENT); sentryClient.addRoleToGroups(ADMIN_USER, ADMIN_ROLE, COMPONENT, Sets.newHashSet(ADMIN_GROUP)); sentryClient.grantPrivilege(ADMIN_USER, ADMIN_ROLE, COMPONENT, new TSentryPrivilege(COMPONENT, SQOOP_SERVER_NAME, new ArrayList<TAuthorizable>(), SqoopActionConstant.ALL)); - } finally { - if (sentryClient != null) { - sentryClient.close(); - } } }
