Repository: incubator-sentry Updated Branches: refs/heads/master 67031139f -> 5a827f6db
SENTRY-906: Add concurrency sentry client tests. (Anne Yu, reviewed by Hao Hao) (to run it: -Dsentry.scaletest.oncluster=true, -Dsentry.host=${SENTRY_HOST}, -Dhive.server2.thrift.bind.host=${HS2_HOST}) Project: http://git-wip-us.apache.org/repos/asf/incubator-sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-sentry/commit/5a827f6d Tree: http://git-wip-us.apache.org/repos/asf/incubator-sentry/tree/5a827f6d Diff: http://git-wip-us.apache.org/repos/asf/incubator-sentry/diff/5a827f6d Branch: refs/heads/master Commit: 5a827f6db54b1c0d3e310e98c1a35298c23ec114 Parents: 6703113 Author: Anne Yu <ann...@cloudera.com> Authored: Fri Jan 15 14:49:12 2016 -0800 Committer: Anne Yu <ann...@cloudera.com> Committed: Fri Jan 15 14:53:30 2016 -0800 ---------------------------------------------------------------------- sentry-tests/sentry-tests-hive/pom.xml | 1 + .../e2e/dbprovider/TestConcurrentClients.java | 344 +++++++++++++++++++ .../AbstractTestWithStaticConfiguration.java | 86 ++++- 3 files changed, 430 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/5a827f6d/sentry-tests/sentry-tests-hive/pom.xml ---------------------------------------------------------------------- diff --git a/sentry-tests/sentry-tests-hive/pom.xml b/sentry-tests/sentry-tests-hive/pom.xml index 98e4752..472cce7 100644 --- a/sentry-tests/sentry-tests-hive/pom.xml +++ b/sentry-tests/sentry-tests-hive/pom.xml @@ -462,6 +462,7 @@ limitations under the License. <include>**/TestDbPrivilegesAtColumnScope.java</include> <include>**/TestColumnEndToEnd.java</include> <include>**/TestDbComplexView.java</include> + <include>**/TestConcurrentClients</include> </includes> <argLine>-Dsentry.e2etest.hiveServer2Type=UnmanagedHiveServer2 -Dsentry.e2etest.DFSType=ClusterDFS -Dsentry.e2etest.external.sentry=true</argLine> </configuration> http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/5a827f6d/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestConcurrentClients.java ---------------------------------------------------------------------- diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestConcurrentClients.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestConcurrentClients.java new file mode 100644 index 0000000..d926797 --- /dev/null +++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestConcurrentClients.java @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.tests.e2e.dbprovider; + +import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient; +import org.apache.sentry.provider.file.PolicyFile; +import org.apache.sentry.tests.e2e.hive.AbstractTestWithStaticConfiguration; + +import org.apache.sentry.tests.e2e.hive.StaticUserGroup; +import static org.junit.Assume.assumeTrue; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.lang.RandomStringUtils; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertEquals; + +/** + * The test class implements concurrency tests to test: + * Sentry client, HS2 jdbc client etc. + */ +public class TestConcurrentClients extends AbstractTestWithStaticConfiguration { + private static final Logger LOGGER = LoggerFactory + .getLogger(TestConcurrentClients.class); + + private PolicyFile policyFile; + + // define scale for tests + private final int NUM_OF_TABLES = Integer.parseInt(System.getProperty( + "sentry.e2e.concurrency.test.tables-per-db", "1")); + private final int NUM_OF_PAR = Integer.parseInt(System.getProperty( + "sentry.e2e.concurrency.test.partitions-per-tb", "3")); + private final int NUM_OF_THREADS = Integer.parseInt(System.getProperty( + "sentry.e2e.concurrency.test.threads", "30")); + private final int NUM_OF_TASKS = Integer.parseInt(System.getProperty( + "sentry.e2e.concurrency.test.tasks", "100")); + private final Long HS2_CLIENT_TEST_DURATION_MS = Long.parseLong(System.getProperty( + "sentry.e2e.concurrency.test.hs2client.test.time.ms", "10000")); //millis + private final Long SENTRY_CLIENT_TEST_DURATION_MS = Long.parseLong(System.getProperty( + "sentry.e2e.concurrency.test.sentryclient.test.time.ms", "10000")); //millis + + private static Map<String, String> privileges = new HashMap<String, String>(); + static { + privileges.put("all_db1", "server=server1->db=" + DB1 + "->action=all"); + } + + @Override + @Before + public void setup() throws Exception { + super.setupAdmin(); + policyFile = PolicyFile.setAdminOnServer1(ADMINGROUP) + .setUserGroupMapping(StaticUserGroup.getStaticMapping()); + writePolicyFile(policyFile); + } + + @BeforeClass + public static void setupTestStaticConfiguration() throws Exception { + assumeTrue(Boolean.parseBoolean(System.getProperty("sentry.scaletest.oncluster", "false"))); + useSentryService = true; // configure sentry client + clientKerberos = true; // need to get client configuration from testing environments + AbstractTestWithStaticConfiguration.setupTestStaticConfiguration(); + } + + static String randomString( int len ){ + return RandomStringUtils.random(len, true, false); + } + + private void execStmt(Statement stmt, String sql) throws Exception { + LOGGER.info("Running [" + sql + "]"); + stmt.execute(sql); + } + + private void createDbTb(String user, String db, String tb) throws Exception{ + Connection connection = context.createConnection(user); + Statement statement = context.createStatement(connection); + try { + execStmt(statement, "DROP DATABASE IF EXISTS " + db + " CASCADE"); + execStmt(statement, "CREATE DATABASE " + db); + execStmt(statement, "USE " + db); + for (int i = 0; i < NUM_OF_TABLES; i++) { + String tbName = tb + "_" + Integer.toString(i); + execStmt(statement, "CREATE TABLE " + tbName + " (a string) PARTITIONED BY (b string)"); + } + } catch (Exception ex) { + LOGGER.error("caught exception: " + ex); + } finally { + statement.close(); + connection.close(); + } + } + + private void createPartition(String user, String db, String tb) throws Exception{ + Connection connection = context.createConnection(user); + Statement statement = context.createStatement(connection); + try { + execStmt(statement, "USE " + db); + for (int j = 0; j < NUM_OF_TABLES; j++) { + String tbName = tb + "_" + Integer.toString(j); + for (int i = 0; i < NUM_OF_PAR; i++) { + String randStr = randomString(4); + String sql = "ALTER TABLE " + tbName + " ADD IF NOT EXISTS PARTITION (b = '" + randStr + "') "; + LOGGER.info("[" + i + "] " + sql); + execStmt(statement, sql); + } + } + } catch (Exception ex) { + LOGGER.error("caught exception: " + ex); + } finally { + statement.close(); + connection.close(); + } + } + + private void adminCreateRole(String roleName) throws Exception { + Connection connection = context.createConnection(ADMIN1); + Statement stmt = context.createStatement(connection); + try { + execStmt(stmt, "DROP ROLE " + roleName); + } catch (Exception ex) { + LOGGER.warn("Role does not exist " + roleName); + } finally { + try { + execStmt(stmt, "CREATE ROLE " + roleName); + } catch (Exception ex) { + LOGGER.error("caught exception when create new role: " + ex); + } finally { + stmt.close(); + connection.close(); + } + } + } + + private void adminCleanUp(String db, String roleName) throws Exception { + Connection connection = context.createConnection(ADMIN1); + Statement stmt = context.createStatement(connection); + try { + execStmt(stmt, "DROP DATABASE IF EXISTS " + db + " CASCADE"); + execStmt(stmt, "DROP ROLE " + roleName); + } catch (Exception ex) { + LOGGER.warn("Failed to clean up ", ex); + } finally { + stmt.close(); + connection.close(); + } + } + + private void adminShowRole(String roleName) throws Exception { + Connection connection = context.createConnection(ADMIN1); + Statement stmt = context.createStatement(connection); + boolean found = false; + try { + ResultSet rs = stmt.executeQuery("SHOW ROLES "); + while (rs.next()) { + if (rs.getString("role").equalsIgnoreCase(roleName)) { + LOGGER.info("Found role " + roleName); + found = true; + } + } + } catch (Exception ex) { + LOGGER.error("caught exception when show roles: " + ex); + } finally { + stmt.close(); + connection.close(); + } + assertTrue("failed to detect " + roleName, found); + } + + private void adminGrant(String test_db, String test_tb, + String roleName, String group) throws Exception { + Connection connection = context.createConnection(ADMIN1); + Statement stmt = context.createStatement(connection); + try { + execStmt(stmt, "USE " + test_db); + for (int i = 0; i < NUM_OF_TABLES; i++) { + String tbName = test_tb + "_" + Integer.toString(i); + execStmt(stmt, "GRANT ALL ON TABLE " + tbName + " TO ROLE " + roleName); + } + execStmt(stmt, "GRANT ROLE " + roleName + " TO GROUP " + group); + } catch (Exception ex) { + LOGGER.error("caught exception when grant permission and role: " + ex); + } finally { + stmt.close(); + connection.close(); + } + } + + /** + * A synchronized state class to track concurrency test status from each thread + */ + private final static class TestRuntimeState { + private int numSuccess = 0; + private boolean failed = false; + private Throwable firstException = null; + + public synchronized void setFirstException(Throwable e) { + failed = true; + if (firstException == null) { + firstException = e; + } + } + public synchronized void setNumSuccess() { + numSuccess += 1; + } + public synchronized int getNumSuccess() { + return numSuccess; + } + public synchronized Throwable getFirstException() { + return firstException; + } + public synchronized boolean isFailed() { + return failed; + } + } + + /** + * Test when concurrent HS2 clients talking to server, + * Privileges are correctly created and updated. + * @throws Exception + */ + @Test + public void testConccurentHS2Client() throws Exception { + ExecutorService executor = Executors.newFixedThreadPool(NUM_OF_THREADS); + final TestRuntimeState state = new TestRuntimeState(); + + for (int i = 0; i < NUM_OF_TASKS; i ++) { + executor.execute(new Runnable() { + @Override + public void run() { + LOGGER.info("Starting tests: create role, show role, create db and tbl, and create partitions"); + if (state.failed) return; + try { + Long startTime = System.currentTimeMillis(); + Long elapsedTime = 0L; + while (Long.compare(elapsedTime, HS2_CLIENT_TEST_DURATION_MS) <= 0) { + String randStr = randomString(5); + String test_role = "test_role_" + randStr; + String test_db = "test_db_" + randStr; + String test_tb = "test_tb_" + randStr; + LOGGER.info("Start to test sentry with hs2 client with role " + test_role); + adminCreateRole(test_role); + adminShowRole(test_role); + createDbTb(ADMIN1, test_db, test_tb); + adminGrant(test_db, test_tb, test_role, USERGROUP1); + createPartition(USER1_1, test_db, test_tb); + adminCleanUp(test_db, test_role); + elapsedTime = System.currentTimeMillis() - startTime; + LOGGER.info("elapsedTime = " + elapsedTime); + } + state.setNumSuccess(); + } catch (Exception e) { + LOGGER.error("Exception: " + e); + state.setFirstException(e); + } + } + }); + } + executor.shutdown(); + while (!executor.isTerminated()) { + Thread.sleep(1000); //millisecond + } + Throwable ex = state.getFirstException(); + assertFalse( ex == null ? "Test failed" : ex.toString(), state.failed); + assertEquals(NUM_OF_TASKS, state.getNumSuccess()); + } + + /** + * Test when concurrent sentry clients talking to sentry server, threads data are synchronized + * @throws Exception + */ + @Test + public void testConcurrentSentryClient() throws Exception { + final String HIVE_KEYTAB_PATH = + System.getProperty("sentry.e2etest.hive.policyOwnerKeytab"); + final SentryPolicyServiceClient client = getSentryClient("hive", HIVE_KEYTAB_PATH); + ExecutorService executor = Executors.newFixedThreadPool(NUM_OF_THREADS); + + final TestRuntimeState state = new TestRuntimeState(); + for (int i = 0; i < NUM_OF_TASKS; i ++) { + LOGGER.info("Start to test sentry client with task id [" + i + "]"); + executor.execute(new Runnable() { + @Override + public void run() { + if (state.failed) { + LOGGER.error("found one failed state, abort test from here."); + return; + } + try { + String randStr = randomString(5); + String test_role = "test_role_" + randStr; + LOGGER.info("Start to test role: " + test_role); + Long startTime = System.currentTimeMillis(); + Long elapsedTime = 0L; + while (Long.compare(elapsedTime, SENTRY_CLIENT_TEST_DURATION_MS) <= 0) { + LOGGER.info("Test role " + test_role + " runs " + elapsedTime + " ms."); + client.createRole(ADMIN1, test_role); + client.listRoles(ADMIN1); + client.grantServerPrivilege(ADMIN1, test_role, "server1", false); + client.listAllPrivilegesByRoleName(ADMIN1, test_role); + client.dropRole(ADMIN1, test_role); + elapsedTime = System.currentTimeMillis() - startTime; + } + state.setNumSuccess(); + } catch (Exception e) { + LOGGER.error("Sentry Client Testing Exception: ", e); + state.setFirstException(e); + } + } + }); + } + executor.shutdown(); + while (!executor.isTerminated()) { + Thread.sleep(1000); //millisecond + } + Throwable ex = state.getFirstException(); + assertFalse( ex == null ? "Test failed" : ex.toString(), state.failed); + assertEquals(NUM_OF_TASKS, state.getNumSuccess()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/5a827f6d/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/AbstractTestWithStaticConfiguration.java ---------------------------------------------------------------------- diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/AbstractTestWithStaticConfiguration.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/AbstractTestWithStaticConfiguration.java index dc8c1eb..614856f 100644 --- a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/AbstractTestWithStaticConfiguration.java +++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/AbstractTestWithStaticConfiguration.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; @@ -32,7 +33,9 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.HashSet; +import com.google.common.collect.Sets; import junit.framework.Assert; import org.apache.commons.io.FileUtils; @@ -51,6 +54,7 @@ import org.apache.sentry.policy.db.DBModelAuthorizables; import org.apache.sentry.provider.db.SimpleDBProviderBackend; import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient; import org.apache.sentry.provider.file.PolicyFile; +import org.apache.sentry.service.thrift.KerberosConfiguration; import org.apache.sentry.service.thrift.SentryServiceClientFactory; import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig; import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig; @@ -72,6 +76,10 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.Maps; import com.google.common.io.Files; +import javax.security.auth.Subject; +import javax.security.auth.kerberos.KerberosPrincipal; +import javax.security.auth.login.LoginContext; + public abstract class AbstractTestWithStaticConfiguration { private static final Logger LOGGER = LoggerFactory .getLogger(AbstractTestWithStaticConfiguration.class); @@ -137,6 +145,38 @@ public abstract class AbstractTestWithStaticConfiguration { protected static Context context; protected final String semanticException = "SemanticException No valid privileges"; + protected static boolean clientKerberos = false; + protected static String REALM = System.getProperty("sentry.service.realm", "EXAMPLE.COM"); + protected static final String SERVER_KERBEROS_NAME = "sentry/" + SERVER_HOST + "@" + REALM; + protected static final String SERVER_KEY_TAB = System.getProperty("sentry.service.server.keytab"); + + private static LoginContext clientLoginContext; + protected static SentryPolicyServiceClient client; + + /** + * Get sentry client with authenticated Subject + * (its security-related attributes(for example, kerberos principal and key) + * @param clientShortName + * @param clientKeyTabDir + * @return client's Subject + */ + public static Subject getClientSubject(String clientShortName, String clientKeyTabDir) { + String clientKerberosPrincipal = clientShortName + "@" + REALM; + File clientKeyTabFile = new File(clientKeyTabDir); + Subject clientSubject = new Subject(false, Sets.newHashSet( + new KerberosPrincipal(clientKerberosPrincipal)), new HashSet<Object>(), + new HashSet<Object>()); + try { + clientLoginContext = new LoginContext("", clientSubject, null, + KerberosConfiguration.createClientConfig(clientKerberosPrincipal, clientKeyTabFile)); + clientLoginContext.login(); + } catch (Exception ex) { + LOGGER.error("Exception: " + ex); + } + clientSubject = clientLoginContext.getSubject(); + return clientSubject; + } + public static void createContext() throws Exception { context = new Context(hiveServer, fileSystem, baseDir, confDir, dataDir, policyFileLocation); @@ -445,6 +485,51 @@ public abstract class AbstractTestWithStaticConfiguration { return SentryServiceClientFactory.create(sentryServer.get(0).getConf()); } + /** + * Get Sentry authorized client to communicate with sentry server, + * the client can be for a minicluster, real distributed cluster, + * sentry server can use policy file or it's a service. + * @param clientShortName: principal prefix string + * @param clientKeyTabDir: authorization key path + * @return sentry client to talk to sentry server + * @throws Exception + */ + public static SentryPolicyServiceClient getSentryClient(String clientShortName, + String clientKeyTabDir) throws Exception { + if (!useSentryService) { + LOGGER.info("Running on a minicluser env."); + return getSentryClient(); + } + + if (clientKerberos) { + if (sentryConf == null ) { + sentryConf = new Configuration(false); + } + final String SENTRY_HOST = System.getProperty("sentry.host", SERVER_HOST); + final String SERVER_KERBEROS_PRINCIPAL = "sentry/" + SENTRY_HOST + "@" + REALM; + sentryConf.set(ServerConfig.PRINCIPAL, SERVER_KERBEROS_PRINCIPAL); + sentryConf.set(ServerConfig.KEY_TAB, SERVER_KEY_TAB); + sentryConf.set(ServerConfig.ALLOW_CONNECT, "hive"); + sentryConf.set(ServerConfig.SECURITY_USE_UGI_TRANSPORT, "false"); + sentryConf.set(ClientConfig.SERVER_RPC_ADDRESS, + System.getProperty("sentry.service.server.rpc.address")); + sentryConf.set(ClientConfig.SERVER_RPC_PORT, + System.getProperty("sentry.service.server.rpc.port", "8038")); + sentryConf.set(ClientConfig.SERVER_RPC_CONN_TIMEOUT, "720000"); //millis + Subject clientSubject = getClientSubject(clientShortName, clientKeyTabDir); + client = Subject.doAs(clientSubject, + new PrivilegedExceptionAction<SentryPolicyServiceClient>() { + @Override + public SentryPolicyServiceClient run() throws Exception { + return SentryServiceClientFactory.create(sentryConf); + } + }); + } else { + client = getSentryClient(); + } + return client; + } + @Before public void setup() throws Exception{ LOGGER.info("AbstractTestStaticConfiguration setup"); @@ -627,5 +712,4 @@ public abstract class AbstractTestWithStaticConfiguration { LOGGER.info("Running [" + sql + "]"); stmt.execute(sql); } - }