Repository: incubator-sentry Updated Branches: refs/heads/master 0693dfb21 -> 36aae7ad7
SENTRY-658: Connection leak in Hive biding with Sentry service (Prasad Mujumdar, reviewed by Lenni Kuff) Project: http://git-wip-us.apache.org/repos/asf/incubator-sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-sentry/commit/36aae7ad Tree: http://git-wip-us.apache.org/repos/asf/incubator-sentry/tree/36aae7ad Diff: http://git-wip-us.apache.org/repos/asf/incubator-sentry/diff/36aae7ad Branch: refs/heads/master Commit: 36aae7ad78456a950a4ec65d00c958ec446d9708 Parents: 0693dfb Author: Prasad Mujumdar <pras...@apache.org> Authored: Tue Feb 24 08:44:04 2015 -0800 Committer: Prasad Mujumdar <pras...@apache.org> Committed: Tue Feb 24 08:44:04 2015 -0800 ---------------------------------------------------------------------- .../hive/ql/exec/SentryGrantRevokeTask.java | 3 + .../binding/hive/HiveAuthzBindingHook.java | 14 +- .../binding/hive/authz/HiveAuthzBinding.java | 49 +------ .../metastore/SentryMetaStoreFilterHook.java | 10 ++ .../sentry/service/thrift/SentryService.java | 19 +++ .../tests/e2e/dbprovider/TestDbConnections.java | 144 +++++++++++++++++++ .../tests/e2e/minisentry/InternalSentrySrv.java | 109 ++++++++++++++ .../sentry/tests/e2e/minisentry/SentrySrv.java | 21 +++ 8 files changed, 317 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/36aae7ad/sentry-binding/sentry-binding-hive/src/main/java/org/apache/hadoop/hive/ql/exec/SentryGrantRevokeTask.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/hadoop/hive/ql/exec/SentryGrantRevokeTask.java b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/hadoop/hive/ql/exec/SentryGrantRevokeTask.java index 0cbb250..2a60a23 100644 --- a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/hadoop/hive/ql/exec/SentryGrantRevokeTask.java +++ b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/hadoop/hive/ql/exec/SentryGrantRevokeTask.java @@ -179,6 +179,9 @@ public class SentryGrantRevokeTask extends Task<DDLWork> implements Serializable if (sentryClient != null) { sentryClient.close(); } + if (hiveAuthzBinding != null) { + hiveAuthzBinding.close(); + } } } http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/36aae7ad/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/hive/HiveAuthzBindingHook.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/hive/HiveAuthzBindingHook.java b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/hive/HiveAuthzBindingHook.java index e311398..48afa08 100644 --- a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/hive/HiveAuthzBindingHook.java +++ b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/hive/HiveAuthzBindingHook.java @@ -330,11 +330,11 @@ public class HiveAuthzBindingHook extends AbstractSemanticAnalyzerHook { } } - if (stmtAuthObject == null) { - // We don't handle authorizing this statement - return; - } try { + if (stmtAuthObject == null) { + // We don't handle authorizing this statement + return; + } authorizeWithHiveBindings(context, stmtAuthObject, stmtOperation); } catch (AuthorizationException e) { executeOnFailureHooks(context, stmtOperation, e); @@ -346,13 +346,15 @@ public class HiveAuthzBindingHook extends AbstractSemanticAnalyzerHook { String msg = HiveAuthzConf.HIVE_SENTRY_PRIVILEGE_ERROR_MESSAGE + "\n Required privileges for this query: " + permsRequired; throw new SemanticException(msg, e); + } finally { + hiveAuthzBinding.close(); } + if ("true".equalsIgnoreCase(context.getConf(). get(HiveAuthzConf.HIVE_SENTRY_MOCK_COMPILATION))) { throw new SemanticException(HiveAuthzConf.HIVE_SENTRY_MOCK_ERROR + " Mock query compilation aborted. Set " + HiveAuthzConf.HIVE_SENTRY_MOCK_COMPILATION + " to 'false' for normal query processing"); } - hiveAuthzBinding.set(context.getConf()); } private void executeOnFailureHooks(HiveSemanticAnalyzerHookContext context, @@ -513,8 +515,6 @@ public class HiveAuthzBindingHook extends AbstractSemanticAnalyzerHook { // validate permission hiveAuthzBinding.authorize(stmtOperation, stmtAuthObject, getCurrentSubject(context), inputHierarchy, outputHierarchy); - - hiveAuthzBinding.set(context.getConf()); } private boolean isUDF(ReadEntity readEntity) { http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/36aae7ad/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/hive/authz/HiveAuthzBinding.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/hive/authz/HiveAuthzBinding.java b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/hive/authz/HiveAuthzBinding.java index b4b69e1..3071475 100644 --- a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/hive/authz/HiveAuthzBinding.java +++ b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/hive/authz/HiveAuthzBinding.java @@ -57,8 +57,6 @@ import com.google.common.collect.Sets; public class HiveAuthzBinding { private static final Logger LOG = LoggerFactory .getLogger(HiveAuthzBinding.class); - private static final Map<String, HiveAuthzBinding> authzBindingMap = - new ConcurrentHashMap<String, HiveAuthzBinding>(); private static final AtomicInteger queryID = new AtomicInteger(); private static final Splitter ROLE_SET_SPLITTER = Splitter.on(",").trimResults() .omitEmptyStrings(); @@ -129,49 +127,6 @@ public class HiveAuthzBinding { } } - /** - * Retrieve the HiveAuthzBinding if the tag is saved in the given configuration - * @param conf - * @return HiveAuthzBinding or null - */ - public static HiveAuthzBinding get(Configuration conf) { - String tagName = conf.get(HIVE_BINDING_TAG); - if (tagName == null) { - return null; - } else { - return authzBindingMap.get(tagName); - } - } - - /** - * store the HiveAuthzBinding in the authzBindingMap and save a tag in the given configuration - * @param conf - */ - public void set (Configuration conf) { - if (!open) { - throw new IllegalStateException("Binding has been closed"); - } - String tagName = SessionState.get().getSessionId() + "_" + queryID.incrementAndGet(); - authzBindingMap.put(tagName, this); - conf.set(HIVE_BINDING_TAG, tagName); - } - - /** - * remove the authzBindingMap entry for given tag - * @param conf - */ - public void clear(Configuration conf) { - if (!open) { - throw new IllegalStateException("Binding has been closed"); - } - String tagName = conf.get(HIVE_BINDING_TAG); - if (tagName != null) { - authzBindingMap.remove(tagName); - } - open = false; - authProvider.close(); - } - private void validateHiveConfig(HiveHook hiveHook, HiveConf hiveConf, HiveAuthzConf authzConf) throws InvalidConfigurationException{ if(hiveHook.equals(HiveHook.HiveMetaStore)) { @@ -393,4 +348,8 @@ public class HiveAuthzBinding { } return authProvider.getLastFailedPrivileges(); } + + public void close() { + authProvider.close(); + } } http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/36aae7ad/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentryMetaStoreFilterHook.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentryMetaStoreFilterHook.java b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentryMetaStoreFilterHook.java index 5e26e83..2ae4fbd 100644 --- a/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentryMetaStoreFilterHook.java +++ b/sentry-binding/sentry-binding-hive/src/main/java/org/apache/sentry/binding/metastore/SentryMetaStoreFilterHook.java @@ -133,6 +133,8 @@ public class SentryMetaStoreFilterHook implements MetaStoreFilterHook { } catch (Exception e) { LOG.warn("Error getting DB list ", e); return new ArrayList<String>(); + } finally { + close(); } } @@ -150,6 +152,8 @@ public class SentryMetaStoreFilterHook implements MetaStoreFilterHook { } catch (Exception e) { LOG.warn("Error getting Table list ", e); return new ArrayList<String>(); + } finally { + close(); } } @@ -193,4 +197,10 @@ public class SentryMetaStoreFilterHook implements MetaStoreFilterHook { return SessionState.get().getConf(); } + private void close() { + if (hiveAuthzBinding != null) { + hiveAuthzBinding.close(); + hiveAuthzBinding = null; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/36aae7ad/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java index 0b4b39a..d48fe5b 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java @@ -56,6 +56,7 @@ import org.apache.thrift.TMultiplexedProcessor; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.server.TServer; +import org.apache.thrift.server.TServerEventHandler; import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TSaslServerTransport; import org.apache.thrift.transport.TServerSocket; @@ -399,4 +400,22 @@ public class SentryService implements Callable { public Configuration getConf() { return conf; } + + /** + * Add Thrift event handler to underlying thrift threadpool server + * @param eventHandler + */ + public void setThriftEventHandler(TServerEventHandler eventHandler) throws IllegalStateException { + if (thriftServer == null) { + throw new IllegalStateException("Server is not initialized or stopped"); + } + thriftServer.setServerEventHandler(eventHandler); + } + + public TServerEventHandler getThriftEventHandler() throws IllegalStateException { + if (thriftServer == null) { + throw new IllegalStateException("Server is not initialized or stopped"); + } + return thriftServer.getEventHandler(); + } } http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/36aae7ad/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestDbConnections.java ---------------------------------------------------------------------- diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestDbConnections.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestDbConnections.java new file mode 100644 index 0000000..7024263 --- /dev/null +++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestDbConnections.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.tests.e2e.dbprovider; + +import static org.junit.Assert.*; + +import java.io.File; +import java.io.FileOutputStream; +import java.sql.Connection; +import java.sql.Statement; + +import org.apache.sentry.provider.db.SentryAccessDeniedException; +import org.apache.sentry.provider.db.SentryAlreadyExistsException; +import org.apache.sentry.provider.file.PolicyFile; +import org.apache.sentry.tests.e2e.hive.AbstractTestWithStaticConfiguration; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.io.Resources; + +public class TestDbConnections extends AbstractTestWithStaticConfiguration { + private PolicyFile policyFile; + + @BeforeClass + public static void setupTestStaticConfiguration() throws Exception { + useSentryService = true; + AbstractTestWithStaticConfiguration.setupTestStaticConfiguration(); + } + + @Override + @Before + public void setup() throws Exception { + super.setupAdmin(); + super.setup(); + policyFile = PolicyFile.setAdminOnServer1(ADMINGROUP); + } + + /** + * Currently the hive binding opens a new server connection for each + * statement. This test verifies that the client connection is closed properly + * at the end. Test Queries, DDLs, Auth DDLs and metdata filtering (eg show + * tables/databases) + * @throws Exception + */ + @Test + public void testClientConnections() throws Exception { + String roleName = "connectionTest"; + long preConnectionClientId; + Connection connection = context.createConnection(ADMIN1); + Statement statement = context.createStatement(connection); + + assertEquals(0, getSentrySrv().getNumActiveClients()); + + preConnectionClientId = getSentrySrv().getTotalClients(); + statement.execute("DROP DATABASE IF EXISTS DB_1 CASCADE"); + statement.execute("CREATE DATABASE DB_1"); + statement.execute("USE DB_1"); + assertTrue(preConnectionClientId < getSentrySrv().getTotalClients()); + assertEquals(0, getSentrySrv().getNumActiveClients()); + + // client connection is closed after DDLs + preConnectionClientId = getSentrySrv().getTotalClients(); + statement.execute("CREATE TABLE t1 (c1 string)"); + assertTrue(preConnectionClientId < getSentrySrv().getTotalClients()); + assertEquals(0, getSentrySrv().getNumActiveClients()); + + // client connection is closed after queries + preConnectionClientId = getSentrySrv().getTotalClients(); + statement.execute("SELECT * FROM t1"); + assertTrue(preConnectionClientId < getSentrySrv().getTotalClients()); + assertEquals(0, getSentrySrv().getNumActiveClients()); + + preConnectionClientId = getSentrySrv().getTotalClients(); + statement.execute("DROP TABLE t1"); + assertTrue(preConnectionClientId < getSentrySrv().getTotalClients()); + assertEquals(0, getSentrySrv().getNumActiveClients()); + + // client connection is closed after auth DDL + preConnectionClientId = getSentrySrv().getTotalClients(); + statement.execute("CREATE ROLE " + roleName); + assertEquals(0, getSentrySrv().getNumActiveClients()); + assertTrue(preConnectionClientId < getSentrySrv().getTotalClients()); + context.assertSentryException(statement, "CREATE ROLE " + roleName, + SentryAlreadyExistsException.class.getSimpleName()); + assertEquals(0, getSentrySrv().getNumActiveClients()); + statement.execute("DROP ROLE " + roleName); + assertEquals(0, getSentrySrv().getNumActiveClients()); + + // client invocation via metastore filter + preConnectionClientId = getSentrySrv().getTotalClients(); + statement.executeQuery("show tables"); + assertTrue(preConnectionClientId < getSentrySrv().getTotalClients()); + assertEquals(0, getSentrySrv().getNumActiveClients()); + + statement.close(); + connection.close(); + assertEquals(0, getSentrySrv().getNumActiveClients()); + + connection = context.createConnection(USER1_1); + statement = context.createStatement(connection); + assertEquals(0, getSentrySrv().getNumActiveClients()); + + // verify client connection is closed after statement auth error + preConnectionClientId = getSentrySrv().getTotalClients(); + context.assertAuthzException(statement, "USE DB_1"); + assertTrue(preConnectionClientId < getSentrySrv().getTotalClients()); + assertEquals(0, getSentrySrv().getNumActiveClients()); + + // verify client connection is closed after auth DDL error + preConnectionClientId = getSentrySrv().getTotalClients(); + context.assertSentryException(statement, "CREATE ROLE " + roleName, + SentryAccessDeniedException.class.getSimpleName()); + assertTrue(preConnectionClientId < getSentrySrv().getTotalClients()); + assertEquals(0, getSentrySrv().getNumActiveClients()); + + // client invocation via metastore filter + preConnectionClientId = getSentrySrv().getTotalClients(); + statement.executeQuery("show databases"); + assertTrue(preConnectionClientId < getSentrySrv().getTotalClients()); + assertEquals(0, getSentrySrv().getNumActiveClients()); + + statement.close(); + connection.close(); + + assertEquals(0, getSentrySrv().getNumActiveClients()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/36aae7ad/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/minisentry/InternalSentrySrv.java ---------------------------------------------------------------------- diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/minisentry/InternalSentrySrv.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/minisentry/InternalSentrySrv.java index 68bc9ee..603aa38 100644 --- a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/minisentry/InternalSentrySrv.java +++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/minisentry/InternalSentrySrv.java @@ -20,13 +20,20 @@ package org.apache.sentry.tests.e2e.minisentry; import java.io.IOException; import java.util.List; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import org.apache.curator.test.TestingServer; import org.apache.hadoop.conf.Configuration; +import org.apache.sentry.provider.db.service.thrift.SentryProcessorWrapper; import org.apache.sentry.service.thrift.SentryService; import org.apache.sentry.service.thrift.SentryServiceFactory; import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig; import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.server.ServerContext; +import org.apache.thrift.server.TServerEventHandler; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,6 +41,71 @@ import com.google.common.collect.Lists; public class InternalSentrySrv implements SentrySrv { + public static class SentryServerContext implements ServerContext { + private long contextId; + + public SentryServerContext(long contextId) { + this.contextId = contextId; + } + + public long getContextId() { + return contextId; + } + } + + /** + * Thrift even handler class to track client connections to Sentry service + */ + public static class SentryThriftEvenHandler implements TServerEventHandler { + // unique id for each client connection. We could see multiple simultaneous + // client connections, some make it thread safe. + private AtomicLong clientId = new AtomicLong(); + // Lists of clientId currently connected + private List<Long> clientList = Lists.newArrayList(); + + /** + * Thrift callback when a new client is connecting + */ + @Override + public ServerContext createContext(TProtocol inputProto, + TProtocol outputProto) { + clientList.add(clientId.incrementAndGet()); + LOGGER.info("Client Connected: " + clientId.get()); + return new SentryServerContext(clientId.get()); + } + + /** + * Thrift callback when a client is disconnecting + */ + @Override + public void deleteContext(ServerContext arg0, TProtocol arg1, TProtocol arg2) { + clientList.remove(((SentryServerContext) arg0).getContextId()); + LOGGER.info("Client Disonnected: " + + ((SentryServerContext) arg0).getContextId()); + } + + @Override + public void preServe() { + } + + @Override + public void processContext(ServerContext arg0, TTransport arg1, + TTransport arg2) { + } + + public long getClientCount() { + return clientList.size(); + } + + public List<Long> getClienList() { + return clientList; + } + + public long getClientId() { + return clientId.get(); + } + } + private List<SentryService> sentryServers = Lists.newArrayList(); private static TestingServer zkServer; // created only if in case of HA private static final Logger LOGGER = LoggerFactory @@ -91,6 +163,7 @@ public class InternalSentrySrv implements SentrySrv { throw new TimeoutException("Server did not start after 60 seconds"); } } + sentryServer.setThriftEventHandler(new SentryThriftEvenHandler()); } @Override @@ -161,4 +234,40 @@ public class InternalSentrySrv implements SentrySrv { return zkServer != null; } + @Override + public long getNumActiveClients(int serverNum) { + SentryThriftEvenHandler thriftHandler = (SentryThriftEvenHandler) get( + serverNum).getThriftEventHandler(); + LOGGER.warn("Total clients: " + thriftHandler.getClientId()); + for (Long clientId: thriftHandler.getClienList()) { + LOGGER.warn("Got clients: " + clientId); + } + return thriftHandler.getClientCount(); + } + + @Override + public long getNumActiveClients() { + long numClients = 0; + for (int sentryServerNum = 0; sentryServerNum < sentryServers.size(); sentryServerNum++) { + numClients += getNumActiveClients(sentryServerNum); + } + return numClients; + + } + + @Override + public long getTotalClients() { + long totalClients = 0; + for (int sentryServerNum = 0; sentryServerNum < sentryServers.size(); sentryServerNum++) { + totalClients += getTotalClients(sentryServerNum); + } + return totalClients; + } + + @Override + public long getTotalClients(int serverNum) { + SentryThriftEvenHandler thriftHandler = (SentryThriftEvenHandler) get( + serverNum).getThriftEventHandler(); + return thriftHandler.getClientId(); + } } http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/36aae7ad/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/minisentry/SentrySrv.java ---------------------------------------------------------------------- diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/minisentry/SentrySrv.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/minisentry/SentrySrv.java index e9ae5fa..b8cf894 100644 --- a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/minisentry/SentrySrv.java +++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/minisentry/SentrySrv.java @@ -76,4 +76,25 @@ public interface SentrySrv { * @return True - HA is enabled False - HA is not enabled */ public boolean isHaEnabled(); + + /** + * Get the number of active clients connections across servers + */ + public long getNumActiveClients(); + + /** + * Get the number of active clients connections for the given server + */ + public long getNumActiveClients(int serverNum); + + /** + * Get the total number of clients connected so far + */ + public long getTotalClients(); + + /** + * Get the total number of clients connected so far + */ + public long getTotalClients(int serverNum); + }