SENTRY-1011: Add Kafka binding (Ashish K Singh, reviewed by HaoHao and Dapeng Sun, via Anne Yu)
Change-Id: I4e54d5d519448bac24896b2c76fd875978ec655a Project: http://git-wip-us.apache.org/repos/asf/incubator-sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-sentry/commit/7ce03735 Tree: http://git-wip-us.apache.org/repos/asf/incubator-sentry/tree/7ce03735 Diff: http://git-wip-us.apache.org/repos/asf/incubator-sentry/diff/7ce03735 Branch: refs/heads/master Commit: 7ce037351b7060d9c46b5578669839caf62cadcd Parents: 2575add Author: Anne Yu <[email protected]> Authored: Tue Feb 9 16:59:58 2016 -0800 Committer: hahao <[email protected]> Committed: Mon Mar 21 23:13:09 2016 -0700 ---------------------------------------------------------------------- pom.xml | 12 ++ sentry-binding/pom.xml | 1 + sentry-binding/sentry-binding-kafka/pom.xml | 76 ++++++++++ .../org/apache/sentry/kafka/ConvertUtil.java | 55 +++++++ .../kafka/authorizer/SentryKafkaAuthorizer.java | 137 +++++++++++++++++ .../sentry/kafka/binding/KafkaAuthBinding.java | 152 +++++++++++++++++++ .../binding/KafkaAuthBindingSingleton.java | 87 +++++++++++ .../apache/sentry/kafka/conf/KafkaAuthConf.java | 78 ++++++++++ .../kafka/MockGroupMappingServiceProvider.java | 46 ++++++ .../kafka/authorizer/ConvertUtilTest.java | 85 +++++++++++ .../authorizer/SentryKafkaAuthorizerTest.java | 126 +++++++++++++++ .../src/test/resources/core-site.xml | 26 ++++ .../src/test/resources/log4j.properties | 30 ++++ .../src/test/resources/sentry-site.xml | 42 +++++ .../src/test/resources/test-authz-provider.ini | 38 +++++ .../provider/common/AuthorizationComponent.java | 1 + 16 files changed, 992 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/7ce03735/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index ac2d596..eb6d004 100644 --- a/pom.xml +++ b/pom.xml @@ -96,6 +96,8 @@ limitations under the License. <sqoop.version>1.99.6</sqoop.version> <test.sentry.hadoop.classpath>${maven.test.classpath}</test.sentry.hadoop.classpath> <zookeeper.version>3.4.5</zookeeper.version> + <kafka.version>0.9.0.0</kafka.version> + <commons-io.version>1.3.2</commons-io.version> </properties> <dependencyManagement> @@ -415,6 +417,11 @@ limitations under the License. </dependency> <dependency> <groupId>org.apache.sentry</groupId> + <artifactId>sentry-binding-kafka</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.sentry</groupId> <artifactId>sentry-provider-common</artifactId> <version>${project.version}</version> </dependency> @@ -605,6 +612,11 @@ limitations under the License. <artifactId>hamcrest-all</artifactId> <version>${hamcrest.version}</version> </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.11</artifactId> + <version>${kafka.version}</version> + </dependency> </dependencies> </dependencyManagement> http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/7ce03735/sentry-binding/pom.xml ---------------------------------------------------------------------- diff --git a/sentry-binding/pom.xml b/sentry-binding/pom.xml index 0f2a987..9e4999b 100644 --- a/sentry-binding/pom.xml +++ b/sentry-binding/pom.xml @@ -31,6 +31,7 @@ limitations under the License. <modules> <module>sentry-binding-hive</module> + <module>sentry-binding-kafka</module> <module>sentry-binding-solr</module> <module>sentry-binding-sqoop</module> </modules> http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/7ce03735/sentry-binding/sentry-binding-kafka/pom.xml ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-kafka/pom.xml b/sentry-binding/sentry-binding-kafka/pom.xml new file mode 100644 index 0000000..bd24c20 --- /dev/null +++ b/sentry-binding/sentry-binding-kafka/pom.xml @@ -0,0 +1,76 @@ +<?xml version="1.0"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<project + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" + xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.sentry</groupId> + <artifactId>sentry-binding</artifactId> + <version>1.7.0-incubating-SNAPSHOT</version> + </parent> + + <artifactId>sentry-binding-kafka</artifactId> + <name>Sentry Binding for Kafka</name> + + <dependencies> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.sentry</groupId> + <artifactId>sentry-core-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.sentry</groupId> + <artifactId>sentry-core-model-kafka</artifactId> + </dependency> + <dependency> + <groupId>org.apache.sentry</groupId> + <artifactId>sentry-policy-kafka</artifactId> + </dependency> + <dependency> + <groupId>org.apache.sentry</groupId> + <artifactId>sentry-provider-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.sentry</groupId> + <artifactId>sentry-provider-file</artifactId> + </dependency> + <dependency> + <groupId>org.apache.sentry</groupId> + <artifactId>sentry-provider-db</artifactId> + </dependency> + <dependency> + <groupId>org.apache.sentry</groupId> + <artifactId>sentry-policy-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.11</artifactId> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/7ce03735/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/ConvertUtil.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/ConvertUtil.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/ConvertUtil.java new file mode 100644 index 0000000..c878308 --- /dev/null +++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/ConvertUtil.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.kafka; + +import java.util.List; + +import kafka.security.auth.Resource; + +import org.apache.sentry.core.common.Authorizable; +import org.apache.sentry.core.model.kafka.Host; + +import com.google.common.collect.Lists; +import org.apache.sentry.core.model.kafka.KafkaAuthorizable; + +public class ConvertUtil { + + public static List<Authorizable> convertResourceToAuthorizable(String hostname, + final Resource resource) { + List<Authorizable> authorizables = Lists.newArrayList(); + authorizables.add(new Host(hostname)); + authorizables.add(new Authorizable() { + @Override + public String getTypeName() { + final String resourceTypeName = resource.resourceType().name(); + // Kafka's GROUP resource is referred as CONSUMERGROUP within Sentry. + if (resourceTypeName.equalsIgnoreCase("group")) { + return KafkaAuthorizable.AuthorizableType.CONSUMERGROUP.name(); + } else { + return resourceTypeName; + } + } + + @Override + public String getName() { + return resource.name(); + } + }); + return authorizables; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/7ce03735/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizer.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizer.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizer.java new file mode 100644 index 0000000..9ffb971 --- /dev/null +++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizer.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.kafka.authorizer; + +import kafka.network.RequestChannel; +import kafka.security.auth.Acl; +import kafka.security.auth.Authorizer; +import kafka.security.auth.Operation; +import kafka.security.auth.Resource; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.sentry.kafka.binding.KafkaAuthBinding; +import org.apache.sentry.kafka.binding.KafkaAuthBindingSingleton; +import org.apache.sentry.kafka.conf.KafkaAuthConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.immutable.Map; +import scala.collection.immutable.Set; + +import java.util.ArrayList; +import java.util.List; + + +public class SentryKafkaAuthorizer implements Authorizer { + + private static Logger LOG = + LoggerFactory.getLogger(SentryKafkaAuthorizer.class); + + KafkaAuthBinding binding; + KafkaAuthConf kafkaAuthConf; + + String sentry_site = null; + List<KafkaPrincipal> super_users = null; + + public SentryKafkaAuthorizer() { + } + + @Override + public boolean authorize(RequestChannel.Session session, Operation operation, + Resource resource) { + LOG.debug("Authorizing Session: " + session + " for Operation: " + operation + " on Resource: " + resource); + final KafkaPrincipal user = session.principal(); + if (isSuperUser(user)) { + LOG.debug("Allowing SuperUser: " + user + " in " + session + " for Operation: " + operation + " on Resource: " + resource); + return true; + } + LOG.debug("User: " + user + " is not a SuperUser"); + return binding.authorize(session, operation, resource); + } + + @Override + public void addAcls(Set<Acl> acls, final Resource resource) { + throw new UnsupportedOperationException("Please use Sentry CLI to perform this action."); + } + + @Override + public boolean removeAcls(Set<Acl> acls, final Resource resource) { + throw new UnsupportedOperationException("Please use Sentry CLI to perform this action."); + } + + @Override + public boolean removeAcls(final Resource resource) { + throw new UnsupportedOperationException("Please use Sentry CLI to perform this action."); + } + + @Override + public Set<Acl> getAcls(Resource resource) { + throw new UnsupportedOperationException("Please use Sentry CLI to perform this action."); + } + + @Override + public Map<Resource, Set<Acl>> getAcls(KafkaPrincipal principal) { + throw new UnsupportedOperationException("Please use Sentry CLI to perform this action."); + } + + @Override + public Map<Resource, Set<Acl>> getAcls() { + throw new UnsupportedOperationException("Please use Sentry CLI to perform this action."); + } + + @Override + public void close() { + } + + @Override + public void configure(java.util.Map<String, ?> configs) { + final Object sentryKafkaSiteUrlConfig = configs.get(KafkaAuthConf.SENTRY_KAFKA_SITE_URL); + if (sentryKafkaSiteUrlConfig != null) { + this.sentry_site = sentryKafkaSiteUrlConfig.toString(); + } + final Object kafkaSuperUsersConfig = configs.get(KafkaAuthConf.KAFKA_SUPER_USERS); + if (kafkaSuperUsersConfig != null) { + getSuperUsers(kafkaSuperUsersConfig.toString()); + } + LOG.info("Configuring Sentry KafkaAuthorizer: " + sentry_site); + final KafkaAuthBindingSingleton instance = KafkaAuthBindingSingleton.getInstance(); + instance.configure(sentry_site); + this.binding = instance.getAuthBinding(); + this.kafkaAuthConf = instance.getKafkaAuthConf(); + } + + private void getSuperUsers(String kafkaSuperUsers) { + super_users = new ArrayList<>(); + String[] superUsers = kafkaSuperUsers.split(";"); + for (String superUser : superUsers) { + if (!superUser.isEmpty()) { + final String trimmedUser = superUser.trim(); + super_users.add(KafkaPrincipal.fromString(trimmedUser)); + LOG.debug("Adding " + trimmedUser + " to list of Kafka SuperUsers."); + } + } + } + + private boolean isSuperUser(KafkaPrincipal user) { + if (super_users != null) { + for (KafkaPrincipal superUser : super_users) { + if (superUser.equals(user)) { + return true; + } + } + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/7ce03735/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java new file mode 100644 index 0000000..ccbe60e --- /dev/null +++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBinding.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.kafka.binding; + +import java.lang.reflect.Constructor; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.collect.Sets; +import kafka.network.RequestChannel; +import kafka.security.auth.Operation; +import kafka.security.auth.Resource; +import org.apache.sentry.core.common.ActiveRoleSet; +import org.apache.sentry.core.common.Authorizable; +import org.apache.sentry.core.common.Subject; +import org.apache.sentry.core.model.kafka.KafkaActionFactory; +import org.apache.sentry.core.model.kafka.KafkaActionFactory.KafkaAction; +import org.apache.sentry.kafka.ConvertUtil; +import org.apache.sentry.kafka.conf.KafkaAuthConf.AuthzConfVars; +import org.apache.sentry.policy.common.PolicyEngine; +import org.apache.sentry.provider.common.AuthorizationComponent; +import org.apache.sentry.provider.common.AuthorizationProvider; +import org.apache.sentry.provider.common.ProviderBackend; +import org.apache.sentry.provider.db.generic.SentryGenericProviderBackend; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaAuthBinding { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaAuthBinding.class); + private static final String COMPONENT_TYPE = AuthorizationComponent.KAFKA; + + private final Configuration authConf; + private final AuthorizationProvider authProvider; + private ProviderBackend providerBackend; + + private final KafkaActionFactory actionFactory = new KafkaActionFactory(); + + public KafkaAuthBinding(Configuration authConf) throws Exception { + this.authConf = authConf; + this.authProvider = createAuthProvider(); + } + + /** + * Instantiate the configured authz provider + * + * @return {@link AuthorizationProvider} + */ + private AuthorizationProvider createAuthProvider() throws Exception { + /** + * get the authProvider class, policyEngine class, providerBackend class and resources from the + * kafkaAuthConf config + */ + String authProviderName = + authConf.get(AuthzConfVars.AUTHZ_PROVIDER.getVar(), + AuthzConfVars.AUTHZ_PROVIDER.getDefault()); + String resourceName = + authConf.get(AuthzConfVars.AUTHZ_PROVIDER_RESOURCE.getVar(), + AuthzConfVars.AUTHZ_PROVIDER_RESOURCE.getDefault()); + String providerBackendName = + authConf.get(AuthzConfVars.AUTHZ_PROVIDER_BACKEND.getVar(), + AuthzConfVars.AUTHZ_PROVIDER_BACKEND.getDefault()); + String policyEngineName = + authConf.get(AuthzConfVars.AUTHZ_POLICY_ENGINE.getVar(), + AuthzConfVars.AUTHZ_POLICY_ENGINE.getDefault()); + String instanceName = authConf.get(AuthzConfVars.AUTHZ_INSTANCE_NAME.getVar()); + if (resourceName != null && resourceName.startsWith("classpath:")) { + String resourceFileName = resourceName.substring("classpath:".length()); + resourceName = AuthorizationProvider.class.getClassLoader().getResource(resourceFileName).getPath(); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Using authorization provider " + authProviderName + " with resource " + + resourceName + ", policy engine " + policyEngineName + ", provider backend " + + providerBackendName); + } + + // Instantiate the configured providerBackend + Constructor<?> providerBackendConstructor = + Class.forName(providerBackendName) + .getDeclaredConstructor(Configuration.class, String.class); + providerBackendConstructor.setAccessible(true); + providerBackend = + (ProviderBackend) providerBackendConstructor.newInstance(new Object[]{authConf, + resourceName}); + if (providerBackend instanceof SentryGenericProviderBackend) { + ((SentryGenericProviderBackend) providerBackend).setComponentType(COMPONENT_TYPE); + ((SentryGenericProviderBackend) providerBackend).setServiceName("kafka" + instanceName); + } + + // Instantiate the configured policyEngine + Constructor<?> policyConstructor = + Class.forName(policyEngineName).getDeclaredConstructor(ProviderBackend.class); + policyConstructor.setAccessible(true); + PolicyEngine policyEngine = + (PolicyEngine) policyConstructor.newInstance(new Object[]{providerBackend}); + + // Instantiate the configured authProvider + Constructor<?> constructor = + Class.forName(authProviderName).getDeclaredConstructor(Configuration.class, String.class, + PolicyEngine.class); + constructor.setAccessible(true); + return (AuthorizationProvider) constructor.newInstance(new Object[]{authConf, resourceName, + policyEngine}); + } + + /** + * Authorize access to a Kafka privilege + */ + public boolean authorize(RequestChannel.Session session, Operation operation, Resource resource) { + List<Authorizable> authorizables = ConvertUtil.convertResourceToAuthorizable(session.clientAddress().getHostAddress(), resource); + Set<KafkaAction> actions = Sets.newHashSet(actionFactory.getActionByName(operation.name())); + return authProvider.hasAccess(new Subject(getName(session)), authorizables, actions, ActiveRoleSet.ALL); + } + + /* + * For SSL session's Kafka creates user names with "CN=" prepended to the user name. + * "=" is used as splitter by Sentry to parse key value pairs and so it is required to strip off "CN=". + * */ + private String getName(RequestChannel.Session session) { + final String principalName = session.principal().getName(); + int start = principalName.indexOf("CN="); + if (start >= 0) { + String tmpName, name = ""; + tmpName = principalName.substring(start + 3); + int end = tmpName.indexOf(","); + if (end > 0) { + name = tmpName.substring(0, end); + } else { + name = tmpName; + } + return name; + } else { + return principalName; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/7ce03735/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBindingSingleton.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBindingSingleton.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBindingSingleton.java new file mode 100644 index 0000000..92e50e6 --- /dev/null +++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/binding/KafkaAuthBindingSingleton.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.kafka.binding; + +import java.net.MalformedURLException; +import java.net.URL; + +import org.apache.sentry.kafka.conf.KafkaAuthConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Strings; + +public class KafkaAuthBindingSingleton { + private static Logger log = LoggerFactory.getLogger(KafkaAuthBindingSingleton.class); + + // Lazy init holder class idiom to avoid DCL + private static class KafkaAuthBindingSingletonHolder { + static final KafkaAuthBindingSingleton instance = new KafkaAuthBindingSingleton(); + } + + private static KafkaAuthConf kafkaAuthConf = null; + + private KafkaAuthBinding binding; + + private KafkaAuthBindingSingleton() { + } + + private KafkaAuthConf loadAuthzConf(String sentry_site) { + if (Strings.isNullOrEmpty(sentry_site)) { + throw new IllegalArgumentException("Configuration key " + KafkaAuthConf.SENTRY_KAFKA_SITE_URL + + " value '" + sentry_site + "' is invalid."); + } + + KafkaAuthConf kafkaAuthConf = null; + try { + kafkaAuthConf = new KafkaAuthConf(new URL(sentry_site)); + } catch (MalformedURLException e) { + throw new IllegalArgumentException("Configuration key " + KafkaAuthConf.SENTRY_KAFKA_SITE_URL + + " specifies a malformed URL '" + sentry_site + "'", e); + } + return kafkaAuthConf; + } + + public void configure(String sentry_site) { + try { + kafkaAuthConf = loadAuthzConf(sentry_site); + binding = new KafkaAuthBinding(kafkaAuthConf); + log.info("KafkaAuthBinding created successfully"); + } catch (Exception ex) { + log.error("Unable to create KafkaAuthBinding", ex); + throw new RuntimeException("Unable to create KafkaAuthBinding: " + ex.getMessage(), ex); + } + } + + public static KafkaAuthBindingSingleton getInstance() { + return KafkaAuthBindingSingletonHolder.instance; + } + + public KafkaAuthBinding getAuthBinding() { + if (binding == null) { + throw new RuntimeException("KafkaAuthBindingSingleton not configured yet."); + } + return binding; + } + + public KafkaAuthConf getKafkaAuthConf() { + if (binding == null) { + throw new RuntimeException("KafkaAuthBindingSingleton not configured yet."); + } + return kafkaAuthConf; + } +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/7ce03735/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java new file mode 100644 index 0000000..e75ec7e --- /dev/null +++ b/sentry-binding/sentry-binding-kafka/src/main/java/org/apache/sentry/kafka/conf/KafkaAuthConf.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.sentry.kafka.conf; + +import java.net.URL; + +import org.apache.hadoop.conf.Configuration; +import org.apache.sentry.policy.kafka.SimpleKafkaPolicyEngine; +import org.apache.sentry.provider.common.HadoopGroupResourceAuthorizationProvider; +import org.apache.sentry.provider.db.generic.SentryGenericProviderBackend; + +public class KafkaAuthConf extends Configuration { + /** + * Configuration key used in kafka.properties to point at sentry-site.xml + */ + public static final String SENTRY_KAFKA_SITE_URL = "sentry.kafka.site.url"; + public static final String AUTHZ_SITE_FILE = "sentry-site.xml"; + public static final String KAFKA_SUPER_USERS = "kafka.superusers"; + + /** + * Config setting definitions + */ + public static enum AuthzConfVars { + AUTHZ_PROVIDER("sentry.kafka.provider", + HadoopGroupResourceAuthorizationProvider.class.getName()), + AUTHZ_PROVIDER_RESOURCE("sentry.kafka.provider.resource", ""), + AUTHZ_PROVIDER_BACKEND("sentry.kafka.provider.backend", SentryGenericProviderBackend.class.getName()), + AUTHZ_POLICY_ENGINE("sentry.kafka.policy.engine", SimpleKafkaPolicyEngine.class.getName()), + AUTHZ_INSTANCE_NAME("sentry.kafka.name", ""); + + private final String varName; + private final String defaultVal; + + AuthzConfVars(String varName, String defaultVal) { + this.varName = varName; + this.defaultVal = defaultVal; + } + + public String getVar() { + return varName; + } + + public String getDefault() { + return defaultVal; + } + + public static String getDefault(String varName) { + for (AuthzConfVars oneVar : AuthzConfVars.values()) { + if (oneVar.getVar().equalsIgnoreCase(varName)) { + return oneVar.getDefault(); + } + } + return null; + } + } + + public KafkaAuthConf(URL kafkaAuthzSiteURL) { + super(true); + addResource(kafkaAuthzSiteURL); + } + + @Override + public String get(String varName) { + return get(varName, AuthzConfVars.getDefault(varName)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/7ce03735/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/MockGroupMappingServiceProvider.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/MockGroupMappingServiceProvider.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/MockGroupMappingServiceProvider.java new file mode 100644 index 0000000..48f0d3d --- /dev/null +++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/MockGroupMappingServiceProvider.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sentry.kafka; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.security.GroupMappingServiceProvider; + +import com.google.common.collect.Lists; + +public class MockGroupMappingServiceProvider implements GroupMappingServiceProvider { + + public MockGroupMappingServiceProvider() { + } + + @Override + public List<String> getGroups(String user) throws IOException { + return Lists.newArrayList(user); + } + + @Override + public void cacheGroupsRefresh() throws IOException { + } + + @Override + public void cacheGroupsAdd(List<String> groups) throws IOException { + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/7ce03735/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/ConvertUtilTest.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/ConvertUtilTest.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/ConvertUtilTest.java new file mode 100644 index 0000000..e08d442 --- /dev/null +++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/ConvertUtilTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.kafka.authorizer; + +import junit.framework.Assert; +import kafka.security.auth.Resource; +import kafka.security.auth.Resource$; +import kafka.security.auth.ResourceType$; +import org.apache.sentry.core.common.Authorizable; +import org.apache.sentry.core.model.kafka.KafkaAuthorizable; +import org.apache.sentry.kafka.ConvertUtil; +import org.junit.Test; + +import java.util.List; + +public class ConvertUtilTest { + + @Test + public void testCluster() { + String hostname = "localhost"; + String clusterName = Resource$.MODULE$.ClusterResourceName(); + Resource clusterResource = new Resource(ResourceType$.MODULE$.fromString("cluster"), clusterName); + List<Authorizable> authorizables = ConvertUtil.convertResourceToAuthorizable(hostname, clusterResource); + for (Authorizable auth : authorizables) { + if (auth.getTypeName().equalsIgnoreCase(KafkaAuthorizable.AuthorizableType.CLUSTER.name())) { + Assert.assertEquals(auth.getName(), clusterName); + } else if (auth.getTypeName().equalsIgnoreCase(KafkaAuthorizable.AuthorizableType.HOST.name())) { + Assert.assertEquals(auth.getName(), hostname); + } else { + Assert.fail("Unexpected type found: " + auth.getTypeName()); + } + } + Assert.assertEquals(authorizables.size(), 2); + } + + @Test + public void testTopic() { + String hostname = "localhost"; + String topicName = "t1"; + Resource topicResource = new Resource(ResourceType$.MODULE$.fromString("topic"), topicName); + List<Authorizable> authorizables = ConvertUtil.convertResourceToAuthorizable(hostname, topicResource); + for (Authorizable auth : authorizables) { + if (auth.getTypeName().equalsIgnoreCase(KafkaAuthorizable.AuthorizableType.TOPIC.name())) { + Assert.assertEquals(auth.getName(), topicName); + } else if (auth.getTypeName().equalsIgnoreCase(KafkaAuthorizable.AuthorizableType.HOST.name())) { + Assert.assertEquals(auth.getName(), hostname); + } else { + Assert.fail("Unexpected type found: " + auth.getTypeName()); + } + } + Assert.assertEquals(authorizables.size(), 2); + } + + @Test + public void testConsumerGroup() { + String hostname = "localhost"; + String consumerGroup = "g1"; + Resource consumerGroupResource = new Resource(ResourceType$.MODULE$.fromString("group"), consumerGroup); + List<Authorizable> authorizables = ConvertUtil.convertResourceToAuthorizable(hostname, consumerGroupResource); + for (Authorizable auth : authorizables) { + if (auth.getTypeName().equalsIgnoreCase(KafkaAuthorizable.AuthorizableType.CONSUMERGROUP.name())) { + Assert.assertEquals(auth.getName(),consumerGroup); + } else if (auth.getTypeName().equalsIgnoreCase(KafkaAuthorizable.AuthorizableType.HOST.name())) { + Assert.assertEquals(auth.getName(),hostname); + } else { + Assert.fail("Unexpected type found: " + auth.getTypeName()); + } + } + Assert.assertEquals(authorizables.size(), 2); + } +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/7ce03735/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizerTest.java ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizerTest.java b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizerTest.java new file mode 100644 index 0000000..eafe0f0 --- /dev/null +++ b/sentry-binding/sentry-binding-kafka/src/test/java/org/apache/sentry/kafka/authorizer/SentryKafkaAuthorizerTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.kafka.authorizer; + +import kafka.network.RequestChannel; +import kafka.security.auth.Operation; +import kafka.security.auth.Operation$; +import kafka.security.auth.Resource; +import kafka.security.auth.Resource$; +import kafka.security.auth.ResourceType$; +import kafka.server.KafkaConfig; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.sentry.kafka.conf.KafkaAuthConf; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Properties; + +public class SentryKafkaAuthorizerTest { + + private SentryKafkaAuthorizer authorizer; + private InetAddress testHostName1; + private InetAddress testHostName2; + private String resourceName; + private Resource clusterResource; + private Resource topic1Resource; + private KafkaConfig config; + + public SentryKafkaAuthorizerTest() throws UnknownHostException { + authorizer = new SentryKafkaAuthorizer(); + testHostName1 = InetAddress.getByAddress("host1", new byte[] {1, 2, 3, 4}); + testHostName2 = InetAddress.getByAddress("host2", new byte[] {2, 3, 4, 5}); + resourceName = Resource$.MODULE$.ClusterResourceName(); + clusterResource = new Resource(ResourceType$.MODULE$.fromString("cluster"), resourceName); + topic1Resource = new Resource(ResourceType$.MODULE$.fromString("topic"), "t1"); + } + + @Before + public void setUp() { + Properties props = new Properties(); + String sentry_site_path = SentryKafkaAuthorizerTest.class.getClassLoader().getResource(KafkaAuthConf.AUTHZ_SITE_FILE).getPath(); + // Kafka check this prop when creating a config instance + props.put("zookeeper.connect", "test"); + props.put("sentry.kafka.site.url", "file://" + sentry_site_path); + + config = KafkaConfig.fromProps(props); + authorizer.configure(config.originals()); + } + + @Test + public void testAdmin() { + + KafkaPrincipal admin = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "admin"); + RequestChannel.Session host1Session = new RequestChannel.Session(admin, testHostName1); + RequestChannel.Session host2Session = new RequestChannel.Session(admin, testHostName2); + + Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Create"), clusterResource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Describe"), clusterResource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("ClusterAction"), clusterResource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Read"), topic1Resource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Write"), topic1Resource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Create"), topic1Resource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Delete"), topic1Resource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Alter"), topic1Resource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Describe"), topic1Resource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("ClusterAction"),topic1Resource)); + + Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Create"), clusterResource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Describe"), clusterResource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("ClusterAction"), clusterResource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Read"), topic1Resource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Write"), topic1Resource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Create"), topic1Resource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Delete"), topic1Resource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Alter"), topic1Resource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Describe"), topic1Resource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("ClusterAction"), topic1Resource)); + } + + @Test + public void testSubAdmin() { + KafkaPrincipal admin = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "subadmin"); + RequestChannel.Session host1Session = new RequestChannel.Session(admin, testHostName1); + RequestChannel.Session host2Session = new RequestChannel.Session(admin, testHostName2); + + Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Create"), clusterResource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Describe"), clusterResource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("ClusterAction"), clusterResource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Read"), topic1Resource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Write"), topic1Resource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Create"), topic1Resource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Delete"), topic1Resource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Alter"), topic1Resource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("Describe"), topic1Resource)); + Assert.assertTrue("Test failed.", authorizer.authorize(host1Session, Operation$.MODULE$.fromString("ClusterAction"),topic1Resource)); + + Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Create"), clusterResource)); + Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Describe"), clusterResource)); + Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("ClusterAction"), clusterResource)); + Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Read"), topic1Resource)); + Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Write"), topic1Resource)); + Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Create"), topic1Resource)); + Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Delete"), topic1Resource)); + Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Alter"), topic1Resource)); + Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("Describe"), topic1Resource)); + Assert.assertFalse("Test failed.", authorizer.authorize(host2Session, Operation$.MODULE$.fromString("ClusterAction"), topic1Resource)); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/7ce03735/sentry-binding/sentry-binding-kafka/src/test/resources/core-site.xml ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-kafka/src/test/resources/core-site.xml b/sentry-binding/sentry-binding-kafka/src/test/resources/core-site.xml new file mode 100644 index 0000000..61a0463 --- /dev/null +++ b/sentry-binding/sentry-binding-kafka/src/test/resources/core-site.xml @@ -0,0 +1,26 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<configuration> + <property> + <name>hadoop.security.group.mapping</name> + <value>org.apache.sentry.kafka.MockGroupMappingServiceProvider</value> + </property> +</configuration> + http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/7ce03735/sentry-binding/sentry-binding-kafka/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-kafka/src/test/resources/log4j.properties b/sentry-binding/sentry-binding-kafka/src/test/resources/log4j.properties new file mode 100644 index 0000000..d42c02c --- /dev/null +++ b/sentry-binding/sentry-binding-kafka/src/test/resources/log4j.properties @@ -0,0 +1,30 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +sentry.root.logger=DEBUG,console +log4j.rootLogger=${sentry.root.logger} + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.out +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d (%t) [%p - %l] %m%n + +log4g.logger.kafka.utils.Logging=WARN +log4j.logger.org.apache.kafka=WARN +log4j.logger.org.apache.sentry=DEBUG +log4j.logger.org.apache.zookeeper=WARN +log4j.logger.org.I0Itec.zkclient=WARN +log4j.logger.org.apache.hadoop=WARN +log4j.category.DataNucleus=OFF http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/7ce03735/sentry-binding/sentry-binding-kafka/src/test/resources/sentry-site.xml ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-kafka/src/test/resources/sentry-site.xml b/sentry-binding/sentry-binding-kafka/src/test/resources/sentry-site.xml new file mode 100644 index 0000000..69ce5a7 --- /dev/null +++ b/sentry-binding/sentry-binding-kafka/src/test/resources/sentry-site.xml @@ -0,0 +1,42 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<configuration> + <property> + <name>sentry.kafka.provider</name> + <value>org.apache.sentry.provider.common.HadoopGroupResourceAuthorizationProvider</value> + </property> + <property> + <name>hadoop.security.group.mapping</name> + <value>test</value> + </property> + <property> + <name>sentry.kafka.provider.resource</name> + <value>classpath:test-authz-provider.ini</value> + </property> + <property> + <name>sentry.kafka.policy.engine</name> + <value>org.apache.sentry.policy.kafka.SimpleKafkaPolicyEngine</value> + </property> + <property> + <name>sentry.kafka.provider.backend</name> + <value>org.apache.sentry.provider.file.SimpleFileProviderBackend</value> + </property> +</configuration> + http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/7ce03735/sentry-binding/sentry-binding-kafka/src/test/resources/test-authz-provider.ini ---------------------------------------------------------------------- diff --git a/sentry-binding/sentry-binding-kafka/src/test/resources/test-authz-provider.ini b/sentry-binding/sentry-binding-kafka/src/test/resources/test-authz-provider.ini new file mode 100644 index 0000000..5f85382 --- /dev/null +++ b/sentry-binding/sentry-binding-kafka/src/test/resources/test-authz-provider.ini @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[groups] +admin = admin_all +subadmin = admin_host1 +consumer0 = consumer_t1_all +consumer1 = consumer_t1_host1 +consumer2 = consumer_t2_host2 +producer0 = producer_t1_all +producer1 = producer_t1_host1 +producer2 = producer_t2_host2 +consumer_producer0 = consumer_producer_t1 + +[roles] +admin_all = host=* +admin_host1 = host=1.2.3.4 +consumer_t1_all = host=*->topic=t1->action=read +consumer_t1_host1 = host=host1->topic=t1->action=read +consumer_t2_host2 = host=host2->topic=t2->action=read +producer_t1_all = host=*->topic=t1->action=write +producer_t1_host1 = host=host1->topic=t1->action=write +producer_t2_host2 = host=host2->topic=t2->action=write +consumer_producer_t1 = host=host1->topic=t1->action=all http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/7ce03735/sentry-provider/sentry-provider-common/src/main/java/org/apache/sentry/provider/common/AuthorizationComponent.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-common/src/main/java/org/apache/sentry/provider/common/AuthorizationComponent.java b/sentry-provider/sentry-provider-common/src/main/java/org/apache/sentry/provider/common/AuthorizationComponent.java index 6409015..c74641a 100644 --- a/sentry-provider/sentry-provider-common/src/main/java/org/apache/sentry/provider/common/AuthorizationComponent.java +++ b/sentry-provider/sentry-provider-common/src/main/java/org/apache/sentry/provider/common/AuthorizationComponent.java @@ -22,4 +22,5 @@ package org.apache.sentry.provider.common; public class AuthorizationComponent{ public static final String Search = "solr"; public static final String SQOOP = "sqoop"; + public static final String KAFKA = "kafka"; }
