Repository: incubator-ranger
Updated Branches:
  refs/heads/master 103104129 -> f0c9216da


RANGER-246 - Kafka authorization plugin (Bosco Durai via Selvamohan Neethiraj)


Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/f0c9216d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/f0c9216d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/f0c9216d

Branch: refs/heads/master
Commit: f0c9216dafbe81cbbe7ae9696c9f6de456ebfac6
Parents: 1031041
Author: sneethiraj <[email protected]>
Authored: Sun May 17 18:40:00 2015 -0400
Committer: sneethiraj <[email protected]>
Committed: Sun May 17 18:40:00 2015 -0400

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 .../service-defs/ranger-servicedef-kafka.json   |  12 +-
 plugin-kafka/pom.xml                            | 103 +++++-----
 .../kafka/authorizer/RangerKafkaAuthorizer.java | 201 ++++++++++++++++++-
 .../kafka/client/ServiceKafkaClient.java        |   2 -
 pom.xml                                         |  13 +-
 6 files changed, 258 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/f0c9216d/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index dd4e2c2..7f41f0c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -7,3 +7,4 @@
 .project
 /target/
 winpkg/target
+.DS_Store

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/f0c9216d/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json
----------------------------------------------------------------------
diff --git 
a/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json 
b/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json
index 9928c5d..d19b10c 100644
--- a/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json
+++ b/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json
@@ -59,15 +59,9 @@
                },
                {
                        "itemId": 7,
-                       "name":"replicate",
-                       "label":"Replicate"
-               },
-               {
-                       "itemId": 8,
-                       "name":"connect",
-                       "label":"Connect"
+                       "name":"kafka_admin",
+                       "label":"Kafka Admin"
                }
-               
        ],
        "configs":[
                {
@@ -97,7 +91,7 @@
                        "name":"commonNameForCertificate",
                        "type":"string",
                        "mandatory":false,
-                       "label":"Common Name for Certificate"
+                       "label":"Ranger Plugin SSL CName"
                }
                
        ],

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/f0c9216d/plugin-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/plugin-kafka/pom.xml b/plugin-kafka/pom.xml
index e9ea265..afee47d 100644
--- a/plugin-kafka/pom.xml
+++ b/plugin-kafka/pom.xml
@@ -1,56 +1,51 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-  <modelVersion>4.0.0</modelVersion>
-  <groupId>security_plugins.ranger-kafka-plugin</groupId>
-  <artifactId>ranger-kafka-plugin</artifactId>
-  <name>KAFKA Security Plugin</name>
-  <description>KAFKA Security Plugin</description>
-  <packaging>jar</packaging>
-  <properties>
-    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-  </properties>
-  <parent>
-     <groupId>org.apache.ranger</groupId>
-     <artifactId>ranger</artifactId>
-     <version>0.5.0</version>
-     <relativePath>..</relativePath>
-  </parent>
-  <dependencies>
-    <dependency>
-      <groupId>security_plugins.ranger-plugins-common</groupId>
-      <artifactId>ranger-plugins-common</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>security_plugins.ranger-plugins-audit</groupId>
-      <artifactId>ranger-plugins-audit</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.ranger</groupId>
-      <artifactId>credentialbuilder</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka_2.10</artifactId>
-      <version>${kafka.version}</version>
-    </dependency>
-  </dependencies>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more 
contributor 
+       license agreements. See the NOTICE file distributed with this work for 
additional 
+       information regarding copyright ownership. The ASF licenses this file 
to 
+       You under the Apache License, Version 2.0 (the "License"); you may not 
use 
+       this file except in compliance with the License. You may obtain a copy 
of 
+       the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
required 
+       by applicable law or agreed to in writing, software distributed under 
the 
+       License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS 
+       OF ANY KIND, either express or implied. See the License for the 
specific 
+       language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
+       <groupId>security_plugins.ranger-kafka-plugin</groupId>
+       <artifactId>ranger-kafka-plugin</artifactId>
+       <name>KAFKA Security Plugin</name>
+       <description>KAFKA Security Plugin</description>
+       <packaging>jar</packaging>
+       <properties>
+               
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+       </properties>
+       <parent>
+               <groupId>org.apache.ranger</groupId>
+               <artifactId>ranger</artifactId>
+               <version>0.5.0</version>
+               <relativePath>..</relativePath>
+       </parent>
+       <dependencies>
+               <dependency>
+                       
<groupId>security_plugins.ranger-plugins-common</groupId>
+                       <artifactId>ranger-plugins-common</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>security_plugins.ranger-plugins-audit</groupId>
+                       <artifactId>ranger-plugins-audit</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.ranger</groupId>
+                       <artifactId>credentialbuilder</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.kafka</groupId>
+                       <artifactId>kafka_2.10</artifactId>
+                       <version>${kafka.version}</version>
+               </dependency>
+       </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/f0c9216d/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
----------------------------------------------------------------------
diff --git 
a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
 
