Repository: ranger Updated Branches: refs/heads/master f8aff4e14 -> b6d173023
RANGER-2222:Apache RangerKafkaPlugin support to handle Kafka Cluster as a new resource Signed-off-by: rmani <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ranger/repo Commit: http://git-wip-us.apache.org/repos/asf/ranger/commit/b6d17302 Tree: http://git-wip-us.apache.org/repos/asf/ranger/tree/b6d17302 Diff: http://git-wip-us.apache.org/repos/asf/ranger/diff/b6d17302 Branch: refs/heads/master Commit: b6d1730238f65a7d724d307e7552d6e06ebbb9b9 Parents: f8aff4e Author: rmani <[email protected]> Authored: Mon Oct 8 12:09:34 2018 -0700 Committer: rmani <[email protected]> Committed: Mon Oct 8 17:03:18 2018 -0700 ---------------------------------------------------------------------- .../service-defs/ranger-servicedef-kafka.json | 49 ++- .../authorizer/RangerKafkaAuditHandler.java | 74 ++++ .../kafka/authorizer/RangerKafkaAuthorizer.java | 16 +- .../KafkaRangerAuthorizerGSSTest.java | 1 - .../KafkaRangerTopicCreationTest.java | 191 ++++++++++ .../src/test/resources/kafka-policies.json | 198 +++++++++- .../src/test/resources/kafka_kerberos.jaas | 8 +- .../optimized/current/ranger_core_db_mysql.sql | 1 + .../optimized/current/ranger_core_db_oracle.sql | 1 + .../current/ranger_core_db_postgres.sql | 1 + .../current/ranger_core_db_sqlanywhere.sql | 2 + .../current/ranger_core_db_sqlserver.sql | 1 + .../PatchForKafkaServiceDefUpdate_J10025.java | 381 +++++++++++++++++++ src/main/assembly/plugin-kafka.xml | 1 - 14 files changed, 900 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ranger/blob/b6d17302/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json ---------------------------------------------------------------------- diff --git a/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json b/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json index ca3e0fe..7e91aab 100644 --- a/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json +++ b/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json @@ -23,13 +23,15 @@ "validationMessage":"", "uiHint":"", "label":"Topic", - "description":"Topic" + "description":"Topic", + "accessTypeRestrictions": ["create", "delete", "configure", "alter_configs", "describe", "describe_configs", "consume", "publish", "kafka_admin"] }, { "itemId":2, "name":"transactionalid", "type":"string", "level":1, + "mandatory":true, "excludesSupported":true, "matcher":"org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher", "matcherOptions":{ @@ -37,9 +39,41 @@ "ignoreCase":true }, "label":"Transactional Id", - "description":"Transactional Id" + "description":"Transactional Id", + "accessTypeRestrictions": ["publish", "describe"] + }, + { + "itemId":3, + "name":"cluster", + "type":"string", + "level":1, + "mandatory":true, + "excludesSupported":true, + "matcher":"org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher", + "matcherOptions":{ + "wildCard":true, + "ignoreCase":true + }, + "label":"Cluster", + "description":"Cluster", + "accessTypeRestrictions": ["configure", "alter_configs", "describe", "describe_configs", "kafka_admin", "idempotent_write"] + }, + { + "itemId":4, + "name":"delegationtoken", + "type":"string", + "level":1, + "mandatory":true, + "excludesSupported":true, + "matcher":"org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher", + "matcherOptions":{ + "wildCard":true, + "ignoreCase":true + }, + "label":"Delegation Token", + "description":"Delegation Token", + "accessTypeRestrictions": ["describe"] } - ], "accessTypes":[ { @@ -49,7 +83,6 @@ "impliedGrants":[ "describe" ] - }, { "itemId":2, @@ -58,7 +91,6 @@ "impliedGrants":[ "describe" ] - }, { "itemId":5, @@ -67,7 +99,6 @@ "impliedGrants":[ "describe" ] - }, { "itemId":6, @@ -99,7 +130,6 @@ "create", "delete" ] - }, { "itemId":10, @@ -150,13 +180,10 @@ "mandatory":false, "label":"Ranger Plugin SSL CName" } - ], "enums":[ - ], "contextEnrichers":[ - ], "policyConditions":[ { @@ -164,7 +191,6 @@ "name":"ip-range", "evaluator":"org.apache.ranger.plugin.conditionevaluator.RangerIpMatcher", "evaluatorOptions":{ - }, "validationRegEx":"", "validationMessage":"", @@ -172,6 +198,5 @@ "label":"IP Address Range", "description":"IP Address Range" } - ] } http://git-wip-us.apache.org/repos/asf/ranger/blob/b6d17302/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuditHandler.java ---------------------------------------------------------------------- diff --git a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuditHandler.java b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuditHandler.java new file mode 100644 index 0000000..ee50e95 --- /dev/null +++ b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuditHandler.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.ranger.authorization.kafka.authorizer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ranger.audit.model.AuthzAuditEvent; +import org.apache.ranger.plugin.audit.RangerDefaultAuditHandler; +import org.apache.ranger.plugin.policyengine.RangerAccessRequest; +import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl; +import org.apache.ranger.plugin.policyengine.RangerAccessResult; + +public class RangerKafkaAuditHandler extends RangerDefaultAuditHandler { + private static final Log LOG = LogFactory.getLog(RangerKafkaAuditHandler.class); + + private AuthzAuditEvent auditEvent = null; + + public RangerKafkaAuditHandler(){ + } + + @Override + public void processResult(RangerAccessResult result) { + // If Cluster Resource Level Topic Creation is not Allowed we don't audit. + // Subsequent call from Kafka for Topic Creation at Topic resource Level will be audited. + if (!isAuditingNeeded(result)) { + return; + } + auditEvent = super.getAuthzEvents(result); + } + + private boolean isAuditingNeeded(final RangerAccessResult result) { + boolean ret = true; + boolean isAllowed = result.getIsAllowed(); + RangerAccessRequest request = result.getAccessRequest(); + RangerAccessResourceImpl resource = (RangerAccessResourceImpl) request.getResource(); + String resourceName = (String) resource.getValue(RangerKafkaAuthorizer.KEY_CLUSTER); + if (resourceName != null) { + if (request.getAccessType().equalsIgnoreCase(RangerKafkaAuthorizer.ACCESS_TYPE_CREATE) && !isAllowed) { + ret = false; + } + } + return ret; + } + + public void flushAudit() { + if(LOG.isDebugEnabled()) { + LOG.info("==> RangerYarnAuditHandler.flushAudit(" + "AuditEvent: " + auditEvent + ")"); + } + if (auditEvent != null) { + super.logAuthzAudit(auditEvent); + } + if(LOG.isDebugEnabled()) { + LOG.info("<== RangerYarnAuditHandler.flushAudit(" + "AuditEvent: " + auditEvent + ")"); + } + } +} http://git-wip-us.apache.org/repos/asf/ranger/blob/b6d17302/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java ---------------------------------------------------------------------- diff --git a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java index eab869a..8a661d8 100644 --- a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java +++ b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java @@ -40,7 +40,6 @@ import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.security.authenticator.LoginManager; import org.apache.kafka.common.security.kerberos.KerberosLogin; import org.apache.ranger.audit.provider.MiscUtil; -import org.apache.ranger.plugin.audit.RangerDefaultAuditHandler; import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl; import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl; import org.apache.ranger.plugin.policyengine.RangerAccessResult; @@ -59,6 +58,7 @@ public class RangerKafkaAuthorizer implements Authorizer { public static final String KEY_CLUSTER = "cluster"; public static final String KEY_CONSUMER_GROUP = "consumer_group"; public static final String KEY_TRANSACTIONALID = "transactionalid"; + public static final String KEY_DELEGATIONTOKEN = "delegationtoken"; public static final String ACCESS_TYPE_READ = "consume"; public static final String ACCESS_TYPE_WRITE = "publish"; @@ -72,6 +72,7 @@ public class RangerKafkaAuthorizer implements Authorizer { public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = "idempotent_write"; private static volatile RangerBasePlugin rangerPlugin = null; + RangerKafkaAuditHandler auditHandler = null; public RangerKafkaAuthorizer() { } @@ -115,7 +116,7 @@ public class RangerKafkaAuthorizer implements Authorizer { } logger.info("Calling plugin.init()"); rangerPlugin.init(); - RangerDefaultAuditHandler auditHandler = new RangerDefaultAuditHandler(); + auditHandler = new RangerKafkaAuditHandler(); rangerPlugin.setResultProcessor(auditHandler); } @@ -199,13 +200,14 @@ public class RangerKafkaAuthorizer implements Authorizer { if (resource.resourceType().equals(Topic$.MODULE$)) { rangerResource.setValue(KEY_TOPIC, resource.name()); - } else if (resource.resourceType().equals(Cluster$.MODULE$)) { //NOPMD - // CLUSTER should go as null - // rangerResource.setValue(KEY_CLUSTER, resource.name()); + } else if (resource.resourceType().equals(Cluster$.MODULE$)) { + rangerResource.setValue(KEY_CLUSTER, resource.name()); } else if (resource.resourceType().equals(Group$.MODULE$)) { rangerResource.setValue(KEY_CONSUMER_GROUP, resource.name()); } else if (resource.resourceType().equals(TransactionalId$.MODULE$)) { - rangerResource.setValue(KEY_TRANSACTIONALID,resource.name()); + rangerResource.setValue(KEY_TRANSACTIONALID, resource.name()); + } else if (resource.resourceType().equals(DelegationToken$.MODULE$)) { + rangerResource.setValue(KEY_DELEGATIONTOKEN, resource.name()); } else { logger.fatal("Unsupported resourceType=" + resource.resourceType()); validationFailed = true; @@ -228,6 +230,8 @@ public class RangerKafkaAuthorizer implements Authorizer { } catch (Throwable t) { logger.error("Error while calling isAccessAllowed(). request=" + rangerRequest, t); + } finally { + auditHandler.flushAudit(); } } RangerPerfTracer.log(perf); http://git-wip-us.apache.org/repos/asf/ranger/blob/b6d17302/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java ---------------------------------------------------------------------- diff --git a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java index c1386fe..43e88b5 100644 --- a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java +++ b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java @@ -308,7 +308,6 @@ public class KafkaRangerAuthorizerGSSTest { producer.send(new ProducerRecord<String, String>("dev", "somekey", "somevalue")); producer.flush(); record.get(); - Assert.fail("Authorization failure expected"); } catch (Exception ex) { Assert.assertTrue(ex.getMessage().contains("Not authorized to access topics")); } http://git-wip-us.apache.org/repos/asf/ranger/blob/b6d17302/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerTopicCreationTest.java ---------------------------------------------------------------------- diff --git a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerTopicCreationTest.java b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerTopicCreationTest.java new file mode 100644 index 0000000..a12817e --- /dev/null +++ b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerTopicCreationTest.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ranger.authorization.kafka.authorizer; + +import kafka.server.KafkaConfig; +import kafka.server.KafkaServerStartable; +import org.apache.curator.test.InstanceSpec; +import org.apache.curator.test.TestingServer; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.KafkaAdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kerby.kerberos.kerb.server.SimpleKdcServer; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.net.ServerSocket; +import java.nio.charset.StandardCharsets; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + + +public class KafkaRangerTopicCreationTest { + private final static Logger LOG = LoggerFactory.getLogger(KafkaRangerTopicCreationTest.class); + + private static KafkaServerStartable kafkaServer; + private static TestingServer zkServer; + private static int port; + private static Path tempDir; + private static SimpleKdcServer kerbyServer; + + @org.junit.BeforeClass + public static void setup() throws Exception { + String basedir = System.getProperty("basedir"); + if (basedir == null) { + basedir = new File(".").getCanonicalPath(); + } + System.out.println("Base Dir " + basedir); + + configureKerby(basedir); + + // JAAS Config file - We need to point to the correct keytab files + Path path = FileSystems.getDefault().getPath(basedir, "/src/test/resources/kafka_kerberos.jaas"); + String content = new String(Files.readAllBytes(path), StandardCharsets.UTF_8); + content = content.replaceAll("<basedir>", basedir); + //content = content.replaceAll("zookeeper/localhost", "zookeeper/" + address); + + Path path2 = FileSystems.getDefault().getPath(basedir, "/target/test-classes/kafka_kerberos.jaas"); + Files.write(path2, content.getBytes(StandardCharsets.UTF_8)); + + System.setProperty("java.security.auth.login.config", path2.toString()); + + // Set up Zookeeper to require SASL + Map<String,Object> zookeeperProperties = new HashMap<>(); + zookeeperProperties.put("authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider"); + zookeeperProperties.put("requireClientAuthScheme", "sasl"); + zookeeperProperties.put("jaasLoginRenew", "3600000"); + + InstanceSpec instanceSpec = new InstanceSpec(null, -1, -1, -1, true, 1,-1, -1, zookeeperProperties, "localhost"); + + zkServer = new TestingServer(instanceSpec, true); + + // Get a random port + ServerSocket serverSocket = new ServerSocket(0); + port = serverSocket.getLocalPort(); + serverSocket.close(); + + tempDir = Files.createTempDirectory("kafka"); + + LOG.info("Port is {}", port); + LOG.info("Temporary directory is at {}", tempDir); + + final Properties props = new Properties(); + props.put("broker.id", 1); + props.put("host.name", "localhost"); + props.put("port", port); + props.put("log.dir", tempDir.toString()); + props.put("zookeeper.connect", zkServer.getConnectString()); + props.put("replica.socket.timeout.ms", "1500"); + props.put("controlled.shutdown.enable", Boolean.TRUE.toString()); + // Enable SASL_PLAINTEXT + props.put("listeners", "SASL_PLAINTEXT://localhost:" + port); + props.put("security.inter.broker.protocol", "SASL_PLAINTEXT"); + props.put("sasl.enabled.mechanisms", "GSSAPI"); + props.put("sasl.mechanism.inter.broker.protocol", "GSSAPI"); + props.put("sasl.kerberos.service.name", "kafka"); + props.put("offsets.topic.replication.factor", (short) 1); + props.put("offsets.topic.num.partitions", 1); + + // Plug in Apache Ranger authorizer + props.put("authorizer.class.name", "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer"); + + // Create users for testing + UserGroupInformation.createUserForTesting("[email protected]", new String[] {"public"}); + UserGroupInformation.createUserForTesting("kafka/[email protected]", new String[] {"IT"}); + + KafkaConfig config = new KafkaConfig(props); + kafkaServer = new KafkaServerStartable(config); + kafkaServer.startup(); + } + + private static void configureKerby(String baseDir) throws Exception { + + //System.setProperty("sun.security.krb5.debug", "true"); + System.setProperty("java.security.krb5.conf", baseDir + "/target/krb5.conf"); + + kerbyServer = new SimpleKdcServer(); + + kerbyServer.setKdcRealm("kafka.apache.org"); + kerbyServer.setAllowUdp(false); + kerbyServer.setWorkDir(new File(baseDir + "/target")); + + kerbyServer.init(); + + // Create principals + String zookeeper = "zookeeper/[email protected]"; + String kafka = "kafka/[email protected]"; + String client = "[email protected]"; + + kerbyServer.createPrincipal(zookeeper, "zookeeper"); + File keytabFile = new File(baseDir + "/target/zookeeper.keytab"); + kerbyServer.exportPrincipal(zookeeper, keytabFile); + + kerbyServer.createPrincipal(kafka, "kafka"); + keytabFile = new File(baseDir + "/target/kafka.keytab"); + kerbyServer.exportPrincipal(kafka, keytabFile); + + kerbyServer.createPrincipal(client, "client"); + keytabFile = new File(baseDir + "/target/client.keytab"); + kerbyServer.exportPrincipal(client, keytabFile); + + kerbyServer.start(); + } + + @org.junit.AfterClass + public static void cleanup() throws Exception { + if (kafkaServer != null) { + kafkaServer.shutdown(); + } + if (zkServer != null) { + zkServer.stop(); + } + if (kerbyServer != null) { + kerbyServer.stop(); + } + } + + @Test + public void testCreateTopic() throws Exception { + final String topic = "test"; + Properties properties = new Properties(); + properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + port); + properties.put("client.id", "test-consumer-id"); + properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); + AdminClient client = KafkaAdminClient.create(properties); + CreateTopicsResult result = client.createTopics(Arrays.asList(new NewTopic(topic, 1, (short) 1))); + result.values().get(topic).get(); + for (Map.Entry<String, KafkaFuture<Void>> entry : result.values().entrySet()) { + System.out.println("Create Topic : " + entry.getKey() + " " + + "isCancelled : " + entry.getValue().isCancelled() + " " + + "isCompletedExceptionally : " + entry.getValue().isCompletedExceptionally() + " " + + "isDone : " + entry.getValue().isDone()); + } + } +} http://git-wip-us.apache.org/repos/asf/ranger/blob/b6d17302/plugin-kafka/src/test/resources/kafka-policies.json ---------------------------------------------------------------------- diff --git a/plugin-kafka/src/test/resources/kafka-policies.json b/plugin-kafka/src/test/resources/kafka-policies.json index 0c07604..e4f5db1 100644 --- a/plugin-kafka/src/test/resources/kafka-policies.json +++ b/plugin-kafka/src/test/resources/kafka-policies.json @@ -6,6 +6,84 @@ "policies": [ { "service": "cl1_kafka", + "name": "all - cluster", + "policyType": 0, + "description": "Policy for all - cluster", + "isAuditEnabled": true, + "resources": { + "cluster": { + "values": [ + "*" + ], + "isExcludes": false, + "isRecursive": false + } + }, + "policyItems": [ + { + "accesses": [ + { + "type": "publish", + "isAllowed": true + }, + { + "type": "consume", + "isAllowed": true + }, + { + "type": "configure", + "isAllowed": true + }, + { + "type": "describe", + "isAllowed": true + }, + { + "type": "create", + "isAllowed": true + }, + { + "type": "delete", + "isAllowed": true + }, + { + "type": "kafka_admin", + "isAllowed": true + }, + { + "type": "idempotent_write", + "isAllowed": true + }, + { + "type": "describe_configs", + "isAllowed": true + }, + { + "type": "alter_configs", + "isAllowed": true + } + ], + "users": [ + "admin","kafka" + ], + "groups": [ + "IT" + ], + "conditions": [], + "delegateAdmin": true + } + ], + "denyPolicyItems": [], + "allowExceptions": [], + "denyExceptions": [], + "dataMaskPolicyItems": [], + "rowFilterPolicyItems": [], + "id": 40, + "isEnabled": true, + "version": 2 + }, + { + "service": "cl1_kafka", "name": "all - topic", "policyType": 0, "description": "Policy for all - topic", @@ -64,7 +142,7 @@ } ], "users": [ - "admin","kafka" + "admin","kafka", "client" ], "groups": [ "IT" @@ -243,6 +321,84 @@ "id": 30, "isEnabled": true, "version": 1 + }, + { + "service": "cl1_kafka", + "name": "DelegationToken Policy", + "policyType": 0, + "description": "DelegationTokenPolicy", + "isAuditEnabled": true, + "resources": { + "delegationtoken": { + "values": [ + "*" + ], + "isExcludes": false, + "isRecursive": false + } + }, + "policyItems": [ + { + "accesses": [ + { + "type": "publish", + "isAllowed": true + }, + { + "type": "consume", + "isAllowed": true + }, + { + "type": "configure", + "isAllowed": true + }, + { + "type": "describe", + "isAllowed": true + }, + { + "type": "create", + "isAllowed": true + }, + { + "type": "delete", + "isAllowed": true + }, + { + "type": "kafka_admin", + "isAllowed": true + }, + { + "type": "idempotent_write", + "isAllowed": true + }, + { + "type": "describe_configs", + "isAllowed": true + }, + { + "type": "alter_configs", + "isAllowed": true + } + ], + "users": [ + "admin","kafka", "client" + ], + "groups": [ + "IT" + ], + "conditions": [], + "delegateAdmin": true + } + ], + "denyPolicyItems": [], + "allowExceptions": [], + "denyExceptions": [], + "dataMaskPolicyItems": [], + "rowFilterPolicyItems": [], + "id": 31, + "isEnabled": true, + "version": 2 } ], "serviceDef": { @@ -322,6 +478,46 @@ "uiHint":"", "label":"Transactional Id", "description":"Transactional Id" + }, + { + "itemId":3, + "name":"cluster", + "type":"string", + "level":1, + "mandatory":true, + "lookupSupported":false, + "recursiveSupported":false, + "excludesSupported":true, + "matcher":"org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher", + "matcherOptions":{ + "wildCard":true, + "ignoreCase":true + }, + "validationRegEx":"", + "validationMessage":"", + "uiHint":"", + "label":"Cluster", + "description":"Cluster" + }, + { + "itemId":4, + "name":"delegationtoken", + "type":"string", + "level":1, + "mandatory":true, + "lookupSupported":false, + "recursiveSupported":false, + "excludesSupported":true, + "matcher":"org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher", + "matcherOptions":{ + "wildCard":true, + "ignoreCase":true + }, + "validationRegEx":"", + "validationMessage":"", + "uiHint":"", + "label":"Delegation Token", + "description":"Delegation Token" } ], "accessTypes": [ http://git-wip-us.apache.org/repos/asf/ranger/blob/b6d17302/plugin-kafka/src/test/resources/kafka_kerberos.jaas ---------------------------------------------------------------------- diff --git a/plugin-kafka/src/test/resources/kafka_kerberos.jaas b/plugin-kafka/src/test/resources/kafka_kerberos.jaas index 1de804b..2e83c7c 100644 --- a/plugin-kafka/src/test/resources/kafka_kerberos.jaas +++ b/plugin-kafka/src/test/resources/kafka_kerberos.jaas @@ -1,20 +1,20 @@ Server { - com.sun.security.auth.module.Krb5LoginModule required refreshKrb5Config=true useKeyTab=true + com.sun.security.auth.module.Krb5LoginModule required refreshKrb5Config=true useKeyTab=true serviceName="kafka" keyTab="<basedir>/target/zookeeper.keytab" storeKey=true principal="zookeeper/localhost"; }; KafkaServer { - com.sun.security.auth.module.Krb5LoginModule required refreshKrb5Config=true useKeyTab=true + com.sun.security.auth.module.Krb5LoginModule required refreshKrb5Config=true useKeyTab=true serviceName="kafka" keyTab="<basedir>/target/kafka.keytab" storeKey=true principal="kafka/localhost"; }; Client { - com.sun.security.auth.module.Krb5LoginModule required refreshKrb5Config=true useKeyTab=true + com.sun.security.auth.module.Krb5LoginModule required refreshKrb5Config=true useKeyTab=true serviceName="kafka" keyTab="<basedir>/target/kafka.keytab" storeKey=true principal="kafka/localhost"; }; KafkaClient { - com.sun.security.auth.module.Krb5LoginModule required refreshKrb5Config=true useKeyTab=true + com.sun.security.auth.module.Krb5LoginModule required refreshKrb5Config=true useKeyTab=true serviceName="kafka" keyTab="<basedir>/target/client.keytab" storeKey=true principal="client"; }; http://git-wip-us.apache.org/repos/asf/ranger/blob/b6d17302/security-admin/db/mysql/optimized/current/ranger_core_db_mysql.sql ---------------------------------------------------------------------- diff --git a/security-admin/db/mysql/optimized/current/ranger_core_db_mysql.sql b/security-admin/db/mysql/optimized/current/ranger_core_db_mysql.sql index 3f23b00..32cf6db 100644 --- a/security-admin/db/mysql/optimized/current/ranger_core_db_mysql.sql +++ b/security-admin/db/mysql/optimized/current/ranger_core_db_mysql.sql @@ -1383,4 +1383,5 @@ INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10014',UTC_TIMESTAMP(),'Ranger 1.0.0',UTC_TIMESTAMP(),'localhost','Y'); INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10015',UTC_TIMESTAMP(),'Ranger 1.0.0',UTC_TIMESTAMP(),'localhost','Y'); INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10016',UTC_TIMESTAMP(),'Ranger 1.0.0',UTC_TIMESTAMP(),'localhost','Y'); +INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10018',UTC_TIMESTAMP(),'Ranger 1.0.0',UTC_TIMESTAMP(),'localhost','Y'); INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('JAVA_PATCHES',UTC_TIMESTAMP(),'Ranger 1.0.0',UTC_TIMESTAMP(),'localhost','Y'); http://git-wip-us.apache.org/repos/asf/ranger/blob/b6d17302/security-admin/db/oracle/optimized/current/ranger_core_db_oracle.sql ---------------------------------------------------------------------- diff --git a/security-admin/db/oracle/optimized/current/ranger_core_db_oracle.sql b/security-admin/db/oracle/optimized/current/ranger_core_db_oracle.sql index bafdb96..2e577f3 100644 --- a/security-admin/db/oracle/optimized/current/ranger_core_db_oracle.sql +++ b/security-admin/db/oracle/optimized/current/ranger_core_db_oracle.sql @@ -1358,5 +1358,6 @@ INSERT INTO x_db_version_h (id,version,inst_at,inst_by,updated_at,updated_by,act INSERT INTO x_db_version_h (id,version,inst_at,inst_by,updated_at,updated_by,active) VALUES (X_DB_VERSION_H_SEQ.nextval,'J10014',sys_extract_utc(systimestamp),'Ranger 1.0.0',sys_extract_utc(systimestamp),'localhost','Y'); INSERT INTO x_db_version_h (id,version,inst_at,inst_by,updated_at,updated_by,active) VALUES (X_DB_VERSION_H_SEQ.nextval,'J10015',sys_extract_utc(systimestamp),'Ranger 1.0.0',sys_extract_utc(systimestamp),'localhost','Y'); INSERT INTO x_db_version_h (id,version,inst_at,inst_by,updated_at,updated_by,active) VALUES (X_DB_VERSION_H_SEQ.nextval,'J10016',sys_extract_utc(systimestamp),'Ranger 1.0.0',sys_extract_utc(systimestamp),'localhost','Y'); +INSERT INTO x_db_version_h (id,version,inst_at,inst_by,updated_at,updated_by,active) VALUES (X_DB_VERSION_H_SEQ.nextval,'J10018',sys_extract_utc(systimestamp),'Ranger 1.0.0',sys_extract_utc(systimestamp),'localhost','Y'); INSERT INTO x_db_version_h (id,version,inst_at,inst_by,updated_at,updated_by,active) VALUES (X_DB_VERSION_H_SEQ.nextval,'JAVA_PATCHES',sys_extract_utc(systimestamp),'Ranger 1.0.0',sys_extract_utc(systimestamp),'localhost','Y'); commit; http://git-wip-us.apache.org/repos/asf/ranger/blob/b6d17302/security-admin/db/postgres/optimized/current/ranger_core_db_postgres.sql ---------------------------------------------------------------------- diff --git a/security-admin/db/postgres/optimized/current/ranger_core_db_postgres.sql b/security-admin/db/postgres/optimized/current/ranger_core_db_postgres.sql index 2bc58ac..bad32ef 100644 --- a/security-admin/db/postgres/optimized/current/ranger_core_db_postgres.sql +++ b/security-admin/db/postgres/optimized/current/ranger_core_db_postgres.sql @@ -1471,6 +1471,7 @@ INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10014',current_timestamp,'Ranger 1.0.0',current_timestamp,'localhost','Y'); INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10015',current_timestamp,'Ranger 1.0.0',current_timestamp,'localhost','Y'); INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10016',current_timestamp,'Ranger 1.0.0',current_timestamp,'localhost','Y'); +INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10018',current_timestamp,'Ranger 1.0.0',current_timestamp,'localhost','Y'); INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('JAVA_PATCHES',current_timestamp,'Ranger 1.0.0',current_timestamp,'localhost','Y'); DROP VIEW IF EXISTS vx_trx_log; http://git-wip-us.apache.org/repos/asf/ranger/blob/b6d17302/security-admin/db/sqlanywhere/optimized/current/ranger_core_db_sqlanywhere.sql ---------------------------------------------------------------------- diff --git a/security-admin/db/sqlanywhere/optimized/current/ranger_core_db_sqlanywhere.sql b/security-admin/db/sqlanywhere/optimized/current/ranger_core_db_sqlanywhere.sql index 1b64eea..9482992 100644 --- a/security-admin/db/sqlanywhere/optimized/current/ranger_core_db_sqlanywhere.sql +++ b/security-admin/db/sqlanywhere/optimized/current/ranger_core_db_sqlanywhere.sql @@ -1663,6 +1663,8 @@ INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active GO INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10016,CURRENT_TIMESTAMP,'Ranger 1.0.0',CURRENT_TIMESTAMP,'localhost','Y'); GO +INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10018,CURRENT_TIMESTAMP,'Ranger 1.0.0',CURRENT_TIMESTAMP,'localhost','Y'); +GO INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('JAVA_PATCHES',CURRENT_TIMESTAMP,'Ranger 1.0.0',CURRENT_TIMESTAMP,'localhost','Y'); GO exit http://git-wip-us.apache.org/repos/asf/ranger/blob/b6d17302/security-admin/db/sqlserver/optimized/current/ranger_core_db_sqlserver.sql ---------------------------------------------------------------------- diff --git a/security-admin/db/sqlserver/optimized/current/ranger_core_db_sqlserver.sql b/security-admin/db/sqlserver/optimized/current/ranger_core_db_sqlserver.sql index 4a216fe..85f3285 100644 --- a/security-admin/db/sqlserver/optimized/current/ranger_core_db_sqlserver.sql +++ b/security-admin/db/sqlserver/optimized/current/ranger_core_db_sqlserver.sql @@ -3145,6 +3145,7 @@ INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10014',CURRENT_TIMESTAMP,'Ranger 1.0.0',CURRENT_TIMESTAMP,'localhost','Y'); INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10015',CURRENT_TIMESTAMP,'Ranger 1.0.0',CURRENT_TIMESTAMP,'localhost','Y'); INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10016',CURRENT_TIMESTAMP,'Ranger 1.0.0',CURRENT_TIMESTAMP,'localhost','Y'); +INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10018',CURRENT_TIMESTAMP,'Ranger 1.0.0',CURRENT_TIMESTAMP,'localhost','Y'); INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('JAVA_PATCHES',CURRENT_TIMESTAMP,'Ranger 1.0.0',CURRENT_TIMESTAMP,'localhost','Y'); GO CREATE VIEW [dbo].[vx_trx_log] AS http://git-wip-us.apache.org/repos/asf/ranger/blob/b6d17302/security-admin/src/main/java/org/apache/ranger/patch/PatchForKafkaServiceDefUpdate_J10025.java ---------------------------------------------------------------------- diff --git a/security-admin/src/main/java/org/apache/ranger/patch/PatchForKafkaServiceDefUpdate_J10025.java b/security-admin/src/main/java/org/apache/ranger/patch/PatchForKafkaServiceDefUpdate_J10025.java new file mode 100644 index 0000000..0ef1544 --- /dev/null +++ b/security-admin/src/main/java/org/apache/ranger/patch/PatchForKafkaServiceDefUpdate_J10025.java @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ranger.patch; + +import org.apache.commons.lang.StringUtils; +import org.apache.log4j.Logger; +import org.apache.ranger.biz.RangerBizUtil; +import org.apache.ranger.biz.ServiceDBStore; +import org.apache.ranger.common.GUIDUtil; +import org.apache.ranger.common.JSONUtil; +import org.apache.ranger.common.RangerValidatorFactory; +import org.apache.ranger.common.StringUtil; +import org.apache.ranger.db.RangerDaoManager; +import org.apache.ranger.entity.XXAccessTypeDef; +import org.apache.ranger.entity.XXPolicy; +import org.apache.ranger.entity.XXPolicyItem; +import org.apache.ranger.entity.XXPolicyItemAccess; +import org.apache.ranger.entity.XXPolicyItemUserPerm; +import org.apache.ranger.entity.XXPolicyResource; +import org.apache.ranger.entity.XXPolicyResourceMap; +import org.apache.ranger.entity.XXPortalUser; +import org.apache.ranger.entity.XXResourceDef; +import org.apache.ranger.entity.XXService; +import org.apache.ranger.entity.XXServiceDef; +import org.apache.ranger.entity.XXUser; +import org.apache.ranger.plugin.model.RangerPolicy; +import org.apache.ranger.plugin.model.RangerPolicyResourceSignature; +import org.apache.ranger.plugin.model.RangerServiceDef; +import org.apache.ranger.plugin.model.validation.RangerServiceDefValidator; +import org.apache.ranger.plugin.model.validation.RangerValidator.Action; +import org.apache.ranger.plugin.store.EmbeddedServiceDefsUtil; +import org.apache.ranger.service.RangerPolicyService; +import org.apache.ranger.service.XPermMapService; +import org.apache.ranger.service.XPolicyService; +import org.apache.ranger.util.CLIUtil; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Component +public class PatchForKafkaServiceDefUpdate_J10025 extends BaseLoader { + private static final Logger logger = Logger.getLogger(PatchForKafkaServiceDefUpdate_J10025.class); + private static final List<String> POLICY_NAMES = new ArrayList<>(Arrays.asList("all - cluster", "all - delegationtoken")); + private static final String LOGIN_ID_ADMIN = "admin"; + private static final String KAFKA_RESOURCE_CLUSTER = "cluster"; + private static final String KAFKA_RESOURCE_DELEGATIONTOKEN = "delegationtoken"; + + private static final List<String> DEFAULT_POLICY_USERS = new ArrayList<>(Arrays.asList("kafka","rangerlookup")); + + + public static final String SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME = "kafka"; + public static final String CLUSTER_RESOURCE_NAME ="cluster"; + + + @Autowired + RangerDaoManager daoMgr; + + @Autowired + ServiceDBStore svcDBStore; + + @Autowired + JSONUtil jsonUtil; + + @Autowired + RangerPolicyService policyService; + + @Autowired + StringUtil stringUtil; + + @Autowired + GUIDUtil guidUtil; + + @Autowired + XPolicyService xPolService; + + @Autowired + XPermMapService xPermMapService; + + @Autowired + RangerBizUtil bizUtil; + + @Autowired + RangerValidatorFactory validatorFactory; + + @Autowired + ServiceDBStore svcStore; + + public static void main(String[] args) { + logger.info("main()"); + try { + PatchForKafkaServiceDefUpdate_J10025 loader = (PatchForKafkaServiceDefUpdate_J10025) CLIUtil.getBean(PatchForKafkaServiceDefUpdate_J10025.class); + loader.init(); + while (loader.isMoreToProcess()) { + loader.load(); + } + logger.info("Load complete. Exiting!!!"); + System.exit(0); + } catch (Exception e) { + logger.error("Error loading", e); + System.exit(1); + } + } + + @Override + public void init() throws Exception { + // Do Nothing + } + + @Override + public void execLoad() { + logger.info("==> PatchForKafkaServiceDefUpdate_J10025.execLoad()"); + try { + updateKafkaServiceDef(); + } catch (Exception e) { + logger.error("Error while applying PatchForKafkaServiceDefUpdate_J10025...", e); + } + logger.info("<== PatchForKafkaServiceDefUpdate_J10025.execLoad()"); + } + + @Override + public void printStats() { + logger.info("PatchForKafkaServiceDefUpdate_J10025 "); + } + + private void updateKafkaServiceDef(){ + RangerServiceDef ret = null; + RangerServiceDef embeddedKafkaServiceDef = null; + RangerServiceDef dbKafkaServiceDef = null; + List<RangerServiceDef.RangerResourceDef> embeddedKafkaResourceDefs = null; + List<RangerServiceDef.RangerAccessTypeDef> embeddedKafkaAccessTypes = null; + XXServiceDef xXServiceDefObj = null; + try{ + embeddedKafkaServiceDef=EmbeddedServiceDefsUtil.instance().getEmbeddedServiceDef(SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME); + if(embeddedKafkaServiceDef!=null){ + + xXServiceDefObj = daoMgr.getXXServiceDef().findByName(SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME); + Map<String, String> serviceDefOptionsPreUpdate=null; + String jsonStrPreUpdate=null; + if(xXServiceDefObj!=null) { + jsonStrPreUpdate=xXServiceDefObj.getDefOptions(); + serviceDefOptionsPreUpdate=jsonStringToMap(jsonStrPreUpdate); + xXServiceDefObj=null; + } + dbKafkaServiceDef=svcDBStore.getServiceDefByName(SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME); + + if(dbKafkaServiceDef!=null){ + embeddedKafkaResourceDefs = embeddedKafkaServiceDef.getResources(); + embeddedKafkaAccessTypes = embeddedKafkaServiceDef.getAccessTypes(); + + if (checkNewKafkaresourcePresent(embeddedKafkaResourceDefs)) { + // This is to check if CLUSTER resource is added to the resource definition, if so update the resource def and accessType def + if (embeddedKafkaResourceDefs != null) { + dbKafkaServiceDef.setResources(embeddedKafkaResourceDefs); + } + if (embeddedKafkaAccessTypes != null) { + if(!embeddedKafkaAccessTypes.toString().equalsIgnoreCase(dbKafkaServiceDef.getAccessTypes().toString())) { + dbKafkaServiceDef.setAccessTypes(embeddedKafkaAccessTypes); + } + } + } + + RangerServiceDefValidator validator = validatorFactory.getServiceDefValidator(svcStore); + validator.validate(dbKafkaServiceDef, Action.UPDATE); + + ret = svcStore.updateServiceDef(dbKafkaServiceDef); + if(ret==null){ + logger.error("Error while updating "+SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME+"service-def"); + throw new RuntimeException("Error while updating "+SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME+"service-def"); + } + xXServiceDefObj = daoMgr.getXXServiceDef().findByName(SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME); + if(xXServiceDefObj!=null) { + String jsonStrPostUpdate=xXServiceDefObj.getDefOptions(); + Map<String, String> serviceDefOptionsPostUpdate=jsonStringToMap(jsonStrPostUpdate); + if (serviceDefOptionsPostUpdate != null && serviceDefOptionsPostUpdate.containsKey(RangerServiceDef.OPTION_ENABLE_DENY_AND_EXCEPTIONS_IN_POLICIES)) { + if(serviceDefOptionsPreUpdate == null || !serviceDefOptionsPreUpdate.containsKey(RangerServiceDef.OPTION_ENABLE_DENY_AND_EXCEPTIONS_IN_POLICIES)) { + String preUpdateValue = serviceDefOptionsPreUpdate == null ? null : serviceDefOptionsPreUpdate.get(RangerServiceDef.OPTION_ENABLE_DENY_AND_EXCEPTIONS_IN_POLICIES); + if (preUpdateValue == null) { + serviceDefOptionsPostUpdate.remove(RangerServiceDef.OPTION_ENABLE_DENY_AND_EXCEPTIONS_IN_POLICIES); + } else { + serviceDefOptionsPostUpdate.put(RangerServiceDef.OPTION_ENABLE_DENY_AND_EXCEPTIONS_IN_POLICIES, preUpdateValue); + } + xXServiceDefObj.setDefOptions(mapToJsonString(serviceDefOptionsPostUpdate)); + daoMgr.getXXServiceDef().update(xXServiceDefObj); + } + } + createDefaultPolicyForNewResources(); + } + } + } + }catch(Exception e) + { + logger.error("Error while updating "+SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME+"service-def", e); + } + } + + private boolean checkNewKafkaresourcePresent(List<RangerServiceDef.RangerResourceDef> resourceDefs) { + boolean ret = false; + for(RangerServiceDef.RangerResourceDef resourceDef : resourceDefs) { + if (CLUSTER_RESOURCE_NAME.equals(resourceDef.getName()) ) { + ret = true ; + break; + } + } + return ret; + } + + private String mapToJsonString(Map<String, String> map) { + String ret = null; + if(map != null) { + try { + ret = jsonUtil.readMapToString(map); + } catch(Exception excp) { + logger.warn("mapToJsonString() failed to convert map: " + map, excp); + } + } + return ret; + } + + protected Map<String, String> jsonStringToMap(String jsonStr) { + Map<String, String> ret = null; + if(!StringUtils.isEmpty(jsonStr)) { + try { + ret = jsonUtil.jsonToMap(jsonStr); + } catch(Exception excp) { + // fallback to earlier format: "name1=value1;name2=value2" + for(String optionString : jsonStr.split(";")) { + if(StringUtils.isEmpty(optionString)) { + continue; + } + String[] nvArr = optionString.split("="); + String name = (nvArr != null && nvArr.length > 0) ? nvArr[0].trim() : null; + String value = (nvArr != null && nvArr.length > 1) ? nvArr[1].trim() : null; + if(StringUtils.isEmpty(name)) { + continue; + } + if(ret == null) { + ret = new HashMap<String, String>(); + } + ret.put(name, value); + } + } + } + return ret; + } + + private void createDefaultPolicyForNewResources() { + logger.info("==> createDefaultPolicyForNewResources "); + XXPortalUser xxPortalUser = daoMgr.getXXPortalUser().findByLoginId(LOGIN_ID_ADMIN); + Long currentUserId = xxPortalUser.getId(); + + XXServiceDef xXServiceDefObj = daoMgr.getXXServiceDef() + .findByName(EmbeddedServiceDefsUtil.EMBEDDED_SERVICEDEF_KAFKA_NAME); + if (xXServiceDefObj == null) { + logger.debug("ServiceDef not fount with name :" + EmbeddedServiceDefsUtil.EMBEDDED_SERVICEDEF_KAFKA_NAME); + return; + } + + Long xServiceDefId = xXServiceDefObj.getId(); + List<XXService> xxServices = daoMgr.getXXService().findByServiceDefId(xServiceDefId); + + for (XXService xxService : xxServices) { + int resourceMapOrder = 0; + for (String newResource : POLICY_NAMES) { + XXPolicy xxPolicy = new XXPolicy(); + xxPolicy.setName(newResource); + xxPolicy.setDescription(newResource); + xxPolicy.setService(xxService.getId()); + xxPolicy.setPolicyPriority(RangerPolicy.POLICY_PRIORITY_NORMAL); + xxPolicy.setIsAuditEnabled(Boolean.TRUE); + xxPolicy.setIsEnabled(Boolean.TRUE); + xxPolicy.setPolicyType(RangerPolicy.POLICY_TYPE_ACCESS); + xxPolicy.setGuid(guidUtil.genGUID()); + xxPolicy.setAddedByUserId(currentUserId); + xxPolicy.setUpdatedByUserId(currentUserId); + RangerPolicy rangerPolicy = new RangerPolicy(); + RangerPolicyResourceSignature resourceSignature = new RangerPolicyResourceSignature(rangerPolicy); + xxPolicy.setResourceSignature(resourceSignature.getSignature()); + XXPolicy createdPolicy = daoMgr.getXXPolicy().create(xxPolicy); + + XXPolicyItem xxPolicyItem = new XXPolicyItem(); + xxPolicyItem.setIsEnabled(Boolean.TRUE); + xxPolicyItem.setDelegateAdmin(Boolean.TRUE); + xxPolicyItem.setItemType(0); + xxPolicyItem.setOrder(0); + xxPolicyItem.setAddedByUserId(currentUserId); + xxPolicyItem.setUpdatedByUserId(currentUserId); + xxPolicyItem.setPolicyId(createdPolicy.getId()); + XXPolicyItem createdXXPolicyItem = daoMgr.getXXPolicyItem().create(xxPolicyItem); + + List<String> accessTypes = Arrays.asList("create", "delete", "configure", "alter_configs", "describe", "describe_configs", "consume", "publish", "kafka_admin","idempotent_write"); + for (int i = 0; i < accessTypes.size(); i++) { + XXAccessTypeDef xAccTypeDef = daoMgr.getXXAccessTypeDef().findByNameAndServiceId(accessTypes.get(i), + xxPolicy.getService()); + if (xAccTypeDef == null) { + throw new RuntimeException(accessTypes.get(i) + ": is not a valid access-type. policy='" + + xxPolicy.getName() + "' service='" + xxPolicy.getService() + "'"); + } + XXPolicyItemAccess xPolItemAcc = new XXPolicyItemAccess(); + xPolItemAcc.setIsAllowed(Boolean.TRUE); + xPolItemAcc.setType(xAccTypeDef.getId()); + xPolItemAcc.setOrder(i); + xPolItemAcc.setAddedByUserId(currentUserId); + xPolItemAcc.setUpdatedByUserId(currentUserId); + xPolItemAcc.setPolicyitemid(createdXXPolicyItem.getId()); + daoMgr.getXXPolicyItemAccess().create(xPolItemAcc); + } + + for (int i = 0; i < DEFAULT_POLICY_USERS.size(); i++) { + String user = DEFAULT_POLICY_USERS.get(i); + if (StringUtils.isBlank(user)) { + continue; + } + XXUser xxUser = daoMgr.getXXUser().findByUserName(user); + if (xxUser == null) { + throw new RuntimeException(user + ": user does not exist. policy='" + xxPolicy.getName() + + "' service='" + xxPolicy.getService() + "' user='" + user + "'"); + } + XXPolicyItemUserPerm xUserPerm = new XXPolicyItemUserPerm(); + xUserPerm.setUserId(xxUser.getId()); + xUserPerm.setPolicyItemId(createdXXPolicyItem.getId()); + xUserPerm.setOrder(i); + xUserPerm.setAddedByUserId(currentUserId); + xUserPerm.setUpdatedByUserId(currentUserId); + daoMgr.getXXPolicyItemUserPerm().create(xUserPerm); + } + + String policyResourceName = KAFKA_RESOURCE_CLUSTER; + if ("all - delegationtoken".equals(newResource)) { + policyResourceName = KAFKA_RESOURCE_DELEGATIONTOKEN; + } + XXResourceDef xResDef = daoMgr.getXXResourceDef().findByNameAndPolicyId(policyResourceName, + createdPolicy.getId()); + if (xResDef == null) { + throw new RuntimeException(policyResourceName + ": is not a valid resource-type. policy='" + + createdPolicy.getName() + "' service='" + createdPolicy.getService() + "'"); + } + + XXPolicyResource xPolRes = new XXPolicyResource(); + + xPolRes.setAddedByUserId(currentUserId); + xPolRes.setUpdatedByUserId(currentUserId); + xPolRes.setIsExcludes(Boolean.FALSE); + xPolRes.setIsRecursive(Boolean.FALSE); + xPolRes.setPolicyId(createdPolicy.getId()); + xPolRes.setResDefId(xResDef.getId()); + xPolRes = daoMgr.getXXPolicyResource().create(xPolRes); + + XXPolicyResourceMap xPolResMap = new XXPolicyResourceMap(); + xPolResMap.setResourceId(xPolRes.getId()); + xPolResMap.setValue("*"); + xPolResMap.setOrder(resourceMapOrder); + xPolResMap.setAddedByUserId(currentUserId); + xPolResMap.setUpdatedByUserId(currentUserId); + daoMgr.getXXPolicyResourceMap().create(xPolResMap); + resourceMapOrder++; + logger.info("Creating policy for service id : " + xxService.getId()); + } + } + logger.info("<== createDefaultPolicyForNewResources "); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ranger/blob/b6d17302/src/main/assembly/plugin-kafka.xml ---------------------------------------------------------------------- diff --git a/src/main/assembly/plugin-kafka.xml b/src/main/assembly/plugin-kafka.xml index 97ff8ad..7c55128 100644 --- a/src/main/assembly/plugin-kafka.xml +++ b/src/main/assembly/plugin-kafka.xml @@ -62,7 +62,6 @@ </include> <include>commons-lang:commons-lang</include> <include>commons-io:commons-io</include> - <include>com.google.guava:guava:jar:${google.guava.version}</include> <include>org.apache.httpcomponents:httpclient:jar:${httpcomponents.httpclient.version} </include> <include>org.apache.httpcomponents:httpcore:jar:${httpcomponents.httpcore.version}
