Repository: ranger
Updated Branches:
  refs/heads/master f8aff4e14 -> b6d173023


RANGER-2222:Apache RangerKafkaPlugin support to handle Kafka Cluster as a new 
resource

Signed-off-by: rmani <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/ranger/commit/b6d17302
Tree: http://git-wip-us.apache.org/repos/asf/ranger/tree/b6d17302
Diff: http://git-wip-us.apache.org/repos/asf/ranger/diff/b6d17302

Branch: refs/heads/master
Commit: b6d1730238f65a7d724d307e7552d6e06ebbb9b9
Parents: f8aff4e
Author: rmani <[email protected]>
Authored: Mon Oct 8 12:09:34 2018 -0700
Committer: rmani <[email protected]>
Committed: Mon Oct 8 17:03:18 2018 -0700

----------------------------------------------------------------------
 .../service-defs/ranger-servicedef-kafka.json   |  49 ++-
 .../authorizer/RangerKafkaAuditHandler.java     |  74 ++++
 .../kafka/authorizer/RangerKafkaAuthorizer.java |  16 +-
 .../KafkaRangerAuthorizerGSSTest.java           |   1 -
 .../KafkaRangerTopicCreationTest.java           | 191 ++++++++++
 .../src/test/resources/kafka-policies.json      | 198 +++++++++-
 .../src/test/resources/kafka_kerberos.jaas      |   8 +-
 .../optimized/current/ranger_core_db_mysql.sql  |   1 +
 .../optimized/current/ranger_core_db_oracle.sql |   1 +
 .../current/ranger_core_db_postgres.sql         |   1 +
 .../current/ranger_core_db_sqlanywhere.sql      |   2 +
 .../current/ranger_core_db_sqlserver.sql        |   1 +
 .../PatchForKafkaServiceDefUpdate_J10025.java   | 381 +++++++++++++++++++
 src/main/assembly/plugin-kafka.xml              |   1 -
 14 files changed, 900 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ranger/blob/b6d17302/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json
----------------------------------------------------------------------
diff --git 
a/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json 
b/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json
index ca3e0fe..7e91aab 100644
--- a/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json
+++ b/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json
@@ -23,13 +23,15 @@
                        "validationMessage":"",
                        "uiHint":"",
                        "label":"Topic",
-                       "description":"Topic"
+                       "description":"Topic",
+                       "accessTypeRestrictions": ["create", "delete", 
"configure", "alter_configs", "describe", "describe_configs", "consume", 
"publish", "kafka_admin"]
                },
                {
                        "itemId":2,
                        "name":"transactionalid",
                        "type":"string",
                        "level":1,
+                       "mandatory":true,
                        "excludesSupported":true,
                        
"matcher":"org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher",
                        "matcherOptions":{
@@ -37,9 +39,41 @@
                                "ignoreCase":true
                        },
                        "label":"Transactional Id",
-                       "description":"Transactional Id"
+                       "description":"Transactional Id",
+                       "accessTypeRestrictions": ["publish", "describe"]
+               },
+               {
+                       "itemId":3,
+                       "name":"cluster",
+                       "type":"string",
+                       "level":1,
+                       "mandatory":true,
+                       "excludesSupported":true,
+                       
"matcher":"org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher",
+                       "matcherOptions":{
+                               "wildCard":true,
+                               "ignoreCase":true
+                       },
+                       "label":"Cluster",
+                       "description":"Cluster",
+                       "accessTypeRestrictions": ["configure", 
"alter_configs", "describe", "describe_configs", "kafka_admin", 
"idempotent_write"]
+               },
+               {
+                       "itemId":4,
+                       "name":"delegationtoken",
+                       "type":"string",
+                       "level":1,
+                       "mandatory":true,
+                       "excludesSupported":true,
+                       
"matcher":"org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher",
+                       "matcherOptions":{
+                               "wildCard":true,
+                               "ignoreCase":true
+                       },
+                       "label":"Delegation Token",
+                       "description":"Delegation Token",
+                       "accessTypeRestrictions": ["describe"]
                }
-               
        ],
        "accessTypes":[
                {
@@ -49,7 +83,6 @@
                        "impliedGrants":[
                                "describe"
                        ]
-                       
                },
                {
                        "itemId":2,
@@ -58,7 +91,6 @@
                        "impliedGrants":[
                                "describe"
                        ]
-                       
                },
                {
                        "itemId":5,
@@ -67,7 +99,6 @@
                        "impliedGrants":[
                                "describe"
                        ]
-                       
                },
                {
                        "itemId":6,
@@ -99,7 +130,6 @@
                                "create",
                                "delete"
                        ]
-                       
                },
                {
                        "itemId":10,
@@ -150,13 +180,10 @@
                        "mandatory":false,
                        "label":"Ranger Plugin SSL CName"
                }
-               
        ],
        "enums":[
-               
        ],
        "contextEnrichers":[
-               
        ],
        "policyConditions":[
                {
@@ -164,7 +191,6 @@
                        "name":"ip-range",
                        
"evaluator":"org.apache.ranger.plugin.conditionevaluator.RangerIpMatcher",
                        "evaluatorOptions":{
-                               
                        },
                        "validationRegEx":"",
                        "validationMessage":"",
@@ -172,6 +198,5 @@
                        "label":"IP Address Range",
                        "description":"IP Address Range"
                }
-               
        ]
 }

http://git-wip-us.apache.org/repos/asf/ranger/blob/b6d17302/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuditHandler.java
----------------------------------------------------------------------
diff --git 
a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuditHandler.java
 
