Repository: incubator-sentry Updated Branches: refs/heads/sentry-hdfs-plugin c059d3d76 -> 0152e3a3d
SENTRY-432: Synchronization of HDFS permissions with Sentry permissions: More restart tests ( Arun Suresh via Sravya Tirukkovalur) Project: http://git-wip-us.apache.org/repos/asf/incubator-sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-sentry/commit/0152e3a3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-sentry/tree/0152e3a3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-sentry/diff/0152e3a3 Branch: refs/heads/sentry-hdfs-plugin Commit: 0152e3a3daf34ce04de0ad39ffc3a6f857370205 Parents: c059d3d Author: Sravya Tirukkovalur <[email protected]> Authored: Fri Oct 17 16:10:05 2014 -0700 Committer: Sravya Tirukkovalur <[email protected]> Committed: Fri Oct 17 16:10:39 2014 -0700 ---------------------------------------------------------------------- pom.xml | 2 +- sentry-dist/src/main/assembly/sentry-hdfs.xml | 14 + .../sentry/hdfs/SentryAuthorizationInfo.java | 10 +- .../hdfs/SentryAuthorizationProvider.java | 8 +- .../org/apache/sentry/hdfs/SentryUpdater.java | 1 + .../provider/db/SimpleDBProviderBackend.java | 26 +- sentry-tests/sentry-tests-hive/pom.xml | 2 + .../tests/e2e/hdfs/TestHDFSIntegration.java | 497 +++++++++++++------ 8 files changed, 409 insertions(+), 151 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0152e3a3/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 952d702..d901718 100644 --- a/pom.xml +++ b/pom.xml @@ -432,9 +432,9 @@ limitations under the License. <module>sentry-provider</module> <module>sentry-policy</module> <module>sentry-tests</module> - <module>sentry-dist</module> <module>sentry-service-client</module> <module>sentry-hdfs</module> + <module>sentry-dist</module> </modules> <build> http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0152e3a3/sentry-dist/src/main/assembly/sentry-hdfs.xml ---------------------------------------------------------------------- diff --git a/sentry-dist/src/main/assembly/sentry-hdfs.xml b/sentry-dist/src/main/assembly/sentry-hdfs.xml index 8d85d8f..22ced14 100644 --- a/sentry-dist/src/main/assembly/sentry-hdfs.xml +++ b/sentry-dist/src/main/assembly/sentry-hdfs.xml @@ -30,6 +30,20 @@ <baseDirectory>sentry-hdfs-${project.version}</baseDirectory> + <dependencySets> + <dependencySet> + <outputDirectory>/</outputDirectory> + <unpack>false</unpack> + <useProjectArtifact>false</useProjectArtifact> + <useStrictFiltering>true</useStrictFiltering> + <useTransitiveFiltering>false</useTransitiveFiltering> + <includes> + <include>org.apache.thrift:libthrift</include> + <include>org.apache.thrift:libfb303</include> + </includes> + </dependencySet> + </dependencySets> + <fileSets> <fileSet> <directory>${project.parent.basedir}/sentry-hdfs/sentry-hdfs-dist/target</directory> http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0152e3a3/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryAuthorizationInfo.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryAuthorizationInfo.java b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryAuthorizationInfo.java index 23e06dd..3081ae1 100644 --- a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryAuthorizationInfo.java +++ b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryAuthorizationInfo.java @@ -112,9 +112,13 @@ public class SentryAuthorizationInfo implements Runnable { lock.writeLock().lock(); try { authzPaths = newAuthzPaths; - LOG.warn("##### FULL Updated paths seq Num [" + authzPaths.getLastUpdatedSeqNum() + "]"); + if (LOG.isDebugEnabled()) { + LOG.debug("FULL Updated paths seq Num [" + authzPaths.getLastUpdatedSeqNum() + "]"); + } authzPermissions = newAuthzPerms; - LOG.warn("##### FULL Updated perms seq Num [" + authzPermissions.getLastUpdatedSeqNum() + "]"); + if (LOG.isDebugEnabled()) { + LOG.debug("FULL Updated perms seq Num [" + authzPermissions.getLastUpdatedSeqNum() + "]"); + } } finally { lock.writeLock().unlock(); } @@ -192,7 +196,7 @@ public class SentryAuthorizationInfo implements Runnable { public boolean isStale() { long now = System.currentTimeMillis(); boolean stale = now - lastUpdate > staleThresholdMillisec; - if (stale && now - lastStaleReport > 30 * 1000) { + if (stale && now - lastStaleReport > retryWaitMillisec) { LOG.warn("Authorization information has been stale for [{}]s", (now - lastUpdate) / 1000); lastStaleReport = now; http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0152e3a3/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryAuthorizationProvider.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryAuthorizationProvider.java b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryAuthorizationProvider.java index 3edd5fa..cfd5862 100644 --- a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryAuthorizationProvider.java +++ b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryAuthorizationProvider.java @@ -309,7 +309,7 @@ public class SentryAuthorizationProvider String user = defaultAuthzProvider.getUser(node, snapshotId); String group = defaultAuthzProvider.getGroup(node, snapshotId); INodeAuthorizationInfo pNode = node.getParent(); - while (group == null || pNode != null) { + while (group == null && pNode != null) { group = defaultAuthzProvider.getGroup(pNode, snapshotId); pNode = pNode.getParent(); } @@ -334,9 +334,9 @@ public class SentryAuthorizationProvider if (LOG.isDebugEnabled()) { LOG.debug("### getAclEntry [" + (p == null ? "null" : p) + "] : [" + "isManaged=" + isManaged - + ",isStale=" + isStale - + ",hasAuthzObj=" + hasAuthzObj - + ",origAtuhzAsAcl=" + originalAuthzAsAcl + "]" + + ", isStale=" + isStale + + ", hasAuthzObj=" + hasAuthzObj + + ", origAuthzAsAcl=" + originalAuthzAsAcl + "]" + "[" + (f == null ? "null" : f.getEntries()) + "]"); } return f; http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0152e3a3/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryUpdater.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryUpdater.java b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryUpdater.java index 905553e..9540397 100644 --- a/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryUpdater.java +++ b/sentry-hdfs/sentry-hdfs-namenode-plugin/src/main/java/org/apache/sentry/hdfs/SentryUpdater.java @@ -42,6 +42,7 @@ public class SentryUpdater { } catch (Exception e) { LOG.error("Error connecting to Sentry ['{}'] !!", e.getMessage()); + sentryClient = null; return null; } } http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0152e3a3/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SimpleDBProviderBackend.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SimpleDBProviderBackend.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SimpleDBProviderBackend.java index b66037a..ff549fe 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SimpleDBProviderBackend.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SimpleDBProviderBackend.java @@ -39,9 +39,10 @@ public class SimpleDBProviderBackend implements ProviderBackend { private static final Logger LOGGER = LoggerFactory .getLogger(SimpleDBProviderBackend.class); - private final SentryPolicyServiceClient policyServiceClient; + private SentryPolicyServiceClient policyServiceClient; private volatile boolean initialized; + private Configuration conf; public SimpleDBProviderBackend(Configuration conf, String resourcePath) throws IOException { // DB Provider doesn't use policy file path @@ -50,6 +51,8 @@ public class SimpleDBProviderBackend implements ProviderBackend { public SimpleDBProviderBackend(Configuration conf) throws IOException { this(new SentryPolicyServiceClient(conf)); + this.initialized = false; + this.conf = conf; } @VisibleForTesting @@ -78,10 +81,16 @@ public class SimpleDBProviderBackend implements ProviderBackend { throw new IllegalStateException("Backend has not been properly initialized"); } try { - return ImmutableSet.copyOf(policyServiceClient.listPrivilegesForProvider(groups, roleSet, authorizableHierarchy)); + return ImmutableSet.copyOf(getSentryClient().listPrivilegesForProvider(groups, roleSet, authorizableHierarchy)); } catch (SentryUserException e) { String msg = "Unable to obtain privileges from server: " + e.getMessage(); LOGGER.error(msg, e); + try { + policyServiceClient.close(); + } catch (Exception ex) { + // Ignore + } + policyServiceClient = null; } return ImmutableSet.of(); } @@ -101,6 +110,19 @@ public class SimpleDBProviderBackend implements ProviderBackend { } } + private SentryPolicyServiceClient getSentryClient() { + if (policyServiceClient == null) { + try { + policyServiceClient = new SentryPolicyServiceClient(conf); + } catch (Exception e) { + LOGGER.error("Error connecting to Sentry ['{}'] !!", + e.getMessage()); + policyServiceClient = null; + return null; + } + } + return policyServiceClient; + } /** * SimpleDBProviderBackend does not implement validatePolicy() */ http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0152e3a3/sentry-tests/sentry-tests-hive/pom.xml ---------------------------------------------------------------------- diff --git a/sentry-tests/sentry-tests-hive/pom.xml b/sentry-tests/sentry-tests-hive/pom.xml index fde850f..a8a33eb 100644 --- a/sentry-tests/sentry-tests-hive/pom.xml +++ b/sentry-tests/sentry-tests-hive/pom.xml @@ -249,12 +249,14 @@ limitations under the License. <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-minicluster</artifactId> <scope>test</scope> +<!-- <exclusions> <exclusion> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-jobclient</artifactId> </exclusion> </exclusions> +--> </dependency> <dependency> <groupId>org.hamcrest</groupId> http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0152e3a3/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java ---------------------------------------------------------------------- diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java index 41f8af8..26cf978 100644 --- a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java +++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java @@ -16,28 +16,37 @@ */ package org.apache.sentry.tests.e2e.hdfs; +import java.io.BufferedReader; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStreamReader; import java.io.OutputStream; import java.net.ServerSocket; import java.net.URL; import java.security.PrivilegedExceptionAction; import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; import java.sql.Statement; -import java.util.HashSet; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; +import java.util.StringTokenizer; import java.util.concurrent.TimeoutException; import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclEntryType; import org.apache.hadoop.fs.permission.AclStatus; +import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -45,6 +54,21 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.MiniMRClientCluster; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.security.GroupMappingServiceProvider; import org.apache.hadoop.security.UserGroupInformation; import org.apache.sentry.binding.hive.SentryHiveAuthorizationTaskFactoryImpl; @@ -94,14 +118,48 @@ public class TestHDFSIntegration { } } + public static class WordCountMapper extends MapReduceBase implements + Mapper<LongWritable, Text, String, Long> { + + public void map(LongWritable key, Text value, + OutputCollector<String, Long> output, Reporter reporter) + throws IOException { + StringTokenizer st = new StringTokenizer(value.toString()); + while (st.hasMoreTokens()) { + output.collect(st.nextToken(), 1L); + } + } + + } + + public static class SumReducer extends MapReduceBase implements + Reducer<Text, Long, Text, Long> { + + public void reduce(Text key, Iterator<Long> values, + OutputCollector<Text, Long> output, Reporter reporter) + throws IOException { + + long sum = 0; + while (values.hasNext()) { + sum += values.next(); + } + output.collect(key, sum); + } + + } + private MiniDFSCluster miniDFS; + private MiniMRClientCluster miniMR; private InternalHiveServer hiveServer2; private InternalMetastoreServer metastore; + private SentryService sentryService; private String fsURI; private int hmsPort; - private int sentryPort; + private int sentryPort = -1; private File baseDir; - private UserGroupInformation admin; + private File policyFileLocation; + private UserGroupInformation adminUgi; + private UserGroupInformation hiveUgi; protected static File assertCreateDir(File dir) { if(!dir.isDirectory()) { @@ -117,10 +175,10 @@ public class TestHDFSIntegration { return port; } - private static void startSentryService(SentryService sentryServer) throws Exception { - sentryServer.start(); + private void waitOnSentryService() throws Exception { + sentryService.start(); final long start = System.currentTimeMillis(); - while (!sentryServer.isRunning()) { + while (!sentryService.isRunning()) { Thread.sleep(1000); if (System.currentTimeMillis() - start > 60000L) { throw new TimeoutException("Server did not start after 60 seconds"); @@ -132,112 +190,29 @@ public class TestHDFSIntegration { public void setup() throws Exception { Class.forName("org.apache.hive.jdbc.HiveDriver"); baseDir = Files.createTempDir(); - final File policyFileLocation = new File(baseDir, HiveServerFactory.AUTHZ_PROVIDER_FILENAME); + policyFileLocation = new File(baseDir, HiveServerFactory.AUTHZ_PROVIDER_FILENAME); PolicyFile policyFile = PolicyFile.setAdminOnServer1("hive") .setUserGroupMapping(StaticUserGroup.getStaticMapping()); policyFile.write(policyFileLocation); - admin = UserGroupInformation.createUserForTesting( + adminUgi = UserGroupInformation.createUserForTesting( System.getProperty("user.name"), new String[] { "supergroup" }); - UserGroupInformation hiveUgi = UserGroupInformation.createUserForTesting( + hiveUgi = UserGroupInformation.createUserForTesting( "hive", new String[] { "hive" }); - hiveUgi.doAs(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws Exception { - Configuration sentryConf = new Configuration(false); - Map<String, String> properties = Maps.newHashMap(); - properties.put(HiveServerFactory.AUTHZ_PROVIDER_BACKEND, - SimpleDBProviderBackend.class.getName()); - properties.put(ConfVars.HIVE_AUTHORIZATION_TASK_FACTORY.varname, - SentryHiveAuthorizationTaskFactoryImpl.class.getName()); - properties - .put(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS.varname, "2"); - properties.put("hive.metastore.uris", "thrift://localhost:" + hmsPort); - properties.put(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_NONE); - properties.put("sentry.hive.testing.mode", "true"); - properties.put(ServerConfig.ADMIN_GROUPS, "hive,admin"); - properties.put(ServerConfig.RPC_ADDRESS, "localhost"); - properties.put(ServerConfig.RPC_PORT, String.valueOf(0)); - properties.put(ServerConfig.SENTRY_VERIFY_SCHEM_VERSION, "false"); + // Start Sentry + startSentry(); - properties.put(ServerConfig.SENTRY_STORE_GROUP_MAPPING, ServerConfig.SENTRY_STORE_LOCAL_GROUP_MAPPING); - properties.put(ServerConfig.SENTRY_STORE_GROUP_MAPPING_RESOURCE, policyFileLocation.getPath()); - properties.put(ServerConfig.SENTRY_STORE_JDBC_URL, - "jdbc:derby:;databaseName=" + baseDir.getPath() - + "/sentrystore_db;create=true"); - properties.put("sentry.service.processor.factories", - "org.apache.sentry.provider.db.service.thrift.SentryPolicyStoreProcessorFactory,org.apache.sentry.hdfs.SentryHDFSServiceProcessorFactory"); - properties.put("sentry.policy.store.plugins", "org.apache.sentry.hdfs.SentryPlugin"); - properties.put(ServerConfig.RPC_MIN_THREADS, "3"); - for (Map.Entry<String, String> entry : properties.entrySet()) { - sentryConf.set(entry.getKey(), entry.getValue()); - } - SentryService sentryServer = new SentryServiceFactory().create(sentryConf); - properties.put(ClientConfig.SERVER_RPC_ADDRESS, sentryServer.getAddress() - .getHostName()); - sentryConf.set(ClientConfig.SERVER_RPC_ADDRESS, sentryServer.getAddress() - .getHostName()); - properties.put(ClientConfig.SERVER_RPC_PORT, - String.valueOf(sentryServer.getAddress().getPort())); - sentryConf.set(ClientConfig.SERVER_RPC_PORT, - String.valueOf(sentryServer.getAddress().getPort())); - startSentryService(sentryServer); - sentryPort = sentryServer.getAddress().getPort(); - System.out.println("\n\n Sentry port : " + sentryPort + "\n\n"); - return null; - } - }); - - admin.doAs(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws Exception { - System.setProperty(MiniDFSCluster.PROP_TEST_BUILD_DATA, "target/test/data"); - Configuration conf = new HdfsConfiguration(); - conf.set(DFSConfigKeys.DFS_NAMENODE_AUTHORIZATION_PROVIDER_KEY, - SentryAuthorizationProvider.class.getName()); - conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true); - - File dfsDir = assertCreateDir(new File(baseDir, "dfs")); - conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dfsDir.getPath()); - conf.set("hadoop.security.group.mapping", - MiniDFS.PseudoGroupMappingService.class.getName()); - Configuration.addDefaultResource("test.xml"); + // Start HDFS and MR + startDFSandYARN(); - conf.set("sentry.authorization-provider.hdfs-path-prefixes", "/user/hive/warehouse"); - conf.set("sentry.hdfs.service.security.mode", "none"); - conf.set("sentry.hdfs.service.client.server.rpc-address", "localhost"); - conf.set("sentry.hdfs.service.client.server.rpc-port", String.valueOf(sentryPort)); - EditLogFileOutputStream.setShouldSkipFsyncForTesting(true); - miniDFS = new MiniDFSCluster.Builder(conf).build(); - Path tmpPath = new Path("/tmp"); - Path hivePath = new Path("/user/hive"); - Path warehousePath = new Path(hivePath, "warehouse"); - miniDFS.getFileSystem().mkdirs(warehousePath); - boolean directory = miniDFS.getFileSystem().isDirectory(warehousePath); - System.out.println("\n\n Is dir :" + directory + "\n\n"); - System.out.println("\n\n DefaultFS :" + miniDFS.getFileSystem().getUri() + "\n\n"); - fsURI = miniDFS.getFileSystem().getUri().toString(); - miniDFS.getFileSystem().mkdirs(tmpPath); - miniDFS.getFileSystem().setPermission(tmpPath, FsPermission.valueOf("drwxrwxrwx")); - miniDFS.getFileSystem().setOwner(hivePath, "hive", "hive"); - miniDFS.getFileSystem().setOwner(warehousePath, "hive", "hive"); - System.out.println("\n\n Owner :" - + miniDFS.getFileSystem().getFileStatus(warehousePath).getOwner() - + ", " - + miniDFS.getFileSystem().getFileStatus(warehousePath).getGroup() - + "\n\n"); - System.out.println("\n\n Owner tmp :" - + miniDFS.getFileSystem().getFileStatus(tmpPath).getOwner() + ", " - + miniDFS.getFileSystem().getFileStatus(tmpPath).getGroup() + ", " - + miniDFS.getFileSystem().getFileStatus(tmpPath).getPermission() + ", " - + "\n\n"); - return null; - } - }); + // Start HiveServer2 and Metastore + startHiveAndMetastore(); + } + private void startHiveAndMetastore() throws IOException, InterruptedException { hiveUgi.doAs(new PrivilegedExceptionAction<Void>() { @Override public Void run() throws Exception { @@ -286,7 +261,6 @@ public class TestHDFSIntegration { authzConf.writeXml(out); out.close(); -// hiveConf.set("hive.sentry.conf.url", "file://" + accessSite.getCanonicalPath()); hiveConf.set("hive.sentry.conf.url", accessSite.getPath()); System.out.println("Sentry client file : " + accessSite.getPath()); @@ -312,7 +286,114 @@ public class TestHDFSIntegration { return null; } }); + } + + private void startDFSandYARN() throws IOException, + InterruptedException { + adminUgi.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + System.setProperty(MiniDFSCluster.PROP_TEST_BUILD_DATA, "target/test/data"); + Configuration conf = new HdfsConfiguration(); + conf.set(DFSConfigKeys.DFS_NAMENODE_AUTHORIZATION_PROVIDER_KEY, + SentryAuthorizationProvider.class.getName()); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true); + conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); + File dfsDir = assertCreateDir(new File(baseDir, "dfs")); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dfsDir.getPath()); + conf.set("hadoop.security.group.mapping", + MiniDFS.PseudoGroupMappingService.class.getName()); + Configuration.addDefaultResource("test.xml"); + + conf.set("sentry.authorization-provider.hdfs-path-prefixes", "/user/hive/warehouse"); + conf.set("sentry.authorization-provider.cache-refresh-retry-wait.ms", "5000"); + conf.set("sentry.authorization-provider.cache-stale-threshold.ms", "3000"); + + conf.set("sentry.hdfs.service.security.mode", "none"); + conf.set("sentry.hdfs.service.client.server.rpc-address", "localhost"); + conf.set("sentry.hdfs.service.client.server.rpc-port", String.valueOf(sentryPort)); + EditLogFileOutputStream.setShouldSkipFsyncForTesting(true); + miniDFS = new MiniDFSCluster.Builder(conf).build(); + Path tmpPath = new Path("/tmp"); + Path hivePath = new Path("/user/hive"); + Path warehousePath = new Path(hivePath, "warehouse"); + miniDFS.getFileSystem().mkdirs(warehousePath); + boolean directory = miniDFS.getFileSystem().isDirectory(warehousePath); + System.out.println("\n\n Is dir :" + directory + "\n\n"); + System.out.println("\n\n DefaultFS :" + miniDFS.getFileSystem().getUri() + "\n\n"); + fsURI = miniDFS.getFileSystem().getUri().toString(); + conf.set("fs.defaultFS", fsURI); + + // Create Yarn cluster + // miniMR = MiniMRClientClusterFactory.create(this.getClass(), 1, conf); + + miniDFS.getFileSystem().mkdirs(tmpPath); + miniDFS.getFileSystem().setPermission(tmpPath, FsPermission.valueOf("drwxrwxrwx")); + miniDFS.getFileSystem().setOwner(hivePath, "hive", "hive"); + miniDFS.getFileSystem().setOwner(warehousePath, "hive", "hive"); + System.out.println("\n\n Owner :" + + miniDFS.getFileSystem().getFileStatus(warehousePath).getOwner() + + ", " + + miniDFS.getFileSystem().getFileStatus(warehousePath).getGroup() + + "\n\n"); + System.out.println("\n\n Owner tmp :" + + miniDFS.getFileSystem().getFileStatus(tmpPath).getOwner() + ", " + + miniDFS.getFileSystem().getFileStatus(tmpPath).getGroup() + ", " + + miniDFS.getFileSystem().getFileStatus(tmpPath).getPermission() + ", " + + "\n\n"); + return null; + } + }); + } + private void startSentry() throws IOException, + InterruptedException { + hiveUgi.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + Configuration sentryConf = new Configuration(false); + Map<String, String> properties = Maps.newHashMap(); + properties.put(HiveServerFactory.AUTHZ_PROVIDER_BACKEND, + SimpleDBProviderBackend.class.getName()); + properties.put(ConfVars.HIVE_AUTHORIZATION_TASK_FACTORY.varname, + SentryHiveAuthorizationTaskFactoryImpl.class.getName()); + properties + .put(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS.varname, "2"); + properties.put("hive.metastore.uris", "thrift://localhost:" + hmsPort); + properties.put(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_NONE); + properties.put("sentry.hive.testing.mode", "true"); + properties.put(ServerConfig.ADMIN_GROUPS, "hive,admin"); + properties.put(ServerConfig.RPC_ADDRESS, "localhost"); + properties.put(ServerConfig.RPC_PORT, String.valueOf(sentryPort < 0 ? 0 : sentryPort)); + properties.put(ServerConfig.SENTRY_VERIFY_SCHEM_VERSION, "false"); + + properties.put(ServerConfig.SENTRY_STORE_GROUP_MAPPING, ServerConfig.SENTRY_STORE_LOCAL_GROUP_MAPPING); + properties.put(ServerConfig.SENTRY_STORE_GROUP_MAPPING_RESOURCE, policyFileLocation.getPath()); + properties.put(ServerConfig.SENTRY_STORE_JDBC_URL, + "jdbc:derby:;databaseName=" + baseDir.getPath() + + "/sentrystore_db;create=true"); + properties.put("sentry.service.processor.factories", + "org.apache.sentry.provider.db.service.thrift.SentryPolicyStoreProcessorFactory,org.apache.sentry.hdfs.SentryHDFSServiceProcessorFactory"); + properties.put("sentry.policy.store.plugins", "org.apache.sentry.hdfs.SentryPlugin"); + properties.put(ServerConfig.RPC_MIN_THREADS, "3"); + for (Map.Entry<String, String> entry : properties.entrySet()) { + sentryConf.set(entry.getKey(), entry.getValue()); + } + sentryService = new SentryServiceFactory().create(sentryConf); + properties.put(ClientConfig.SERVER_RPC_ADDRESS, sentryService.getAddress() + .getHostName()); + sentryConf.set(ClientConfig.SERVER_RPC_ADDRESS, sentryService.getAddress() + .getHostName()); + properties.put(ClientConfig.SERVER_RPC_PORT, + String.valueOf(sentryService.getAddress().getPort())); + sentryConf.set(ClientConfig.SERVER_RPC_PORT, + String.valueOf(sentryService.getAddress().getPort())); + waitOnSentryService(); + sentryPort = sentryService.getAddress().getPort(); + System.out.println("\n\n Sentry port : " + sentryPort + "\n\n"); + return null; + } + }); } @After @@ -334,25 +415,8 @@ public class TestHDFSIntegration { } } -// public Connection createConnection(String username) throws Exception { -// String password = username; -// Connection connection = hiveServer2.createConnection(username, password); -// assertNotNull("Connection is null", connection); -// assertFalse("Connection should not be closed", connection.isClosed()); -// Statement statement = connection.createStatement(); -// statement.close(); -// return connection; -// } -// -// public Statement createStatement(Connection connection) -// throws Exception { -// Statement statement = connection.createStatement(); -// assertNotNull("Statement is null", statement); -// return statement; -// } - @Test - public void testSimple() throws Exception { + public void testEnd2End() throws Exception { Connection conn = hiveServer2.createConnection("hive", "hive"); Statement stmt = conn.createStatement(); stmt.execute("create role admin_role"); @@ -363,38 +427,189 @@ public class TestHDFSIntegration { stmt.execute("alter table p1 add partition (month=1, day=2)"); stmt.execute("alter table p1 add partition (month=2, day=1)"); stmt.execute("alter table p1 add partition (month=2, day=2)"); - AclStatus aclStatus = miniDFS.getFileSystem().getAclStatus(new Path("/user/hive/warehouse/p1")); - Set<String> groups = new HashSet<String>(); - for (AclEntry ent : aclStatus.getEntries()) { - if (ent.getType().equals(AclEntryType.GROUP)) { - groups.add(ent.getName()); - } - } - System.out.println("Final acls [" + aclStatus + "]"); - Assert.assertEquals(false, groups.contains("hbase")); stmt.execute("create role p1_admin"); stmt.execute("grant role p1_admin to group hbase"); + + verifyOnAllSubDirs("/user/hive/warehouse/p1", null, "hbase", false); + + loadData(stmt); + + verifyHDFSandMR(stmt); + + stmt.execute("revoke select on table p1 from role p1_admin"); + Thread.sleep(1000); + verifyOnAllSubDirs("/user/hive/warehouse/p1", null, "hbase", false); + + stmt.execute("grant all on table p1 to role p1_admin"); + Thread.sleep(1000); + verifyOnAllSubDirs("/user/hive/warehouse/p1", FsAction.ALL, "hbase", true); + + stmt.execute("revoke select on table p1 from role p1_admin"); + Thread.sleep(1000); + verifyOnAllSubDirs("/user/hive/warehouse/p1", FsAction.WRITE_EXECUTE, "hbase", true); + + sentryService.stop(); + // Verify that Sentry permission are still enforced for the "stale" period + Thread.sleep(500); + verifyOnAllSubDirs("/user/hive/warehouse/p1", FsAction.WRITE_EXECUTE, "hbase", true); + + // Verify that Sentry permission are NOT enforced AFTER "stale" period + Thread.sleep(3000); + verifyOnAllSubDirs("/user/hive/warehouse/p1", null, "hbase", false); + + startSentry(); + // Verify that After Sentry restart permissions are re-enforced + Thread.sleep(5000); + verifyOnAllSubDirs("/user/hive/warehouse/p1", FsAction.WRITE_EXECUTE, "hbase", true); + + // Create new table and verify everything is fine after restart... + stmt.execute("create table p2 (s string) partitioned by (month int, day int)"); + try { + stmt.execute("alter table p2 add partition (month=1, day=1)"); + } catch (Exception e) { + // Metastore throws and exception first time after sentry restart + stmt.execute("alter table p2 add partition (month=1, day=1)"); + } + stmt.execute("alter table p2 add partition (month=1, day=2)"); + stmt.execute("alter table p2 add partition (month=2, day=1)"); + stmt.execute("alter table p2 add partition (month=2, day=2)"); + + Thread.sleep(1000); + verifyOnAllSubDirs("/user/hive/warehouse/p2", null, "hbase", false); + + stmt.execute("grant select on table p2 to role p1_admin"); + Thread.sleep(1000); + verifyOnAllSubDirs("/user/hive/warehouse/p2", FsAction.READ_EXECUTE, "hbase", true); + + stmt.close(); + conn.close(); + } + + private void loadData(Statement stmt) throws IOException, SQLException { + FSDataOutputStream f1 = miniDFS.getFileSystem().create(new Path("/tmp/f1.txt")); + f1.writeChars("m1d1_t1\n"); + f1.writeChars("m1d1_t2\n"); + f1.writeChars("m1d1_t3\n"); + f1.flush(); + f1.close(); + stmt.execute("load data inpath \'/tmp/f1.txt\' overwrite into table p1 partition (month=1, day=1)"); + FSDataOutputStream f2 = miniDFS.getFileSystem().create(new Path("/tmp/f2.txt")); + f2.writeChars("m2d2_t4\n"); + f2.writeChars("m2d2_t5\n"); + f2.writeChars("m2d2_t6\n"); + f2.flush(); + f2.close(); + stmt.execute("load data inpath \'/tmp/f2.txt\' overwrite into table p1 partition (month=2, day=2)"); + ResultSet rs = stmt.executeQuery("select * from p1"); + List<String> vals = new ArrayList<String>(); + while (rs.next()) { + vals.add(rs.getString(1)); + } + Assert.assertEquals(6, vals.size()); + rs.close(); + } + + private void verifyHDFSandMR(Statement stmt) throws IOException, + InterruptedException, SQLException, Exception { + // hbase user should not be allowed to read... + UserGroupInformation hbaseUgi = UserGroupInformation.createUserForTesting("hbase", new String[] {"hbase"}); + hbaseUgi.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + try { + miniDFS.getFileSystem().open(new Path("/user/hive/warehouse/p1/month=1/day=1/f1.txt")); + Assert.fail("Should not be allowed !!"); + } catch (Exception e) { + Assert.assertEquals("Wrong Error : " + e.getMessage(), true, e.getMessage().contains("Permission denied: user=hbase")); + } + return null; + } + }); + + // WordCount should fail.. + // runWordCount(new JobConf(miniMR.getConfig()), "/user/hive/warehouse/p1/month=1/day=1", "/tmp/wc_out"); + stmt.execute("grant select on table p1 to role p1_admin"); + Thread.sleep(1000); - aclStatus = miniDFS.getFileSystem().getAclStatus(new Path("/user/hive/warehouse/p1")); - groups = new HashSet<String>(); - for (AclEntry ent : aclStatus.getEntries()) { - if (ent.getType().equals(AclEntryType.GROUP)) { - groups.add(ent.getName()); + verifyOnAllSubDirs("/user/hive/warehouse/p1", FsAction.READ_EXECUTE, "hbase", true); + // hbase user should now be allowed to read... + hbaseUgi.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + Path p = new Path("/user/hive/warehouse/p1/month=2/day=2/f2.txt"); + BufferedReader in = new BufferedReader(new InputStreamReader(miniDFS.getFileSystem().open(p))); + String line = null; + List<String> lines = new ArrayList<String>(); + do { + line = in.readLine(); + if (line != null) lines.add(line); + } while (line != null); + Assert.assertEquals(3, lines.size()); + in.close(); + return null; } + }); + + } + + private void verifyOnAllSubDirs(String path, FsAction fsAction, String group, boolean groupShouldExist) throws Exception { + verifyOnAllSubDirs(new Path(path), fsAction, group, groupShouldExist); + } + + private void verifyOnAllSubDirs(Path p, FsAction fsAction, String group, boolean groupShouldExist) throws Exception { + FileStatus fStatus = miniDFS.getFileSystem().getFileStatus(p); + if (groupShouldExist) { + Assert.assertEquals(fsAction, getAcls(p).get(group)); + } else { + Assert.assertFalse(getAcls(p).containsKey(group)); } - Assert.assertEquals(true, groups.contains("hbase")); + if (fStatus.isDirectory()) { + FileStatus[] children = miniDFS.getFileSystem().listStatus(p); + for (FileStatus fs : children) { + verifyOnAllSubDirs(fs.getPath(), fsAction, group, groupShouldExist); + } + } + } - stmt.execute("revoke select on table p1 from role p1_admin"); - Thread.sleep(1000); - aclStatus = miniDFS.getFileSystem().getAclStatus(new Path("/user/hive/warehouse/p1")); - groups = new HashSet<String>(); + private Map<String, FsAction> getAcls(Path path) throws Exception { + AclStatus aclStatus = miniDFS.getFileSystem().getAclStatus(path); + Map<String, FsAction> acls = new HashMap<String, FsAction>(); for (AclEntry ent : aclStatus.getEntries()) { if (ent.getType().equals(AclEntryType.GROUP)) { - groups.add(ent.getName()); + acls.put(ent.getName(), ent.getPermission()); } } - Assert.assertEquals(false, groups.contains("hbase")); + return acls; + } + + private void runWordCount(JobConf job, String inPath, String outPath) throws Exception { + Path in = new Path(inPath); + Path out = new Path(outPath); + miniDFS.getFileSystem().delete(out, true); + job.setJobName("TestWC"); + JobClient jobClient = new JobClient(job); + RunningJob submittedJob = null; + FileInputFormat.setInputPaths(job, in); + FileOutputFormat.setOutputPath(job, out); + job.set("mapreduce.output.textoutputformat.separator", " "); + job.setInputFormat(TextInputFormat.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(LongWritable.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(LongWritable.class); + job.setMapperClass(WordCountMapper.class); + job.setReducerClass(SumReducer.class); + job.setOutputFormat(TextOutputFormat.class); + job.setNumReduceTasks(1); + job.setInt("mapreduce.map.maxattempts", 1); + job.setInt("mapreduce.reduce.maxattempts", 1); + + submittedJob = jobClient.submitJob(job); + if (!jobClient.monitorAndPrintJob(job, submittedJob)) { + throw new IOException("job Failed !!"); + } + } }
