http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java index c6600a0..15f7359 100644 --- a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java +++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java @@ -42,16 +42,19 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.sentry.SentryUserException; import org.apache.sentry.core.common.ActiveRoleSet; import org.apache.sentry.core.common.Authorizable; +import org.apache.sentry.core.common.Model; import org.apache.sentry.core.common.Subject; import org.apache.sentry.core.model.kafka.KafkaActionFactory; import org.apache.sentry.core.model.kafka.KafkaActionFactory.KafkaAction; import org.apache.sentry.core.model.kafka.KafkaAuthorizable; +import org.apache.sentry.core.model.kafka.KafkaPrivilegeModel; import org.apache.sentry.kafka.ConvertUtil; import org.apache.sentry.kafka.conf.KafkaAuthConf.AuthzConfVars; import org.apache.sentry.policy.common.PolicyEngine; import org.apache.sentry.provider.common.AuthorizationComponent; import org.apache.sentry.provider.common.AuthorizationProvider; import org.apache.sentry.provider.common.ProviderBackend; +import org.apache.sentry.provider.common.ProviderBackendContext; import org.apache.sentry.provider.db.generic.SentryGenericProviderBackend; import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClient; import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClientFactory; @@ -72,491 +75,497 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY public class KafkaAuthBinding { - private static final Logger LOG = LoggerFactory.getLogger(KafkaAuthBinding.class); - private static final String COMPONENT_TYPE = AuthorizationComponent.KAFKA; - private static final String COMPONENT_NAME = COMPONENT_TYPE; + private static final Logger LOG = LoggerFactory.getLogger(KafkaAuthBinding.class); + private static final String COMPONENT_TYPE = AuthorizationComponent.KAFKA; + private static final String COMPONENT_NAME = COMPONENT_TYPE; - private static Boolean kerberosInit; + private static Boolean kerberosInit; - private final Configuration authConf; - private final AuthorizationProvider authProvider; - private final KafkaActionFactory actionFactory = KafkaActionFactory.getInstance(); + private final Configuration authConf; + private final AuthorizationProvider authProvider; + private final KafkaActionFactory actionFactory = KafkaActionFactory.getInstance(); - private ProviderBackend providerBackend; - private String instanceName; - private String requestorName; - private java.util.Map<String, ?> kafkaConfigs; + private ProviderBackend providerBackend; + private String instanceName; + private String requestorName; + private java.util.Map<String, ?> kafkaConfigs; - public KafkaAuthBinding(String instanceName, String requestorName, Configuration authConf, java.util.Map<String, ?> kafkaConfigs) throws Exception { - this.instanceName = instanceName; - this.requestorName = requestorName; - this.authConf = authConf; - this.kafkaConfigs = kafkaConfigs; - this.authProvider = createAuthProvider(); - } + public KafkaAuthBinding(String instanceName, String requestorName, Configuration authConf, java.util.Map<String, ?> kafkaConfigs) throws Exception { + this.instanceName = instanceName; + this.requestorName = requestorName; + this.authConf = authConf; + this.kafkaConfigs = kafkaConfigs; + this.authProvider = createAuthProvider(); + } + /** + * Instantiate the configured authz provider + * + * @return {@link AuthorizationProvider} + */ + private AuthorizationProvider createAuthProvider() throws Exception { /** - * Instantiate the configured authz provider - * - * @return {@link AuthorizationProvider} + * get the authProvider class, policyEngine class, providerBackend class and resources from the + * kafkaAuthConf config */ - private AuthorizationProvider createAuthProvider() throws Exception { - /** - * get the authProvider class, policyEngine class, providerBackend class and resources from the - * kafkaAuthConf config - */ - String authProviderName = - authConf.get(AuthzConfVars.AUTHZ_PROVIDER.getVar(), - AuthzConfVars.AUTHZ_PROVIDER.getDefault()); - String resourceName = - authConf.get(AuthzConfVars.AUTHZ_PROVIDER_RESOURCE.getVar(), - AuthzConfVars.AUTHZ_PROVIDER_RESOURCE.getDefault()); - String providerBackendName = - authConf.get(AuthzConfVars.AUTHZ_PROVIDER_BACKEND.getVar(), - AuthzConfVars.AUTHZ_PROVIDER_BACKEND.getDefault()); - String policyEngineName = - authConf.get(AuthzConfVars.AUTHZ_POLICY_ENGINE.getVar(), - AuthzConfVars.AUTHZ_POLICY_ENGINE.getDefault()); - if (resourceName != null && resourceName.startsWith("classpath:")) { - String resourceFileName = resourceName.substring("classpath:".length()); - resourceName = AuthorizationProvider.class.getClassLoader().getResource(resourceFileName).getPath(); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Using authorization provider " + authProviderName + " with resource " - + resourceName + ", policy engine " + policyEngineName + ", provider backend " - + providerBackendName); - } - - // Initiate kerberos via UserGroupInformation if required - if (ServiceConstants.ServerConfig.SECURITY_MODE_KERBEROS.equals(authConf.get(ServiceConstants.ServerConfig.SECURITY_MODE)) - && kafkaConfigs != null) { - String keytabProp = kafkaConfigs.get(AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar()).toString(); - String principalProp = kafkaConfigs.get(AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar()).toString(); - if (keytabProp != null && principalProp != null) { - String actualHost = kafkaConfigs.get(AuthzConfVars.AUTHZ_PRINCIPAL_HOSTNAME.getVar()).toString(); - if (actualHost != null) { - principalProp = SecurityUtil.getServerPrincipal(principalProp, actualHost); - } - initKerberos(keytabProp, principalProp); - } else { - LOG.debug("Could not initialize Kerberos.\n" + - AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar() + " set to " + kafkaConfigs.get(AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar()).toString() + "\n" + - AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar() + " set to " + kafkaConfigs.get(AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar()).toString()); - } - } else { - LOG.debug("Could not initialize Kerberos as no kafka config provided. " + - AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar() + " and " + AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar() + - " are required configs to be able to initialize Kerberos"); - } - - // Instantiate the configured providerBackend - Constructor<?> providerBackendConstructor = - Class.forName(providerBackendName) - .getDeclaredConstructor(Configuration.class, String.class); - providerBackendConstructor.setAccessible(true); - providerBackend = - (ProviderBackend) providerBackendConstructor.newInstance(new Object[]{authConf, - resourceName}); - if (providerBackend instanceof SentryGenericProviderBackend) { - ((SentryGenericProviderBackend) providerBackend).setComponentType(COMPONENT_TYPE); - ((SentryGenericProviderBackend) providerBackend).setServiceName(instanceName); - } - - // Instantiate the configured policyEngine - Constructor<?> policyConstructor = - Class.forName(policyEngineName).getDeclaredConstructor(ProviderBackend.class); - policyConstructor.setAccessible(true); - PolicyEngine policyEngine = - (PolicyEngine) policyConstructor.newInstance(new Object[]{providerBackend}); - - // Instantiate the configured authProvider - Constructor<?> constructor = - Class.forName(authProviderName).getDeclaredConstructor(Configuration.class, String.class, - PolicyEngine.class); - constructor.setAccessible(true); - return (AuthorizationProvider) constructor.newInstance(new Object[]{authConf, resourceName, - policyEngine}); + String authProviderName = + authConf.get(AuthzConfVars.AUTHZ_PROVIDER.getVar(), + AuthzConfVars.AUTHZ_PROVIDER.getDefault()); + String resourceName = + authConf.get(AuthzConfVars.AUTHZ_PROVIDER_RESOURCE.getVar(), + AuthzConfVars.AUTHZ_PROVIDER_RESOURCE.getDefault()); + String providerBackendName = + authConf.get(AuthzConfVars.AUTHZ_PROVIDER_BACKEND.getVar(), + AuthzConfVars.AUTHZ_PROVIDER_BACKEND.getDefault()); + String policyEngineName = + authConf.get(AuthzConfVars.AUTHZ_POLICY_ENGINE.getVar(), + AuthzConfVars.AUTHZ_POLICY_ENGINE.getDefault()); + if (resourceName != null && resourceName.startsWith("classpath:")) { + String resourceFileName = resourceName.substring("classpath:".length()); + resourceName = AuthorizationProvider.class.getClassLoader().getResource(resourceFileName).getPath(); } - - /** - * Authorize access to a Kafka privilege - */ - public boolean authorize(RequestChannel.Session session, Operation operation, Resource resource) { - List<Authorizable> authorizables = ConvertUtil.convertResourceToAuthorizable(session.clientAddress().getHostAddress(), resource); - Set<KafkaAction> actions = Sets.newHashSet(actionFactory.getActionByName(operation.name())); - return authProvider.hasAccess(new Subject(getName(session)), authorizables, actions, ActiveRoleSet.ALL); + if (LOG.isDebugEnabled()) { + LOG.debug("Using authorization provider " + authProviderName + " with resource " + + resourceName + ", policy engine " + policyEngineName + ", provider backend " + + providerBackendName); } - public void addAcls(scala.collection.immutable.Set<Acl> acls, final Resource resource) { - verifyAcls(acls); - LOG.info("Adding Acl: acl->" + acls + " resource->" + resource); - - final Iterator<Acl> iterator = acls.iterator(); - while (iterator.hasNext()) { - final Acl acl = iterator.next(); - final String role = getRole(acl); - if (!roleExists(role)) { - throw new KafkaException("Can not add Acl for non-existent Role: " + role); - } - execute(new Command<Void>() { - @Override - public Void run(SentryGenericServiceClient client) throws Exception { - client.grantPrivilege( - requestorName, role, COMPONENT_NAME, toTSentryPrivilege(acl, resource)); - return null; - } - }); - } + // Initiate kerberos via UserGroupInformation if required + if (ServiceConstants.ServerConfig.SECURITY_MODE_KERBEROS.equals(authConf.get(ServiceConstants.ServerConfig.SECURITY_MODE)) + && kafkaConfigs != null) { + String keytabProp = kafkaConfigs.get(AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar()).toString(); + String principalProp = kafkaConfigs.get(AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar()).toString(); + if (keytabProp != null && principalProp != null) { + String actualHost = kafkaConfigs.get(AuthzConfVars.AUTHZ_PRINCIPAL_HOSTNAME.getVar()).toString(); + if (actualHost != null) { + principalProp = SecurityUtil.getServerPrincipal(principalProp, actualHost); + } + initKerberos(keytabProp, principalProp); + } else { + LOG.debug("Could not initialize Kerberos.\n" + + AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar() + " set to " + kafkaConfigs.get(AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar()).toString() + "\n" + + AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar() + " set to " + kafkaConfigs.get(AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar()).toString()); + } + } else { + LOG.debug("Could not initialize Kerberos as no kafka config provided. " + + AuthzConfVars.AUTHZ_KEYTAB_FILE_NAME.getVar() + " and " + AuthzConfVars.AUTHZ_PRINCIPAL_NAME.getVar() + + " are required configs to be able to initialize Kerberos"); } - public boolean removeAcls(scala.collection.immutable.Set<Acl> acls, final Resource resource) { - verifyAcls(acls); - LOG.info("Removing Acl: acl->" + acls + " resource->" + resource); - final Iterator<Acl> iterator = acls.iterator(); - while (iterator.hasNext()) { - final Acl acl = iterator.next(); - final String role = getRole(acl); - try { - execute(new Command<Void>() { - @Override - public Void run(SentryGenericServiceClient client) throws Exception { - client.dropPrivilege( - requestorName, role, toTSentryPrivilege(acl, resource)); - return null; - } - }); - } catch (KafkaException kex) { - LOG.error("Failed to remove acls.", kex); - return false; - } - } - - return true; + // Instantiate the configured providerBackend + Constructor<?> providerBackendConstructor = + Class.forName(providerBackendName) + .getDeclaredConstructor(Configuration.class, String.class); + providerBackendConstructor.setAccessible(true); + providerBackend = + (ProviderBackend) providerBackendConstructor.newInstance(new Object[]{authConf, + resourceName}); + if (providerBackend instanceof SentryGenericProviderBackend) { + ((SentryGenericProviderBackend) providerBackend).setComponentType(COMPONENT_TYPE); + ((SentryGenericProviderBackend) providerBackend).setServiceName(instanceName); } - public void addRole(final String role) { - if (roleExists(role)) { - throw new KafkaException("Can not create an existing role, " + role + ", again."); + // Create backend context + ProviderBackendContext context = new ProviderBackendContext(); + context.setAllowPerDatabase(false); + context.setValidators(KafkaPrivilegeModel.getInstance().getPrivilegeValidators()); + providerBackend.initialize(context); + + // Instantiate the configured policyEngine + Constructor<?> policyConstructor = + Class.forName(policyEngineName).getDeclaredConstructor(ProviderBackend.class); + policyConstructor.setAccessible(true); + PolicyEngine policyEngine = + (PolicyEngine) policyConstructor.newInstance(new Object[]{providerBackend}); + + // Instantiate the configured authProvider + Constructor<?> constructor = + Class.forName(authProviderName).getDeclaredConstructor(Configuration.class, String.class, + PolicyEngine.class, Model.class); + constructor.setAccessible(true); + return (AuthorizationProvider) constructor.newInstance(new Object[]{authConf, resourceName, + policyEngine, KafkaPrivilegeModel.getInstance()}); + } + + /** + * Authorize access to a Kafka privilege + */ + public boolean authorize(RequestChannel.Session session, Operation operation, Resource resource) { + List<Authorizable> authorizables = ConvertUtil.convertResourceToAuthorizable(session.clientAddress().getHostAddress(), resource); + Set<KafkaAction> actions = Sets.newHashSet(actionFactory.getActionByName(operation.name())); + return authProvider.hasAccess(new Subject(getName(session)), authorizables, actions, ActiveRoleSet.ALL); + } + + public void addAcls(scala.collection.immutable.Set<Acl> acls, final Resource resource) { + verifyAcls(acls); + LOG.info("Adding Acl: acl->" + acls + " resource->" + resource); + + final Iterator<Acl> iterator = acls.iterator(); + while (iterator.hasNext()) { + final Acl acl = iterator.next(); + final String role = getRole(acl); + if (!roleExists(role)) { + throw new KafkaException("Can not add Acl for non-existent Role: " + role); + } + execute(new Command<Void>() { + @Override + public Void run(SentryGenericServiceClient client) throws Exception { + client.grantPrivilege( + requestorName, role, COMPONENT_NAME, toTSentryPrivilege(acl, resource)); + return null; } - - execute(new Command<Void>() { - @Override - public Void run(SentryGenericServiceClient client) throws Exception { - client.createRole( - requestorName, role, COMPONENT_NAME); - return null; - } - }); - } - - public void addRoleToGroups(final String role, final java.util.Set<String> groups) { - execute(new Command<Void>() { - @Override - public Void run(SentryGenericServiceClient client) throws Exception { - client.addRoleToGroups( - requestorName, role, COMPONENT_NAME, groups); - return null; - } - }); + }); } - - public void dropAllRoles() { - final List<String> roles = getAllRoles(); + } + + public boolean removeAcls(scala.collection.immutable.Set<Acl> acls, final Resource resource) { + verifyAcls(acls); + LOG.info("Removing Acl: acl->" + acls + " resource->" + resource); + final Iterator<Acl> iterator = acls.iterator(); + while (iterator.hasNext()) { + final Acl acl = iterator.next(); + final String role = getRole(acl); + try { execute(new Command<Void>() { - @Override - public Void run(SentryGenericServiceClient client) throws Exception { - for (String role : roles) { - client.dropRole(requestorName, role, COMPONENT_NAME); - } - return null; - } + @Override + public Void run(SentryGenericServiceClient client) throws Exception { + client.dropPrivilege( + requestorName, role, toTSentryPrivilege(acl, resource)); + return null; + } }); + } catch (KafkaException kex) { + LOG.error("Failed to remove acls.", kex); + return false; + } } - private List<String> getRolesforGroup(final String groupName) { - final List<String> roles = new ArrayList<>(); - execute(new Command<Void>() { - @Override - public Void run(SentryGenericServiceClient client) throws Exception { - for (TSentryRole tSentryRole : client.listRolesByGroupName(requestorName, groupName, COMPONENT_NAME)) { - roles.add(tSentryRole.getRoleName()); - } - return null; - } - }); - - return roles; - } + return true; + } - private SentryGenericServiceClient getClient() throws Exception { - return SentryGenericServiceClientFactory.create(this.authConf); + public void addRole(final String role) { + if (roleExists(role)) { + throw new KafkaException("Can not create an existing role, " + role + ", again."); } - public boolean removeAcls(final Resource resource) { - LOG.info("Removing Acls for Resource: resource->" + resource); - List<String> roles = getAllRoles(); - final List<TSentryPrivilege> tSentryPrivileges = getAllPrivileges(roles); - try { - execute(new Command<Void>() { - @Override - public Void run(SentryGenericServiceClient client) throws Exception { - for (TSentryPrivilege tSentryPrivilege : tSentryPrivileges) { - if (isPrivilegeForResource(tSentryPrivilege, resource)) { - client.dropPrivilege( - requestorName, COMPONENT_NAME, tSentryPrivilege); - } - } - return null; - } - }); - } catch (KafkaException kex) { - LOG.error("Failed to remove acls.", kex); - return false; + execute(new Command<Void>() { + @Override + public Void run(SentryGenericServiceClient client) throws Exception { + client.createRole( + requestorName, role, COMPONENT_NAME); + return null; + } + }); + } + + public void addRoleToGroups(final String role, final java.util.Set<String> groups) { + execute(new Command<Void>() { + @Override + public Void run(SentryGenericServiceClient client) throws Exception { + client.addRoleToGroups( + requestorName, role, COMPONENT_NAME, groups); + return null; + } + }); + } + + public void dropAllRoles() { + final List<String> roles = getAllRoles(); + execute(new Command<Void>() { + @Override + public Void run(SentryGenericServiceClient client) throws Exception { + for (String role : roles) { + client.dropRole(requestorName, role, COMPONENT_NAME); } - - return true; - } - - public scala.collection.immutable.Set<Acl> getAcls(final Resource resource) { - final Option<scala.collection.immutable.Set<Acl>> acls = getAcls().get(resource); - if (acls.nonEmpty()) - return acls.get(); - return new scala.collection.immutable.HashSet<Acl>(); - } - - public Map<Resource, scala.collection.immutable.Set<Acl>> getAcls(KafkaPrincipal principal) { - if (principal.getPrincipalType().toLowerCase().equals("group")) { - List<String> roles = getRolesforGroup(principal.getName()); - return getAclsForRoles(roles); - } else { - LOG.info("Did not recognize Principal type: " + principal.getPrincipalType() + ". Returning Acls for all principals."); - return getAcls(); + return null; + } + }); + } + + private List<String> getRolesforGroup(final String groupName) { + final List<String> roles = new ArrayList<>(); + execute(new Command<Void>() { + @Override + public Void run(SentryGenericServiceClient client) throws Exception { + for (TSentryRole tSentryRole : client.listRolesByGroupName(requestorName, groupName, COMPONENT_NAME)) { + roles.add(tSentryRole.getRoleName()); } - } - - public Map<Resource, scala.collection.immutable.Set<Acl>> getAcls() { - final List<String> roles = getAllRoles(); - return getAclsForRoles(roles); - } - - /** - * A Command is a closure used to pass a block of code from individual - * functions to execute, which centralizes connection error - * handling. Command is parameterized on the return type of the function. - */ - private interface Command<T> { - T run(SentryGenericServiceClient client) throws Exception; - } - - private <T> T execute(Command<T> cmd) throws KafkaException { - SentryGenericServiceClient client = null; - try { - client = getClient(); - return cmd.run(client); - } catch (SentryUserException ex) { - String msg = "Unable to excute command on sentry server: " + ex.getMessage(); - LOG.error(msg, ex); - throw new KafkaException(msg, ex); - } catch (Exception ex) { - String msg = "Unable to obtain client:" + ex.getMessage(); - LOG.error(msg, ex); - throw new KafkaException(msg, ex); - } finally { - if (client != null) { - client.close(); + return null; + } + }); + + return roles; + } + + private SentryGenericServiceClient getClient() throws Exception { + return SentryGenericServiceClientFactory.create(this.authConf); + } + + public boolean removeAcls(final Resource resource) { + LOG.info("Removing Acls for Resource: resource->" + resource); + List<String> roles = getAllRoles(); + final List<TSentryPrivilege> tSentryPrivileges = getAllPrivileges(roles); + try { + execute(new Command<Void>() { + @Override + public Void run(SentryGenericServiceClient client) throws Exception { + for (TSentryPrivilege tSentryPrivilege : tSentryPrivileges) { + if (isPrivilegeForResource(tSentryPrivilege, resource)) { + client.dropPrivilege( + requestorName, COMPONENT_NAME, tSentryPrivilege); } + } + return null; } + }); + } catch (KafkaException kex) { + LOG.error("Failed to remove acls.", kex); + return false; } - private TSentryPrivilege toTSentryPrivilege(Acl acl, Resource resource) { - final List<Authorizable> authorizables = ConvertUtil.convertResourceToAuthorizable(acl.host(), resource); - final List<TAuthorizable> tAuthorizables = new ArrayList<>(); - for (Authorizable authorizable : authorizables) { - tAuthorizables.add(new TAuthorizable(authorizable.getTypeName(), authorizable.getName())); - } - TSentryPrivilege tSentryPrivilege = new TSentryPrivilege(COMPONENT_NAME, instanceName, tAuthorizables, acl.operation().name()); - return tSentryPrivilege; - } - - private String getRole(Acl acl) { - return acl.principal().getName(); - } - - private boolean isPrivilegeForResource(TSentryPrivilege tSentryPrivilege, Resource resource) { - final java.util.Iterator<TAuthorizable> authorizablesIterator = tSentryPrivilege.getAuthorizablesIterator(); - while (authorizablesIterator.hasNext()) { - TAuthorizable tAuthorizable = authorizablesIterator.next(); - if (tAuthorizable.getType().equals(resource.resourceType().name())) { - return true; - } - } - return false; + return true; + } + + public scala.collection.immutable.Set<Acl> getAcls(final Resource resource) { + final Option<scala.collection.immutable.Set<Acl>> acls = getAcls().get(resource); + if (acls.nonEmpty()) + return acls.get(); + return new scala.collection.immutable.HashSet<Acl>(); + } + + public Map<Resource, scala.collection.immutable.Set<Acl>> getAcls(KafkaPrincipal principal) { + if (principal.getPrincipalType().toLowerCase().equals("group")) { + List<String> roles = getRolesforGroup(principal.getName()); + return getAclsForRoles(roles); + } else { + LOG.info("Did not recognize Principal type: " + principal.getPrincipalType() + ". Returning Acls for all principals."); + return getAcls(); } - - private List<TSentryPrivilege> getAllPrivileges(final List<String> roles) { - final List<TSentryPrivilege> tSentryPrivileges = new ArrayList<>(); - execute(new Command<Void>() { - @Override - public Void run(SentryGenericServiceClient client) throws Exception { - for (String role : roles) { - tSentryPrivileges.addAll(client.listPrivilegesByRoleName( - requestorName, role, COMPONENT_NAME, instanceName)); - } - return null; - } - }); - - return tSentryPrivileges; + } + + public Map<Resource, scala.collection.immutable.Set<Acl>> getAcls() { + final List<String> roles = getAllRoles(); + return getAclsForRoles(roles); + } + + /** + * A Command is a closure used to pass a block of code from individual + * functions to execute, which centralizes connection error + * handling. Command is parameterized on the return type of the function. + */ + private interface Command<T> { + T run(SentryGenericServiceClient client) throws Exception; + } + + private <T> T execute(Command<T> cmd) throws KafkaException { + SentryGenericServiceClient client = null; + try { + client = getClient(); + return cmd.run(client); + } catch (SentryUserException ex) { + String msg = "Unable to excute command on sentry server: " + ex.getMessage(); + LOG.error(msg, ex); + throw new KafkaException(msg, ex); + } catch (Exception ex) { + String msg = "Unable to obtain client:" + ex.getMessage(); + LOG.error(msg, ex); + throw new KafkaException(msg, ex); + } finally { + if (client != null) { + client.close(); + } } + } - private List<String> getAllRoles() { - final List<String> roles = new ArrayList<>(); - execute(new Command<Void>() { - @Override - public Void run(SentryGenericServiceClient client) throws Exception { - for (TSentryRole tSentryRole : client.listAllRoles(requestorName, COMPONENT_NAME)) { - roles.add(tSentryRole.getRoleName()); - } - return null; - } - }); - - return roles; + private TSentryPrivilege toTSentryPrivilege(Acl acl, Resource resource) { + final List<Authorizable> authorizables = ConvertUtil.convertResourceToAuthorizable(acl.host(), resource); + final List<TAuthorizable> tAuthorizables = new ArrayList<>(); + for (Authorizable authorizable : authorizables) { + tAuthorizables.add(new TAuthorizable(authorizable.getTypeName(), authorizable.getName())); } - - private Map<Resource, scala.collection.immutable.Set<Acl>> getAclsForRoles(final List<String> roles) { - return scala.collection.JavaConverters.mapAsScalaMapConverter( - rolePrivilegesToResourceAcls(getRoleToPrivileges(roles))) - .asScala().toMap(Predef.<Tuple2<Resource, scala.collection.immutable.Set<Acl>>>conforms()); + TSentryPrivilege tSentryPrivilege = new TSentryPrivilege(COMPONENT_NAME, instanceName, tAuthorizables, acl.operation().name()); + return tSentryPrivilege; + } + + private String getRole(Acl acl) { + return acl.principal().getName(); + } + + private boolean isPrivilegeForResource(TSentryPrivilege tSentryPrivilege, Resource resource) { + final java.util.Iterator<TAuthorizable> authorizablesIterator = tSentryPrivilege.getAuthorizablesIterator(); + while (authorizablesIterator.hasNext()) { + TAuthorizable tAuthorizable = authorizablesIterator.next(); + if (tAuthorizable.getType().equals(resource.resourceType().name())) { + return true; + } } - - private java.util.Map<Resource, scala.collection.immutable.Set<Acl>> rolePrivilegesToResourceAcls(java.util.Map<String, scala.collection.immutable.Set<TSentryPrivilege>> rolePrivilegesMap) { - final java.util.Map<Resource, scala.collection.immutable.Set<Acl>> resourceAclsMap = new HashMap<>(); - for (String role : rolePrivilegesMap.keySet()) { - scala.collection.immutable.Set<TSentryPrivilege> privileges = rolePrivilegesMap.get(role); - final Iterator<TSentryPrivilege> iterator = privileges.iterator(); - while (iterator.hasNext()) { - TSentryPrivilege privilege = iterator.next(); - final List<TAuthorizable> authorizables = privilege.getAuthorizables(); - String host = null; - String operation = privilege.getAction(); - for (TAuthorizable tAuthorizable : authorizables) { - if (tAuthorizable.getType().equals(KafkaAuthorizable.AuthorizableType.HOST.name())) { - host = tAuthorizable.getName(); - } else { - Resource resource = new Resource(ResourceType$.MODULE$.fromString(tAuthorizable.getType()), tAuthorizable.getName()); - if (operation.equals("*")) { - operation = "All"; - } - Acl acl = new Acl(new KafkaPrincipal("role", role), Allow$.MODULE$, host, Operation$.MODULE$.fromString(operation)); - Set<Acl> newAclsJava = new HashSet<Acl>(); - newAclsJava.add(acl); - addExistingAclsForResource(resourceAclsMap, resource, newAclsJava); - final scala.collection.mutable.Set<Acl> aclScala = JavaConversions.asScalaSet(newAclsJava); - resourceAclsMap.put(resource, aclScala.<Acl>toSet()); - } - } - } + return false; + } + + private List<TSentryPrivilege> getAllPrivileges(final List<String> roles) { + final List<TSentryPrivilege> tSentryPrivileges = new ArrayList<>(); + execute(new Command<Void>() { + @Override + public Void run(SentryGenericServiceClient client) throws Exception { + for (String role : roles) { + tSentryPrivileges.addAll(client.listPrivilegesByRoleName( + requestorName, role, COMPONENT_NAME, instanceName)); } - - return resourceAclsMap; - } - - private java.util.Map<String, scala.collection.immutable.Set<TSentryPrivilege>> getRoleToPrivileges(final List<String> roles) { - final java.util.Map<String, scala.collection.immutable.Set<TSentryPrivilege>> rolePrivilegesMap = new HashMap<>(); - execute(new Command<Void>() { - @Override - public Void run(SentryGenericServiceClient client) throws Exception { - for (String role : roles) { - final Set<TSentryPrivilege> rolePrivileges = client.listPrivilegesByRoleName( - requestorName, role, COMPONENT_NAME, instanceName); - final scala.collection.immutable.Set<TSentryPrivilege> rolePrivilegesScala = - scala.collection.JavaConverters.asScalaSetConverter(rolePrivileges).asScala().toSet(); - rolePrivilegesMap.put(role, rolePrivilegesScala); - } - return null; + return null; + } + }); + + return tSentryPrivileges; + } + + private List<String> getAllRoles() { + final List<String> roles = new ArrayList<>(); + execute(new Command<Void>() { + @Override + public Void run(SentryGenericServiceClient client) throws Exception { + for (TSentryRole tSentryRole : client.listAllRoles(requestorName, COMPONENT_NAME)) { + roles.add(tSentryRole.getRoleName()); + } + return null; + } + }); + + return roles; + } + + private Map<Resource, scala.collection.immutable.Set<Acl>> getAclsForRoles(final List<String> roles) { + return scala.collection.JavaConverters.mapAsScalaMapConverter( + rolePrivilegesToResourceAcls(getRoleToPrivileges(roles))) + .asScala().toMap(Predef.<Tuple2<Resource, scala.collection.immutable.Set<Acl>>>conforms()); + } + + private java.util.Map<Resource, scala.collection.immutable.Set<Acl>> rolePrivilegesToResourceAcls(java.util.Map<String, scala.collection.immutable.Set<TSentryPrivilege>> rolePrivilegesMap) { + final java.util.Map<Resource, scala.collection.immutable.Set<Acl>> resourceAclsMap = new HashMap<>(); + for (String role : rolePrivilegesMap.keySet()) { + scala.collection.immutable.Set<TSentryPrivilege> privileges = rolePrivilegesMap.get(role); + final Iterator<TSentryPrivilege> iterator = privileges.iterator(); + while (iterator.hasNext()) { + TSentryPrivilege privilege = iterator.next(); + final List<TAuthorizable> authorizables = privilege.getAuthorizables(); + String host = null; + String operation = privilege.getAction(); + for (TAuthorizable tAuthorizable : authorizables) { + if (tAuthorizable.getType().equals(KafkaAuthorizable.AuthorizableType.HOST.name())) { + host = tAuthorizable.getName(); + } else { + Resource resource = new Resource(ResourceType$.MODULE$.fromString(tAuthorizable.getType()), tAuthorizable.getName()); + if (operation.equals("*")) { + operation = "All"; } - }); - - return rolePrivilegesMap; + Acl acl = new Acl(new KafkaPrincipal("role", role), Allow$.MODULE$, host, Operation$.MODULE$.fromString(operation)); + Set<Acl> newAclsJava = new HashSet<Acl>(); + newAclsJava.add(acl); + addExistingAclsForResource(resourceAclsMap, resource, newAclsJava); + final scala.collection.mutable.Set<Acl> aclScala = JavaConversions.asScalaSet(newAclsJava); + resourceAclsMap.put(resource, aclScala.<Acl>toSet()); + } + } + } } - private void addExistingAclsForResource(java.util.Map<Resource, scala.collection.immutable.Set<Acl>> resourceAclsMap, Resource resource, java.util.Set<Acl> newAclsJava) { - final scala.collection.immutable.Set<Acl> existingAcls = resourceAclsMap.get(resource); - if (existingAcls != null) { - final Iterator<Acl> aclsIter = existingAcls.iterator(); - while (aclsIter.hasNext()) { - Acl curAcl = aclsIter.next(); - newAclsJava.add(curAcl); - } + return resourceAclsMap; + } + + private java.util.Map<String, scala.collection.immutable.Set<TSentryPrivilege>> getRoleToPrivileges(final List<String> roles) { + final java.util.Map<String, scala.collection.immutable.Set<TSentryPrivilege>> rolePrivilegesMap = new HashMap<>(); + execute(new Command<Void>() { + @Override + public Void run(SentryGenericServiceClient client) throws Exception { + for (String role : roles) { + final Set<TSentryPrivilege> rolePrivileges = client.listPrivilegesByRoleName( + requestorName, role, COMPONENT_NAME, instanceName); + final scala.collection.immutable.Set<TSentryPrivilege> rolePrivilegesScala = + scala.collection.JavaConverters.asScalaSetConverter(rolePrivileges).asScala().toSet(); + rolePrivilegesMap.put(role, rolePrivilegesScala); } + return null; + } + }); + + return rolePrivilegesMap; + } + + private void addExistingAclsForResource(java.util.Map<Resource, scala.collection.immutable.Set<Acl>> resourceAclsMap, Resource resource, java.util.Set<Acl> newAclsJava) { + final scala.collection.immutable.Set<Acl> existingAcls = resourceAclsMap.get(resource); + if (existingAcls != null) { + final Iterator<Acl> aclsIter = existingAcls.iterator(); + while (aclsIter.hasNext()) { + Acl curAcl = aclsIter.next(); + newAclsJava.add(curAcl); + } } - - private boolean roleExists(String role) { - return getAllRoles().contains(role); + } + + private boolean roleExists(String role) { + return getAllRoles().contains(role); + } + + private void verifyAcls(scala.collection.immutable.Set<Acl> acls) { + final Iterator<Acl> iterator = acls.iterator(); + while (iterator.hasNext()) { + final Acl acl = iterator.next(); + assert acl.principal().getPrincipalType().toLowerCase().equals("role") : "Only Acls with KafkaPrincipal of type \"role;\" is supported."; + assert acl.permissionType().name().equals(Allow.name()) : "Only Acls with Permission of type \"Allow\" is supported."; } - - private void verifyAcls(scala.collection.immutable.Set<Acl> acls) { - final Iterator<Acl> iterator = acls.iterator(); - while (iterator.hasNext()) { - final Acl acl = iterator.next(); - assert acl.principal().getPrincipalType().toLowerCase().equals("role") : "Only Acls with KafkaPrincipal of type \"role;\" is supported."; - assert acl.permissionType().name().equals(Allow.name()) : "Only Acls with Permission of type \"Allow\" is supported."; - } + } + + /* + * For SSL session's Kafka creates user names with "CN=" prepended to the user name. + * "=" is used as splitter by Sentry to parse key value pairs and so it is required to strip off "CN=". + * */ + private String getName(RequestChannel.Session session) { + final String principalName = session.principal().getName(); + int start = principalName.indexOf("CN="); + if (start >= 0) { + String tmpName, name = ""; + tmpName = principalName.substring(start + 3); + int end = tmpName.indexOf(","); + if (end > 0) { + name = tmpName.substring(0, end); + } else { + name = tmpName; + } + return name; + } else { + return principalName; } - - /* - * For SSL session's Kafka creates user names with "CN=" prepended to the user name. - * "=" is used as splitter by Sentry to parse key value pairs and so it is required to strip off "CN=". - * */ - private String getName(RequestChannel.Session session) { - final String principalName = session.principal().getName(); - int start = principalName.indexOf("CN="); - if (start >= 0) { - String tmpName, name = ""; - tmpName = principalName.substring(start + 3); - int end = tmpName.indexOf(","); - if (end > 0) { - name = tmpName.substring(0, end); - } else { - name = tmpName; - } - return name; - } else { - return principalName; - } + } + + /** + * Initialize kerberos via UserGroupInformation. Will only attempt to login + * during the first request, subsequent calls will have no effect. + */ + private void initKerberos(String keytabFile, String principal) { + if (keytabFile == null || keytabFile.length() == 0) { + throw new IllegalArgumentException("keytabFile required because kerberos is enabled"); } - - /** - * Initialize kerberos via UserGroupInformation. Will only attempt to login - * during the first request, subsequent calls will have no effect. - */ - private void initKerberos(String keytabFile, String principal) { - if (keytabFile == null || keytabFile.length() == 0) { - throw new IllegalArgumentException("keytabFile required because kerberos is enabled"); - } - if (principal == null || principal.length() == 0) { - throw new IllegalArgumentException("principal required because kerberos is enabled"); - } - synchronized (KafkaAuthBinding.class) { - if (kerberosInit == null) { - kerberosInit = new Boolean(true); - // let's avoid modifying the supplied configuration, just to be conservative - final Configuration ugiConf = new Configuration(); - ugiConf.set(HADOOP_SECURITY_AUTHENTICATION, ServiceConstants.ServerConfig.SECURITY_MODE_KERBEROS); - UserGroupInformation.setConfiguration(ugiConf); - LOG.info( - "Attempting to acquire kerberos ticket with keytab: {}, principal: {} ", - keytabFile, principal); - try { - UserGroupInformation.loginUserFromKeytab(principal, keytabFile); - } catch (IOException ioe) { - throw new RuntimeException("Failed to login user with Principal: " + principal + - " and Keytab file: " + keytabFile, ioe); - } - LOG.info("Got Kerberos ticket"); - } + if (principal == null || principal.length() == 0) { + throw new IllegalArgumentException("principal required because kerberos is enabled"); + } + synchronized (KafkaAuthBinding.class) { + if (kerberosInit == null) { + kerberosInit = new Boolean(true); + // let's avoid modifying the supplied configuration, just to be conservative + final Configuration ugiConf = new Configuration(); + ugiConf.set(HADOOP_SECURITY_AUTHENTICATION, ServiceConstants.ServerConfig.SECURITY_MODE_KERBEROS); + UserGroupInformation.setConfiguration(ugiConf); + LOG.info( + "Attempting to acquire kerberos ticket with keytab: {}, principal: {} ", + keytabFile, principal); + try { + UserGroupInformation.loginUserFromKeytab(principal, keytabFile); + } catch (IOException ioe) { + throw new RuntimeException("Failed to login user with Principal: " + principal + + " and Keytab file: " + keytabFile, ioe); } + LOG.info("Got Kerberos ticket"); + } } + } }
http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java index 0a57e2e..2f4f8df 100644 --- a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java +++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java @@ -17,7 +17,7 @@ package org.apache.sentry.kafka.conf; import java.net.URL; import org.apache.hadoop.conf.Configuration; -import org.apache.sentry.policy.kafka.SimpleKafkaPolicyEngine; +import org.apache.sentry.policy.engine.common.CommonPolicyEngine; import org.apache.sentry.provider.common.HadoopGroupResourceAuthorizationProvider; import org.apache.sentry.provider.db.generic.SentryGenericProviderBackend; @@ -41,7 +41,7 @@ public class KafkaAuthConf extends Configuration { AUTHZ_PROVIDER("sentry.kafka.provider", HadoopGroupResourceAuthorizationProvider.class.getName()), AUTHZ_PROVIDER_RESOURCE("sentry.kafka.provider.resource", ""), AUTHZ_PROVIDER_BACKEND("sentry.kafka.provider.backend", SentryGenericProviderBackend.class.getName()), - AUTHZ_POLICY_ENGINE("sentry.kafka.policy.engine", SimpleKafkaPolicyEngine.class.getName()), + AUTHZ_POLICY_ENGINE("sentry.kafka.policy.engine", CommonPolicyEngine.class.getName()), AUTHZ_INSTANCE_NAME(KAFKA_SERVICE_INSTANCE_NAME, "kafka"), AUTHZ_SERVICE_USER_NAME(KAFKA_SERVICE_USER_NAME, "kafka"), AUTHZ_PRINCIPAL_HOSTNAME(KAFKA_PRINCIPAL_HOSTNAME, null), http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/AbstractTestKafkaPolicyEngine.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/AbstractTestKafkaPolicyEngine.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/AbstractTestKafkaPolicyEngine.java new file mode 100644 index 0000000..086b707 --- /dev/null +++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/AbstractTestKafkaPolicyEngine.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.sentry.policy.kafka; + +import java.io.File; +import java.io.IOException; +import java.util.Set; +import java.util.TreeSet; + +import junit.framework.Assert; + +import org.apache.commons.io.FileUtils; +import org.apache.sentry.core.common.ActiveRoleSet; +import org.apache.sentry.policy.common.PolicyEngine; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Sets; +import com.google.common.io.Files; + +public abstract class AbstractTestKafkaPolicyEngine { + + private static final String ADMIN = "host=*->action=all"; + private static final String ADMIN_HOST1 = "host=host1->action=all"; + private static final String CONSUMER_T1_ALL = "host=*->topic=t1->action=read"; + private static final String CONSUMER_T1_HOST1 = "host=host1->topic=t1->action=read"; + private static final String CONSUMER_T2_HOST2 = "host=host2->topic=t2->action=read"; + private static final String PRODUCER_T1_ALL = "host=*->topic=t1->action=write"; + private static final String PRODUCER_T1_HOST1 = "host=host1->topic=t1->action=write"; + private static final String PRODUCER_T2_HOST2 = "host=host2->topic=t2->action=write"; + private static final String CONSUMER_PRODUCER_T1 = "host=host1->topic=t1->action=all"; + + private PolicyEngine policy; + private static File baseDir; + + @BeforeClass + public static void setupClazz() throws IOException { + baseDir = Files.createTempDir(); + } + + @AfterClass + public static void teardownClazz() throws IOException { + if (baseDir != null) { + FileUtils.deleteQuietly(baseDir); + } + } + + protected void setPolicy(PolicyEngine policy) { + this.policy = policy; + } + + protected static File getBaseDir() { + return baseDir; + } + + @Before + public void setup() throws IOException { + afterSetup(); + } + + @After + public void teardown() throws IOException { + beforeTeardown(); + } + + protected void afterSetup() throws IOException {} + + protected void beforeTeardown() throws IOException {} + + + @Test + public void testConsumer0() throws Exception { + Set<String> expected = Sets.newTreeSet(Sets.newHashSet(CONSUMER_T1_ALL)); + Assert.assertEquals(expected.toString(), + new TreeSet<String>(policy.getPrivileges(set("consumer_group0"), ActiveRoleSet.ALL)) + .toString()); + } + + @Test + public void testConsumer1() throws Exception { + Set<String> expected = Sets.newTreeSet(Sets.newHashSet(CONSUMER_T1_HOST1)); + Assert.assertEquals(expected.toString(), + new TreeSet<String>(policy.getPrivileges(set("consumer_group1"), ActiveRoleSet.ALL)) + .toString()); + } + + @Test + public void testConsumer2() throws Exception { + Set<String> expected = Sets.newTreeSet(Sets.newHashSet(CONSUMER_T2_HOST2)); + Assert.assertEquals(expected.toString(), + new TreeSet<String>(policy.getPrivileges(set("consumer_group2"), ActiveRoleSet.ALL)) + .toString()); + } + + @Test + public void testProducer0() throws Exception { + Set<String> expected = Sets.newTreeSet(Sets.newHashSet(PRODUCER_T1_ALL)); + Assert.assertEquals(expected.toString(), + new TreeSet<String>(policy.getPrivileges(set("producer_group0"), ActiveRoleSet.ALL)) + .toString()); + } + + @Test + public void testProducer1() throws Exception { + Set<String> expected = Sets.newTreeSet(Sets.newHashSet(PRODUCER_T1_HOST1)); + Assert.assertEquals(expected.toString(), + new TreeSet<String>(policy.getPrivileges(set("producer_group1"), ActiveRoleSet.ALL)) + .toString()); + } + + + @Test + public void testProducer2() throws Exception { + Set<String> expected = Sets.newTreeSet(Sets.newHashSet(PRODUCER_T2_HOST2)); + Assert.assertEquals(expected.toString(), + new TreeSet<String>(policy.getPrivileges(set("producer_group2"), ActiveRoleSet.ALL)) + .toString()); + } + + @Test + public void testConsumerProducer0() throws Exception { + Set<String> expected = Sets.newTreeSet(Sets.newHashSet(CONSUMER_PRODUCER_T1)); + Assert.assertEquals(expected.toString(), + new TreeSet<String>(policy.getPrivileges(set("consumer_producer_group0"), ActiveRoleSet.ALL)) + .toString()); + } + + @Test + public void testSubAdmin() throws Exception { + Set<String> expected = Sets.newTreeSet(Sets.newHashSet(ADMIN_HOST1)); + Assert.assertEquals(expected.toString(), + new TreeSet<String>(policy.getPrivileges(set("subadmin_group"), ActiveRoleSet.ALL)) + .toString()); + } + + @Test + public void testAdmin() throws Exception { + Set<String> expected = Sets.newTreeSet(Sets.newHashSet(ADMIN)); + Assert + .assertEquals(expected.toString(), + new TreeSet<String>(policy.getPrivileges(set("admin_group"), ActiveRoleSet.ALL)) + .toString()); + } + + private static Set<String> set(String... values) { + return Sets.newHashSet(values); + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/KafkaPolicyTestUtil.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/KafkaPolicyTestUtil.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/KafkaPolicyTestUtil.java new file mode 100644 index 0000000..1ededbd --- /dev/null +++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/KafkaPolicyTestUtil.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.policy.kafka; + +import org.apache.hadoop.conf.Configuration; +import org.apache.sentry.core.model.kafka.KafkaPrivilegeModel; +import org.apache.sentry.policy.common.PolicyEngine; +import org.apache.sentry.policy.engine.common.CommonPolicyEngine; +import org.apache.sentry.provider.common.ProviderBackend; +import org.apache.sentry.provider.common.ProviderBackendContext; +import org.apache.sentry.provider.file.SimpleFileProviderBackend; + +import java.io.IOException; + +public class KafkaPolicyTestUtil { + + public static PolicyEngine createPolicyEngineForTest(String resource) throws IOException { + + ProviderBackend providerBackend = new SimpleFileProviderBackend(new Configuration(), resource); + + // create backendContext + ProviderBackendContext context = new ProviderBackendContext(); + context.setAllowPerDatabase(false); + context.setValidators(KafkaPrivilegeModel.getInstance().getPrivilegeValidators()); + // initialize the backend with the context + providerBackend.initialize(context); + + + return new CommonPolicyEngine(providerBackend); + } + +} http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/MockGroupMappingServiceProvider.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/MockGroupMappingServiceProvider.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/MockGroupMappingServiceProvider.java new file mode 100644 index 0000000..572c74d --- /dev/null +++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/MockGroupMappingServiceProvider.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sentry.policy.kafka; + +import java.util.Set; + +import org.apache.sentry.provider.common.GroupMappingService; + +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; + +public class MockGroupMappingServiceProvider implements GroupMappingService { + private final Multimap<String, String> userToGroupMap; + + public MockGroupMappingServiceProvider(Multimap<String, String> userToGroupMap) { + this.userToGroupMap = userToGroupMap; + } + @Override + public Set<String> getGroups(String user) { + return Sets.newHashSet(userToGroupMap.get(user)); + } + +} http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaAuthorizationProviderGeneralCases.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaAuthorizationProviderGeneralCases.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaAuthorizationProviderGeneralCases.java new file mode 100644 index 0000000..07f4d7d --- /dev/null +++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaAuthorizationProviderGeneralCases.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sentry.policy.kafka; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Set; + +import junit.framework.Assert; + +import org.apache.commons.io.FileUtils; +import org.apache.sentry.core.common.Action; +import org.apache.sentry.core.common.ActiveRoleSet; +import org.apache.sentry.core.common.Authorizable; +import org.apache.sentry.core.common.Subject; +import org.apache.sentry.core.model.kafka.Cluster; +import org.apache.sentry.core.model.kafka.ConsumerGroup; +import org.apache.sentry.core.model.kafka.KafkaActionConstant; +import org.apache.sentry.core.model.kafka.KafkaActionFactory.KafkaAction; +import org.apache.sentry.core.model.kafka.Host; +import org.apache.sentry.core.model.kafka.KafkaPrivilegeModel; +import org.apache.sentry.core.model.kafka.Topic; +import org.apache.sentry.provider.common.HadoopGroupResourceAuthorizationProvider; +import org.apache.sentry.provider.common.ResourceAuthorizationProvider; +import org.apache.sentry.provider.file.PolicyFiles; +import org.junit.After; +import org.junit.Test; + +import com.google.common.base.Objects; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; +import com.google.common.io.Files; + +public class TestKafkaAuthorizationProviderGeneralCases { + private static final Multimap<String, String> USER_TO_GROUP_MAP = HashMultimap.create(); + + private static final Host HOST_1 = new Host("host1"); + private static final Host HOST_2 = new Host("host2"); + private static final Cluster cluster1 = new Cluster(); + private static final Topic topic1 = new Topic("t1"); + private static final Topic topic2 = new Topic("t2"); + private static final ConsumerGroup cgroup1 = new ConsumerGroup("cg1"); + private static final ConsumerGroup cgroup2 = new ConsumerGroup("cg2"); + + private static final KafkaAction ALL = new KafkaAction(KafkaActionConstant.ALL); + private static final KafkaAction READ = new KafkaAction(KafkaActionConstant.READ); + private static final KafkaAction WRITE = new KafkaAction(KafkaActionConstant.WRITE); + private static final KafkaAction CREATE = new KafkaAction(KafkaActionConstant.CREATE); + private static final KafkaAction DELETE = new KafkaAction(KafkaActionConstant.DELETE); + private static final KafkaAction ALTER = new KafkaAction(KafkaActionConstant.ALTER); + private static final KafkaAction DESCRIBE = new KafkaAction(KafkaActionConstant.DESCRIBE); + private static final KafkaAction CLUSTER_ACTION = new KafkaAction( + KafkaActionConstant.CLUSTER_ACTION); + + private static final Set<KafkaAction> allActions = Sets.newHashSet(ALL, READ, WRITE, CREATE, DELETE, ALTER, DESCRIBE, CLUSTER_ACTION); + + private static final Subject ADMIN = new Subject("admin1"); + private static final Subject SUB_ADMIN = new Subject("subadmin1"); + private static final Subject CONSUMER0 = new Subject("consumer0"); + private static final Subject CONSUMER1 = new Subject("consumer1"); + private static final Subject CONSUMER2 = new Subject("consumer2"); + private static final Subject PRODUCER0 = new Subject("producer0"); + private static final Subject PRODUCER1 = new Subject("producer1"); + private static final Subject PRODUCER2 = new Subject("producer2"); + private static final Subject CONSUMER_PRODUCER0 = new Subject("consumer_producer0"); + + private static final String ADMIN_GROUP = "admin_group"; + private static final String SUBADMIN_GROUP = "subadmin_group"; + private static final String CONSUMER_GROUP0 = "consumer_group0"; + private static final String CONSUMER_GROUP1 = "consumer_group1"; + private static final String CONSUMER_GROUP2 = "consumer_group2"; + private static final String PRODUCER_GROUP0 = "producer_group0"; + private static final String PRODUCER_GROUP1 = "producer_group1"; + private static final String PRODUCER_GROUP2 = "producer_group2"; + private static final String CONSUMER_PRODUCER_GROUP0 = "consumer_producer_group0"; + + static { + USER_TO_GROUP_MAP.putAll(ADMIN.getName(), Arrays.asList(ADMIN_GROUP)); + USER_TO_GROUP_MAP.putAll(SUB_ADMIN.getName(), Arrays.asList(SUBADMIN_GROUP )); + USER_TO_GROUP_MAP.putAll(CONSUMER0.getName(), Arrays.asList(CONSUMER_GROUP0)); + USER_TO_GROUP_MAP.putAll(CONSUMER1.getName(), Arrays.asList(CONSUMER_GROUP1)); + USER_TO_GROUP_MAP.putAll(CONSUMER2.getName(), Arrays.asList(CONSUMER_GROUP2)); + USER_TO_GROUP_MAP.putAll(PRODUCER0.getName(), Arrays.asList(PRODUCER_GROUP0)); + USER_TO_GROUP_MAP.putAll(PRODUCER1.getName(), Arrays.asList(PRODUCER_GROUP1)); + USER_TO_GROUP_MAP.putAll(PRODUCER2.getName(), Arrays.asList(PRODUCER_GROUP2)); + USER_TO_GROUP_MAP.putAll(CONSUMER_PRODUCER0.getName(), Arrays.asList(CONSUMER_PRODUCER_GROUP0)); + } + + private final ResourceAuthorizationProvider authzProvider; + private File baseDir; + + public TestKafkaAuthorizationProviderGeneralCases() throws IOException { + baseDir = Files.createTempDir(); + PolicyFiles.copyToDir(baseDir, "kafka-policy-test-authz-provider.ini"); + authzProvider = new HadoopGroupResourceAuthorizationProvider( + KafkaPolicyTestUtil.createPolicyEngineForTest(new File(baseDir, + "kafka-policy-test-authz-provider.ini").getPath()), + new MockGroupMappingServiceProvider(USER_TO_GROUP_MAP), KafkaPrivilegeModel.getInstance()); + } + + @After + public void teardown() { + if(baseDir != null) { + FileUtils.deleteQuietly(baseDir); + } + } + + private void doTestResourceAuthorizationProvider(Subject subject, List<? extends Authorizable> authorizableHierarchy, + Set<? extends Action> actions, boolean expected) throws Exception { + Objects.ToStringHelper helper = Objects.toStringHelper("TestParameters"); + helper.add("Subject", subject).add("authzHierarchy", authorizableHierarchy).add("action", actions); + Assert.assertEquals(helper.toString(), expected, + authzProvider.hasAccess(subject, authorizableHierarchy, actions, ActiveRoleSet.ALL)); + } + + @Test + public void testAdmin() throws Exception { + doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1,cluster1), allActions, true); + doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1,topic1), allActions, true); + doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1,topic2), allActions, true); + doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1,cgroup1), allActions, true); + doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1,cgroup2), allActions, true); + doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_1), allActions, true); + + doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2,cluster1), allActions, false); + doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2,topic1), allActions, false); + doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2,topic2), allActions, false); + doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2,cgroup1), allActions, false); + doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2,cgroup2), allActions, false); + doTestResourceAuthorizationProvider(SUB_ADMIN, Arrays.asList(HOST_2), allActions, false); + + doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1,cluster1), allActions, true); + doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1,topic1), allActions, true); + doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1,topic2), allActions, true); + doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1,cgroup1), allActions, true); + doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1,cgroup2), allActions, true); + doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_1), allActions, true); + + doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2,cluster1), allActions, true); + doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2,topic1), allActions, true); + doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2,topic2), allActions, true); + doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2,cgroup1), allActions, true); + doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2,cgroup2), allActions, true); + doTestResourceAuthorizationProvider(ADMIN, Arrays.asList(HOST_2), allActions, true); + } + + @Test + public void testConsumer() throws Exception { + for (KafkaAction action : allActions) { + for (Host host : Sets.newHashSet(HOST_1, HOST_2)) { + doTestResourceAuthorizationProvider(CONSUMER0, Arrays.asList(host, topic1), + Sets.newHashSet(action), READ.equals(action)); + } + } + for (KafkaAction action : allActions) { + for (Host host : Sets.newHashSet(HOST_1, HOST_2)) { + doTestResourceAuthorizationProvider(CONSUMER1, Arrays.asList(host, topic1), + Sets.newHashSet(action), HOST_1.equals(host) && READ.equals(action)); + } + } + for (KafkaAction action : allActions) { + for (Host host : Sets.newHashSet(HOST_1, HOST_2)) { + doTestResourceAuthorizationProvider(CONSUMER2, Arrays.asList(host, topic2), + Sets.newHashSet(action), HOST_2.equals(host) && READ.equals(action)); + } + } + } + + @Test + public void testProducer() throws Exception { + for (KafkaAction action : allActions) { + for (Host host : Sets.newHashSet(HOST_1, HOST_2)) { + doTestResourceAuthorizationProvider(PRODUCER0, Arrays.asList(host, topic1), + Sets.newHashSet(action), WRITE.equals(action)); + } + } + for (KafkaAction action : allActions) { + for (Host host : Sets.newHashSet(HOST_1, HOST_2)) { + doTestResourceAuthorizationProvider(PRODUCER1, Arrays.asList(host, topic1), + Sets.newHashSet(action), HOST_1.equals(host) && WRITE.equals(action)); + } + } + for (KafkaAction action : allActions) { + for (Host host : Sets.newHashSet(HOST_1, HOST_2)) { + doTestResourceAuthorizationProvider(PRODUCER2, Arrays.asList(host, topic2), + Sets.newHashSet(action), HOST_2.equals(host) && WRITE.equals(action)); + } + } + } + + @Test + public void testConsumerProducer() throws Exception { + for (KafkaAction action : allActions) { + doTestResourceAuthorizationProvider(CONSUMER_PRODUCER0, Arrays.asList(HOST_1, topic1), + Sets.newHashSet(action), true); + } + } + +} http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaAuthorizationProviderSpecialCases.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaAuthorizationProviderSpecialCases.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaAuthorizationProviderSpecialCases.java new file mode 100644 index 0000000..63d2f30 --- /dev/null +++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaAuthorizationProviderSpecialCases.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sentry.policy.kafka; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Set; + +import junit.framework.Assert; + +import org.apache.commons.io.FileUtils; +import org.apache.sentry.core.common.Action; +import org.apache.sentry.core.common.ActiveRoleSet; +import org.apache.sentry.core.common.Authorizable; +import org.apache.sentry.core.common.Subject; +import org.apache.sentry.core.model.kafka.KafkaActionConstant; +import org.apache.sentry.core.model.kafka.KafkaActionFactory.KafkaAction; +import org.apache.sentry.core.model.kafka.Host; +import org.apache.sentry.core.model.kafka.KafkaPrivilegeModel; +import org.apache.sentry.core.model.kafka.Topic; +import org.apache.sentry.policy.common.PolicyEngine; +import org.apache.sentry.provider.common.AuthorizationProvider; +import org.apache.sentry.provider.file.LocalGroupResourceAuthorizationProvider; +import org.apache.sentry.provider.file.PolicyFile; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Sets; +import com.google.common.io.Files; + +public class TestKafkaAuthorizationProviderSpecialCases { + private AuthorizationProvider authzProvider; + private PolicyFile policyFile; + private File baseDir; + private File iniFile; + private String initResource; + @Before + public void setup() throws IOException { + baseDir = Files.createTempDir(); + iniFile = new File(baseDir, "policy.ini"); + initResource = "file://" + iniFile.getPath(); + policyFile = new PolicyFile(); + } + + @After + public void teardown() throws IOException { + if(baseDir != null) { + FileUtils.deleteQuietly(baseDir); + } + } + + @Test + public void testDuplicateEntries() throws Exception { + Subject user1 = new Subject("user1"); + Host host1 = new Host("host1"); + Topic topic1 = new Topic("t1"); + Set<? extends Action> actions = Sets.newHashSet(new KafkaAction(KafkaActionConstant.READ)); + policyFile.addGroupsToUser(user1.getName(), true, "group1", "group1") + .addRolesToGroup("group1", true, "role1", "role1") + .addPermissionsToRole("role1", true, "host=host1->topic=t1->action=read", + "host=host1->topic=t1->action=read"); + policyFile.write(iniFile); + PolicyEngine policy = KafkaPolicyTestUtil.createPolicyEngineForTest(initResource); + authzProvider = new LocalGroupResourceAuthorizationProvider(initResource, + policy, KafkaPrivilegeModel.getInstance()); + List<? extends Authorizable> authorizableHierarchy = ImmutableList.of(host1, topic1); + Assert.assertTrue(authorizableHierarchy.toString(), + authzProvider.hasAccess(user1, authorizableHierarchy, actions, ActiveRoleSet.ALL)); + } + +} http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java new file mode 100644 index 0000000..62fbea7 --- /dev/null +++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaModelAuthorizables.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sentry.policy.kafka; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertNull; +import static junit.framework.Assert.fail; + +import org.apache.sentry.core.model.kafka.Cluster; +import org.apache.sentry.core.model.kafka.ConsumerGroup; +import org.apache.sentry.core.model.kafka.Host; +import org.apache.sentry.core.model.kafka.KafkaModelAuthorizables; +import org.apache.sentry.core.model.kafka.Topic; +import org.apache.shiro.config.ConfigurationException; +import org.junit.Test; + +public class TestKafkaModelAuthorizables { + + @Test + public void testHost() throws Exception { + Host host1 = (Host) KafkaModelAuthorizables.from("HOST=host1"); + assertEquals("host1", host1.getName()); + } + + @Test(expected=IllegalArgumentException.class) + public void testNoKV() throws Exception { + System.out.println(KafkaModelAuthorizables.from("nonsense")); + } + + @Test(expected=IllegalArgumentException.class) + public void testEmptyKey() throws Exception { + System.out.println(KafkaModelAuthorizables.from("=host1")); + } + + @Test(expected=IllegalArgumentException.class) + public void testEmptyValue() throws Exception { + System.out.println(KafkaModelAuthorizables.from("HOST=")); + } + + @Test + public void testNotAuthorizable() throws Exception { + assertNull(KafkaModelAuthorizables.from("k=v")); + } + + @Test + public void testResourceNameIsCaseSensitive() throws Exception { + Host host1 = (Host)KafkaModelAuthorizables.from("HOST=Host1"); + assertEquals("Host1", host1.getName()); + + Cluster cluster1 = (Cluster)KafkaModelAuthorizables.from("Cluster=kafka-cluster"); + assertEquals("kafka-cluster", cluster1.getName()); + + Topic topic1 = (Topic)KafkaModelAuthorizables.from("topic=topiC1"); + assertEquals("topiC1", topic1.getName()); + + ConsumerGroup consumergroup1 = (ConsumerGroup)KafkaModelAuthorizables.from("ConsumerGroup=CG1"); + assertEquals("CG1", consumergroup1.getName()); + } + + @Test + public void testClusterResourceNameIsRestricted() throws Exception { + try { + KafkaModelAuthorizables.from("Cluster=cluster1"); + fail("Cluster with name other than " + Cluster.NAME + " must not have been created."); + } catch (ConfigurationException cex) { + assertEquals("Exception message is not as expected.", "Kafka's cluster resource can only have name " + Cluster.NAME, cex.getMessage()); + } catch (Exception ex) { + fail("Configuration exception was expected for invalid Cluster name."); + } + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPolicyEngineDFS.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPolicyEngineDFS.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPolicyEngineDFS.java new file mode 100644 index 0000000..4299b1f --- /dev/null +++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPolicyEngineDFS.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sentry.policy.kafka; + +import java.io.File; +import java.io.IOException; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.sentry.provider.file.PolicyFiles; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +public class TestKafkaPolicyEngineDFS extends AbstractTestKafkaPolicyEngine { + private static MiniDFSCluster dfsCluster; + private static FileSystem fileSystem; + private static Path root; + private static Path etc; + + @BeforeClass + public static void setupLocalClazz() throws IOException { + File baseDir = getBaseDir(); + Assert.assertNotNull(baseDir); + File dfsDir = new File(baseDir, "dfs"); + Assert.assertTrue(dfsDir.isDirectory() || dfsDir.mkdirs()); + Configuration conf = new Configuration(); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dfsDir.getPath()); + dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); + fileSystem = dfsCluster.getFileSystem(); + root = new Path(fileSystem.getUri().toString()); + etc = new Path(root, "/etc"); + fileSystem.mkdirs(etc); + } + + @AfterClass + public static void teardownLocalClazz() { + if(dfsCluster != null) { + dfsCluster.shutdown(); + } + } + + @Override + protected void afterSetup() throws IOException { + fileSystem.delete(etc, true); + fileSystem.mkdirs(etc); + PolicyFiles.copyToDir(fileSystem, etc, "kafka-policy-test-authz-provider.ini"); + setPolicy(KafkaPolicyTestUtil.createPolicyEngineForTest(new Path(etc, + "kafka-policy-test-authz-provider.ini").toString())); + } + + @Override + protected void beforeTeardown() throws IOException { + fileSystem.delete(etc, true); + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/d94e900a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPolicyEngineLocalFS.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPolicyEngineLocalFS.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPolicyEngineLocalFS.java new file mode 100644 index 0000000..9a69f1c --- /dev/null +++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/policy/kafka/TestKafkaPolicyEngineLocalFS.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sentry.policy.kafka; + +import java.io.File; +import java.io.IOException; + +import junit.framework.Assert; + +import org.apache.commons.io.FileUtils; +import org.apache.sentry.provider.file.PolicyFiles; + +public class TestKafkaPolicyEngineLocalFS extends AbstractTestKafkaPolicyEngine { + + @Override + protected void afterSetup() throws IOException { + File baseDir = getBaseDir(); + Assert.assertNotNull(baseDir); + Assert.assertTrue(baseDir.isDirectory() || baseDir.mkdirs()); + PolicyFiles.copyToDir(baseDir, "kafka-policy-test-authz-provider.ini"); + setPolicy(KafkaPolicyTestUtil.createPolicyEngineForTest(new File(baseDir, + "kafka-policy-test-authz-provider.ini").getPath())); + } + + @Override + protected void beforeTeardown() throws IOException { + File baseDir = getBaseDir(); + Assert.assertNotNull(baseDir); + FileUtils.deleteQuietly(baseDir); + } +}
