Repository: incubator-sentry Updated Branches: refs/heads/kafka ab1093bef -> e6a2f13a0
SENTRY-1057: Add implementations for acls' CRUD (Ashish K Singh, reviewed by: Dapeng Sun and Hao Hao) Change-Id: Iff5f23cee47bef256db387ceb032c1a6ea5c9124 Project: http://git-wip-us.apache.org/repos/asf/incubator-sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-sentry/commit/e6a2f13a Tree: http://git-wip-us.apache.org/repos/asf/incubator-sentry/tree/e6a2f13a Diff: http://git-wip-us.apache.org/repos/asf/incubator-sentry/diff/e6a2f13a Branch: refs/heads/kafka Commit: e6a2f13a0ac589e25b3a42eecacc4cbf8200f9b7 Parents: ab1093b Author: hahao <hao....@cloudera.com> Authored: Sat Mar 5 15:42:45 2016 -0800 Committer: hahao <hao....@cloudera.com> Committed: Sat Mar 5 15:42:45 2016 -0800 ---------------------------------------------------------------------- .gitignore | 1 + pom.xml | 3 + .../kafka/authorizer/SentryKafkaAuthorizer.java | 56 ++- .../sentry/kafka/binding/KafkaAuthBinding.java | 369 ++++++++++++++++++- .../binding/KafkaAuthBindingSingleton.java | 4 +- .../apache/sentry/kafka/conf/KafkaAuthConf.java | 4 +- .../authorizer/SentryKafkaAuthorizerTest.java | 1 - .../core/model/kafka/KafkaActionFactory.java | 3 + .../core/model/kafka/TestKafkaAuthorizable.java | 4 - sentry-provider/sentry-provider-db/pom.xml | 4 + .../persistent/PrivilegeOperatePersistence.java | 2 + sentry-tests/pom.xml | 1 + sentry-tests/sentry-tests-kafka/pom.xml | 64 ++++ .../tests/e2e/kafka/CustomPrincipalBuilder.java | 47 +++ .../tests/e2e/kafka/EmbeddedZkServer.java | 71 ++++ .../sentry/tests/e2e/kafka/KafkaTestServer.java | 124 +++++++ .../sentry/tests/e2e/kafka/TestUtils.java | 29 ++ .../e2e/kafka/AbstractKafkaSentryTestBase.java | 227 ++++++++++++ .../tests/e2e/kafka/StaticUserGroupRole.java | 57 +++ .../sentry/tests/e2e/kafka/TestAclsCrud.java | 328 +++++++++++++++++ .../src/test/resources/log4j.properties | 38 ++ .../src/test/resources/test.crt | 15 + .../src/test/resources/user1.crt | 15 + .../src/test/resources/user2.crt | 15 + 24 files changed, 1449 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index a89bad8..08edd26 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ *.class +classes/ target/ .classpath .project http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index ca2c92a..7932b99 100644 --- a/pom.xml +++ b/pom.xml @@ -842,6 +842,9 @@ limitations under the License. <exclude>**/metastore_db/</exclude> <exclude>**/*.rej</exclude> <exclude>**/thirdparty/</exclude> + <!-- Exclude SSL .crtand .jks files --> + <exclude>**/*.crt</exclude> + <exclude>**/*.jks</exclude> </excludes> </configuration> </execution> http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizer.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizer.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizer.java index 5bf520b..3bce6cc 100644 --- a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizer.java +++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizer.java @@ -27,6 +27,7 @@ import org.apache.sentry.kafka.binding.KafkaAuthBindingSingleton; import org.apache.sentry.kafka.conf.KafkaAuthConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import scala.collection.immutable.Map; import scala.collection.immutable.Set; @@ -36,15 +37,15 @@ import java.util.List; public class SentryKafkaAuthorizer implements Authorizer { - private static Logger LOG = - LoggerFactory.getLogger(SentryKafkaAuthorizer.class); + private final static Logger LOG = LoggerFactory.getLogger(SentryKafkaAuthorizer.class); + private final static String INSTANCE_NAME = KafkaAuthConf.AuthzConfVars.getDefault(KafkaAuthConf.KAFKA_SERVICE_INSTANCE_NAME); - KafkaAuthBinding binding; - KafkaAuthConf kafkaAuthConf; + private KafkaAuthBinding binding; + private String kafkaServiceInstanceName = INSTANCE_NAME; + private String requestorName = KafkaAuthConf.AuthzConfVars.getDefault(KafkaAuthConf.KAFKA_SERVICE_USER_NAME); String sentry_site = null; List<KafkaPrincipal> super_users = null; - String kafkaServiceInstanceName = KafkaAuthConf.AuthzConfVars.getDefault(KafkaAuthConf.KAFKA_SERVICE_INSTANCE_NAME); public SentryKafkaAuthorizer() { } @@ -60,36 +61,36 @@ public class SentryKafkaAuthorizer implements Authorizer { } LOG.debug("User: " + user + " is not a SuperUser"); return binding.authorize(session, operation, resource); - } +} @Override public void addAcls(Set<Acl> acls, final Resource resource) { - throw new UnsupportedOperationException("Please use Sentry CLI to perform this action."); + binding.addAcls(acls, resource); } @Override public boolean removeAcls(Set<Acl> acls, final Resource resource) { - throw new UnsupportedOperationException("Please use Sentry CLI to perform this action."); + return binding.removeAcls(acls, resource); } @Override public boolean removeAcls(final Resource resource) { - throw new UnsupportedOperationException("Please use Sentry CLI to perform this action."); + return binding.removeAcls(resource); } @Override public Set<Acl> getAcls(Resource resource) { - throw new UnsupportedOperationException("Please use Sentry CLI to perform this action."); + return binding.getAcls(resource); } @Override public Map<Resource, Set<Acl>> getAcls(KafkaPrincipal principal) { - throw new UnsupportedOperationException("Please use Sentry CLI to perform this action."); + return binding.getAcls(principal); } @Override public Map<Resource, Set<Acl>> getAcls() { - throw new UnsupportedOperationException("Please use Sentry CLI to perform this action."); + return binding.getAcls(); } @Override @@ -110,11 +111,14 @@ public class SentryKafkaAuthorizer implements Authorizer { if (kafkaServiceInstanceName != null) { this.kafkaServiceInstanceName = kafkaServiceInstanceName.toString(); } + final Object kafkaServiceUserName = configs.get(KafkaAuthConf.KAFKA_SERVICE_USER_NAME); + if (kafkaServiceUserName != null) { + this.requestorName = kafkaServiceUserName.toString(); + } LOG.info("Configuring Sentry KafkaAuthorizer: " + sentry_site); final KafkaAuthBindingSingleton instance = KafkaAuthBindingSingleton.getInstance(); - instance.configure(this.kafkaServiceInstanceName, sentry_site); + instance.configure(this.kafkaServiceInstanceName, this.requestorName, sentry_site); this.binding = instance.getAuthBinding(); - this.kafkaAuthConf = instance.getKafkaAuthConf(); } private void getSuperUsers(String kafkaSuperUsers) { @@ -139,4 +143,28 @@ public class SentryKafkaAuthorizer implements Authorizer { } return false; } + + /** + * This is not used by Kafka, however as role is a Sentry centric entity having some mean to perform role CRUD will be required. + * This method will be used by a Sentry-Kafka cli that will allow users to perform CRUD of roles and adding roles to groups. + */ + public void addRole(String role) { + binding.addRole(role); + } + + /** + * This is not used by Kafka, however as role is a Sentry centric entity having some mean to add role to groups will be required. + * This method will be used by a Sentry-Kafka cli that will allow users to perform CRUD of roles and adding roles to groups. + */ + public void addRoleToGroups(String role, java.util.Set<String> groups) { + binding.addRoleToGroups(role, groups); + } + + /** + * This is not used by Kafka, however as role is a Sentry centric entity having some mean to perform role CRUD will be required. + * This method will be used by a Sentry-Kafka cli that will allow users to perform CRUD of roles and adding roles to groups. + */ + public void dropAllRoles() { + binding.dropAllRoles(); + } } http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java index a54eb8f..8f4a8c4 100644 --- a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java +++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java @@ -17,20 +17,32 @@ package org.apache.sentry.kafka.binding; import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Set; +import kafka.security.auth.Acl; +import kafka.security.auth.Allow; +import kafka.security.auth.Allow$; +import kafka.security.auth.Operation$; +import kafka.security.auth.ResourceType$; import org.apache.hadoop.conf.Configuration; import com.google.common.collect.Sets; import kafka.network.RequestChannel; import kafka.security.auth.Operation; import kafka.security.auth.Resource; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.sentry.SentryUserException; import org.apache.sentry.core.common.ActiveRoleSet; import org.apache.sentry.core.common.Authorizable; import org.apache.sentry.core.common.Subject; import org.apache.sentry.core.model.kafka.KafkaActionFactory; import org.apache.sentry.core.model.kafka.KafkaActionFactory.KafkaAction; +import org.apache.sentry.core.model.kafka.KafkaAuthorizable; import org.apache.sentry.kafka.ConvertUtil; import org.apache.sentry.kafka.conf.KafkaAuthConf.AuthzConfVars; import org.apache.sentry.policy.common.PolicyEngine; @@ -38,23 +50,40 @@ import org.apache.sentry.provider.common.AuthorizationComponent; import org.apache.sentry.provider.common.AuthorizationProvider; import org.apache.sentry.provider.common.ProviderBackend; import org.apache.sentry.provider.db.generic.SentryGenericProviderBackend; +import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClient; +import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientFactory; +import org.apache.sentry.provider.db.generic.service.thrift.TAuthorizable; +import org.apache.sentry.provider.db.generic.service.thrift.TSentryPrivilege; +import org.apache.sentry.provider.db.generic.service.thrift.TSentryRole; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.Option; +import scala.Predef; +import scala.Tuple2; +import scala.collection.Iterator; +import scala.collection.JavaConversions; +import scala.collection.immutable.Map; public class KafkaAuthBinding { private static final Logger LOG = LoggerFactory.getLogger(KafkaAuthBinding.class); private static final String COMPONENT_TYPE = AuthorizationComponent.KAFKA; + private static final String COMPONENT_NAME = COMPONENT_TYPE; private final Configuration authConf; private final AuthorizationProvider authProvider; + private final KafkaActionFactory actionFactory = KafkaActionFactory.getInstance(); + private ProviderBackend providerBackend; + private String instanceName; + private String requestorName; - private final KafkaActionFactory actionFactory = KafkaActionFactory.getInstance(); - public KafkaAuthBinding(String instanceName, Configuration authConf) throws Exception { + public KafkaAuthBinding(String instanceName, String requestorName, Configuration authConf) throws Exception { + this.instanceName = instanceName; + this.requestorName = requestorName; this.authConf = authConf; - this.authProvider = createAuthProvider(instanceName); + this.authProvider = createAuthProvider(); } /** @@ -62,7 +91,7 @@ public class KafkaAuthBinding { * * @return {@link AuthorizationProvider} */ - private AuthorizationProvider createAuthProvider(String instanceName) throws Exception { + private AuthorizationProvider createAuthProvider() throws Exception { /** * get the authProvider class, policyEngine class, providerBackend class and resources from the * kafkaAuthConf config @@ -127,6 +156,324 @@ public class KafkaAuthBinding { return authProvider.hasAccess(new Subject(getName(session)), authorizables, actions, ActiveRoleSet.ALL); } + public void addAcls(scala.collection.immutable.Set<Acl> acls, final Resource resource) { + verifyAcls(acls); + LOG.info("Adding Acl: acl->" + acls + " resource->" + resource); + + final Iterator<Acl> iterator = acls.iterator(); + while (iterator.hasNext()) { + final Acl acl = iterator.next(); + final String role = getRole(acl); + if (!roleExists(role)) { + throw new KafkaException("Can not add Acl for non-existent Role: " + role); + } + execute(new Command<Void>() { + @Override + public Void run(SentryGenericServiceClient client) throws Exception { + client.grantPrivilege( + requestorName, role, COMPONENT_NAME, toTSentryPrivilege(acl, resource)); + return null; + } + }); + } + } + + public boolean removeAcls(scala.collection.immutable.Set<Acl> acls, final Resource resource) { + verifyAcls(acls); + LOG.info("Removing Acl: acl->" + acls + " resource->" + resource); + final Iterator<Acl> iterator = acls.iterator(); + while (iterator.hasNext()) { + final Acl acl = iterator.next(); + final String role = getRole(acl); + try { + execute(new Command<Void>() { + @Override + public Void run(SentryGenericServiceClient client) throws Exception { + client.dropPrivilege( + requestorName, role, toTSentryPrivilege(acl, resource)); + return null; + } + }); + } catch (KafkaException kex) { + LOG.error("Failed to remove acls.", kex); + return false; + } + } + + return true; + } + + public void addRole(final String role) { + if (roleExists(role)) { + throw new KafkaException("Can not create an existing role, " + role + ", again."); + } + + execute(new Command<Void>() { + @Override + public Void run(SentryGenericServiceClient client) throws Exception { + client.createRole( + requestorName, role, COMPONENT_NAME); + return null; + } + }); + } + + public void addRoleToGroups(final String role, final java.util.Set<String> groups) { + execute(new Command<Void>() { + @Override + public Void run(SentryGenericServiceClient client) throws Exception { + client.addRoleToGroups( + requestorName, role, COMPONENT_NAME, groups); + return null; + } + }); + } + + public void dropAllRoles() { + final List<String> roles = getAllRoles(); + execute(new Command<Void>() { + @Override + public Void run(SentryGenericServiceClient client) throws Exception { + for (String role : roles) { + client.dropRole(requestorName, role, COMPONENT_NAME); + } + return null; + } + }); + } + + private List<String> getRolesforGroup(final String groupName) { + final List<String> roles = new ArrayList<>(); + execute(new Command<Void>() { + @Override + public Void run(SentryGenericServiceClient client) throws Exception { + for (TSentryRole tSentryRole : client.listRolesByGroupName(requestorName, groupName, COMPONENT_NAME)) { + roles.add(tSentryRole.getRoleName()); + } + return null; + } + }); + + return roles; + } + + private SentryGenericServiceClient getClient() throws Exception { + return SentryGenericServiceClientFactory.create(this.authConf); + } + + public boolean removeAcls(final Resource resource) { + LOG.info("Removing Acls for Resource: resource->" + resource); + List<String> roles = getAllRoles(); + final List<TSentryPrivilege> tSentryPrivileges = getAllPrivileges(roles); + try { + execute(new Command<Void>() { + @Override + public Void run(SentryGenericServiceClient client) throws Exception { + for (TSentryPrivilege tSentryPrivilege : tSentryPrivileges) { + if (isPrivilegeForResource(tSentryPrivilege, resource)) { + client.dropPrivilege( + requestorName, COMPONENT_NAME, tSentryPrivilege); + } + } + return null; + } + }); + } catch (KafkaException kex) { + LOG.error("Failed to remove acls.", kex); + return false; + } + + return true; + } + + public scala.collection.immutable.Set<Acl> getAcls(final Resource resource) { + final Option<scala.collection.immutable.Set<Acl>> acls = getAcls().get(resource); + if (acls.nonEmpty()) + return acls.get(); + return new scala.collection.immutable.HashSet<Acl>(); + } + + public Map<Resource, scala.collection.immutable.Set<Acl>> getAcls(KafkaPrincipal principal) { + if (principal.getPrincipalType().toLowerCase().equals("group")) { + List<String> roles = getRolesforGroup(principal.getName()); + return getAclsForRoles(roles); + } else { + LOG.info("Did not recognize Principal type: " + principal.getPrincipalType() + ". Returning Acls for all principals."); + return getAcls(); + } + } + + public Map<Resource, scala.collection.immutable.Set<Acl>> getAcls() { + final List<String> roles = getAllRoles(); + return getAclsForRoles(roles); + } + + /** + * A Command is a closure used to pass a block of code from individual + * functions to execute, which centralizes connection error + * handling. Command is parameterized on the return type of the function. + */ + private interface Command<T> { + T run(SentryGenericServiceClient client) throws Exception; + } + + private <T> T execute(Command<T> cmd) throws KafkaException { + SentryGenericServiceClient client = null; + try { + client = getClient(); + return cmd.run(client); + } catch (SentryUserException ex) { + String msg = "Unable to excute command on sentry server: " + ex.getMessage(); + LOG.error(msg, ex); + throw new KafkaException(msg, ex); + } catch (Exception ex) { + String msg = "Unable to obtain client:" + ex.getMessage(); + LOG.error(msg, ex); + throw new KafkaException(msg, ex); + } finally { + if (client != null) { + client.close(); + } + } + } + + private TSentryPrivilege toTSentryPrivilege(Acl acl, Resource resource) { + final List<Authorizable> authorizables = ConvertUtil.convertResourceToAuthorizable(acl.host(), resource); + final List<TAuthorizable> tAuthorizables = new ArrayList<>(); + for (Authorizable authorizable : authorizables) { + tAuthorizables.add(new TAuthorizable(authorizable.getTypeName(), authorizable.getName())); + } + TSentryPrivilege tSentryPrivilege = new TSentryPrivilege(COMPONENT_NAME, instanceName, tAuthorizables, acl.operation().name()); + return tSentryPrivilege; + } + + private String getRole(Acl acl) { + return acl.principal().getName(); + } + + private boolean isPrivilegeForResource(TSentryPrivilege tSentryPrivilege, Resource resource) { + final java.util.Iterator<TAuthorizable> authorizablesIterator = tSentryPrivilege.getAuthorizablesIterator(); + while (authorizablesIterator.hasNext()) { + TAuthorizable tAuthorizable = authorizablesIterator.next(); + if (tAuthorizable.getType().equals(resource.resourceType().name())) { + return true; + } + } + return false; + } + + private List<TSentryPrivilege> getAllPrivileges(final List<String> roles) { + final List<TSentryPrivilege> tSentryPrivileges = new ArrayList<>(); + execute(new Command<Void>() { + @Override + public Void run(SentryGenericServiceClient client) throws Exception { + for (String role : roles) { + tSentryPrivileges.addAll(client.listPrivilegesByRoleName( + requestorName, role, COMPONENT_NAME, instanceName)); + } + return null; + } + }); + + return tSentryPrivileges; + } + + private List<String> getAllRoles() { + final List<String> roles = new ArrayList<>(); + execute(new Command<Void>() { + @Override + public Void run(SentryGenericServiceClient client) throws Exception { + for (TSentryRole tSentryRole : client.listAllRoles(requestorName, COMPONENT_NAME)) { + roles.add(tSentryRole.getRoleName()); + } + return null; + } + }); + + return roles; + } + + private Map<Resource, scala.collection.immutable.Set<Acl>> getAclsForRoles(final List<String> roles) { + return scala.collection.JavaConverters.mapAsScalaMapConverter( + rolePrivilegesToResourceAcls(getRoleToPrivileges(roles))) + .asScala().toMap(Predef.<Tuple2<Resource, scala.collection.immutable.Set<Acl>>>conforms()); + } + + private java.util.Map<Resource, scala.collection.immutable.Set<Acl>> rolePrivilegesToResourceAcls(java.util.Map<String, scala.collection.immutable.Set<TSentryPrivilege>> rolePrivilegesMap) { + final java.util.Map<Resource, scala.collection.immutable.Set<Acl>> resourceAclsMap = new HashMap<>(); + for (String role : rolePrivilegesMap.keySet()) { + scala.collection.immutable.Set<TSentryPrivilege> privileges = rolePrivilegesMap.get(role); + final Iterator<TSentryPrivilege> iterator = privileges.iterator(); + while (iterator.hasNext()) { + TSentryPrivilege privilege = iterator.next(); + final List<TAuthorizable> authorizables = privilege.getAuthorizables(); + String host = null; + String operation = privilege.getAction(); + for (TAuthorizable tAuthorizable : authorizables) { + if (tAuthorizable.getType().equals(KafkaAuthorizable.AuthorizableType.HOST.name())) { + host = tAuthorizable.getName(); + } else { + Resource resource = new Resource(ResourceType$.MODULE$.fromString(tAuthorizable.getType()), tAuthorizable.getName()); + if (operation.equals("*")) { + operation = "All"; + } + Acl acl = new Acl(new KafkaPrincipal("role", role), Allow$.MODULE$, host, Operation$.MODULE$.fromString(operation)); + Set<Acl> newAclsJava = new HashSet<Acl>(); + newAclsJava.add(acl); + addExistingAclsForResource(resourceAclsMap, resource, newAclsJava); + final scala.collection.mutable.Set<Acl> aclScala = JavaConversions.asScalaSet(newAclsJava); + resourceAclsMap.put(resource, aclScala.<Acl>toSet()); + } + } + } + } + + return resourceAclsMap; + } + + private java.util.Map<String, scala.collection.immutable.Set<TSentryPrivilege>> getRoleToPrivileges(final List<String> roles) { + final java.util.Map<String, scala.collection.immutable.Set<TSentryPrivilege>> rolePrivilegesMap = new HashMap<>(); + execute(new Command<Void>() { + @Override + public Void run(SentryGenericServiceClient client) throws Exception { + for (String role : roles) { + final Set<TSentryPrivilege> rolePrivileges = client.listPrivilegesByRoleName( + requestorName, role, COMPONENT_NAME, instanceName); + final scala.collection.immutable.Set<TSentryPrivilege> rolePrivilegesScala = + scala.collection.JavaConverters.asScalaSetConverter(rolePrivileges).asScala().toSet(); + rolePrivilegesMap.put(role, rolePrivilegesScala); + } + return null; + } + }); + + return rolePrivilegesMap; + } + + private void addExistingAclsForResource(java.util.Map<Resource, scala.collection.immutable.Set<Acl>> resourceAclsMap, Resource resource, java.util.Set<Acl> newAclsJava) { + final scala.collection.immutable.Set<Acl> existingAcls = resourceAclsMap.get(resource); + if (existingAcls != null) { + final Iterator<Acl> aclsIter = existingAcls.iterator(); + while (aclsIter.hasNext()) { + Acl curAcl = aclsIter.next(); + newAclsJava.add(curAcl); + } + } + } + + private boolean roleExists(String role) { + return getAllRoles().contains(role); + } + + private void verifyAcls(scala.collection.immutable.Set<Acl> acls) { + final Iterator<Acl> iterator = acls.iterator(); + while (iterator.hasNext()) { + final Acl acl = iterator.next(); + assert acl.principal().getPrincipalType().toLowerCase().equals("role") : "Only Acls with KafkaPrincipal of type \"role;\" is supported."; + assert acl.permissionType().name().equals(Allow.name()) : "Only Acls with Permission of type \"Allow\" is supported."; + } + } + /* * For SSL session's Kafka creates user names with "CN=" prepended to the user name. * "=" is used as splitter by Sentry to parse key value pairs and so it is required to strip off "CN=". @@ -136,13 +483,13 @@ public class KafkaAuthBinding { int start = principalName.indexOf("CN="); if (start >= 0) { String tmpName, name = ""; - tmpName = principalName.substring(start + 3); - int end = tmpName.indexOf(","); - if (end > 0) { - name = tmpName.substring(0, end); - } else { - name = tmpName; - } + tmpName = principalName.substring(start + 3); + int end = tmpName.indexOf(","); + if (end > 0) { + name = tmpName.substring(0, end); + } else { + name = tmpName; + } return name; } else { return principalName; http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBindingSingleton.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBindingSingleton.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBindingSingleton.java index d7a5d1c..a0007a3 100644 --- a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBindingSingleton.java +++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBindingSingleton.java @@ -56,10 +56,10 @@ public class KafkaAuthBindingSingleton { return kafkaAuthConf; } - public void configure(String instanceName, String sentry_site) { + public void configure(String instanceName, String requestorName, String sentry_site) { try { kafkaAuthConf = loadAuthzConf(sentry_site); - binding = new KafkaAuthBinding(instanceName, kafkaAuthConf); + binding = new KafkaAuthBinding(instanceName, requestorName, kafkaAuthConf); log.info("KafkaAuthBinding created successfully"); } catch (Exception ex) { log.error("Unable to create KafkaAuthBinding", ex); http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java index cff9418..e0d767e 100644 --- a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java +++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java @@ -29,6 +29,7 @@ public class KafkaAuthConf extends Configuration { public static final String AUTHZ_SITE_FILE = "sentry-site.xml"; public static final String KAFKA_SUPER_USERS = "kafka.superusers"; public static final String KAFKA_SERVICE_INSTANCE_NAME = "sentry.kafka.service.instance"; + public static final String KAFKA_SERVICE_USER_NAME = "sentry.kafka.service.user.name"; /** * Config setting definitions @@ -38,7 +39,8 @@ public class KafkaAuthConf extends Configuration { AUTHZ_PROVIDER_RESOURCE("sentry.kafka.provider.resource", ""), AUTHZ_PROVIDER_BACKEND("sentry.kafka.provider.backend", SentryGenericProviderBackend.class.getName()), AUTHZ_POLICY_ENGINE("sentry.kafka.policy.engine", SimpleKafkaPolicyEngine.class.getName()), - AUTHZ_INSTANCE_NAME(KAFKA_SERVICE_INSTANCE_NAME, "kafka"); + AUTHZ_INSTANCE_NAME(KAFKA_SERVICE_INSTANCE_NAME, "kafka"), + AUTHZ_SERVICE_USER_NAME(KAFKA_SERVICE_USER_NAME, "kafka"); private final String varName; private final String defaultVal; http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizerTest.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizerTest.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizerTest.java index eafe0f0..f40d8c2 100644 --- a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizerTest.java +++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizerTest.java @@ -17,7 +17,6 @@ package org.apache.sentry.kafka.authorizer; import kafka.network.RequestChannel; -import kafka.security.auth.Operation; import kafka.security.auth.Operation$; import kafka.security.auth.Resource; import kafka.security.auth.Resource$; http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionFactory.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionFactory.java b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionFactory.java index 7b8b518..fc3bf7a 100644 --- a/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionFactory.java +++ b/sentry-core/sentry-core-model-kafka/src/main/java/org/apache/sentry/core/model/kafka/KafkaActionFactory.java @@ -185,6 +185,9 @@ public class KafkaActionFactory extends BitFieldActionFactory { */ @Override public KafkaAction getActionByName(String name) { + if (name.equalsIgnoreCase("*")) { + return new KafkaAction("ALL"); + } return KafkaActionType.hasActionType(name) ? new KafkaAction(name) : null; } } http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAuthorizable.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAuthorizable.java b/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAuthorizable.java index 81446a7..04316f2 100644 --- a/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAuthorizable.java +++ b/sentry-core/sentry-core-model-kafka/src/test/java/org/apache/sentry/core/model/kafka/TestKafkaAuthorizable.java @@ -19,11 +19,7 @@ package org.apache.sentry.core.model.kafka; import junit.framework.Assert; -import org.apache.sentry.core.model.kafka.Cluster; -import org.apache.sentry.core.model.kafka.ConsumerGroup; import org.apache.sentry.core.model.kafka.KafkaAuthorizable.AuthorizableType; -import org.apache.sentry.core.model.kafka.Host; -import org.apache.sentry.core.model.kafka.Topic; import org.junit.Test; /** http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-provider/sentry-provider-db/pom.xml ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/pom.xml b/sentry-provider/sentry-provider-db/pom.xml index 7514a7c..9a10829 100644 --- a/sentry-provider/sentry-provider-db/pom.xml +++ b/sentry-provider/sentry-provider-db/pom.xml @@ -92,6 +92,10 @@ limitations under the License. </dependency> <dependency> <groupId>org.apache.sentry</groupId> + <artifactId>sentry-core-model-kafka</artifactId> + </dependency> + <dependency> + <groupId>org.apache.sentry</groupId> <artifactId>sentry-provider-common</artifactId> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/persistent/PrivilegeOperatePersistence.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/persistent/PrivilegeOperatePersistence.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/persistent/PrivilegeOperatePersistence.java index c3b0be8..3ccb3aa 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/persistent/PrivilegeOperatePersistence.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/generic/service/persistent/PrivilegeOperatePersistence.java @@ -31,6 +31,7 @@ import org.apache.sentry.core.common.Action; import org.apache.sentry.core.common.Authorizable; import org.apache.sentry.core.common.BitFieldAction; import org.apache.sentry.core.common.BitFieldActionFactory; +import org.apache.sentry.core.model.kafka.KafkaActionFactory; import org.apache.sentry.core.model.search.SearchActionFactory; import org.apache.sentry.core.model.sqoop.SqoopActionFactory; import org.apache.sentry.provider.db.generic.service.persistent.PrivilegeObject.Builder; @@ -51,6 +52,7 @@ public class PrivilegeOperatePersistence { static{ actionFactories.put("solr", new SearchActionFactory()); actionFactories.put("sqoop", new SqoopActionFactory()); + actionFactories.put("kafka", KafkaActionFactory.getInstance()); } public boolean checkPrivilegeOption(Set<MSentryRole> roles, PrivilegeObject privilege, PersistenceManager pm) { http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-tests/pom.xml ---------------------------------------------------------------------- diff --git a/sentry-tests/pom.xml b/sentry-tests/pom.xml index 3294335..88a28bb 100644 --- a/sentry-tests/pom.xml +++ b/sentry-tests/pom.xml @@ -31,6 +31,7 @@ limitations under the License. <module>sentry-tests-hive</module> <module>sentry-tests-solr</module> <module>sentry-tests-sqoop</module> + <module>sentry-tests-kafka</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-tests/sentry-tests-kafka/pom.xml ---------------------------------------------------------------------- diff --git a/sentry-tests/sentry-tests-kafka/pom.xml b/sentry-tests/sentry-tests-kafka/pom.xml new file mode 100644 index 0000000..54c7205 --- /dev/null +++ b/sentry-tests/sentry-tests-kafka/pom.xml @@ -0,0 +1,64 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>sentry-tests</artifactId> + <groupId>org.apache.sentry</groupId> + <version>1.7.0-incubating-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>sentry-tests-kafka</artifactId> + <name>Sentry Kafka Tests</name> + <description>end to end tests for sentry-kafka integration</description> + + <dependencies> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </dependency> + <dependency> + <groupId>org.apache.sentry</groupId> + <artifactId>sentry-binding-kafka</artifactId> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.10</artifactId> + <version>${kafka.version}</version> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>${kafka.version}</version> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + <dependency> + <groupId>org.apache.sentry</groupId> + <artifactId>sentry-provider-db</artifactId> + </dependency> + </dependencies> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/CustomPrincipalBuilder.java ---------------------------------------------------------------------- diff --git a/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/CustomPrincipalBuilder.java b/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/CustomPrincipalBuilder.java new file mode 100644 index 0000000..5531fcb --- /dev/null +++ b/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/CustomPrincipalBuilder.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.tests.e2e.kafka; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.network.Authenticator; +import org.apache.kafka.common.network.TransportLayer; +import org.apache.kafka.common.security.auth.PrincipalBuilder; + +import java.security.Principal; +import java.util.Map; + +public class CustomPrincipalBuilder implements PrincipalBuilder { + @Override + public void configure(Map<String, ?> map) { + + } + + @Override + public Principal buildPrincipal(TransportLayer transportLayer, Authenticator authenticator) throws KafkaException { + try { + return transportLayer.peerPrincipal(); + } catch (Exception e) { + throw new KafkaException("Failed to build principal due to: ", e); + } + } + + @Override + public void close() throws KafkaException { + + } +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/EmbeddedZkServer.java ---------------------------------------------------------------------- diff --git a/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/EmbeddedZkServer.java b/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/EmbeddedZkServer.java new file mode 100644 index 0000000..442ddff --- /dev/null +++ b/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/EmbeddedZkServer.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.tests.e2e.kafka; + +import org.apache.commons.io.FileUtils; +import org.apache.zookeeper.server.NIOServerCnxnFactory; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.file.Files; +import java.nio.file.Path; + +public class EmbeddedZkServer { + private static final Logger LOGGER = LoggerFactory.getLogger(EmbeddedZkServer.class); + + private Path snapshotDir = null; + private Path logDir = null; + private ZooKeeperServer zookeeper = null; + private NIOServerCnxnFactory factory = null; + + public EmbeddedZkServer(int port) throws Exception { + snapshotDir = Files.createTempDirectory("zookeeper-snapshot-"); + logDir = Files.createTempDirectory("zookeeper-log-"); + int tickTime = 500; + zookeeper = new ZooKeeperServer(snapshotDir.toFile(), logDir.toFile(), tickTime); + factory = new NIOServerCnxnFactory(); + InetSocketAddress addr = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), port); + LOGGER.info("Starting Zookeeper at " + addr); + factory.configure(addr, 0); + factory.startup(zookeeper); + } + + public void shutdown() throws IOException { + try { + zookeeper.shutdown(); + } catch (Exception e) { + LOGGER.error("Failed to shutdown ZK server", e); + } + + try { + factory.shutdown(); + } catch (Exception e) { + LOGGER.error("Failed to shutdown Zk connection factory.", e); + } + + FileUtils.deleteDirectory(logDir.toFile()); + FileUtils.deleteDirectory(snapshotDir.toFile()); + } + + public ZooKeeperServer getZk() { + return zookeeper; + } +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/KafkaTestServer.java ---------------------------------------------------------------------- diff --git a/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/KafkaTestServer.java b/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/KafkaTestServer.java new file mode 100644 index 0000000..129191a --- /dev/null +++ b/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/KafkaTestServer.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.tests.e2e.kafka; + +import kafka.server.KafkaServerStartable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Properties; + +public class KafkaTestServer { + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTestServer.class); + + private int zkPort = -1; + private int kafkaPort = -1; + private EmbeddedZkServer zkServer = null; + private KafkaServerStartable kafkaServer = null; + private File sentrySitePath = null; + + public KafkaTestServer(File sentrySitePath) throws Exception { + this.zkPort = TestUtils.getFreePort(); + this.kafkaPort = TestUtils.getFreePort(); + this.sentrySitePath = sentrySitePath; + createZkServer(); + createKafkaServer(); + } + + public void start() throws Exception { + kafkaServer.startup(); + LOGGER.info("Started Kafka broker."); + } + + public void shutdown() { + if (kafkaServer != null) { + kafkaServer.shutdown(); + kafkaServer.awaitShutdown(); + LOGGER.info("Stopped Kafka server."); + } + + if (zkServer != null) { + try { + zkServer.shutdown(); + LOGGER.info("Stopped ZK server."); + } catch (IOException e) { + LOGGER.error("Failed to shutdown ZK server.", e); + } + } + } + + private Path getTempDirectory() { + Path tempDirectory = null; + try { + tempDirectory = Files.createTempDirectory("kafka-sentry-"); + } catch (IOException e) { + LOGGER.error("Failed to create temp dir for Kafka's log dir."); + throw new RuntimeException(e); + } + return tempDirectory; + } + + private void setupKafkaProps(Properties props) throws UnknownHostException { + props.put("listeners", "SSL://" + InetAddress.getLocalHost().getHostAddress() + ":" + kafkaPort); + props.put("log.dir", getTempDirectory().toAbsolutePath().toString()); + props.put("zookeeper.connect", InetAddress.getLocalHost().getHostAddress() + ":" + zkPort); + props.put("replica.socket.timeout.ms", "1500"); + props.put("controller.socket.timeout.ms", "1500"); + props.put("controlled.shutdown.enable", true); + props.put("delete.topic.enable", false); + props.put("controlled.shutdown.retry.backoff.ms", "100"); + props.put("port", kafkaPort); + props.put("authorizer.class.name", "org.apache.sentry.kafka.authorizer.SentryKafkaAuthorizer"); + props.put("sentry.kafka.site.url", "file://" + sentrySitePath.getAbsolutePath()); + props.put("allow.everyone.if.no.acl.found", "true"); + props.put("ssl.keystore.location", KafkaTestServer.class.getResource("/test.keystore.jks").getPath()); + props.put("ssl.keystore.password", "test-ks-passwd"); + props.put("ssl.key.password", "test-key-passwd"); + props.put("ssl.truststore.location", KafkaTestServer.class.getResource("/test.truststore.jks").getPath()); + props.put("ssl.truststore.password", "test-ts-passwd"); + props.put("security.inter.broker.protocol", "SSL"); + props.put("ssl.client.auth", "required"); + props.put("kafka.superusers", "User:CN=superuser;User:CN=superuser1; User:CN=Superuser2 "); + } + + private void createKafkaServer() throws UnknownHostException { + Properties props = new Properties(); + setupKafkaProps(props); + kafkaServer = KafkaServerStartable.fromProps(props); + } + + private void createZkServer() throws Exception { + try { + zkServer = new EmbeddedZkServer(zkPort); + zkPort = zkServer.getZk().getClientPort(); + } catch (Exception e) { + LOGGER.error("Failed to create testing zookeeper server."); + throw new RuntimeException(e); + } + } + + public String getBootstrapServers() { + return "localhost:" + kafkaPort; + } +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/TestUtils.java ---------------------------------------------------------------------- diff --git a/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/TestUtils.java b/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/TestUtils.java new file mode 100644 index 0000000..dda4047 --- /dev/null +++ b/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/TestUtils.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.tests.e2e.kafka; + +import java.io.IOException; +import java.net.ServerSocket; + +public class TestUtils { + public static int getFreePort() throws IOException { + synchronized (TestUtils.class) { + ServerSocket serverSocket = new ServerSocket(0); + return serverSocket.getLocalPort(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/AbstractKafkaSentryTestBase.java ---------------------------------------------------------------------- diff --git a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/AbstractKafkaSentryTestBase.java b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/AbstractKafkaSentryTestBase.java new file mode 100644 index 0000000..a2cfa28 --- /dev/null +++ b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/AbstractKafkaSentryTestBase.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.tests.e2e.kafka; + +import com.google.common.base.Joiner; +import com.google.common.collect.Sets; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.sentry.core.model.kafka.Cluster; +import org.apache.sentry.core.model.kafka.KafkaActionConstant; +import org.apache.sentry.core.model.kafka.Host; +import org.apache.sentry.kafka.conf.KafkaAuthConf; +import org.apache.sentry.provider.db.generic.SentryGenericProviderBackend; +import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClient; +import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientFactory; +import org.apache.sentry.provider.db.generic.service.thrift.TAuthorizable; +import org.apache.sentry.provider.db.generic.service.thrift.TSentryPrivilege; +import org.apache.sentry.provider.file.LocalGroupResourceAuthorizationProvider; +import org.apache.sentry.provider.file.PolicyFile; +import org.apache.sentry.service.thrift.SentryService; +import org.apache.sentry.service.thrift.SentryServiceFactory; +import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig; +import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.io.File; +import java.io.FileOutputStream; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeoutException; + +import static org.junit.Assert.assertTrue; + + +/** + * This class used to test the Kafka integration with Sentry. + */ +public class AbstractKafkaSentryTestBase { + + protected static final String COMPONENT = "kafka"; + protected static final String ADMIN_USER = "kafka"; + protected static final String ADMIN_GROUP = "group_kafka"; + protected static final String ADMIN_ROLE = "role_kafka"; + + protected static SentryService sentryServer; + protected static File sentrySitePath; + + protected static File baseDir; + protected static File dbDir; + protected static File policyFilePath; + + protected static PolicyFile policyFile; + + protected static String bootstrapServers = null; + protected static KafkaTestServer kafkaServer = null; + + @BeforeClass + public static void beforeTestEndToEnd() throws Exception { + setupConf(); + startSentryServer(); + setUserGroups(); + setAdminPrivilege(); + startKafkaServer(); + } + + @AfterClass + public static void afterTestEndToEnd() throws Exception { + stopSentryServer(); + stopKafkaServer(); + } + + private static void stopKafkaServer() { + if (kafkaServer != null) { + kafkaServer.shutdown(); + kafkaServer = null; + } + } + + private static void stopSentryServer() throws Exception { + if (sentryServer != null) { + sentryServer.stop(); + sentryServer = null; + } + + FileUtils.deleteDirectory(baseDir); + } + + public static void setupConf() throws Exception { + baseDir = createTempDir(); + sentrySitePath = new File(baseDir, "sentry-site.xml"); + dbDir = new File(baseDir, "sentry_policy_db"); + policyFilePath = new File(baseDir, "local_policy_file.ini"); + policyFile = new PolicyFile(); + + /** set the configuratoion for Sentry Service */ + Configuration conf = new Configuration(); + + conf.set(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_NONE); + conf.set(ServerConfig.SENTRY_VERIFY_SCHEM_VERSION, "false"); + conf.set(ServerConfig.ADMIN_GROUPS, Joiner.on(",").join(ADMIN_GROUP, + UserGroupInformation.getLoginUser().getPrimaryGroupName())); + conf.set(ServerConfig.RPC_PORT, String.valueOf(TestUtils.getFreePort())); + conf.set(ServerConfig.RPC_ADDRESS, NetUtils.createSocketAddr( + InetAddress.getLocalHost().getHostAddress() + ":" + conf.get(ServerConfig.RPC_PORT)) + .getAddress().getCanonicalHostName()); + conf.set(ServerConfig.SENTRY_STORE_JDBC_URL, + "jdbc:derby:;databaseName=" + dbDir.getPath() + ";create=true"); + conf.set(ServerConfig.SENTRY_STORE_JDBC_PASS, "dummy"); + conf.set(ServerConfig.SENTRY_STORE_GROUP_MAPPING, + ServerConfig.SENTRY_STORE_LOCAL_GROUP_MAPPING); + conf.set(ServerConfig.SENTRY_STORE_GROUP_MAPPING_RESOURCE, + policyFilePath.getPath()); + sentryServer = new SentryServiceFactory().create(conf); + } + + public static File createTempDir() { + File baseDir = new File(System.getProperty("java.io.tmpdir")); + String baseName = "kafka-e2e-"; + File tempDir = new File(baseDir, baseName + UUID.randomUUID().toString()); + if (tempDir.mkdir()) { + return tempDir; + } + throw new IllegalStateException("Failed to create temp directory"); + } + + public static void startSentryServer() throws Exception { + sentryServer.start(); + final long start = System.currentTimeMillis(); + while(!sentryServer.isRunning()) { + Thread.sleep(1000); + if(System.currentTimeMillis() - start > 60000L) { + throw new TimeoutException("Server did not start after 60 seconds"); + } + } + } + + public static void setUserGroups() throws Exception { + for (String user : StaticUserGroupRole.getUsers()) { + Set<String> groups = StaticUserGroupRole.getGroups(user); + policyFile.addGroupsToUser(user, + groups.toArray(new String[groups.size()])); + } + UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); + policyFile.addGroupsToUser(loginUser.getShortUserName(), loginUser.getGroupNames()); + + policyFile.write(policyFilePath); + } + + public static void setAdminPrivilege() throws Exception { + SentryGenericServiceClient sentryClient = null; + try { + /** grant all privilege to admin user */ + sentryClient = getSentryClient(); + sentryClient.createRoleIfNotExist(ADMIN_USER, ADMIN_ROLE, COMPONENT); + sentryClient.addRoleToGroups(ADMIN_USER, ADMIN_ROLE, COMPONENT, Sets.newHashSet(ADMIN_GROUP)); + final ArrayList<TAuthorizable> authorizables = new ArrayList<TAuthorizable>(); + Host host = new Host(InetAddress.getLocalHost().getHostName()); + authorizables.add(new TAuthorizable(host.getTypeName(), host.getName())); + Cluster cluster = new Cluster(); + authorizables.add(new TAuthorizable(cluster.getTypeName(), cluster.getName())); + sentryClient.grantPrivilege(ADMIN_USER, ADMIN_ROLE, COMPONENT, + new TSentryPrivilege(COMPONENT, "kafka", authorizables, + KafkaActionConstant.ALL)); + } finally { + if (sentryClient != null) { + sentryClient.close(); + } + } + } + + protected static SentryGenericServiceClient getSentryClient() throws Exception { + return SentryGenericServiceClientFactory.create(getClientConfig()); + } + + public static void assertCausedMessage(Exception e, String message) { + if (e.getCause() != null) { + assertTrue("Expected message: " + message + ", but got: " + e.getCause().getMessage(), e.getCause().getMessage().contains(message)); + } else { + assertTrue("Expected message: " + message + ", but got: " + e.getMessage(), e.getMessage().contains(message)); + } + } + + private static Configuration getClientConfig() { + Configuration conf = new Configuration(); + /** set the Sentry client configuration for Kafka Service integration */ + conf.set(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_NONE); + conf.set(ClientConfig.SERVER_RPC_ADDRESS, sentryServer.getAddress().getHostName()); + conf.set(ClientConfig.SERVER_RPC_PORT, String.valueOf(sentryServer.getAddress().getPort())); + + conf.set(KafkaAuthConf.AuthzConfVars.AUTHZ_PROVIDER.getVar(), + LocalGroupResourceAuthorizationProvider.class.getName()); + conf.set(KafkaAuthConf.AuthzConfVars.AUTHZ_PROVIDER_BACKEND.getVar(), + SentryGenericProviderBackend.class.getName()); + conf.set(KafkaAuthConf.AuthzConfVars.AUTHZ_PROVIDER_RESOURCE.getVar(), policyFilePath.getPath()); + return conf; + } + + private static void startKafkaServer() throws Exception { + // Workaround for SentryKafkaAuthorizer to be added to classpath + Class.forName("org.apache.sentry.kafka.authorizer.SentryKafkaAuthorizer"); + getClientConfig().writeXml(new FileOutputStream(sentrySitePath)); + + kafkaServer = new KafkaTestServer(sentrySitePath); + kafkaServer.start(); + bootstrapServers = kafkaServer.getBootstrapServers(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/StaticUserGroupRole.java ---------------------------------------------------------------------- diff --git a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/StaticUserGroupRole.java b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/StaticUserGroupRole.java new file mode 100644 index 0000000..96b7cf4 --- /dev/null +++ b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/StaticUserGroupRole.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.tests.e2e.kafka; + +import com.google.common.collect.Sets; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class StaticUserGroupRole { + public static final String SUPERUSER = "superuser"; + public static final String USER_1 = "user1"; + public static final String USER_2 = "user2"; + public static final String USER_KAFKA = "kafka"; + + public static final String GROUP_0 = "group0"; + public static final String GROUP_1 = "group1"; + public static final String GROUP_2 = "group2"; + public static final String GROUP_KAFKA = "group_kafka"; + + public static final String ROLE_0 = "role0"; + public static final String ROLE_1 = "role1"; + public static final String ROLE_2 = "role2"; + + private static Map<String, Set<String>> userToGroupsMapping = + new HashMap<String, Set<String>>(); + + static { + userToGroupsMapping.put(SUPERUSER, Sets.newHashSet(GROUP_0)); + userToGroupsMapping.put(USER_1, Sets.newHashSet(GROUP_1)); + userToGroupsMapping.put(USER_2, Sets.newHashSet(GROUP_2)); + userToGroupsMapping.put(USER_KAFKA, Sets.newHashSet(GROUP_KAFKA)); + } + + public static Set<String> getUsers() { + return userToGroupsMapping.keySet(); + } + + public static Set<String> getGroups(String user) { + return userToGroupsMapping.get(user); + } +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAclsCrud.java ---------------------------------------------------------------------- diff --git a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAclsCrud.java b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAclsCrud.java new file mode 100644 index 0000000..135d362 --- /dev/null +++ b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAclsCrud.java @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.tests.e2e.kafka; + +import junit.framework.Assert; +import kafka.security.auth.Acl; +import kafka.security.auth.Allow$; +import kafka.security.auth.Operation$; +import kafka.security.auth.Resource; +import kafka.security.auth.ResourceType$; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.sentry.kafka.authorizer.SentryKafkaAuthorizer; +import org.apache.sentry.kafka.conf.KafkaAuthConf; +import org.junit.After; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.immutable.Map; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; + +public class TestAclsCrud extends AbstractKafkaSentryTestBase { + private static final Logger LOGGER = LoggerFactory.getLogger(TestAclsCrud.class); + private SentryKafkaAuthorizer sentryKafkaAuthorizer; + + @After + public void cleanUp() throws Exception { + sentryKafkaAuthorizer.dropAllRoles(); + if (sentryKafkaAuthorizer != null) { + sentryKafkaAuthorizer.close(); + sentryKafkaAuthorizer = null; + } + } + + + @Test + public void testAddAclsForNonExistentRole() { + sentryKafkaAuthorizer = new SentryKafkaAuthorizer(); + java.util.Map<String, String> configs = new HashMap<>(); + configs.put(KafkaAuthConf.SENTRY_KAFKA_SITE_URL, "file://" + sentrySitePath.getAbsolutePath()); + sentryKafkaAuthorizer.configure(configs); + + final String role1 = "role1"; + Set<Acl> acls = new HashSet<>(); + final Acl acl = new Acl(new KafkaPrincipal("role", role1), + Allow$.MODULE$, + "127.0.0.1", + Operation$.MODULE$.fromString("READ")); + acls.add(acl); + scala.collection.immutable.Set<Acl> aclsScala = scala.collection.JavaConversions.asScalaSet(acls).toSet(); + Resource resource = new Resource(ResourceType$.MODULE$.fromString("TOPIC"), "test-topic"); + try { + sentryKafkaAuthorizer.addAcls(aclsScala, resource); + } catch (Exception ex) { + assertCausedMessage(ex, "Can not add Acl for non-existent Role: role1"); + } + } + + @Test + public void testAddRole() { + sentryKafkaAuthorizer = new SentryKafkaAuthorizer(); + java.util.Map<String, String> configs = new HashMap<>(); + configs.put(KafkaAuthConf.SENTRY_KAFKA_SITE_URL, "file://" + sentrySitePath.getAbsolutePath()); + sentryKafkaAuthorizer.configure(configs); + + final String role1 = "role1"; + try { + sentryKafkaAuthorizer.addRole(role1); + } catch (Exception ex) { + Assert.fail("Failed to create role."); + } + } + + @Test + public void testAddExistingRole() { + sentryKafkaAuthorizer = new SentryKafkaAuthorizer(); + java.util.Map<String, String> configs = new HashMap<>(); + configs.put(KafkaAuthConf.SENTRY_KAFKA_SITE_URL, "file://" + sentrySitePath.getAbsolutePath()); + sentryKafkaAuthorizer.configure(configs); + + // Add role the first time + final String role1 = "role1"; + try { + sentryKafkaAuthorizer.addRole(role1); + } catch (Exception ex) { + Assert.fail("Failed to create role."); + } + + // Try adding same role again + try { + sentryKafkaAuthorizer.addRole(role1); + } catch (Exception ex) { + assertCausedMessage(ex, "Can not create an existing role, role1, again."); + } + } + + @Test + public void testAddAcls() { + sentryKafkaAuthorizer = new SentryKafkaAuthorizer(); + java.util.Map<String, String> configs = new HashMap<>(); + configs.put(KafkaAuthConf.SENTRY_KAFKA_SITE_URL, "file://" + sentrySitePath.getAbsolutePath()); + sentryKafkaAuthorizer.configure(configs); + + final String role1 = "role1"; + Set<Acl> acls = new HashSet<>(); + final Acl acl = new Acl(new KafkaPrincipal("role", role1), + Allow$.MODULE$, + "127.0.0.1", + Operation$.MODULE$.fromString("READ")); + acls.add(acl); + scala.collection.immutable.Set<Acl> aclsScala = scala.collection.JavaConversions.asScalaSet(acls).toSet(); + Resource resource = new Resource(ResourceType$.MODULE$.fromString("TOPIC"), "test-topic"); + + // Add role + try { + sentryKafkaAuthorizer.addRole(role1); + } catch (Exception ex) { + Assert.fail("Failed to create role."); + } + + // Add acl + try { + sentryKafkaAuthorizer.addAcls(aclsScala, resource); + } catch (Exception ex) { + Assert.fail("Failed to add acl."); + } + + final scala.collection.immutable.Set<Acl> obtainedAcls = sentryKafkaAuthorizer.getAcls(resource); + Assert.assertTrue("Obtained acls did not match expected Acls", obtainedAcls.contains(acl)); + } + + @Test + public void testAddRoleToGroups() { + sentryKafkaAuthorizer = new SentryKafkaAuthorizer(); + java.util.Map<String, String> configs = new HashMap<>(); + configs.put(KafkaAuthConf.SENTRY_KAFKA_SITE_URL, "file://" + sentrySitePath.getAbsolutePath()); + sentryKafkaAuthorizer.configure(configs); + + final String role1 = "role1"; + Set<Acl> acls = new HashSet<>(); + final Acl acl = new Acl(new KafkaPrincipal("role", role1), + Allow$.MODULE$, + "127.0.0.1", + Operation$.MODULE$.fromString("READ")); + acls.add(acl); + scala.collection.immutable.Set<Acl> aclsScala = scala.collection.JavaConversions.asScalaSet(acls).toSet(); + Resource resource = new Resource(ResourceType$.MODULE$.fromString("TOPIC"), "test-topic"); + + // Add role + try { + sentryKafkaAuthorizer.addRole(role1); + } catch (Exception ex) { + Assert.fail("Failed to create role."); + } + + // Add acl + try { + sentryKafkaAuthorizer.addAcls(aclsScala, resource); + } catch (Exception ex) { + Assert.fail("Failed to add acl."); + } + + // Add role to group + Set<String> groups = new HashSet<>(); + String group1 = "group1"; + groups.add(group1); + try { + sentryKafkaAuthorizer.addRoleToGroups(role1, groups); + } catch (Exception ex) { + throw ex; + } + + final scala.collection.immutable.Set<Acl> obtainedAcls = sentryKafkaAuthorizer.getAcls(new KafkaPrincipal("group", group1)).get(resource).get(); + Assert.assertTrue("Obtained acls did not match expected Acls", obtainedAcls.contains(acl)); + } + + @Test + public void testRemoveAclsByResource() { + sentryKafkaAuthorizer = new SentryKafkaAuthorizer(); + java.util.Map<String, String> configs = new HashMap<>(); + configs.put(KafkaAuthConf.SENTRY_KAFKA_SITE_URL, "file://" + sentrySitePath.getAbsolutePath()); + sentryKafkaAuthorizer.configure(configs); + + final String role1 = "role1"; + Set<Acl> acls = new HashSet<>(); + final KafkaPrincipal principal1 = new KafkaPrincipal("role", role1); + final Acl acl = new Acl(principal1, + Allow$.MODULE$, + "127.0.0.1", + Operation$.MODULE$.fromString("READ")); + acls.add(acl); + scala.collection.immutable.Set<Acl> aclsScala = scala.collection.JavaConversions.asScalaSet(acls).toSet(); + Resource resource = new Resource(ResourceType$.MODULE$.fromString("TOPIC"), "test-topic"); + + // Add role + try { + sentryKafkaAuthorizer.addRole(role1); + } catch (Exception ex) { + Assert.fail("Failed to create role."); + } + + // Add acl + try { + sentryKafkaAuthorizer.addAcls(aclsScala, resource); + } catch (Exception ex) { + Assert.fail("Failed to add acl."); + } + + // Add acl for different resource + Set<Acl> acls2 = new HashSet<>(); + final Acl acl2 = new Acl(principal1, + Allow$.MODULE$, + "127.0.0.1", + Operation$.MODULE$.fromString("WRITE")); + acls2.add(acl2); + scala.collection.immutable.Set<Acl> aclsScala2 = scala.collection.JavaConversions.asScalaSet(acls2).toSet(); + Resource resource2 = new Resource(ResourceType$.MODULE$.fromString("CLUSTER"), "test-cluster"); + try { + sentryKafkaAuthorizer.addAcls(aclsScala2, resource2); + } catch (Exception ex) { + Assert.fail("Failed to add second acl."); + } + + try { + sentryKafkaAuthorizer.removeAcls(resource); + } catch (Exception ex) { + Assert.fail("Failed to remove acls for resource."); + } + + final Map<Resource, scala.collection.immutable.Set<Acl>> obtainedAcls = sentryKafkaAuthorizer.getAcls(principal1); + Assert.assertTrue("Obtained acls must not contain acl for removed resource's acls.", !obtainedAcls.keySet().contains(resource)); + Assert.assertTrue("Obtained acls must contain acl for resource2.", obtainedAcls.keySet().contains(resource2)); + Assert.assertTrue("Obtained acl does not match expected acl.", obtainedAcls.get(resource2).get().contains(acl2)); + } + + @Test + public void testRemoveAclsByAclsAndResource() { + sentryKafkaAuthorizer = new SentryKafkaAuthorizer(); + java.util.Map<String, String> configs = new HashMap<>(); + configs.put(KafkaAuthConf.SENTRY_KAFKA_SITE_URL, "file://" + sentrySitePath.getAbsolutePath()); + sentryKafkaAuthorizer.configure(configs); + + final String role1 = "role1"; + Set<Acl> acls = new HashSet<>(); + final KafkaPrincipal principal1 = new KafkaPrincipal("role", role1); + final Acl acl = new Acl(principal1, + Allow$.MODULE$, + "127.0.0.1", + Operation$.MODULE$.fromString("READ")); + acls.add(acl); + scala.collection.immutable.Set<Acl> aclsScala = scala.collection.JavaConversions.asScalaSet(acls).toSet(); + Resource resource = new Resource(ResourceType$.MODULE$.fromString("TOPIC"), "test-topic"); + + // Add role + try { + sentryKafkaAuthorizer.addRole(role1); + } catch (Exception ex) { + Assert.fail("Failed to create role."); + } + + // Add acl + try { + sentryKafkaAuthorizer.addAcls(aclsScala, resource); + } catch (Exception ex) { + Assert.fail("Failed to add acl."); + } + + // Add another acl to same resource + Set<Acl> acls01 = new HashSet<>(); + final Acl acl01 = new Acl(principal1, + Allow$.MODULE$, + "127.0.0.1", + Operation$.MODULE$.fromString("DESCRIBE")); + acls01.add(acl01); + scala.collection.immutable.Set<Acl> aclsScala01 = scala.collection.JavaConversions.asScalaSet(acls01).toSet(); + try { + sentryKafkaAuthorizer.addAcls(aclsScala01, resource); + } catch (Exception ex) { + Assert.fail("Failed to add acl."); + } + + + // Add acl for different resource + Set<Acl> acls2 = new HashSet<>(); + final Acl acl2 = new Acl(principal1, + Allow$.MODULE$, + "127.0.0.1", + Operation$.MODULE$.fromString("WRITE")); + acls2.add(acl2); + scala.collection.immutable.Set<Acl> aclsScala2 = scala.collection.JavaConversions.asScalaSet(acls2).toSet(); + Resource resource2 = new Resource(ResourceType$.MODULE$.fromString("CLUSTER"), "test-cluster"); + try { + sentryKafkaAuthorizer.addAcls(aclsScala2, resource2); + } catch (Exception ex) { + Assert.fail("Failed to add second acl."); + } + + // Remove acls + try { + sentryKafkaAuthorizer.removeAcls(aclsScala, resource); + } catch (Exception ex) { + Assert.fail("Failed to remove acls for resource."); + } + + final Map<Resource, scala.collection.immutable.Set<Acl>> obtainedAcls = sentryKafkaAuthorizer.getAcls(principal1); + Assert.assertTrue("Obtained acls must contain acl for resource.", obtainedAcls.keySet().contains(resource)); + Assert.assertTrue("Obtained acls must contain acl for resource2.", obtainedAcls.keySet().contains(resource2)); + Assert.assertTrue("Obtained acl must not contain removed acl for resource.", !obtainedAcls.get(resource).get().contains(acl)); + Assert.assertTrue("Obtained acl does not match expected acl for resource.", obtainedAcls.get(resource).get().contains(acl01)); + Assert.assertTrue("Obtained acl does not match expected acl for resource2.", obtainedAcls.get(resource2).get().contains(acl2)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-tests/sentry-tests-kafka/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/sentry-tests/sentry-tests-kafka/src/test/resources/log4j.properties b/sentry-tests/sentry-tests-kafka/src/test/resources/log4j.properties new file mode 100644 index 0000000..5f52884 --- /dev/null +++ b/sentry-tests/sentry-tests-kafka/src/test/resources/log4j.properties @@ -0,0 +1,38 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Define some default values that can be overridden by system properties. +# +# For testing, it may also be convenient to specify + +sentry.root.logger=DEBUG,console +log4j.rootLogger=${sentry.root.logger} + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.out +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d (%t) [%p - %l] %m%n + +log4g.logger.kafka.utils.Logging=WARN +log4j.logger.org.apache.kafka=WARN +log4j.logger.org.apache.sentry=DEBUG +log4j.logger.org.apache.zookeeper=WARN +log4j.logger.org.I0Itec.zkclient=WARN +log4j.logger.org.apache.hadoop=WARN +log4j.category.DataNucleus=OFF \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-tests/sentry-tests-kafka/src/test/resources/test.crt ---------------------------------------------------------------------- diff --git a/sentry-tests/sentry-tests-kafka/src/test/resources/test.crt b/sentry-tests/sentry-tests-kafka/src/test/resources/test.crt new file mode 100644 index 0000000..fd6c902 --- /dev/null +++ b/sentry-tests/sentry-tests-kafka/src/test/resources/test.crt @@ -0,0 +1,15 @@ +-----BEGIN CERTIFICATE----- +MIICxzCCAa+gAwIBAgIEK13qfTANBgkqhkiG9w0BAQsFADAUMRIwEAYDVQQDEwlzdXBlcnVzZXIw +HhcNMTUxMjE1MjMzNTAzWhcNMTYwMzE0MjMzNTAzWjAUMRIwEAYDVQQDEwlzdXBlcnVzZXIwggEi +MA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQChGUnirhdFKW6OXbPBqQ1tWEFrxvCHr51uVU9H +V2aqO+Q02a+Vzyb24dzyqnbM5uOeGqAyTFXpCPOK0oxTCvf/0idmHIcgt40797I7rxWDJw9/wYos +UGkqizAb878LaFScIo6Phu6zjdj/J16vd5KiWN5pzOLnwO8DebzO5s+N34VuNZ8s45zemq2bES9Z +z8mMolTkZS4d8wGExC93n5oiNrPGUneKRZJYukv3SiDMajaOTqnI4Xo/LIs3dynq8dTBQPTtUwnA +UZz8kpew6PfxDYYHjg2eHli/6Dopmur/R27xuxn5VnJHnxgL5mbxrRgAidGN6CwJFA7ZxSBn67pr +AgMBAAGjITAfMB0GA1UdDgQWBBTxczVGKoS4NuNIPlS4yJfm8fSj3zANBgkqhkiG9w0BAQsFAAOC +AQEAC4PSVAzUVGqhESIGDpJ6kbHzw/wBUmrjceTDQv9cVPNrHlMWoG67nM45tECWud3osB57nunV +vcwSNXxhf4M+IPK1BoT2awUjEfWN+F7guxFXpU2lQpmHPj+015g9pGvvneRLZj8VfdFo8PuyDeRy +V0HuG7xJ2xZMM8XpgL9BHrgD/4CITzRkaHnyuYb+Yz5GUFYOpLn0ANNm3gfW+eMiE/38zc+o23wJ +V49hAKGqalJUATWVzq7iCqTqxeIQ2RQyJ9O5p82Y5CIG1Tp07zdCPVqkKz7NAbt2K0ZW5/5qc5V/ +y88rnXWj9nZPYwyVj5rxqB8h2WDLDmxr1JuwuMOlYw== +-----END CERTIFICATE----- http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-tests/sentry-tests-kafka/src/test/resources/user1.crt ---------------------------------------------------------------------- diff --git a/sentry-tests/sentry-tests-kafka/src/test/resources/user1.crt b/sentry-tests/sentry-tests-kafka/src/test/resources/user1.crt new file mode 100644 index 0000000..5cb6caa --- /dev/null +++ b/sentry-tests/sentry-tests-kafka/src/test/resources/user1.crt @@ -0,0 +1,15 @@ +-----BEGIN CERTIFICATE----- +MIICvzCCAaegAwIBAgIEWaKEszANBgkqhkiG9w0BAQsFADAQMQ4wDAYDVQQDEwV1c2VyMTAeFw0x +NTEyMTUyMzQyNTlaFw0xNjAzMTQyMzQyNTlaMBAxDjAMBgNVBAMTBXVzZXIxMIIBIjANBgkqhkiG +9w0BAQEFAAOCAQ8AMIIBCgKCAQEAgDzGn4VvJnROVCC+CR77DfqmF1wkNUrOiaLL9qufoRi9DuZU +epmqebg0YyCQVyuIUe1p7qhnOGNnFN0nJC75C4MbCDX/s2+gxUBb6iaP7pwmdKzprvP3YGQrQXo/ +pv+zV9EH1P5JP+27B6NVGTGJPUP4UqZF2uyhNOHIcB9sMvZTnyfDLs+8o9dCv3bFPpwEGZnk3I1I +xD1cYSz+qb3E3M68L6cFVSo1qnK0QN8eBXXB/ljCHaQ47jLfZrJjjiRKA1YOnY+sRCbQDv4wU+dc +oOenLzLikrMdVyONokbkneS/LnwjmNev2i9I9NA0D3bZvJuN/DkuQ245iXgdnqOvJwIDAQABoyEw +HzAdBgNVHQ4EFgQUfzocV1Og4CsGte7Ux4luCVA3TTYwDQYJKoZIhvcNAQELBQADggEBAEeemqwJ +eY/GahjPesuyJKiIfH4MgMGvZ19441WnKG1CuHrtwMES8Znxc+iS4hutPX6I/Mvb9HMg8M3u9B7W +1dj4QOvi5iZuWqrV2bhBrFoUV7fXPjjMfu15i/CX5Lfu56cBeyKshq674rQ4AWn1k5saxa6Jhaao +6ceFfnTldgVSSS0rBFyz1fBj7dLXnS8MmxN0cmDO1jVXu2Tfjw0ofRmLxD1SCMEwrNEcERRUWudm +nIy1Q14xCYmTnGEf9uG8TmHO/y5Elc/jcMN2mGwb8N0FIV7nh1HLyAmR6O7JPrQ3QWR4Vr5tMH/K +3b9N51c0enX9UZedGYVc+qlLJ/P6B5w= +-----END CERTIFICATE----- http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/e6a2f13a/sentry-tests/sentry-tests-kafka/src/test/resources/user2.crt ---------------------------------------------------------------------- diff --git a/sentry-tests/sentry-tests-kafka/src/test/resources/user2.crt b/sentry-tests/sentry-tests-kafka/src/test/resources/user2.crt new file mode 100644 index 0000000..d0b0820 --- /dev/null +++ b/sentry-tests/sentry-tests-kafka/src/test/resources/user2.crt @@ -0,0 +1,15 @@ +-----BEGIN CERTIFICATE----- +MIICvzCCAaegAwIBAgIEC6qUijANBgkqhkiG9w0BAQsFADAQMQ4wDAYDVQQDEwV1c2VyMjAeFw0x +NTEyMTUyMzQ0MjVaFw0xNjAzMTQyMzQ0MjVaMBAxDjAMBgNVBAMTBXVzZXIyMIIBIjANBgkqhkiG +9w0BAQEFAAOCAQ8AMIIBCgKCAQEAhm2vitVj2xApz7ZtaWNcqegodc9nFY+HCcIx2WqoUzQTXZ8q +Fm6H6blKrL+xJXY7ZlEB8nMdfWFfOdS2zX6hutkstkwId5MSceWUb5GUzdClUQAS8DGMtQdU3LlY +EcIgz9fim6/Ad0ZIKwyAc47HJLd/nQOozAaDDnWdLbhRymv/PNEt5IndkeTfbFd1uWgpV9vhfLWN +3FmXOksVoIKR+l9YBOmAUIjstK2Tq8b/q4Dbcp82X1nPW12fG2FlowgolWEOlaCbSGwN60LjoP69 +1azAFU5IPaxmQ46oZpb7jMCRrHgdx+zhjRxjY9PpTCYWdtBHqnLyuckl/mpOxS64vwIDAQABoyEw +HzAdBgNVHQ4EFgQUHaTI3Xl/CjJLhVCZto5ZJBCTaLUwDQYJKoZIhvcNAQELBQADggEBAEg/SxvT ++NLmh7tWF0QZR2S6wl+UgJIqiS6NlEk3Te6TdPda2t2K8cmFndBcAmZqvLkz7dIkeDwa507SbrTg +NJXcOycpH1s15VjiVRF8dXqflLCEcBUNw8h4AENsdVcNKliR+YXLk1i/x5jVfncQps6Zxj68NFoN +h6tf7KyBHT4DvekYocjdXDQ/tPdvPqokYIM/q0K7NRZvDg6yUYukkFjta9D9623PwydtA/t75AEb +zOJra5A6qp/qo/U1UyLzEkwSlWaLaOa7MrNaFy/OQbkVncP+6jFCIXlWpQ+TqyUmTfwmL+A2oJWW +l3Ziy62zAfuaJ1EwY4zwFlZHJR4lF7E= +-----END CERTIFICATE-----