Repository: sentry Updated Branches: refs/heads/sentry-ha-redesign 2d5ed9984 -> 908072d66
http://git-wip-us.apache.org/repos/asf/sentry/blob/908072d6/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java index a35bf1d..353d461 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/PoolClientInvocationHandler.java @@ -20,11 +20,13 @@ package org.apache.sentry.service.thrift; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.pool2.PooledObjectFactory; import org.apache.commons.pool2.impl.AbandonedConfig; import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.apache.hadoop.conf.Configuration; +import org.apache.sentry.core.common.exception.SentryStandbyException; import org.apache.sentry.core.common.exception.SentryUserException; import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient; import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig; @@ -33,63 +35,273 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * The PoolClientInvocationHandler is a proxy class for handling thrift call. For every thrift call, - * get the instance of SentryPolicyServiceBaseClient from the commons-pool, and return the instance - * to the commons-pool after complete the call. For any exception with the call, discard the - * instance and create a new one added to the commons-pool. Then, get the instance and do the call - * again. For the thread safe, the commons-pool will manage the connection pool, and every thread - * can get the connection by borrowObject() and return the connection to the pool by returnObject(). + * The PoolClientInvocationHandler is a proxy class for handling thrift + * call. For every thrift call, get the instance of + * SentryPolicyServiceBaseClient from the commons-pool, and return the instance + * to the commons-pool after complete the call. For any exception with the call, + * discard the instance and create a new one added to the commons-pool. Then, + * get the instance and do the call again. For the thread safe, the commons-pool + * will manage the connection pool, and every thread can get the connection by + * borrowObject() and return the connection to the pool by returnObject(). */ public class PoolClientInvocationHandler extends SentryClientInvocationHandler { + private static final Logger LOGGER = + LoggerFactory.getLogger(PoolClientInvocationHandler.class); - private static final Logger LOGGER = LoggerFactory.getLogger(PoolClientInvocationHandler.class); + private static final String POOL_EXCEPTION_MESSAGE = "Pool exception occurred "; private final Configuration conf; - private PooledObjectFactory<SentryPolicyServiceClient> poolFactory; - private GenericObjectPool<SentryPolicyServiceClient> pool; - private GenericObjectPoolConfig poolConfig; - private int connectionRetryTotal; - private static final String POOL_EXCEPTION_MESSAGE = "Pool exception occured "; + /** + * The configuration to use for our object pools. + * Null if we are not using object pools. + */ + private final GenericObjectPoolConfig poolConfig; + + /** + * The total number of connection retries to attempt per endpoint. + */ + private final int connectionRetryTotal; + + /** + * The configured sentry servers. + */ + private final Endpoint[] endpoints; + + /** + * The endpoint which we are currently using. This can be read without any locks. + * It must be written while holding the endpoints lock. + */ + private volatile int freshestEndpointIdx = 0; + + private class Endpoint { + /** + * The server address or hostname. + */ + private final String addr; + + /** + * The server port. + */ + private final int port; + + /** + * The server's poolFactory used to create new clients. + */ + private final PooledObjectFactory<SentryPolicyServiceClient> poolFactory; + + /** + * The server's pool of cached clients. + */ + private final GenericObjectPool<SentryPolicyServiceClient> pool; + + Endpoint(String addr, int port) { + this.addr = addr; + this.port = port; + this.poolFactory = new SentryServiceClientPoolFactory(addr, port, conf); + this.pool = new GenericObjectPool<SentryPolicyServiceClient>( + this.poolFactory, poolConfig, new AbandonedConfig()); + } + + GenericObjectPool<SentryPolicyServiceClient> getPool() { + return pool; + } + + String getEndPointStr() { + return new String("endpoint at [address " + addr + ", port " + port + "]"); + } + } public PoolClientInvocationHandler(Configuration conf) throws Exception { this.conf = conf; - readConfiguration(); - poolFactory = new SentryServiceClientPoolFactory(conf); - pool = new GenericObjectPool<SentryPolicyServiceClient>(poolFactory, poolConfig, new AbandonedConfig()); + + this.poolConfig = new GenericObjectPoolConfig(); + // config the pool size for commons-pool + this.poolConfig.setMaxTotal(conf.getInt(ClientConfig.SENTRY_POOL_MAX_TOTAL, + ClientConfig.SENTRY_POOL_MAX_TOTAL_DEFAULT)); + this.poolConfig.setMinIdle(conf.getInt(ClientConfig.SENTRY_POOL_MIN_IDLE, + ClientConfig.SENTRY_POOL_MIN_IDLE_DEFAULT)); + this.poolConfig.setMaxIdle(conf.getInt(ClientConfig.SENTRY_POOL_MAX_IDLE, + ClientConfig.SENTRY_POOL_MAX_IDLE_DEFAULT)); + + // get the retry number for reconnecting service + this.connectionRetryTotal = conf.getInt(ClientConfig.SENTRY_POOL_RETRY_TOTAL, + ClientConfig.SENTRY_POOL_RETRY_TOTAL_DEFAULT); + + String hostsAndPortsStr = conf.get(ClientConfig.SERVER_RPC_ADDRESS); + if (hostsAndPortsStr == null) { + throw new RuntimeException("Config key " + + ClientConfig.SERVER_RPC_ADDRESS + " is required"); + } + int defaultPort = conf.getInt(ClientConfig.SERVER_RPC_PORT, + ClientConfig.SERVER_RPC_PORT_DEFAULT); + String[] hostsAndPorts = hostsAndPortsStr.split(","); + String[] hosts = new String[hostsAndPorts.length]; + int[] ports = new int[hostsAndPorts.length]; + parseHostPortStrings(hostsAndPortsStr, hostsAndPorts, hosts, + ports, defaultPort); + this.endpoints = new Endpoint[hostsAndPorts.length]; + for (int i = 0; i < this.endpoints.length; i++) { + this.endpoints[i] = new Endpoint(hosts[i], ports[i]); + LOGGER.info("Initiate sentry sever endpoint: hostname " + hosts[i] + ", port " + ports[i]); + } + } + + @VisibleForTesting + /** + * Utility function for parsing host and port strings. Expected form should be + * (host:port). The hostname could be in ipv6 style. Port number can be empty + * and will be default to defaultPort. + */ + static protected void parseHostPortStrings(String hostsAndPortsStr, + String[] hostsAndPorts, String[] hosts, int[] ports, + int defaultPort) { + int i = -1; + for (String hostAndPort: hostsAndPorts) { + i++; + hostAndPort = hostAndPort.trim(); + if (hostAndPort.isEmpty()) { + throw new RuntimeException("Cannot handle empty server RPC address " + + "in component " + (i + 1) + " of " + hostsAndPortsStr); + } + int colonIdx = hostAndPort.lastIndexOf(":"); + if (colonIdx == -1) { + // There is no colon in the host+port string. + // That means the port is left unspecified, and should be set to + // the default. + hosts[i] = hostAndPort; + ports[i] = defaultPort; + continue; + } + int rightBracketIdx = hostAndPort.indexOf(']', colonIdx); + if (rightBracketIdx != -1) { + // If there is a right bracket that occurs after the colon, the + // colon we found is part of an ipv6 address like this: + // [::1]. That means we only have the host part, not the port part. + hosts[i] = hostAndPort.substring(0, rightBracketIdx); + ports[i] = defaultPort; + continue; + } + // We have a host:port string, where the part after colon should be + // the port. + hosts[i] = hostAndPort.substring(0, colonIdx); + String portStr = hostAndPort.substring(colonIdx+1); + try { + ports[i] = Integer.valueOf(portStr); + } catch (NumberFormatException e) { + throw new RuntimeException("Cannot parse port string " + portStr + + "in component " + (i + 1) + " of " + hostsAndPortsStr); + } + if ((ports[i] < 0) || (ports[i] > 65535)) { + throw new RuntimeException("Invalid port number given for " + portStr + + "in component " + (i + 1) + " of " + hostsAndPortsStr); + } + } + // Strip the brackets off of hostnames and ip addresses enclosed in square + // brackets. This is to support ipv6-style [address]:port addressing. + for (int j = 0; j < hosts.length; j++) { + if ((hosts[j].startsWith("[")) && (hosts[j].endsWith("]"))) { + hosts[j] = hosts[j].substring(1, hosts[j].length() - 1); + } + } } @Override - public Object invokeImpl(Object proxy, Method method, Object[] args) throws Exception { + public Object invokeImpl(Object proxy, Method method, Object[] args) + throws Exception { int retryCount = 0; - Object result = null; - while (retryCount < connectionRetryTotal) { + /** + * The maximum number of retries that we will do. Each endpoint gets its + * own set of retries. + */ + int retryLimit = connectionRetryTotal * endpoints.length; + + /** + * The index of the endpoint to use. + */ + int endpointIdx = freshestEndpointIdx; + + /** + * A list of exceptions from each endpoint. This starts as null to avoid + * memory allocation in the common case where there is no error. + */ + Exception exc[] = null; + + Object ret = null; + + while (retryCount < retryLimit) { + GenericObjectPool<SentryPolicyServiceClient> pool = + endpoints[endpointIdx].getPool(); try { - // The wapper here is for the retry of thrift call, the default retry number is 3. - result = invokeFromPool(method, args); + if ((exc != null) && + (exc[endpointIdx] instanceof TTransportException)) { + // If there was a TTransportException last time we tried to contact + // this endpoint, attempt to create a new connection before we try + // again. + synchronized (endpoints) { + // If there has room, create new instance and add it to the + // commons-pool. This instance will be returned first from the + // commons-pool, because the configuration is LIFO. + if (pool.getNumIdle() + pool.getNumActive() < pool.getMaxTotal()) { + pool.addObject(); + } + } + } + // Try to make the RPC. + ret = invokeFromPool(method, args, pool); break; - } catch (TTransportException e) { - // TTransportException means there has connection problem, create a new connection and try - // again. Get the lock of pool and add new connection. - synchronized (pool) { - // If there has room, create new instance and add it to the commons-pool, this instance - // will be back first from the commons-pool because the configuration is LIFO. - if (pool.getNumIdle() + pool.getNumActive() < pool.getMaxTotal()) { - pool.addObject(); + } catch (SentryStandbyException | TTransportException e) { + if (exc == null) { + exc = new Exception[endpoints.length]; + } + exc[endpointIdx] = e; + } + + Exception lastExc = exc[endpointIdx]; + synchronized (endpoints) { + int curFreshestEndpointIdx = freshestEndpointIdx; + if (curFreshestEndpointIdx == endpointIdx) { + curFreshestEndpointIdx = + (curFreshestEndpointIdx + 1) % endpoints.length; + freshestEndpointIdx = curFreshestEndpointIdx; + } + endpointIdx = curFreshestEndpointIdx; + } + // Increase the retry num, and throw the exception if can't retry again. + retryCount++; + if (retryCount == connectionRetryTotal) { + boolean allStandby = true, allUnreachable = true; + for (int i = 0; i < exc.length; i++) { + if (exc[i] instanceof SentryStandbyException) { + allUnreachable = false; + LOGGER.error("Sentry server " + endpoints[endpointIdx].getEndPointStr() + + " is in standby mode"); + } else { + allStandby = false; + LOGGER.error("Sentry server " + endpoints[endpointIdx].getEndPointStr() + + " is in unreachable."); } } - // Increase the retry num, and throw the exception if can't retry again. - retryCount++; - if (retryCount == connectionRetryTotal) { - throw new SentryUserException(e.getMessage(), e); + if (allStandby) { + throw new SentryStandbyException("All sentry servers are in " + + "standby mode.", lastExc); + } else if (allUnreachable) { + throw new SentryUserException("All sentry servers are unreachable. " + + "Diagnostics is needed for unreachable servers.", + lastExc); + } else { + throw new SentryUserException("All reachable servers are standby. " + + "Diagnostics is needed for unreachable servers.", + lastExc); } } } - return result; + return ret; } - private Object invokeFromPool(Method method, Object[] args) throws Exception { + private Object invokeFromPool(Method method, Object[] args, + GenericObjectPool<SentryPolicyServiceClient> pool) throws Exception { Object result = null; SentryPolicyServiceClient client; try { @@ -106,7 +318,9 @@ public class PoolClientInvocationHandler extends SentryClientInvocationHandler { // Get the target exception, check if SentryUserException or TTransportException is wrapped. // TTransportException means there has connection problem with the pool. Throwable targetException = e.getCause(); - if (targetException instanceof SentryUserException) { + if (targetException instanceof SentryStandbyException) { + throw (SentryStandbyException)targetException; + } else if (targetException instanceof SentryUserException) { Throwable sentryTargetException = targetException.getCause(); // If there has connection problem, eg, invalid connection if the service restarted, // sentryTargetException instanceof TTransportException = true. @@ -134,21 +348,12 @@ public class PoolClientInvocationHandler extends SentryClientInvocationHandler { @Override public void close() { - try { - pool.close(); - } catch (Exception e) { - LOGGER.debug(POOL_EXCEPTION_MESSAGE, e); + for (int i = 0; i < endpoints.length; i++) { + try { + endpoints[i].getPool().close(); + } catch (Exception e) { + LOGGER.debug(POOL_EXCEPTION_MESSAGE, e); + } } } - - private void readConfiguration() { - poolConfig = new GenericObjectPoolConfig(); - // config the pool size for commons-pool - poolConfig.setMaxTotal(conf.getInt(ClientConfig.SENTRY_POOL_MAX_TOTAL, ClientConfig.SENTRY_POOL_MAX_TOTAL_DEFAULT)); - poolConfig.setMinIdle(conf.getInt(ClientConfig.SENTRY_POOL_MIN_IDLE, ClientConfig.SENTRY_POOL_MIN_IDLE_DEFAULT)); - poolConfig.setMaxIdle(conf.getInt(ClientConfig.SENTRY_POOL_MAX_IDLE, ClientConfig.SENTRY_POOL_MAX_IDLE_DEFAULT)); - // get the retry number for reconnecting service - connectionRetryTotal = conf.getInt(ClientConfig.SENTRY_POOL_RETRY_TOTAL, - ClientConfig.SENTRY_POOL_RETRY_TOTAL_DEFAULT); - } } http://git-wip-us.apache.org/repos/asf/sentry/blob/908072d6/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java index 56d774b..9e90af8 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientFactory.java @@ -24,7 +24,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient; import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClientDefaultImpl; -import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig; public final class SentryServiceClientFactory { @@ -32,15 +31,9 @@ public final class SentryServiceClientFactory { } public static SentryPolicyServiceClient create(Configuration conf) throws Exception { - boolean pooled = conf.getBoolean(ClientConfig.SENTRY_POOL_ENABLED, false); - if (pooled) { return (SentryPolicyServiceClient) Proxy .newProxyInstance(SentryPolicyServiceClientDefaultImpl.class.getClassLoader(), - SentryPolicyServiceClientDefaultImpl.class.getInterfaces(), - new PoolClientInvocationHandler(conf)); - } else { - return new SentryPolicyServiceClientDefaultImpl(conf); - } + SentryPolicyServiceClientDefaultImpl.class.getInterfaces(), + new PoolClientInvocationHandler(conf)); } - } http://git-wip-us.apache.org/repos/asf/sentry/blob/908072d6/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java index afea78a..0164fa6 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryServiceClientPoolFactory.java @@ -36,16 +36,21 @@ public class SentryServiceClientPoolFactory extends BasePooledObjectFactory<Sent private static final Logger LOGGER = LoggerFactory.getLogger(SentryServiceClientPoolFactory.class); - private Configuration conf; + private final String addr; + private final int port; + private final Configuration conf; - public SentryServiceClientPoolFactory(Configuration conf) { + public SentryServiceClientPoolFactory(String addr, int port, + Configuration conf) { + this.addr = addr; + this.port = port; this.conf = conf; } @Override public SentryPolicyServiceClient create() throws Exception { LOGGER.debug("Creating Sentry Service Client..."); - return new SentryPolicyServiceClientDefaultImpl(conf); + return new SentryPolicyServiceClientDefaultImpl(addr, port, conf); } @Override http://git-wip-us.apache.org/repos/asf/sentry/blob/908072d6/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceFailureCase.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceFailureCase.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceFailureCase.java index 51bba31..d1ac447 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceFailureCase.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceFailureCase.java @@ -58,10 +58,13 @@ public class TestSentryServiceFailureCase extends SentryServiceIntegrationBase { public void testClientServerConnectionFailure() throws Exception { try { connectToSentryService(); + String requestorUserName = ADMIN_USER; + client.listRoles(requestorUserName); Assert.fail("Failed to receive Exception"); } catch(Exception e) { LOGGER.info("Excepted exception", e); - Throwable cause = e.getCause(); + // peer callback exception is nested inside SentryUserException. + Throwable cause = e.getCause().getCause(); if (cause == null) { throw e; } http://git-wip-us.apache.org/repos/asf/sentry/blob/908072d6/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceWithInvalidMsgSize.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceWithInvalidMsgSize.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceWithInvalidMsgSize.java index 15eab15..a4dd8a6 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceWithInvalidMsgSize.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/thrift/TestSentryServiceWithInvalidMsgSize.java @@ -104,7 +104,7 @@ public class TestSentryServiceWithInvalidMsgSize extends SentryServiceIntegratio client.grantServerPrivilege(ADMIN_USER, ROLE_NAME, "server", false); } catch (SentryUserException e) { exceptionThrown = true; - Assert.assertTrue(e.getMessage().contains("org.apache.thrift.transport.TTransportException")); + Assert.assertTrue(e.getCause().getMessage().contains("org.apache.thrift.transport.TTransportException")); } finally { Assert.assertEquals(true, exceptionThrown); } http://git-wip-us.apache.org/repos/asf/sentry/blob/908072d6/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java new file mode 100644 index 0000000..d601b1e --- /dev/null +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHMSFollower.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.service.thrift; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hive.hcatalog.messaging.HCatEventMessage; +import org.apache.sentry.binding.metastore.messaging.json.SentryJSONMessageFactory; +import org.apache.sentry.provider.db.service.persistent.SentryStore; +import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.List; + +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class TestHMSFollower { + SentryJSONMessageFactory messageFactory = new SentryJSONMessageFactory(); + SentryStore sentryStore = Mockito.mock(SentryStore.class); + final static String hiveInstance = "server2"; + + @Test + public void testCreateDatabase() throws Exception { + String dbName = "db1"; + + // Create notification events + NotificationEvent notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.CREATE_DATABASE.toString(), + messageFactory.buildCreateDatabaseMessage(new Database(dbName, null, "hdfs://db1", null)).toString()); + List<NotificationEvent> events = new ArrayList<>(); + events.add(notificationEvent); + + Configuration configuration = new Configuration(); + HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance); + hmsFollower.processNotificationEvents(events); + + TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); + authorizable.setServer(hiveInstance); + authorizable.setDb("db1"); + + verify(sentryStore, times(1)).dropPrivilege(authorizable); + } + @Test + public void testDropDatabase() throws Exception { + String dbName = "db1"; + + // Create notification events + NotificationEvent notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.DROP_DATABASE.toString(), + messageFactory.buildDropDatabaseMessage(new Database(dbName, null, "hdfs://db1", null)).toString()); + List<NotificationEvent> events = new ArrayList<>(); + events.add(notificationEvent); + + Configuration configuration = new Configuration(); + HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance); + hmsFollower.processNotificationEvents(events); + + TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); + authorizable.setServer(hiveInstance); + authorizable.setDb("db1"); + + verify(sentryStore, times(1)).dropPrivilege(authorizable); + } + @Test + public void testCreateTable() throws Exception { + String dbName = "db1"; + String tableName = "table1"; + + // Create notification events + StorageDescriptor sd = new StorageDescriptor(); + sd.setLocation("hdfs://db1.db/table1"); + NotificationEvent notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.CREATE_TABLE.toString(), + messageFactory.buildCreateTableMessage(new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null)).toString()); + List<NotificationEvent> events = new ArrayList<>(); + events.add(notificationEvent); + + Configuration configuration = new Configuration(); + HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance); + hmsFollower.processNotificationEvents(events); + + TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); + authorizable.setServer(hiveInstance); + authorizable.setDb("db1"); + authorizable.setTable(tableName); + + verify(sentryStore, times(1)).dropPrivilege(authorizable); + } + @Test + public void testDropTable() throws Exception { + String dbName = "db1"; + String tableName = "table1"; + + // Create notification events + StorageDescriptor sd = new StorageDescriptor(); + sd.setLocation("hdfs://db1.db/table1"); + NotificationEvent notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.DROP_TABLE.toString(), + messageFactory.buildDropTableMessage(new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null)).toString()); + List<NotificationEvent> events = new ArrayList<>(); + events.add(notificationEvent); + + Configuration configuration = new Configuration(); + HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance); + hmsFollower.processNotificationEvents(events); + + TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); + authorizable.setServer(hiveInstance); + authorizable.setDb("db1"); + authorizable.setTable(tableName); + + verify(sentryStore, times(1)).dropPrivilege(authorizable); + } + @Test + public void testRenameTable() throws Exception { + String dbName = "db1"; + String tableName = "table1"; + + String newDbName = "db1"; + String newTableName = "table2"; + + // Create notification events + StorageDescriptor sd = new StorageDescriptor(); + sd.setLocation("hdfs://db1.db/table1"); + NotificationEvent notificationEvent = new NotificationEvent(1, 0, HCatEventMessage.EventType.ALTER_TABLE.toString(), + messageFactory.buildAlterTableMessage( + new Table(tableName, dbName, null, 0, 0, 0, sd, null, null, null, null, null), + new Table(newTableName, newDbName, null, 0, 0, 0, sd, null, null, null, null, null)).toString()); + notificationEvent.setDbName(newDbName); + notificationEvent.setTableName(newTableName); + List<NotificationEvent> events = new ArrayList<>(); + events.add(notificationEvent); + + Configuration configuration = new Configuration(); + HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, hiveInstance); + hmsFollower.processNotificationEvents(events); + + TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); + authorizable.setServer(hiveInstance); + authorizable.setDb(dbName); + authorizable.setTable(tableName); + + TSentryAuthorizable newAuthorizable = new TSentryAuthorizable(hiveInstance); + authorizable.setServer(hiveInstance); + newAuthorizable.setDb(newDbName); + newAuthorizable.setTable(newTableName); + + verify(sentryStore, times(1)).renamePrivilege(authorizable, newAuthorizable); + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/908072d6/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java new file mode 100644 index 0000000..5b0e12b --- /dev/null +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestPoolClientInvocationHandler.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.service.thrift; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestPoolClientInvocationHandler { + private static final Logger LOGGER = + LoggerFactory.getLogger(TestPoolClientInvocationHandler.class); + + private void expectParseHostPortStrings(String hostsAndPortsStr, + String[] expectedHosts, int[] expectedPorts) throws Exception { + boolean success = false; + String[] hostsAndPorts = hostsAndPortsStr.split(","); + String[] hosts = new String[hostsAndPorts.length]; + int[] ports = new int[hostsAndPorts.length]; + try { + PoolClientInvocationHandler.parseHostPortStrings(hostsAndPortsStr, + hostsAndPorts, hosts, ports, 8038); + success = true; + } finally { + if (!success) { + LOGGER.error("Caught exception while parsing hosts/ports string " + + hostsAndPortsStr); + } + } + Assert.assertArrayEquals("Got unexpected hosts results while " + + "parsing " + hostsAndPortsStr, expectedHosts, hosts); + Assert.assertArrayEquals("Got unexpected ports results while " + + "parsing " + hostsAndPortsStr, expectedPorts, ports); + } + + @SuppressWarnings("PMD.AvoidUsingHardCodedIP") + @Test + public void testParseHostPortStrings() throws Exception { + expectParseHostPortStrings("foo", new String[] {"foo"}, new int[] {8038}); + expectParseHostPortStrings("foo,bar", + new String[] {"foo", "bar"}, + new int[] {8038, 8038}); + expectParseHostPortStrings("foo:2020,bar:2021", + new String[] {"foo", "bar"}, + new int[] {2020, 2021}); + expectParseHostPortStrings("127.0.0.1:2020,127.1.0.1", + new String[] {"127.0.0.1", "127.1.0.1"}, + new int[] {2020, 8038}); + expectParseHostPortStrings("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:433", + new String[] {"2001:db8:85a3:8d3:1319:8a2e:370:7348"}, + new int[] {433}); + } +}