b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
index 40c2204..4689957 100644
--- 
a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
+++ 
b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
@@ -1,4 +1,3 @@
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -20,14 +19,208 @@
 
 package org.apache.ranger.authorization.kafka.authorizer;
 
+import java.util.Date;
+
+import kafka.security.auth.Acl;
+import kafka.security.auth.Authorizer;
+import kafka.security.auth.KafkaPrincipal;
+import kafka.security.auth.Operation;
+import kafka.security.auth.Resource;
+import kafka.security.auth.ResourceType;
+import kafka.server.KafkaConfig;
+import kafka.network.RequestChannel.Session;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.authorization.utils.StringUtil;
+import org.apache.ranger.plugin.audit.RangerDefaultAuditHandler;
+import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
+import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
+import org.apache.ranger.plugin.policyengine.RangerAccessResult;
+import org.apache.ranger.plugin.service.RangerBasePlugin;
+
+import scala.collection.immutable.HashSet;
+import scala.collection.immutable.Set;
+
+public class RangerKafkaAuthorizer implements Authorizer {
+       private static final Log logger = LogFactory
+                       .getLog(RangerKafkaAuthorizer.class);
+
+       public static final String KEY_TOPIC = "topic";
+       public static final String KEY_CLUSTER = "cluster";
+       public static final String KEY_CONSUMER_GROUP = "consumer_group";
+
+       public static final String ACCESS_TYPE_READ = "read";
+       public static final String ACCESS_TYPE_WRITE = "write";
+       public static final String ACCESS_TYPE_CREATE = "create";
+       public static final String ACCESS_TYPE_DELETE = "delete";
+       public static final String ACCESS_TYPE_ALTER = "alter";
+       public static final String ACCESS_TYPE_DESCRIBE = "describe";
+       public static final String ACCESS_TYPE_KAFKA_ADMIN = "kafka_admin";
+
+       private static volatile RangerBasePlugin rangerPlugin = null;
+
+       public RangerKafkaAuthorizer() {
+               if (rangerPlugin == null) {
+                       rangerPlugin = new RangerBasePlugin("kafka", "kafka");
+               }
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see 
kafka.security.auth.Authorizer#initialize(kafka.server.KafkaConfig)
+        */
+       @Override
+       public void initialize(KafkaConfig kafkaConfig) {
+               rangerPlugin.init();
+               RangerDefaultAuditHandler auditHandler = new 
RangerDefaultAuditHandler();
+
+               rangerPlugin.setResultProcessor(auditHandler);
+       }
+
+       // TODO: Fix this after Session is fixed
+       // @Override
+       public boolean authorize(Session session, Operation operation,
+                       Resource resource) {
+
+               String userName = null;
+               java.util.Set<String> userGroups = getGroupsForUser(userName);
+               String ip = null;
+               Date eventTime = StringUtil.getUTCDate();
+               String accessType = mapToRangerAccessType(operation);
+               if (accessType == null) {
+                       logger.fatal("Unsupported access type. session=" + 
session
+                                       + ", operation=" + operation + ", 
resource=" + resource);
+                       return false;
+               }
+               String action = accessType;
+
+               RangerAccessRequestImpl rangerRequest = new 
RangerAccessRequestImpl();
+               rangerRequest.setUser(userName);
+               rangerRequest.setUserGroups(userGroups);
+               rangerRequest.setClientIPAddress(ip);
+               rangerRequest.setAccessTime(eventTime);
+
+               RangerAccessResourceImpl rangerResource = new 
RangerAccessResourceImpl();
+
+               if (resource.resourceType().equals(ResourceType.TOPIC)) {
+                       rangerResource.setValue(KEY_TOPIC, resource.name());
+               } else if 
(resource.resourceType().equals(ResourceType.CLUSTER)) {
+                       rangerResource.setValue(KEY_CLUSTER, resource.name());
+               } else if (resource.resourceType().equals(ResourceType.GROUP)) {
+                       rangerResource.setValue(KEY_CONSUMER_GROUP, 
resource.name());
+               } else {
+                       logger.fatal("Unsupported resourceType=" + 
resource.resourceType());
+                       return false;
+               }
+
+               rangerRequest.setResource(rangerResource);
+               rangerRequest.setAccessType(accessType);
+               rangerRequest.setAction(action);
+               rangerRequest.setRequestData(resource.name());
+
+               RangerAccessResult result = 
rangerPlugin.isAccessAllowed(rangerRequest);
+               return result.getIsAllowed();
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see
+        * 
kafka.security.auth.Authorizer#addAcls(scala.collection.immutable.Set,
+        * kafka.security.auth.Resource)
+        */
+       @Override
+       public void addAcls(Set<Acl> acls, Resource resource) {
+               logger.error("addAcls() is not supported by Ranger for Kafka");
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see
+        * 
kafka.security.auth.Authorizer#removeAcls(scala.collection.immutable.Set,
+        * kafka.security.auth.Resource)
+        */
+       @Override
+       public boolean removeAcls(Set<Acl> acls, Resource resource) {
+               logger.error("removeAcls() is not supported by Ranger for 
Kafka");
+               return false;
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see
+        * 
kafka.security.auth.Authorizer#removeAcls(kafka.security.auth.Resource)
+        */
+       @Override
+       public boolean removeAcls(Resource resource) {
+               logger.error("removeAcls() is not supported by Ranger for 
Kafka");
+               return false;
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see 
kafka.security.auth.Authorizer#getAcls(kafka.security.auth.Resource)
+        */
+       @Override
+       public Set<Acl> getAcls(Resource resource) {
+               Set<Acl> aclList = new HashSet<Acl>();
+               logger.error("getAcls() is not supported by Ranger for Kafka");
 
-public class RangerKafkaAuthorizer /*KafkaAuthorizationPlugin*/ {
+               return aclList;
+       }
 
-       private static final Log LOG = 
LogFactory.getLog(RangerKafkaAuthorizer.class);
+       /*
+        * (non-Javadoc)
+        * 
+        * @see
+        * 
kafka.security.auth.Authorizer#getAcls(kafka.security.auth.KafkaPrincipal
+        * )
+        */
+       @Override
+       public Set<Acl> getAcls(KafkaPrincipal principal) {
+               Set<Acl> aclList = new HashSet<Acl>();
+               logger.error("getAcls() is not supported by Ranger for Kafka");
+               return aclList;
+       }
 
-    //private static volatile RangerKafkaPlugin kafkaPlugin = null;
+       /**
+        * @param userName
+        * @return
+        */
+       private java.util.Set<String> getGroupsForUser(String userName) {
+               if (userName == null) {
+                       return null;
+               }
 
+               // TODO: Need to implement this method
+               return null;
+       }
 
+       /**
+        * @param operation
+        * @return
+        */
+       private String mapToRangerAccessType(Operation operation) {
+               if (operation.equals(Operation.READ)) {
+                       return ACCESS_TYPE_READ;
+               } else if (operation.equals(Operation.WRITE)) {
+                       return ACCESS_TYPE_WRITE;
+               } else if (operation.equals(Operation.CREATE)) {
+                       return ACCESS_TYPE_CREATE;
+               } else if (operation.equals(Operation.DELETE)) {
+                       return ACCESS_TYPE_DELETE;
+               } else if (operation.equals(Operation.ALTER)) {
+                       return ACCESS_TYPE_ALTER;
+               } else if (operation.equals(Operation.DESCRIBE)) {
+                       return ACCESS_TYPE_DESCRIBE;
+               } else if (operation.equals(Operation.CLUSTER_ACTION)) {
+                       return ACCESS_TYPE_KAFKA_ADMIN;
+               }
+               return null;
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/f0c9216d/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
----------------------------------------------------------------------
diff --git 
a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
 
b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
index a62bd95..5cca619 100644
--- 
a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
+++ 
b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
@@ -22,10 +22,8 @@ package org.apache.ranger.services.kafka.client;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/f0c9216d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0b5608a..cbb3a08 100644
--- a/pom.xml
+++ b/pom.xml
@@ -87,8 +87,6 @@
   <module>hive-agent</module>
   <module>knox-agent</module>
   <module>storm-agent</module>
-  <module>plugin-kafka</module>
-  <!-- <module>plugin-solr</module> -->
   <module>plugin-yarn</module>
   <module>ranger_solrj</module>
   <module>security-admin</module>
@@ -224,6 +222,12 @@
                 <module>plugin-solr</module>         
          </modules>
       </profile>
+      <profile>
+          <id>kafka-security</id>
+         <modules>
+                <module>plugin-kafka</module>         
+         </modules>
+      </profile>
   </profiles>
   <distributionManagement>
         <repository>
@@ -375,15 +379,14 @@
              <descriptor>src/main/assembly/hbase-agent.xml</descriptor>
              <descriptor>src/main/assembly/knox-agent.xml</descriptor>
              <descriptor>src/main/assembly/storm-agent.xml</descriptor>
-            <descriptor>src/main/assembly/plugin-kafka.xml</descriptor>
+                <descriptor>src/main/assembly/plugin-kafka.xml</descriptor>
              <descriptor>src/main/assembly/plugin-yarn.xml</descriptor>
-            <descriptor>src/main/assembly/plugin-solr.xml</descriptor>
+                <descriptor>src/main/assembly/plugin-solr.xml</descriptor>
              <descriptor>src/main/assembly/admin-web.xml</descriptor>
              <descriptor>src/main/assembly/usersync.xml</descriptor>
              <descriptor>src/main/assembly/migration-util.xml</descriptor>
              <descriptor>src/main/assembly/kms.xml</descriptor>
              <descriptor>src/main/assembly/ranger-src.xml</descriptor>
-            <!--<descriptor>src/main/assembly/plugin-kms.xml</descriptor>-->
            </descriptors>
          </configuration>
       </plugin>

Reply via email to