RANGER-246 - Kafka authorization plugin (Bosco Durai via Selvamohan Neethiraj)
Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/f0c9216d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/f0c9216d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/f0c9216d Branch: refs/heads/tag-policy Commit: f0c9216dafbe81cbbe7ae9696c9f6de456ebfac6 Parents: 1031041 Author: sneethiraj <[email protected]> Authored: Sun May 17 18:40:00 2015 -0400 Committer: sneethiraj <[email protected]> Committed: Sun May 17 18:40:00 2015 -0400 ---------------------------------------------------------------------- .gitignore | 1 + .../service-defs/ranger-servicedef-kafka.json | 12 +- plugin-kafka/pom.xml | 103 +++++----- .../kafka/authorizer/RangerKafkaAuthorizer.java | 201 ++++++++++++++++++- .../kafka/client/ServiceKafkaClient.java | 2 - pom.xml | 13 +- 6 files changed, 258 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/f0c9216d/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index dd4e2c2..7f41f0c 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ .project /target/ winpkg/target +.DS_Store http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/f0c9216d/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json ---------------------------------------------------------------------- diff --git a/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json b/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json index 9928c5d..d19b10c 100644 --- a/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json +++ b/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json @@ -59,15 +59,9 @@ }, { "itemId": 7, - "name":"replicate", - "label":"Replicate" - }, - { - "itemId": 8, - "name":"connect", - "label":"Connect" + "name":"kafka_admin", + "label":"Kafka Admin" } - ], "configs":[ { @@ -97,7 +91,7 @@ "name":"commonNameForCertificate", "type":"string", "mandatory":false, - "label":"Common Name for Certificate" + "label":"Ranger Plugin SSL CName" } ], http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/f0c9216d/plugin-kafka/pom.xml ---------------------------------------------------------------------- diff --git a/plugin-kafka/pom.xml b/plugin-kafka/pom.xml index e9ea265..afee47d 100644 --- a/plugin-kafka/pom.xml +++ b/plugin-kafka/pom.xml @@ -1,56 +1,51 @@ <?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <groupId>security_plugins.ranger-kafka-plugin</groupId> - <artifactId>ranger-kafka-plugin</artifactId> - <name>KAFKA Security Plugin</name> - <description>KAFKA Security Plugin</description> - <packaging>jar</packaging> - <properties> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - </properties> - <parent> - <groupId>org.apache.ranger</groupId> - <artifactId>ranger</artifactId> - <version>0.5.0</version> - <relativePath>..</relativePath> - </parent> - <dependencies> - <dependency> - <groupId>security_plugins.ranger-plugins-common</groupId> - <artifactId>ranger-plugins-common</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>security_plugins.ranger-plugins-audit</groupId> - <artifactId>ranger-plugins-audit</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.ranger</groupId> - <artifactId>credentialbuilder</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.10</artifactId> - <version>${kafka.version}</version> - </dependency> - </dependencies> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <groupId>security_plugins.ranger-kafka-plugin</groupId> + <artifactId>ranger-kafka-plugin</artifactId> + <name>KAFKA Security Plugin</name> + <description>KAFKA Security Plugin</description> + <packaging>jar</packaging> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + <parent> + <groupId>org.apache.ranger</groupId> + <artifactId>ranger</artifactId> + <version>0.5.0</version> + <relativePath>..</relativePath> + </parent> + <dependencies> + <dependency> + <groupId>security_plugins.ranger-plugins-common</groupId> + <artifactId>ranger-plugins-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>security_plugins.ranger-plugins-audit</groupId> + <artifactId>ranger-plugins-audit</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.ranger</groupId> + <artifactId>credentialbuilder</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.10</artifactId> + <version>${kafka.version}</version> + </dependency> + </dependencies> </project> http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/f0c9216d/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java ---------------------------------------------------------------------- diff --git a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java index 40c2204..4689957 100644 --- a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java +++ b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java @@ -1,4 +1,3 @@ - /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -20,14 +19,208 @@ package org.apache.ranger.authorization.kafka.authorizer; +import java.util.Date; + +import kafka.security.auth.Acl; +import kafka.security.auth.Authorizer; +import kafka.security.auth.KafkaPrincipal; +import kafka.security.auth.Operation; +import kafka.security.auth.Resource; +import kafka.security.auth.ResourceType; +import kafka.server.KafkaConfig; +import kafka.network.RequestChannel.Session; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.ranger.authorization.utils.StringUtil; +import org.apache.ranger.plugin.audit.RangerDefaultAuditHandler; +import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl; +import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl; +import org.apache.ranger.plugin.policyengine.RangerAccessResult; +import org.apache.ranger.plugin.service.RangerBasePlugin; + +import scala.collection.immutable.HashSet; +import scala.collection.immutable.Set; + +public class RangerKafkaAuthorizer implements Authorizer { + private static final Log logger = LogFactory + .getLog(RangerKafkaAuthorizer.class); + + public static final String KEY_TOPIC = "topic"; + public static final String KEY_CLUSTER = "cluster"; + public static final String KEY_CONSUMER_GROUP = "consumer_group"; + + public static final String ACCESS_TYPE_READ = "read"; + public static final String ACCESS_TYPE_WRITE = "write"; + public static final String ACCESS_TYPE_CREATE = "create"; + public static final String ACCESS_TYPE_DELETE = "delete"; + public static final String ACCESS_TYPE_ALTER = "alter"; + public static final String ACCESS_TYPE_DESCRIBE = "describe"; + public static final String ACCESS_TYPE_KAFKA_ADMIN = "kafka_admin"; + + private static volatile RangerBasePlugin rangerPlugin = null; + + public RangerKafkaAuthorizer() { + if (rangerPlugin == null) { + rangerPlugin = new RangerBasePlugin("kafka", "kafka"); + } + } + + /* + * (non-Javadoc) + * + * @see kafka.security.auth.Authorizer#initialize(kafka.server.KafkaConfig) + */ + @Override + public void initialize(KafkaConfig kafkaConfig) { + rangerPlugin.init(); + RangerDefaultAuditHandler auditHandler = new RangerDefaultAuditHandler(); + + rangerPlugin.setResultProcessor(auditHandler); + } + + // TODO: Fix this after Session is fixed + // @Override + public boolean authorize(Session session, Operation operation, + Resource resource) { + + String userName = null; + java.util.Set<String> userGroups = getGroupsForUser(userName); + String ip = null; + Date eventTime = StringUtil.getUTCDate(); + String accessType = mapToRangerAccessType(operation); + if (accessType == null) { + logger.fatal("Unsupported access type. session=" + session + + ", operation=" + operation + ", resource=" + resource); + return false; + } + String action = accessType; + + RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl(); + rangerRequest.setUser(userName); + rangerRequest.setUserGroups(userGroups); + rangerRequest.setClientIPAddress(ip); + rangerRequest.setAccessTime(eventTime); + + RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl(); + + if (resource.resourceType().equals(ResourceType.TOPIC)) { + rangerResource.setValue(KEY_TOPIC, resource.name()); + } else if (resource.resourceType().equals(ResourceType.CLUSTER)) { + rangerResource.setValue(KEY_CLUSTER, resource.name()); + } else if (resource.resourceType().equals(ResourceType.GROUP)) { + rangerResource.setValue(KEY_CONSUMER_GROUP, resource.name()); + } else { + logger.fatal("Unsupported resourceType=" + resource.resourceType()); + return false; + } + + rangerRequest.setResource(rangerResource); + rangerRequest.setAccessType(accessType); + rangerRequest.setAction(action); + rangerRequest.setRequestData(resource.name()); + + RangerAccessResult result = rangerPlugin.isAccessAllowed(rangerRequest); + return result.getIsAllowed(); + } + + /* + * (non-Javadoc) + * + * @see + * kafka.security.auth.Authorizer#addAcls(scala.collection.immutable.Set, + * kafka.security.auth.Resource) + */ + @Override + public void addAcls(Set<Acl> acls, Resource resource) { + logger.error("addAcls() is not supported by Ranger for Kafka"); + } + + /* + * (non-Javadoc) + * + * @see + * kafka.security.auth.Authorizer#removeAcls(scala.collection.immutable.Set, + * kafka.security.auth.Resource) + */ + @Override + public boolean removeAcls(Set<Acl> acls, Resource resource) { + logger.error("removeAcls() is not supported by Ranger for Kafka"); + return false; + } + + /* + * (non-Javadoc) + * + * @see + * kafka.security.auth.Authorizer#removeAcls(kafka.security.auth.Resource) + */ + @Override + public boolean removeAcls(Resource resource) { + logger.error("removeAcls() is not supported by Ranger for Kafka"); + return false; + } + + /* + * (non-Javadoc) + * + * @see kafka.security.auth.Authorizer#getAcls(kafka.security.auth.Resource) + */ + @Override + public Set<Acl> getAcls(Resource resource) { + Set<Acl> aclList = new HashSet<Acl>(); + logger.error("getAcls() is not supported by Ranger for Kafka"); -public class RangerKafkaAuthorizer /*KafkaAuthorizationPlugin*/ { + return aclList; + } - private static final Log LOG = LogFactory.getLog(RangerKafkaAuthorizer.class); + /* + * (non-Javadoc) + * + * @see + * kafka.security.auth.Authorizer#getAcls(kafka.security.auth.KafkaPrincipal + * ) + */ + @Override + public Set<Acl> getAcls(KafkaPrincipal principal) { + Set<Acl> aclList = new HashSet<Acl>(); + logger.error("getAcls() is not supported by Ranger for Kafka"); + return aclList; + } - //private static volatile RangerKafkaPlugin kafkaPlugin = null; + /** + * @param userName + * @return + */ + private java.util.Set<String> getGroupsForUser(String userName) { + if (userName == null) { + return null; + } + // TODO: Need to implement this method + return null; + } + /** + * @param operation + * @return + */ + private String mapToRangerAccessType(Operation operation) { + if (operation.equals(Operation.READ)) { + return ACCESS_TYPE_READ; + } else if (operation.equals(Operation.WRITE)) { + return ACCESS_TYPE_WRITE; + } else if (operation.equals(Operation.CREATE)) { + return ACCESS_TYPE_CREATE; + } else if (operation.equals(Operation.DELETE)) { + return ACCESS_TYPE_DELETE; + } else if (operation.equals(Operation.ALTER)) { + return ACCESS_TYPE_ALTER; + } else if (operation.equals(Operation.DESCRIBE)) { + return ACCESS_TYPE_DESCRIBE; + } else if (operation.equals(Operation.CLUSTER_ACTION)) { + return ACCESS_TYPE_KAFKA_ADMIN; + } + return null; + } } http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/f0c9216d/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java ---------------------------------------------------------------------- diff --git a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java index a62bd95..5cca619 100644 --- a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java +++ b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java @@ -22,10 +22,8 @@ package org.apache.ranger.services.kafka.client; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; -import java.util.LinkedHashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/f0c9216d/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 0b5608a..cbb3a08 100644 --- a/pom.xml +++ b/pom.xml @@ -87,8 +87,6 @@ <module>hive-agent</module> <module>knox-agent</module> <module>storm-agent</module> - <module>plugin-kafka</module> - <!-- <module>plugin-solr</module> --> <module>plugin-yarn</module> <module>ranger_solrj</module> <module>security-admin</module> @@ -224,6 +222,12 @@ <module>plugin-solr</module> </modules> </profile> + <profile> + <id>kafka-security</id> + <modules> + <module>plugin-kafka</module> + </modules> + </profile> </profiles> <distributionManagement> <repository> @@ -375,15 +379,14 @@ <descriptor>src/main/assembly/hbase-agent.xml</descriptor> <descriptor>src/main/assembly/knox-agent.xml</descriptor> <descriptor>src/main/assembly/storm-agent.xml</descriptor> - <descriptor>src/main/assembly/plugin-kafka.xml</descriptor> + <descriptor>src/main/assembly/plugin-kafka.xml</descriptor> <descriptor>src/main/assembly/plugin-yarn.xml</descriptor> - <descriptor>src/main/assembly/plugin-solr.xml</descriptor> + <descriptor>src/main/assembly/plugin-solr.xml</descriptor> <descriptor>src/main/assembly/admin-web.xml</descriptor> <descriptor>src/main/assembly/usersync.xml</descriptor> <descriptor>src/main/assembly/migration-util.xml</descriptor> <descriptor>src/main/assembly/kms.xml</descriptor> <descriptor>src/main/assembly/ranger-src.xml</descriptor> - <!--<descriptor>src/main/assembly/plugin-kms.xml</descriptor>--> </descriptors> </configuration> </plugin>
