Revert "RANGER-246 - Kafka authorization plugin" This reverts commit a5f8531a17558cfc75e2ad216816f272705898cf.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/33ec87ec Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/33ec87ec Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/33ec87ec Branch: refs/heads/tag-policy Commit: 33ec87ec660e447b16bce4d6181b4ed877c572c2 Parents: aff4074 Author: sneethiraj <[email protected]> Authored: Sun May 17 18:12:00 2015 -0400 Committer: sneethiraj <[email protected]> Committed: Sun May 17 18:12:00 2015 -0400 ---------------------------------------------------------------------- .gitignore | 1 - .../service-defs/ranger-servicedef-kafka.json | 12 +- plugin-kafka/pom.xml | 103 +++++----- .../kafka/authorizer/RangerKafkaAuthorizer.java | 201 +------------------ .../kafka/client/ServiceKafkaClient.java | 2 + pom.xml | 11 +- 6 files changed, 71 insertions(+), 259 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/33ec87ec/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 7f41f0c..dd4e2c2 100644 --- a/.gitignore +++ b/.gitignore @@ -7,4 +7,3 @@ .project /target/ winpkg/target -.DS_Store http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/33ec87ec/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json ---------------------------------------------------------------------- diff --git a/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json b/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json index d19b10c..9928c5d 100644 --- a/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json +++ b/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json @@ -59,9 +59,15 @@ }, { "itemId": 7, - "name":"kafka_admin", - "label":"Kafka Admin" + "name":"replicate", + "label":"Replicate" + }, + { + "itemId": 8, + "name":"connect", + "label":"Connect" } + ], "configs":[ { @@ -91,7 +97,7 @@ "name":"commonNameForCertificate", "type":"string", "mandatory":false, - "label":"Ranger Plugin SSL CName" + "label":"Common Name for Certificate" } ], http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/33ec87ec/plugin-kafka/pom.xml ---------------------------------------------------------------------- diff --git a/plugin-kafka/pom.xml b/plugin-kafka/pom.xml index afee47d..e9ea265 100644 --- a/plugin-kafka/pom.xml +++ b/plugin-kafka/pom.xml @@ -1,51 +1,56 @@ <?xml version="1.0" encoding="UTF-8"?> -<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor - license agreements. See the NOTICE file distributed with this work for additional - information regarding copyright ownership. The ASF licenses this file to - You under the Apache License, Version 2.0 (the "License"); you may not use - this file except in compliance with the License. You may obtain a copy of - the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required - by applicable law or agreed to in writing, software distributed under the - License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS - OF ANY KIND, either express or implied. See the License for the specific - language governing permissions and limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <groupId>security_plugins.ranger-kafka-plugin</groupId> - <artifactId>ranger-kafka-plugin</artifactId> - <name>KAFKA Security Plugin</name> - <description>KAFKA Security Plugin</description> - <packaging>jar</packaging> - <properties> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - </properties> - <parent> - <groupId>org.apache.ranger</groupId> - <artifactId>ranger</artifactId> - <version>0.5.0</version> - <relativePath>..</relativePath> - </parent> - <dependencies> - <dependency> - <groupId>security_plugins.ranger-plugins-common</groupId> - <artifactId>ranger-plugins-common</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>security_plugins.ranger-plugins-audit</groupId> - <artifactId>ranger-plugins-audit</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.ranger</groupId> - <artifactId>credentialbuilder</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.10</artifactId> - <version>${kafka.version}</version> - </dependency> - </dependencies> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <groupId>security_plugins.ranger-kafka-plugin</groupId> + <artifactId>ranger-kafka-plugin</artifactId> + <name>KAFKA Security Plugin</name> + <description>KAFKA Security Plugin</description> + <packaging>jar</packaging> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + <parent> + <groupId>org.apache.ranger</groupId> + <artifactId>ranger</artifactId> + <version>0.5.0</version> + <relativePath>..</relativePath> + </parent> + <dependencies> + <dependency> + <groupId>security_plugins.ranger-plugins-common</groupId> + <artifactId>ranger-plugins-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>security_plugins.ranger-plugins-audit</groupId> + <artifactId>ranger-plugins-audit</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.ranger</groupId> + <artifactId>credentialbuilder</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.10</artifactId> + <version>${kafka.version}</version> + </dependency> + </dependencies> </project> http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/33ec87ec/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java ---------------------------------------------------------------------- diff --git a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java index 4689957..40c2204 100644 --- a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java +++ b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java @@ -1,3 +1,4 @@ + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -19,208 +20,14 @@ package org.apache.ranger.authorization.kafka.authorizer; -import java.util.Date; - -import kafka.security.auth.Acl; -import kafka.security.auth.Authorizer; -import kafka.security.auth.KafkaPrincipal; -import kafka.security.auth.Operation; -import kafka.security.auth.Resource; -import kafka.security.auth.ResourceType; -import kafka.server.KafkaConfig; -import kafka.network.RequestChannel.Session; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.ranger.authorization.utils.StringUtil; -import org.apache.ranger.plugin.audit.RangerDefaultAuditHandler; -import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl; -import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl; -import org.apache.ranger.plugin.policyengine.RangerAccessResult; -import org.apache.ranger.plugin.service.RangerBasePlugin; - -import scala.collection.immutable.HashSet; -import scala.collection.immutable.Set; - -public class RangerKafkaAuthorizer implements Authorizer { - private static final Log logger = LogFactory - .getLog(RangerKafkaAuthorizer.class); - - public static final String KEY_TOPIC = "topic"; - public static final String KEY_CLUSTER = "cluster"; - public static final String KEY_CONSUMER_GROUP = "consumer_group"; - - public static final String ACCESS_TYPE_READ = "read"; - public static final String ACCESS_TYPE_WRITE = "write"; - public static final String ACCESS_TYPE_CREATE = "create"; - public static final String ACCESS_TYPE_DELETE = "delete"; - public static final String ACCESS_TYPE_ALTER = "alter"; - public static final String ACCESS_TYPE_DESCRIBE = "describe"; - public static final String ACCESS_TYPE_KAFKA_ADMIN = "kafka_admin"; - - private static volatile RangerBasePlugin rangerPlugin = null; - - public RangerKafkaAuthorizer() { - if (rangerPlugin == null) { - rangerPlugin = new RangerBasePlugin("kafka", "kafka"); - } - } - - /* - * (non-Javadoc) - * - * @see kafka.security.auth.Authorizer#initialize(kafka.server.KafkaConfig) - */ - @Override - public void initialize(KafkaConfig kafkaConfig) { - rangerPlugin.init(); - RangerDefaultAuditHandler auditHandler = new RangerDefaultAuditHandler(); - - rangerPlugin.setResultProcessor(auditHandler); - } - - // TODO: Fix this after Session is fixed - // @Override - public boolean authorize(Session session, Operation operation, - Resource resource) { - - String userName = null; - java.util.Set<String> userGroups = getGroupsForUser(userName); - String ip = null; - Date eventTime = StringUtil.getUTCDate(); - String accessType = mapToRangerAccessType(operation); - if (accessType == null) { - logger.fatal("Unsupported access type. session=" + session - + ", operation=" + operation + ", resource=" + resource); - return false; - } - String action = accessType; - - RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl(); - rangerRequest.setUser(userName); - rangerRequest.setUserGroups(userGroups); - rangerRequest.setClientIPAddress(ip); - rangerRequest.setAccessTime(eventTime); - - RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl(); - - if (resource.resourceType().equals(ResourceType.TOPIC)) { - rangerResource.setValue(KEY_TOPIC, resource.name()); - } else if (resource.resourceType().equals(ResourceType.CLUSTER)) { - rangerResource.setValue(KEY_CLUSTER, resource.name()); - } else if (resource.resourceType().equals(ResourceType.GROUP)) { - rangerResource.setValue(KEY_CONSUMER_GROUP, resource.name()); - } else { - logger.fatal("Unsupported resourceType=" + resource.resourceType()); - return false; - } - - rangerRequest.setResource(rangerResource); - rangerRequest.setAccessType(accessType); - rangerRequest.setAction(action); - rangerRequest.setRequestData(resource.name()); - - RangerAccessResult result = rangerPlugin.isAccessAllowed(rangerRequest); - return result.getIsAllowed(); - } - - /* - * (non-Javadoc) - * - * @see - * kafka.security.auth.Authorizer#addAcls(scala.collection.immutable.Set, - * kafka.security.auth.Resource) - */ - @Override - public void addAcls(Set<Acl> acls, Resource resource) { - logger.error("addAcls() is not supported by Ranger for Kafka"); - } - - /* - * (non-Javadoc) - * - * @see - * kafka.security.auth.Authorizer#removeAcls(scala.collection.immutable.Set, - * kafka.security.auth.Resource) - */ - @Override - public boolean removeAcls(Set<Acl> acls, Resource resource) { - logger.error("removeAcls() is not supported by Ranger for Kafka"); - return false; - } - - /* - * (non-Javadoc) - * - * @see - * kafka.security.auth.Authorizer#removeAcls(kafka.security.auth.Resource) - */ - @Override - public boolean removeAcls(Resource resource) { - logger.error("removeAcls() is not supported by Ranger for Kafka"); - return false; - } - - /* - * (non-Javadoc) - * - * @see kafka.security.auth.Authorizer#getAcls(kafka.security.auth.Resource) - */ - @Override - public Set<Acl> getAcls(Resource resource) { - Set<Acl> aclList = new HashSet<Acl>(); - logger.error("getAcls() is not supported by Ranger for Kafka"); - return aclList; - } +public class RangerKafkaAuthorizer /*KafkaAuthorizationPlugin*/ { - /* - * (non-Javadoc) - * - * @see - * kafka.security.auth.Authorizer#getAcls(kafka.security.auth.KafkaPrincipal - * ) - */ - @Override - public Set<Acl> getAcls(KafkaPrincipal principal) { - Set<Acl> aclList = new HashSet<Acl>(); - logger.error("getAcls() is not supported by Ranger for Kafka"); - return aclList; - } + private static final Log LOG = LogFactory.getLog(RangerKafkaAuthorizer.class); - /** - * @param userName - * @return - */ - private java.util.Set<String> getGroupsForUser(String userName) { - if (userName == null) { - return null; - } + //private static volatile RangerKafkaPlugin kafkaPlugin = null; - // TODO: Need to implement this method - return null; - } - /** - * @param operation - * @return - */ - private String mapToRangerAccessType(Operation operation) { - if (operation.equals(Operation.READ)) { - return ACCESS_TYPE_READ; - } else if (operation.equals(Operation.WRITE)) { - return ACCESS_TYPE_WRITE; - } else if (operation.equals(Operation.CREATE)) { - return ACCESS_TYPE_CREATE; - } else if (operation.equals(Operation.DELETE)) { - return ACCESS_TYPE_DELETE; - } else if (operation.equals(Operation.ALTER)) { - return ACCESS_TYPE_ALTER; - } else if (operation.equals(Operation.DESCRIBE)) { - return ACCESS_TYPE_DESCRIBE; - } else if (operation.equals(Operation.CLUSTER_ACTION)) { - return ACCESS_TYPE_KAFKA_ADMIN; - } - return null; - } } http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/33ec87ec/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java ---------------------------------------------------------------------- diff --git a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java index 5cca619..a62bd95 100644 --- a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java +++ b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java @@ -22,8 +22,10 @@ package org.apache.ranger.services.kafka.client; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/33ec87ec/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index d26fe5d..0b5608a 100644 --- a/pom.xml +++ b/pom.xml @@ -87,8 +87,8 @@ <module>hive-agent</module> <module>knox-agent</module> <module>storm-agent</module> + <module>plugin-kafka</module> <!-- <module>plugin-solr</module> --> - <!-- <module>plugin-kafka</module> --> <module>plugin-yarn</module> <module>ranger_solrj</module> <module>security-admin</module> @@ -148,8 +148,7 @@ <jersey-bundle.version>1.17.1</jersey-bundle.version> <jersey-client.version>2.6</jersey-client.version> <junit.version>4.11</junit.version> - <!-- <kafka.version>0.8.2.0</kafka.version> --> - <kafka.version>0.8.2.2.3.0.0-1860</kafka.version> + <kafka.version>0.8.2.0</kafka.version> <mockito.version>1.8.4</mockito.version> <hamcrest-version>1.3</hamcrest-version> <knox.gateway.version>0.5.0</knox.gateway.version> @@ -225,12 +224,6 @@ <module>plugin-solr</module> </modules> </profile> - <profile> - <id>kafka-security</id> - <modules> - <module>plugin-kafka</module> - </modules> - </profile> </profiles> <distributionManagement> <repository>