b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuditHandler.java
new file mode 100644
index 0000000..ee50e95
--- /dev/null
+++ 
b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuditHandler.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.ranger.authorization.kafka.authorizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.model.AuthzAuditEvent;
+import org.apache.ranger.plugin.audit.RangerDefaultAuditHandler;
+import org.apache.ranger.plugin.policyengine.RangerAccessRequest;
+import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
+import org.apache.ranger.plugin.policyengine.RangerAccessResult;
+
+public class RangerKafkaAuditHandler extends RangerDefaultAuditHandler {
+    private static final Log LOG = 
LogFactory.getLog(RangerKafkaAuditHandler.class);
+
+    private AuthzAuditEvent auditEvent      = null;
+
+    public RangerKafkaAuditHandler(){
+    }
+
+    @Override
+    public void processResult(RangerAccessResult result) {
+        // If Cluster Resource Level Topic Creation is not Allowed we don't 
audit.
+        // Subsequent call from Kafka for Topic Creation at Topic resource 
Level will be audited.
+        if (!isAuditingNeeded(result)) {
+            return;
+        }
+        auditEvent = super.getAuthzEvents(result);
+    }
+
+    private boolean isAuditingNeeded(final RangerAccessResult result) {
+        boolean ret = true;
+        boolean                            isAllowed = result.getIsAllowed();
+        RangerAccessRequest request = result.getAccessRequest();
+        RangerAccessResourceImpl resource = (RangerAccessResourceImpl) 
request.getResource();
+        String resourceName                      = (String) 
resource.getValue(RangerKafkaAuthorizer.KEY_CLUSTER);
+        if (resourceName != null) {
+            if 
(request.getAccessType().equalsIgnoreCase(RangerKafkaAuthorizer.ACCESS_TYPE_CREATE)
 && !isAllowed) {
+                ret = false;
+            }
+        }
+        return ret;
+    }
+
+    public void flushAudit() {
+        if(LOG.isDebugEnabled()) {
+            LOG.info("==> RangerYarnAuditHandler.flushAudit(" + "AuditEvent: " 
+ auditEvent + ")");
+        }
+        if (auditEvent != null) {
+            super.logAuthzAudit(auditEvent);
+        }
+        if(LOG.isDebugEnabled()) {
+            LOG.info("<== RangerYarnAuditHandler.flushAudit(" + "AuditEvent: " 
+ auditEvent + ")");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ranger/blob/b6d17302/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
----------------------------------------------------------------------
diff --git 
a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
 
b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
index eab869a..8a661d8 100644
--- 
a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
+++ 
b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
@@ -40,7 +40,6 @@ import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.security.authenticator.LoginManager;
 import org.apache.kafka.common.security.kerberos.KerberosLogin;
 import org.apache.ranger.audit.provider.MiscUtil;
-import org.apache.ranger.plugin.audit.RangerDefaultAuditHandler;
 import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
 import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
 import org.apache.ranger.plugin.policyengine.RangerAccessResult;
@@ -59,6 +58,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
        public static final String KEY_CLUSTER = "cluster";
        public static final String KEY_CONSUMER_GROUP = "consumer_group";
        public static final String KEY_TRANSACTIONALID = "transactionalid";
+       public static final String KEY_DELEGATIONTOKEN = "delegationtoken";
 
        public static final String ACCESS_TYPE_READ = "consume";
        public static final String ACCESS_TYPE_WRITE = "publish";
@@ -72,6 +72,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
        public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = 
"idempotent_write";
 
        private static volatile RangerBasePlugin rangerPlugin = null;
+       RangerKafkaAuditHandler auditHandler = null;
 
        public RangerKafkaAuthorizer() {
        }
@@ -115,7 +116,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
                }
                logger.info("Calling plugin.init()");
                rangerPlugin.init();
-               RangerDefaultAuditHandler auditHandler = new 
RangerDefaultAuditHandler();
+               auditHandler = new RangerKafkaAuditHandler();
                rangerPlugin.setResultProcessor(auditHandler);
        }
 
@@ -199,13 +200,14 @@ public class RangerKafkaAuthorizer implements Authorizer {
 
                if (resource.resourceType().equals(Topic$.MODULE$)) {
                        rangerResource.setValue(KEY_TOPIC, resource.name());
-               } else if (resource.resourceType().equals(Cluster$.MODULE$)) { 
//NOPMD
-                       // CLUSTER should go as null
-                       // rangerResource.setValue(KEY_CLUSTER, 
resource.name());
+               } else if (resource.resourceType().equals(Cluster$.MODULE$)) {
+                       rangerResource.setValue(KEY_CLUSTER, resource.name());
                } else if (resource.resourceType().equals(Group$.MODULE$)) {
                        rangerResource.setValue(KEY_CONSUMER_GROUP, 
resource.name());
                } else if 
(resource.resourceType().equals(TransactionalId$.MODULE$)) {
-                       
rangerResource.setValue(KEY_TRANSACTIONALID,resource.name());
+                       rangerResource.setValue(KEY_TRANSACTIONALID, 
resource.name());
+               } else if 
(resource.resourceType().equals(DelegationToken$.MODULE$)) {
+                       rangerResource.setValue(KEY_DELEGATIONTOKEN, 
resource.name());
                } else {
                        logger.fatal("Unsupported resourceType=" + 
resource.resourceType());
                        validationFailed = true;
@@ -228,6 +230,8 @@ public class RangerKafkaAuthorizer implements Authorizer {
                        } catch (Throwable t) {
                                logger.error("Error while calling 
isAccessAllowed(). request="
                                                + rangerRequest, t);
+                       } finally {
+                               auditHandler.flushAudit();
                        }
                }
                RangerPerfTracer.log(perf);

http://git-wip-us.apache.org/repos/asf/ranger/blob/b6d17302/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
----------------------------------------------------------------------
diff --git 
a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
 
b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
index c1386fe..43e88b5 100644
--- 
a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
+++ 
b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
@@ -308,7 +308,6 @@ public class KafkaRangerAuthorizerGSSTest {
                 producer.send(new ProducerRecord<String, String>("dev", 
"somekey", "somevalue"));
             producer.flush();
             record.get();
-            Assert.fail("Authorization failure expected");
         } catch (Exception ex) {
             Assert.assertTrue(ex.getMessage().contains("Not authorized to 
access topics"));
         }

http://git-wip-us.apache.org/repos/asf/ranger/blob/b6d17302/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerTopicCreationTest.java
----------------------------------------------------------------------
diff --git 
a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerTopicCreationTest.java
 
b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerTopicCreationTest.java
new file mode 100644
index 0000000..a12817e
--- /dev/null
+++ 
b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerTopicCreationTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ranger.authorization.kafka.authorizer;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServerStartable;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kerby.kerberos.kerb.server.SimpleKdcServer;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.ServerSocket;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+
+public class KafkaRangerTopicCreationTest {
+    private final static Logger LOG = 
LoggerFactory.getLogger(KafkaRangerTopicCreationTest.class);
+
+    private static KafkaServerStartable kafkaServer;
+    private static TestingServer zkServer;
+    private static int port;
+    private static Path tempDir;
+    private static SimpleKdcServer kerbyServer;
+
+    @org.junit.BeforeClass
+    public static void setup() throws Exception {
+        String basedir = System.getProperty("basedir");
+        if (basedir == null) {
+            basedir = new File(".").getCanonicalPath();
+        }
+        System.out.println("Base Dir " + basedir);
+
+        configureKerby(basedir);
+
+        // JAAS Config file - We need to point to the correct keytab files
+        Path path = FileSystems.getDefault().getPath(basedir, 
"/src/test/resources/kafka_kerberos.jaas");
+        String content = new String(Files.readAllBytes(path), 
StandardCharsets.UTF_8);
+        content = content.replaceAll("<basedir>", basedir);
+        //content = content.replaceAll("zookeeper/localhost", "zookeeper/" + 
address);
+
+        Path path2 = FileSystems.getDefault().getPath(basedir, 
"/target/test-classes/kafka_kerberos.jaas");
+        Files.write(path2, content.getBytes(StandardCharsets.UTF_8));
+
+        System.setProperty("java.security.auth.login.config", 
path2.toString());
+
+        // Set up Zookeeper to require SASL
+        Map<String,Object> zookeeperProperties = new HashMap<>();
+        zookeeperProperties.put("authProvider.1", 
"org.apache.zookeeper.server.auth.SASLAuthenticationProvider");
+        zookeeperProperties.put("requireClientAuthScheme", "sasl");
+        zookeeperProperties.put("jaasLoginRenew", "3600000");
+
+        InstanceSpec instanceSpec = new InstanceSpec(null, -1, -1, -1, true, 
1,-1, -1, zookeeperProperties, "localhost");
+
+        zkServer = new TestingServer(instanceSpec, true);
+
+        // Get a random port
+        ServerSocket serverSocket = new ServerSocket(0);
+        port = serverSocket.getLocalPort();
+        serverSocket.close();
+
+        tempDir = Files.createTempDirectory("kafka");
+
+        LOG.info("Port is {}", port);
+        LOG.info("Temporary directory is at {}", tempDir);
+
+        final Properties props = new Properties();
+        props.put("broker.id", 1);
+        props.put("host.name", "localhost");
+        props.put("port", port);
+        props.put("log.dir", tempDir.toString());
+        props.put("zookeeper.connect", zkServer.getConnectString());
+        props.put("replica.socket.timeout.ms", "1500");
+        props.put("controlled.shutdown.enable", Boolean.TRUE.toString());
+        // Enable SASL_PLAINTEXT
+        props.put("listeners", "SASL_PLAINTEXT://localhost:" + port);
+        props.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
+        props.put("sasl.enabled.mechanisms", "GSSAPI");
+        props.put("sasl.mechanism.inter.broker.protocol", "GSSAPI");
+        props.put("sasl.kerberos.service.name", "kafka");
+        props.put("offsets.topic.replication.factor", (short) 1);
+        props.put("offsets.topic.num.partitions", 1);
+
+        // Plug in Apache Ranger authorizer
+        props.put("authorizer.class.name", 
"org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer");
+
+        // Create users for testing
+        UserGroupInformation.createUserForTesting("[email protected]", 
new String[] {"public"});
+        
UserGroupInformation.createUserForTesting("kafka/[email protected]", 
new String[] {"IT"});
+
+        KafkaConfig config = new KafkaConfig(props);
+        kafkaServer = new KafkaServerStartable(config);
+        kafkaServer.startup();
+   }
+
+    private static void configureKerby(String baseDir) throws Exception {
+
+        //System.setProperty("sun.security.krb5.debug", "true");
+        System.setProperty("java.security.krb5.conf", baseDir + 
"/target/krb5.conf");
+
+        kerbyServer = new SimpleKdcServer();
+
+        kerbyServer.setKdcRealm("kafka.apache.org");
+        kerbyServer.setAllowUdp(false);
+        kerbyServer.setWorkDir(new File(baseDir + "/target"));
+
+        kerbyServer.init();
+
+        // Create principals
+        String zookeeper = "zookeeper/[email protected]";
+        String kafka = "kafka/[email protected]";
+        String client = "[email protected]";
+
+        kerbyServer.createPrincipal(zookeeper, "zookeeper");
+        File keytabFile = new File(baseDir + "/target/zookeeper.keytab");
+        kerbyServer.exportPrincipal(zookeeper, keytabFile);
+
+        kerbyServer.createPrincipal(kafka, "kafka");
+        keytabFile = new File(baseDir + "/target/kafka.keytab");
+        kerbyServer.exportPrincipal(kafka, keytabFile);
+
+        kerbyServer.createPrincipal(client, "client");
+        keytabFile = new File(baseDir + "/target/client.keytab");
+        kerbyServer.exportPrincipal(client, keytabFile);
+
+        kerbyServer.start();
+    }
+
+    @org.junit.AfterClass
+    public static void cleanup() throws Exception {
+        if (kafkaServer != null) {
+            kafkaServer.shutdown();
+        }
+        if (zkServer != null) {
+            zkServer.stop();
+        }
+        if (kerbyServer != null) {
+            kerbyServer.stop();
+        }
+    }
+
+    @Test
+    public void testCreateTopic() throws Exception {
+            final String topic = "test";
+            Properties properties = new Properties();
+            properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:" + port);
+            properties.put("client.id", "test-consumer-id");
+            properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
"SASL_PLAINTEXT");
+            AdminClient client = KafkaAdminClient.create(properties);
+            CreateTopicsResult result = client.createTopics(Arrays.asList(new 
NewTopic(topic, 1, (short) 1)));
+            result.values().get(topic).get();
+            for (Map.Entry<String, KafkaFuture<Void>> entry : 
result.values().entrySet()) {
+                System.out.println("Create Topic : " + entry.getKey() + " " +
+                        "isCancelled : " + entry.getValue().isCancelled() + " 
" +
+                        "isCompletedExceptionally : " + 
entry.getValue().isCompletedExceptionally() + " " +
+                        "isDone : " + entry.getValue().isDone());
+            }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ranger/blob/b6d17302/plugin-kafka/src/test/resources/kafka-policies.json
----------------------------------------------------------------------
diff --git a/plugin-kafka/src/test/resources/kafka-policies.json 
b/plugin-kafka/src/test/resources/kafka-policies.json
index 0c07604..e4f5db1 100644
--- a/plugin-kafka/src/test/resources/kafka-policies.json
+++ b/plugin-kafka/src/test/resources/kafka-policies.json
@@ -6,6 +6,84 @@
   "policies": [
     {
       "service": "cl1_kafka",
+      "name": "all - cluster",
+      "policyType": 0,
+      "description": "Policy for all - cluster",
+      "isAuditEnabled": true,
+      "resources": {
+        "cluster": {
+          "values": [
+            "*"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        }
+      },
+      "policyItems": [
+        {
+          "accesses": [
+            {
+              "type": "publish",
+              "isAllowed": true
+            },
+            {
+              "type": "consume",
+              "isAllowed": true
+            },
+            {
+              "type": "configure",
+              "isAllowed": true
+            },
+            {
+              "type": "describe",
+              "isAllowed": true
+            },
+            {
+              "type": "create",
+              "isAllowed": true
+            },
+            {
+              "type": "delete",
+              "isAllowed": true
+            },
+            {
+              "type": "kafka_admin",
+              "isAllowed": true
+            },
+            {
+              "type": "idempotent_write",
+              "isAllowed": true
+            },
+            {
+              "type": "describe_configs",
+              "isAllowed": true
+            },
+            {
+              "type": "alter_configs",
+              "isAllowed": true
+            }
+          ],
+          "users": [
+            "admin","kafka"
+          ],
+          "groups": [
+            "IT"
+          ],
+          "conditions": [],
+          "delegateAdmin": true
+        }
+      ],
+      "denyPolicyItems": [],
+      "allowExceptions": [],
+      "denyExceptions": [],
+      "dataMaskPolicyItems": [],
+      "rowFilterPolicyItems": [],
+      "id": 40,
+      "isEnabled": true,
+      "version": 2
+    },
+    {
+      "service": "cl1_kafka",
       "name": "all - topic",
       "policyType": 0,
       "description": "Policy for all - topic",
@@ -64,7 +142,7 @@
             }
           ],
           "users": [
-            "admin","kafka"
+            "admin","kafka", "client"
           ],
           "groups": [
             "IT"
@@ -243,6 +321,84 @@
       "id": 30,
       "isEnabled": true,
       "version": 1
+    },
+    {
+      "service": "cl1_kafka",
+      "name": "DelegationToken Policy",
+      "policyType": 0,
+      "description": "DelegationTokenPolicy",
+      "isAuditEnabled": true,
+      "resources": {
+        "delegationtoken": {
+          "values": [
+            "*"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        }
+      },
+      "policyItems": [
+        {
+          "accesses": [
+            {
+              "type": "publish",
+              "isAllowed": true
+            },
+            {
+              "type": "consume",
+              "isAllowed": true
+            },
+            {
+              "type": "configure",
+              "isAllowed": true
+            },
+            {
+              "type": "describe",
+              "isAllowed": true
+            },
+            {
+              "type": "create",
+              "isAllowed": true
+            },
+            {
+              "type": "delete",
+              "isAllowed": true
+            },
+            {
+              "type": "kafka_admin",
+              "isAllowed": true
+            },
+            {
+              "type": "idempotent_write",
+              "isAllowed": true
+            },
+            {
+              "type": "describe_configs",
+              "isAllowed": true
+            },
+            {
+              "type": "alter_configs",
+              "isAllowed": true
+            }
+          ],
+          "users": [
+            "admin","kafka", "client"
+          ],
+          "groups": [
+            "IT"
+          ],
+          "conditions": [],
+          "delegateAdmin": true
+        }
+      ],
+      "denyPolicyItems": [],
+      "allowExceptions": [],
+      "denyExceptions": [],
+      "dataMaskPolicyItems": [],
+      "rowFilterPolicyItems": [],
+      "id": 31,
+      "isEnabled": true,
+      "version": 2
     }
   ],
   "serviceDef": {
@@ -322,6 +478,46 @@
         "uiHint":"",
         "label":"Transactional Id",
         "description":"Transactional Id"
+      },
+      {
+        "itemId":3,
+        "name":"cluster",
+        "type":"string",
+        "level":1,
+        "mandatory":true,
+        "lookupSupported":false,
+        "recursiveSupported":false,
+        "excludesSupported":true,
+        
"matcher":"org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher",
+        "matcherOptions":{
+          "wildCard":true,
+          "ignoreCase":true
+        },
+        "validationRegEx":"",
+        "validationMessage":"",
+        "uiHint":"",
+        "label":"Cluster",
+        "description":"Cluster"
+      },
+      {
+        "itemId":4,
+        "name":"delegationtoken",
+        "type":"string",
+        "level":1,
+        "mandatory":true,
+        "lookupSupported":false,
+        "recursiveSupported":false,
+        "excludesSupported":true,
+        
"matcher":"org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher",
+        "matcherOptions":{
+          "wildCard":true,
+          "ignoreCase":true
+        },
+        "validationRegEx":"",
+        "validationMessage":"",
+        "uiHint":"",
+        "label":"Delegation Token",
+        "description":"Delegation Token"
       }
     ],
     "accessTypes": [

http://git-wip-us.apache.org/repos/asf/ranger/blob/b6d17302/plugin-kafka/src/test/resources/kafka_kerberos.jaas
----------------------------------------------------------------------
diff --git a/plugin-kafka/src/test/resources/kafka_kerberos.jaas 
b/plugin-kafka/src/test/resources/kafka_kerberos.jaas
index 1de804b..2e83c7c 100644
--- a/plugin-kafka/src/test/resources/kafka_kerberos.jaas
+++ b/plugin-kafka/src/test/resources/kafka_kerberos.jaas
@@ -1,20 +1,20 @@
 
 Server {
-        com.sun.security.auth.module.Krb5LoginModule required 
refreshKrb5Config=true useKeyTab=true
+        com.sun.security.auth.module.Krb5LoginModule required 
refreshKrb5Config=true useKeyTab=true serviceName="kafka"
         keyTab="<basedir>/target/zookeeper.keytab" storeKey=true 
principal="zookeeper/localhost";
 };
 
 KafkaServer {
-        com.sun.security.auth.module.Krb5LoginModule required 
refreshKrb5Config=true useKeyTab=true
+        com.sun.security.auth.module.Krb5LoginModule required 
refreshKrb5Config=true useKeyTab=true serviceName="kafka"
         keyTab="<basedir>/target/kafka.keytab" storeKey=true 
principal="kafka/localhost";
 };
 
 Client {
-        com.sun.security.auth.module.Krb5LoginModule required 
refreshKrb5Config=true useKeyTab=true
+        com.sun.security.auth.module.Krb5LoginModule required 
refreshKrb5Config=true useKeyTab=true serviceName="kafka"
         keyTab="<basedir>/target/kafka.keytab" storeKey=true 
principal="kafka/localhost";
 };
 
 KafkaClient {
-        com.sun.security.auth.module.Krb5LoginModule required 
refreshKrb5Config=true useKeyTab=true
+        com.sun.security.auth.module.Krb5LoginModule required 
refreshKrb5Config=true useKeyTab=true serviceName="kafka"
         keyTab="<basedir>/target/client.keytab" storeKey=true 
principal="client";
 };

http://git-wip-us.apache.org/repos/asf/ranger/blob/b6d17302/security-admin/db/mysql/optimized/current/ranger_core_db_mysql.sql
----------------------------------------------------------------------
diff --git a/security-admin/db/mysql/optimized/current/ranger_core_db_mysql.sql 
b/security-admin/db/mysql/optimized/current/ranger_core_db_mysql.sql
index 3f23b00..32cf6db 100644
--- a/security-admin/db/mysql/optimized/current/ranger_core_db_mysql.sql
+++ b/security-admin/db/mysql/optimized/current/ranger_core_db_mysql.sql
@@ -1383,4 +1383,5 @@ INSERT INTO x_db_version_h 
(version,inst_at,inst_by,updated_at,updated_by,active
 INSERT INTO x_db_version_h 
(version,inst_at,inst_by,updated_at,updated_by,active) VALUES 
('J10014',UTC_TIMESTAMP(),'Ranger 1.0.0',UTC_TIMESTAMP(),'localhost','Y');
 INSERT INTO x_db_version_h 
(version,inst_at,inst_by,updated_at,updated_by,active) VALUES 
('J10015',UTC_TIMESTAMP(),'Ranger 1.0.0',UTC_TIMESTAMP(),'localhost','Y');
 INSERT INTO x_db_version_h 
(version,inst_at,inst_by,updated_at,updated_by,active) VALUES 
('J10016',UTC_TIMESTAMP(),'Ranger 1.0.0',UTC_TIMESTAMP(),'localhost','Y');
+INSERT INTO x_db_version_h 
(version,inst_at,inst_by,updated_at,updated_by,active) VALUES 
('J10018',UTC_TIMESTAMP(),'Ranger 1.0.0',UTC_TIMESTAMP(),'localhost','Y');
 INSERT INTO x_db_version_h 
(version,inst_at,inst_by,updated_at,updated_by,active) VALUES 
('JAVA_PATCHES',UTC_TIMESTAMP(),'Ranger 1.0.0',UTC_TIMESTAMP(),'localhost','Y');

http://git-wip-us.apache.org/repos/asf/ranger/blob/b6d17302/security-admin/db/oracle/optimized/current/ranger_core_db_oracle.sql
----------------------------------------------------------------------
diff --git 
a/security-admin/db/oracle/optimized/current/ranger_core_db_oracle.sql 
b/security-admin/db/oracle/optimized/current/ranger_core_db_oracle.sql
index bafdb96..2e577f3 100644
--- a/security-admin/db/oracle/optimized/current/ranger_core_db_oracle.sql
+++ b/security-admin/db/oracle/optimized/current/ranger_core_db_oracle.sql
@@ -1358,5 +1358,6 @@ INSERT INTO x_db_version_h 
(id,version,inst_at,inst_by,updated_at,updated_by,act
 INSERT INTO x_db_version_h 
(id,version,inst_at,inst_by,updated_at,updated_by,active) VALUES 
(X_DB_VERSION_H_SEQ.nextval,'J10014',sys_extract_utc(systimestamp),'Ranger 
1.0.0',sys_extract_utc(systimestamp),'localhost','Y');
 INSERT INTO x_db_version_h 
(id,version,inst_at,inst_by,updated_at,updated_by,active) VALUES 
(X_DB_VERSION_H_SEQ.nextval,'J10015',sys_extract_utc(systimestamp),'Ranger 
1.0.0',sys_extract_utc(systimestamp),'localhost','Y');
 INSERT INTO x_db_version_h 
(id,version,inst_at,inst_by,updated_at,updated_by,active) VALUES 
(X_DB_VERSION_H_SEQ.nextval,'J10016',sys_extract_utc(systimestamp),'Ranger 
1.0.0',sys_extract_utc(systimestamp),'localhost','Y');
+INSERT INTO x_db_version_h 
(id,version,inst_at,inst_by,updated_at,updated_by,active) VALUES 
(X_DB_VERSION_H_SEQ.nextval,'J10018',sys_extract_utc(systimestamp),'Ranger 
1.0.0',sys_extract_utc(systimestamp),'localhost','Y');
 INSERT INTO x_db_version_h 
(id,version,inst_at,inst_by,updated_at,updated_by,active) VALUES 
(X_DB_VERSION_H_SEQ.nextval,'JAVA_PATCHES',sys_extract_utc(systimestamp),'Ranger
 1.0.0',sys_extract_utc(systimestamp),'localhost','Y');
 commit;

http://git-wip-us.apache.org/repos/asf/ranger/blob/b6d17302/security-admin/db/postgres/optimized/current/ranger_core_db_postgres.sql
----------------------------------------------------------------------
diff --git 
a/security-admin/db/postgres/optimized/current/ranger_core_db_postgres.sql 
b/security-admin/db/postgres/optimized/current/ranger_core_db_postgres.sql
index 2bc58ac..bad32ef 100644
--- a/security-admin/db/postgres/optimized/current/ranger_core_db_postgres.sql
+++ b/security-admin/db/postgres/optimized/current/ranger_core_db_postgres.sql
@@ -1471,6 +1471,7 @@ INSERT INTO x_db_version_h 
(version,inst_at,inst_by,updated_at,updated_by,active
 INSERT INTO x_db_version_h 
(version,inst_at,inst_by,updated_at,updated_by,active) VALUES 
('J10014',current_timestamp,'Ranger 1.0.0',current_timestamp,'localhost','Y');
 INSERT INTO x_db_version_h 
(version,inst_at,inst_by,updated_at,updated_by,active) VALUES 
('J10015',current_timestamp,'Ranger 1.0.0',current_timestamp,'localhost','Y');
 INSERT INTO x_db_version_h 
(version,inst_at,inst_by,updated_at,updated_by,active) VALUES 
('J10016',current_timestamp,'Ranger 1.0.0',current_timestamp,'localhost','Y');
+INSERT INTO x_db_version_h 
(version,inst_at,inst_by,updated_at,updated_by,active) VALUES 
('J10018',current_timestamp,'Ranger 1.0.0',current_timestamp,'localhost','Y');
 INSERT INTO x_db_version_h 
(version,inst_at,inst_by,updated_at,updated_by,active) VALUES 
('JAVA_PATCHES',current_timestamp,'Ranger 
1.0.0',current_timestamp,'localhost','Y');
 
 DROP VIEW IF EXISTS vx_trx_log;

http://git-wip-us.apache.org/repos/asf/ranger/blob/b6d17302/security-admin/db/sqlanywhere/optimized/current/ranger_core_db_sqlanywhere.sql
----------------------------------------------------------------------
diff --git 
a/security-admin/db/sqlanywhere/optimized/current/ranger_core_db_sqlanywhere.sql
 
b/security-admin/db/sqlanywhere/optimized/current/ranger_core_db_sqlanywhere.sql
index 1b64eea..9482992 100644
--- 
a/security-admin/db/sqlanywhere/optimized/current/ranger_core_db_sqlanywhere.sql
+++ 
b/security-admin/db/sqlanywhere/optimized/current/ranger_core_db_sqlanywhere.sql
@@ -1663,6 +1663,8 @@ INSERT INTO x_db_version_h 
(version,inst_at,inst_by,updated_at,updated_by,active
 GO
 INSERT INTO x_db_version_h 
(version,inst_at,inst_by,updated_at,updated_by,active) VALUES 
('J10016,CURRENT_TIMESTAMP,'Ranger 1.0.0',CURRENT_TIMESTAMP,'localhost','Y');
 GO
+INSERT INTO x_db_version_h 
(version,inst_at,inst_by,updated_at,updated_by,active) VALUES 
('J10018,CURRENT_TIMESTAMP,'Ranger 1.0.0',CURRENT_TIMESTAMP,'localhost','Y');
+GO
 INSERT INTO x_db_version_h 
(version,inst_at,inst_by,updated_at,updated_by,active) VALUES 
('JAVA_PATCHES',CURRENT_TIMESTAMP,'Ranger 
1.0.0',CURRENT_TIMESTAMP,'localhost','Y');
 GO
 exit

http://git-wip-us.apache.org/repos/asf/ranger/blob/b6d17302/security-admin/db/sqlserver/optimized/current/ranger_core_db_sqlserver.sql
----------------------------------------------------------------------
diff --git 
a/security-admin/db/sqlserver/optimized/current/ranger_core_db_sqlserver.sql 
b/security-admin/db/sqlserver/optimized/current/ranger_core_db_sqlserver.sql
index 4a216fe..85f3285 100644
--- a/security-admin/db/sqlserver/optimized/current/ranger_core_db_sqlserver.sql
+++ b/security-admin/db/sqlserver/optimized/current/ranger_core_db_sqlserver.sql
@@ -3145,6 +3145,7 @@ INSERT INTO x_db_version_h 
(version,inst_at,inst_by,updated_at,updated_by,active
 INSERT INTO x_db_version_h 
(version,inst_at,inst_by,updated_at,updated_by,active) VALUES 
('J10014',CURRENT_TIMESTAMP,'Ranger 1.0.0',CURRENT_TIMESTAMP,'localhost','Y');
 INSERT INTO x_db_version_h 
(version,inst_at,inst_by,updated_at,updated_by,active) VALUES 
('J10015',CURRENT_TIMESTAMP,'Ranger 1.0.0',CURRENT_TIMESTAMP,'localhost','Y');
 INSERT INTO x_db_version_h 
(version,inst_at,inst_by,updated_at,updated_by,active) VALUES 
('J10016',CURRENT_TIMESTAMP,'Ranger 1.0.0',CURRENT_TIMESTAMP,'localhost','Y');
+INSERT INTO x_db_version_h 
(version,inst_at,inst_by,updated_at,updated_by,active) VALUES 
('J10018',CURRENT_TIMESTAMP,'Ranger 1.0.0',CURRENT_TIMESTAMP,'localhost','Y');
 INSERT INTO x_db_version_h 
(version,inst_at,inst_by,updated_at,updated_by,active) VALUES 
('JAVA_PATCHES',CURRENT_TIMESTAMP,'Ranger 
1.0.0',CURRENT_TIMESTAMP,'localhost','Y');
 GO
 CREATE VIEW [dbo].[vx_trx_log] AS

http://git-wip-us.apache.org/repos/asf/ranger/blob/b6d17302/security-admin/src/main/java/org/apache/ranger/patch/PatchForKafkaServiceDefUpdate_J10025.java
----------------------------------------------------------------------
diff --git 
a/security-admin/src/main/java/org/apache/ranger/patch/PatchForKafkaServiceDefUpdate_J10025.java
 
b/security-admin/src/main/java/org/apache/ranger/patch/PatchForKafkaServiceDefUpdate_J10025.java
new file mode 100644
index 0000000..0ef1544
--- /dev/null
+++ 
b/security-admin/src/main/java/org/apache/ranger/patch/PatchForKafkaServiceDefUpdate_J10025.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ranger.patch;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.ranger.biz.RangerBizUtil;
+import org.apache.ranger.biz.ServiceDBStore;
+import org.apache.ranger.common.GUIDUtil;
+import org.apache.ranger.common.JSONUtil;
+import org.apache.ranger.common.RangerValidatorFactory;
+import org.apache.ranger.common.StringUtil;
+import org.apache.ranger.db.RangerDaoManager;
+import org.apache.ranger.entity.XXAccessTypeDef;
+import org.apache.ranger.entity.XXPolicy;
+import org.apache.ranger.entity.XXPolicyItem;
+import org.apache.ranger.entity.XXPolicyItemAccess;
+import org.apache.ranger.entity.XXPolicyItemUserPerm;
+import org.apache.ranger.entity.XXPolicyResource;
+import org.apache.ranger.entity.XXPolicyResourceMap;
+import org.apache.ranger.entity.XXPortalUser;
+import org.apache.ranger.entity.XXResourceDef;
+import org.apache.ranger.entity.XXService;
+import org.apache.ranger.entity.XXServiceDef;
+import org.apache.ranger.entity.XXUser;
+import org.apache.ranger.plugin.model.RangerPolicy;
+import org.apache.ranger.plugin.model.RangerPolicyResourceSignature;
+import org.apache.ranger.plugin.model.RangerServiceDef;
+import org.apache.ranger.plugin.model.validation.RangerServiceDefValidator;
+import org.apache.ranger.plugin.model.validation.RangerValidator.Action;
+import org.apache.ranger.plugin.store.EmbeddedServiceDefsUtil;
+import org.apache.ranger.service.RangerPolicyService;
+import org.apache.ranger.service.XPermMapService;
+import org.apache.ranger.service.XPolicyService;
+import org.apache.ranger.util.CLIUtil;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Component
+public class PatchForKafkaServiceDefUpdate_J10025 extends BaseLoader {
+       private static final Logger logger = 
Logger.getLogger(PatchForKafkaServiceDefUpdate_J10025.class);
+       private static final List<String> POLICY_NAMES = new 
ArrayList<>(Arrays.asList("all - cluster", "all - delegationtoken"));
+       private static final String LOGIN_ID_ADMIN = "admin";
+       private static final String KAFKA_RESOURCE_CLUSTER = "cluster";
+       private static final String KAFKA_RESOURCE_DELEGATIONTOKEN = 
"delegationtoken";
+
+       private static final List<String> DEFAULT_POLICY_USERS = new 
ArrayList<>(Arrays.asList("kafka","rangerlookup"));
+
+
+       public static final String SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME  
= "kafka";
+       public static final String CLUSTER_RESOURCE_NAME ="cluster";
+
+
+       @Autowired
+       RangerDaoManager daoMgr;
+
+       @Autowired
+       ServiceDBStore svcDBStore;
+
+       @Autowired
+       JSONUtil jsonUtil;
+
+       @Autowired
+       RangerPolicyService policyService;
+
+       @Autowired
+       StringUtil stringUtil;
+
+       @Autowired
+       GUIDUtil guidUtil;
+
+       @Autowired
+       XPolicyService xPolService;
+
+       @Autowired
+       XPermMapService xPermMapService;
+
+       @Autowired
+       RangerBizUtil bizUtil;
+
+       @Autowired
+       RangerValidatorFactory validatorFactory;
+
+       @Autowired
+       ServiceDBStore svcStore;
+
+       public static void main(String[] args) {
+               logger.info("main()");
+               try {
+                       PatchForKafkaServiceDefUpdate_J10025 loader = 
(PatchForKafkaServiceDefUpdate_J10025) 
CLIUtil.getBean(PatchForKafkaServiceDefUpdate_J10025.class);
+                       loader.init();
+                       while (loader.isMoreToProcess()) {
+                               loader.load();
+                       }
+                       logger.info("Load complete. Exiting!!!");
+                       System.exit(0);
+               } catch (Exception e) {
+                       logger.error("Error loading", e);
+                       System.exit(1);
+               }
+       }
+
+       @Override
+       public void init() throws Exception {
+               // Do Nothing
+       }
+
+       @Override
+       public void execLoad() {
+               logger.info("==> 
PatchForKafkaServiceDefUpdate_J10025.execLoad()");
+               try {
+                       updateKafkaServiceDef();
+               } catch (Exception e) {
+                       logger.error("Error while applying 
PatchForKafkaServiceDefUpdate_J10025...", e);
+               }
+               logger.info("<== 
PatchForKafkaServiceDefUpdate_J10025.execLoad()");
+       }
+
+       @Override
+       public void printStats() {
+               logger.info("PatchForKafkaServiceDefUpdate_J10025 ");
+       }
+
+       private void updateKafkaServiceDef(){
+               RangerServiceDef ret                = null;
+               RangerServiceDef embeddedKafkaServiceDef = null;
+               RangerServiceDef dbKafkaServiceDef         = null;
+               List<RangerServiceDef.RangerResourceDef>   
embeddedKafkaResourceDefs  = null;
+               List<RangerServiceDef.RangerAccessTypeDef>     
embeddedKafkaAccessTypes   = null;
+               XXServiceDef xXServiceDefObj         = null;
+               try{
+                       
embeddedKafkaServiceDef=EmbeddedServiceDefsUtil.instance().getEmbeddedServiceDef(SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME);
+                       if(embeddedKafkaServiceDef!=null){
+
+                               xXServiceDefObj = 
daoMgr.getXXServiceDef().findByName(SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME);
+                               Map<String, String> 
serviceDefOptionsPreUpdate=null;
+                               String jsonStrPreUpdate=null;
+                               if(xXServiceDefObj!=null) {
+                                       
jsonStrPreUpdate=xXServiceDefObj.getDefOptions();
+                                       
serviceDefOptionsPreUpdate=jsonStringToMap(jsonStrPreUpdate);
+                                       xXServiceDefObj=null;
+                               }
+                               
dbKafkaServiceDef=svcDBStore.getServiceDefByName(SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME);
+
+                               if(dbKafkaServiceDef!=null){
+                                       embeddedKafkaResourceDefs = 
embeddedKafkaServiceDef.getResources();
+                                       embeddedKafkaAccessTypes  = 
embeddedKafkaServiceDef.getAccessTypes();
+
+                                       if 
(checkNewKafkaresourcePresent(embeddedKafkaResourceDefs)) {
+                                               // This is to check if CLUSTER 
resource is added to the resource definition, if so update the resource def and 
accessType def
+                                               if (embeddedKafkaResourceDefs 
!= null) {
+                                                       
dbKafkaServiceDef.setResources(embeddedKafkaResourceDefs);
+                                               }
+                                               if (embeddedKafkaAccessTypes != 
null) {
+                                                       
if(!embeddedKafkaAccessTypes.toString().equalsIgnoreCase(dbKafkaServiceDef.getAccessTypes().toString()))
 {
+                                                               
dbKafkaServiceDef.setAccessTypes(embeddedKafkaAccessTypes);
+                                                       }
+                                               }
+                                       }
+
+                                       RangerServiceDefValidator validator = 
validatorFactory.getServiceDefValidator(svcStore);
+                                       validator.validate(dbKafkaServiceDef, 
Action.UPDATE);
+
+                                       ret = 
svcStore.updateServiceDef(dbKafkaServiceDef);
+                                       if(ret==null){
+                                               logger.error("Error while 
updating "+SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME+"service-def");
+                                               throw new 
RuntimeException("Error while updating 
"+SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME+"service-def");
+                                       }
+                                       xXServiceDefObj = 
daoMgr.getXXServiceDef().findByName(SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME);
+                                       if(xXServiceDefObj!=null) {
+                                               String 
jsonStrPostUpdate=xXServiceDefObj.getDefOptions();
+                                               Map<String, String> 
serviceDefOptionsPostUpdate=jsonStringToMap(jsonStrPostUpdate);
+                                               if (serviceDefOptionsPostUpdate 
!= null && 
serviceDefOptionsPostUpdate.containsKey(RangerServiceDef.OPTION_ENABLE_DENY_AND_EXCEPTIONS_IN_POLICIES))
 {
+                                                       
if(serviceDefOptionsPreUpdate == null || 
!serviceDefOptionsPreUpdate.containsKey(RangerServiceDef.OPTION_ENABLE_DENY_AND_EXCEPTIONS_IN_POLICIES))
 {
+                                                               String 
preUpdateValue = serviceDefOptionsPreUpdate == null ? null : 
serviceDefOptionsPreUpdate.get(RangerServiceDef.OPTION_ENABLE_DENY_AND_EXCEPTIONS_IN_POLICIES);
+                                                               if 
(preUpdateValue == null) {
+                                                                       
serviceDefOptionsPostUpdate.remove(RangerServiceDef.OPTION_ENABLE_DENY_AND_EXCEPTIONS_IN_POLICIES);
+                                                               } else {
+                                                                       
serviceDefOptionsPostUpdate.put(RangerServiceDef.OPTION_ENABLE_DENY_AND_EXCEPTIONS_IN_POLICIES,
 preUpdateValue);
+                                                               }
+                                                               
xXServiceDefObj.setDefOptions(mapToJsonString(serviceDefOptionsPostUpdate));
+                                                               
daoMgr.getXXServiceDef().update(xXServiceDefObj);
+                                                       }
+                                               }
+                                               
createDefaultPolicyForNewResources();
+                                       }
+                               }
+                       }
+               }catch(Exception e)
+               {
+                       logger.error("Error while updating 
"+SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME+"service-def", e);
+               }
+       }
+
+       private boolean 
checkNewKafkaresourcePresent(List<RangerServiceDef.RangerResourceDef> 
resourceDefs) {
+               boolean ret = false;
+               for(RangerServiceDef.RangerResourceDef resourceDef : 
resourceDefs) {
+                       if (CLUSTER_RESOURCE_NAME.equals(resourceDef.getName()) 
) {
+                               ret = true ;
+                               break;
+                       }
+               }
+               return ret;
+       }
+
+       private String mapToJsonString(Map<String, String> map) {
+               String ret = null;
+               if(map != null) {
+                       try {
+                               ret = jsonUtil.readMapToString(map);
+                       } catch(Exception excp) {
+                               logger.warn("mapToJsonString() failed to 
convert map: " + map, excp);
+                       }
+               }
+               return ret;
+       }
+
+       protected Map<String, String> jsonStringToMap(String jsonStr) {
+               Map<String, String> ret = null;
+               if(!StringUtils.isEmpty(jsonStr)) {
+                       try {
+                               ret = jsonUtil.jsonToMap(jsonStr);
+                       } catch(Exception excp) {
+                               // fallback to earlier format: 
"name1=value1;name2=value2"
+                               for(String optionString : jsonStr.split(";")) {
+                                       if(StringUtils.isEmpty(optionString)) {
+                                               continue;
+                                       }
+                                       String[] nvArr = 
optionString.split("=");
+                                       String name  = (nvArr != null && 
nvArr.length > 0) ? nvArr[0].trim() : null;
+                                       String value = (nvArr != null && 
nvArr.length > 1) ? nvArr[1].trim() : null;
+                                       if(StringUtils.isEmpty(name)) {
+                                               continue;
+                                       }
+                                       if(ret == null) {
+                                               ret = new HashMap<String, 
String>();
+                                       }
+                                       ret.put(name, value);
+                               }
+                       }
+               }
+               return ret;
+       }
+
+       private void createDefaultPolicyForNewResources() {
+               logger.info("==> createDefaultPolicyForNewResources ");
+               XXPortalUser xxPortalUser = 
daoMgr.getXXPortalUser().findByLoginId(LOGIN_ID_ADMIN);
+               Long currentUserId = xxPortalUser.getId();
+
+               XXServiceDef xXServiceDefObj = daoMgr.getXXServiceDef()
+                               
.findByName(EmbeddedServiceDefsUtil.EMBEDDED_SERVICEDEF_KAFKA_NAME);
+               if (xXServiceDefObj == null) {
+                       logger.debug("ServiceDef not fount with name :" + 
EmbeddedServiceDefsUtil.EMBEDDED_SERVICEDEF_KAFKA_NAME);
+                       return;
+               }
+
+               Long xServiceDefId = xXServiceDefObj.getId();
+               List<XXService> xxServices = 
daoMgr.getXXService().findByServiceDefId(xServiceDefId);
+
+               for (XXService xxService : xxServices) {
+                       int resourceMapOrder = 0;
+                       for (String newResource : POLICY_NAMES) {
+                               XXPolicy xxPolicy = new XXPolicy();
+                               xxPolicy.setName(newResource);
+                               xxPolicy.setDescription(newResource);
+                               xxPolicy.setService(xxService.getId());
+                               
xxPolicy.setPolicyPriority(RangerPolicy.POLICY_PRIORITY_NORMAL);
+                               xxPolicy.setIsAuditEnabled(Boolean.TRUE);
+                               xxPolicy.setIsEnabled(Boolean.TRUE);
+                               
xxPolicy.setPolicyType(RangerPolicy.POLICY_TYPE_ACCESS);
+                               xxPolicy.setGuid(guidUtil.genGUID());
+                               xxPolicy.setAddedByUserId(currentUserId);
+                               xxPolicy.setUpdatedByUserId(currentUserId);
+                               RangerPolicy rangerPolicy = new RangerPolicy();
+                               RangerPolicyResourceSignature resourceSignature 
= new RangerPolicyResourceSignature(rangerPolicy);
+                               
xxPolicy.setResourceSignature(resourceSignature.getSignature());
+                               XXPolicy createdPolicy = 
daoMgr.getXXPolicy().create(xxPolicy);
+
+                               XXPolicyItem xxPolicyItem = new XXPolicyItem();
+                               xxPolicyItem.setIsEnabled(Boolean.TRUE);
+                               xxPolicyItem.setDelegateAdmin(Boolean.TRUE);
+                               xxPolicyItem.setItemType(0);
+                               xxPolicyItem.setOrder(0);
+                               xxPolicyItem.setAddedByUserId(currentUserId);
+                               xxPolicyItem.setUpdatedByUserId(currentUserId);
+                               xxPolicyItem.setPolicyId(createdPolicy.getId());
+                               XXPolicyItem createdXXPolicyItem = 
daoMgr.getXXPolicyItem().create(xxPolicyItem);
+
+                               List<String> accessTypes = 
Arrays.asList("create", "delete", "configure", "alter_configs", "describe", 
"describe_configs", "consume", "publish", "kafka_admin","idempotent_write");
+                               for (int i = 0; i < accessTypes.size(); i++) {
+                                       XXAccessTypeDef xAccTypeDef = 
daoMgr.getXXAccessTypeDef().findByNameAndServiceId(accessTypes.get(i),
+                                                       xxPolicy.getService());
+                                       if (xAccTypeDef == null) {
+                                               throw new 
RuntimeException(accessTypes.get(i) + ": is not a valid access-type. policy='"
+                                                               + 
xxPolicy.getName() + "' service='" + xxPolicy.getService() + "'");
+                                       }
+                                       XXPolicyItemAccess xPolItemAcc = new 
XXPolicyItemAccess();
+                                       xPolItemAcc.setIsAllowed(Boolean.TRUE);
+                                       
xPolItemAcc.setType(xAccTypeDef.getId());
+                                       xPolItemAcc.setOrder(i);
+                                       
xPolItemAcc.setAddedByUserId(currentUserId);
+                                       
xPolItemAcc.setUpdatedByUserId(currentUserId);
+                                       
xPolItemAcc.setPolicyitemid(createdXXPolicyItem.getId());
+                                       
daoMgr.getXXPolicyItemAccess().create(xPolItemAcc);
+                               }
+
+                               for (int i = 0; i < 
DEFAULT_POLICY_USERS.size(); i++) {
+                                       String user = 
DEFAULT_POLICY_USERS.get(i);
+                                       if (StringUtils.isBlank(user)) {
+                                               continue;
+                                       }
+                                       XXUser xxUser = 
daoMgr.getXXUser().findByUserName(user);
+                                       if (xxUser == null) {
+                                               throw new RuntimeException(user 
+ ": user does not exist. policy='" + xxPolicy.getName()
+                                                               + "' service='" 
+ xxPolicy.getService() + "' user='" + user + "'");
+                                       }
+                                       XXPolicyItemUserPerm xUserPerm = new 
XXPolicyItemUserPerm();
+                                       xUserPerm.setUserId(xxUser.getId());
+                                       
xUserPerm.setPolicyItemId(createdXXPolicyItem.getId());
+                                       xUserPerm.setOrder(i);
+                                       
xUserPerm.setAddedByUserId(currentUserId);
+                                       
xUserPerm.setUpdatedByUserId(currentUserId);
+                                       
daoMgr.getXXPolicyItemUserPerm().create(xUserPerm);
+                               }
+
+                               String policyResourceName = 
KAFKA_RESOURCE_CLUSTER;
+                               if ("all - 
delegationtoken".equals(newResource)) {
+                                       policyResourceName = 
KAFKA_RESOURCE_DELEGATIONTOKEN;
+                               }
+                               XXResourceDef xResDef = 
daoMgr.getXXResourceDef().findByNameAndPolicyId(policyResourceName,
+                                               createdPolicy.getId());
+                               if (xResDef == null) {
+                                       throw new 
RuntimeException(policyResourceName + ": is not a valid resource-type. policy='"
+                                                       + 
createdPolicy.getName() + "' service='" + createdPolicy.getService() + "'");
+                               }
+
+                               XXPolicyResource xPolRes = new 
XXPolicyResource();
+
+                               xPolRes.setAddedByUserId(currentUserId);
+                               xPolRes.setUpdatedByUserId(currentUserId);
+                               xPolRes.setIsExcludes(Boolean.FALSE);
+                               xPolRes.setIsRecursive(Boolean.FALSE);
+                               xPolRes.setPolicyId(createdPolicy.getId());
+                               xPolRes.setResDefId(xResDef.getId());
+                               xPolRes = 
daoMgr.getXXPolicyResource().create(xPolRes);
+
+                               XXPolicyResourceMap xPolResMap = new 
XXPolicyResourceMap();
+                               xPolResMap.setResourceId(xPolRes.getId());
+                               xPolResMap.setValue("*");
+                               xPolResMap.setOrder(resourceMapOrder);
+                               xPolResMap.setAddedByUserId(currentUserId);
+                               xPolResMap.setUpdatedByUserId(currentUserId);
+                               
daoMgr.getXXPolicyResourceMap().create(xPolResMap);
+                               resourceMapOrder++;
+                               logger.info("Creating policy for service id : " 
+ xxService.getId());
+                       }
+               }
+               logger.info("<== createDefaultPolicyForNewResources ");
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ranger/blob/b6d17302/src/main/assembly/plugin-kafka.xml
----------------------------------------------------------------------
diff --git a/src/main/assembly/plugin-kafka.xml 
b/src/main/assembly/plugin-kafka.xml
index 97ff8ad..7c55128 100644
--- a/src/main/assembly/plugin-kafka.xml
+++ b/src/main/assembly/plugin-kafka.xml
@@ -62,7 +62,6 @@
                                                        </include>
                                                        
<include>commons-lang:commons-lang</include>
                                                        
<include>commons-io:commons-io</include>
-                                                       
<include>com.google.guava:guava:jar:${google.guava.version}</include>
                                                        
<include>org.apache.httpcomponents:httpclient:jar:${httpcomponents.httpclient.version}
                                                        </include>
                                                        
<include>org.apache.httpcomponents:httpcore:jar:${httpcomponents.httpcore.version}

Reply via email to