Repository: ranger Updated Branches: refs/heads/ranger-1.0 b64b66e37 -> 50957cdf3
RANGER-2117:RangerKafkaAuthorizer to support new resources and operations which are in Apache Kafka 1.0.0 Signed-off-by: rmani <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ranger/repo Commit: http://git-wip-us.apache.org/repos/asf/ranger/commit/50957cdf Tree: http://git-wip-us.apache.org/repos/asf/ranger/tree/50957cdf Diff: http://git-wip-us.apache.org/repos/asf/ranger/diff/50957cdf Branch: refs/heads/ranger-1.0 Commit: 50957cdf33b313ac62fd1bdd6e9058ebf1895bfa Parents: b64b66e Author: rmani <[email protected]> Authored: Mon Jun 4 14:46:40 2018 -0700 Committer: rmani <[email protected]> Committed: Tue Jun 5 11:21:44 2018 -0700 ---------------------------------------------------------------------- .../service-defs/ranger-servicedef-kafka.json | 37 ++- .../kafka/authorizer/RangerKafkaAuthorizer.java | 12 + .../KafkaRangerAuthorizerGSSTest.java | 22 ++ .../src/test/resources/kafka-policies.json | 146 +++++++++++- .../PatchForKafkaServiceDefUpdate_J10015.java | 236 +++++++++++++++++++ 5 files changed, 449 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ranger/blob/50957cdf/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json ---------------------------------------------------------------------- diff --git a/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json b/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json index 839d780..ca3e0fe 100644 --- a/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json +++ b/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json @@ -24,6 +24,20 @@ "uiHint":"", "label":"Topic", "description":"Topic" + }, + { + "itemId":2, + "name":"transactionalid", + "type":"string", + "level":1, + "excludesSupported":true, + "matcher":"org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher", + "matcherOptions":{ + "wildCard":true, + "ignoreCase":true + }, + "label":"Transactional Id", + "description":"Transactional Id" } ], @@ -68,7 +82,10 @@ { "itemId":9, "name":"delete", - "label":"Delete" + "label":"Delete", + "impliedGrants":[ + "describe" + ] }, { "itemId":7, @@ -83,6 +100,24 @@ "delete" ] + }, + { + "itemId":10, + "name":"idempotent_write", + "label":"Idempotent Write" + }, + { + "itemId":11, + "name":"describe_configs", + "label":"Describe Configs" + }, + { + "itemId":12, + "name":"alter_configs", + "label":"Alter Configs", + "impliedGrants":[ + "describe_configs" + ] } ], "configs":[ http://git-wip-us.apache.org/repos/asf/ranger/blob/50957cdf/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java ---------------------------------------------------------------------- diff --git a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java index 630d1af..b5d151e 100644 --- a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java +++ b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java @@ -57,6 +57,7 @@ public class RangerKafkaAuthorizer implements Authorizer { public static final String KEY_TOPIC = "topic"; public static final String KEY_CLUSTER = "cluster"; public static final String KEY_CONSUMER_GROUP = "consumer_group"; + public static final String KEY_TRANSACTIONALID = "transactionalid"; public static final String ACCESS_TYPE_READ = "consume"; public static final String ACCESS_TYPE_WRITE = "publish"; @@ -65,6 +66,9 @@ public class RangerKafkaAuthorizer implements Authorizer { public static final String ACCESS_TYPE_CONFIGURE = "configure"; public static final String ACCESS_TYPE_DESCRIBE = "describe"; public static final String ACCESS_TYPE_KAFKA_ADMIN = "kafka_admin"; + public static final String ACCESS_TYPE_DESCRIBE_CONFIGS = "describe_configs"; + public static final String ACCESS_TYPE_ALTER_CONFIGS = "alter_configs"; + public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = "idempotent_write"; private static volatile RangerBasePlugin rangerPlugin = null; @@ -198,6 +202,8 @@ public class RangerKafkaAuthorizer implements Authorizer { // rangerResource.setValue(KEY_CLUSTER, resource.name()); } else if (resource.resourceType().equals(Group$.MODULE$)) { rangerResource.setValue(KEY_CONSUMER_GROUP, resource.name()); + } else if (resource.resourceType().equals(TransactionalId$.MODULE$)) { + rangerResource.setValue(KEY_TRANSACTIONALID,resource.name()); } else { logger.fatal("Unsupported resourceType=" + resource.resourceType()); validationFailed = true; @@ -327,6 +333,12 @@ public class RangerKafkaAuthorizer implements Authorizer { return ACCESS_TYPE_CREATE; } else if (operation.equals(Delete$.MODULE$)) { return ACCESS_TYPE_DELETE; + } else if (operation.equals(DescribeConfigs$.MODULE$)) { + return ACCESS_TYPE_DESCRIBE_CONFIGS; + } else if (operation.equals(AlterConfigs$.MODULE$)) { + return ACCESS_TYPE_ALTER_CONFIGS; + } else if (operation.equals(IdempotentWrite$.MODULE$)) { + return ACCESS_TYPE_IDEMPOTENT_WRITE; } return null; } http://git-wip-us.apache.org/repos/asf/ranger/blob/50957cdf/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java ---------------------------------------------------------------------- diff --git a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java index 23b9299..2624478 100644 --- a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java +++ b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java @@ -316,4 +316,26 @@ public class KafkaRangerAuthorizerGSSTest { producer.close(); } + + @Test + public void testAuthorizedIdempotentWrite() throws Exception { + // Create the Producer + Properties producerProps = new Properties(); + producerProps.put("bootstrap.servers", "localhost:" + port); + producerProps.put("acks", "all"); + producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); + producerProps.put("sasl.mechanism", "GSSAPI"); + producerProps.put("sasl.kerberos.service.name", "kafka"); + producerProps.put("enable.idempotence", "true"); + + final Producer<String, String> producer = new KafkaProducer<>(producerProps); + + // Send a message + Future<RecordMetadata> record = + producer.send(new ProducerRecord<String, String>("test", "somekey", "somevalue")); + producer.flush(); + producer.close(); + } } http://git-wip-us.apache.org/repos/asf/ranger/blob/50957cdf/plugin-kafka/src/test/resources/kafka-policies.json ---------------------------------------------------------------------- diff --git a/plugin-kafka/src/test/resources/kafka-policies.json b/plugin-kafka/src/test/resources/kafka-policies.json index d0e469a..0c07604 100644 --- a/plugin-kafka/src/test/resources/kafka-policies.json +++ b/plugin-kafka/src/test/resources/kafka-policies.json @@ -49,10 +49,22 @@ { "type": "kafka_admin", "isAllowed": true + }, + { + "type": "idempotent_write", + "isAllowed": true + }, + { + "type": "describe_configs", + "isAllowed": true + }, + { + "type": "alter_configs", + "isAllowed": true } ], "users": [ - "admin" + "admin","kafka" ], "groups": [ "IT" @@ -99,6 +111,18 @@ { "type": "describe", "isAllowed": true + }, + { + "type": "idempotent_write", + "isAllowed": true + }, + { + "type": "describe_configs", + "isAllowed": true + }, + { + "type": "alter_configs", + "isAllowed": true } ], "users": [], @@ -120,6 +144,64 @@ }, { "service": "cl1_kafka", + "name": "ClusterLevelPolicy", + "policyType": 0, + "description": "", + "isAuditEnabled": true, + "resources": { + "topic": { + "values": [ + "test" + ], + "isExcludes": false, + "isRecursive": false + } + }, + "policyItems": [ + { + "accesses": [ + { + "type": "publish", + "isAllowed": true + }, + { + "type": "consume", + "isAllowed": true + }, + { + "type": "describe", + "isAllowed": true + }, + { + "type": "idempotent_write", + "isAllowed": true + }, + { + "type": "describe_configs", + "isAllowed": true + }, + { + "type": "alter_configs", + "isAllowed": true + } + ], + "users": ["kafka"], + "groups": [], + "conditions": [], + "delegateAdmin": false + } + ], + "denyPolicyItems": [], + "allowExceptions": [], + "denyExceptions": [], + "dataMaskPolicyItems": [], + "rowFilterPolicyItems": [], + "id": 25, + "isEnabled": true, + "version": 1 + }, + { + "service": "cl1_kafka", "name": "DevPolicy", "policyType": 0, "description": "", @@ -220,6 +302,26 @@ "uiHint": "", "label": "Topic", "description": "Topic" + }, + { + "itemId":2, + "name":"transactionalid", + "type":"string", + "level":1, + "mandatory":true, + "lookupSupported":false, + "recursiveSupported":false, + "excludesSupported":true, + "matcher":"org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher", + "matcherOptions":{ + "wildCard":true, + "ignoreCase":true + }, + "validationRegEx":"", + "validationMessage":"", + "uiHint":"", + "label":"Transactional Id", + "description":"Transactional Id" } ], "accessTypes": [ @@ -263,7 +365,7 @@ "itemId": 9, "name": "delete", "label": "Delete", - "impliedGrants": [] + "impliedGrants": ["describe"] }, { "itemId": 7, @@ -277,6 +379,24 @@ "create", "delete" ] + }, + { + "itemId":10, + "name":"idempotent_write", + "label":"Idempotent Write" + }, + { + "itemId":11, + "name":"describe_configs", + "label":"Describe Configs" + }, + { + "itemId":12, + "name":"alter_configs", + "label":"Alter Configs", + "impliedGrants":[ + "describe_configs" + ] } ], "policyConditions": [ @@ -976,7 +1096,9 @@ "itemId": 9018, "name": "kafka:delete", "label": "Delete", - "impliedGrants": [] + "impliedGrants": [ + "kafka:describe" + ] }, { "itemId": 9016, @@ -992,6 +1114,24 @@ ] }, { + "itemId":9019, + "name":"idempotent_write", + "label":"Idempotent Write" + }, + { + "itemId":9020, + "name":"describe_configs", + "label":"Describe Configs" + }, + { + "itemId":9021, + "name":"alter_configs", + "label":"Alter Configs", + "impliedGrants":[ + "describe_configs" + ] + }, + { "itemId": 11012, "name": "atlas:read", "label": "read", http://git-wip-us.apache.org/repos/asf/ranger/blob/50957cdf/security-admin/src/main/java/org/apache/ranger/patch/PatchForKafkaServiceDefUpdate_J10015.java ---------------------------------------------------------------------- diff --git a/security-admin/src/main/java/org/apache/ranger/patch/PatchForKafkaServiceDefUpdate_J10015.java b/security-admin/src/main/java/org/apache/ranger/patch/PatchForKafkaServiceDefUpdate_J10015.java new file mode 100644 index 0000000..cb829a7 --- /dev/null +++ b/security-admin/src/main/java/org/apache/ranger/patch/PatchForKafkaServiceDefUpdate_J10015.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ranger.patch; + +import org.apache.commons.lang.StringUtils; +import org.apache.log4j.Logger; +import org.apache.ranger.biz.RangerBizUtil; +import org.apache.ranger.biz.ServiceDBStore; +import org.apache.ranger.common.JSONUtil; +import org.apache.ranger.common.RangerValidatorFactory; +import org.apache.ranger.common.StringUtil; +import org.apache.ranger.db.RangerDaoManager; +import org.apache.ranger.entity.XXServiceDef; +import org.apache.ranger.plugin.model.RangerServiceDef; +import org.apache.ranger.plugin.model.validation.RangerServiceDefValidator; +import org.apache.ranger.plugin.model.validation.RangerValidator.Action; +import org.apache.ranger.plugin.store.EmbeddedServiceDefsUtil; +import org.apache.ranger.service.RangerPolicyService; +import org.apache.ranger.service.XPermMapService; +import org.apache.ranger.service.XPolicyService; +import org.apache.ranger.util.CLIUtil; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Component +public class PatchForKafkaServiceDefUpdate_J10015 extends BaseLoader { + private static final Logger logger = Logger.getLogger(PatchForKafkaServiceDefUpdate_J10015.class); + public static final String SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME = "kafka"; + public static final String TRANSACTIONALID_RESOURCE_NAME ="transactionalid"; + + @Autowired + RangerDaoManager daoMgr; + + @Autowired + ServiceDBStore svcDBStore; + + @Autowired + JSONUtil jsonUtil; + + @Autowired + RangerPolicyService policyService; + + @Autowired + StringUtil stringUtil; + + @Autowired + XPolicyService xPolService; + + @Autowired + XPermMapService xPermMapService; + + @Autowired + RangerBizUtil bizUtil; + + @Autowired + RangerValidatorFactory validatorFactory; + + @Autowired + ServiceDBStore svcStore; + + public static void main(String[] args) { + logger.info("main()"); + try { + PatchForKafkaServiceDefUpdate_J10015 loader = (PatchForKafkaServiceDefUpdate_J10015) CLIUtil.getBean(PatchForKafkaServiceDefUpdate_J10015.class); + loader.init(); + while (loader.isMoreToProcess()) { + loader.load(); + } + logger.info("Load complete. Exiting!!!"); + System.exit(0); + } catch (Exception e) { + logger.error("Error loading", e); + System.exit(1); + } + } + + @Override + public void init() throws Exception { + // Do Nothing + } + + @Override + public void execLoad() { + logger.info("==> PatchForKafkaServiceDefUpdate_J10015.execLoad()"); + try { + updateHiveServiceDef(); + } catch (Exception e) { + logger.error("Error while applying PatchForKafkaServiceDefUpdate_J10015...", e); + } + logger.info("<== PatchForKafkaServiceDefUpdate_J10015.execLoad()"); + } + + @Override + public void printStats() { + logger.info("PatchForKafkaServiceDefUpdate_J10015 "); + } + + private void updateHiveServiceDef(){ + RangerServiceDef ret = null; + RangerServiceDef embeddedKafkaServiceDef = null; + RangerServiceDef dbKafkaServiceDef = null; + List<RangerServiceDef.RangerResourceDef> embeddedKafkaResourceDefs = null; + List<RangerServiceDef.RangerAccessTypeDef> embeddedKafkaAccessTypes = null; + XXServiceDef xXServiceDefObj = null; + try{ + embeddedKafkaServiceDef=EmbeddedServiceDefsUtil.instance().getEmbeddedServiceDef(SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME); + if(embeddedKafkaServiceDef!=null){ + + xXServiceDefObj = daoMgr.getXXServiceDef().findByName(SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME); + Map<String, String> serviceDefOptionsPreUpdate=null; + String jsonStrPreUpdate=null; + if(xXServiceDefObj!=null) { + jsonStrPreUpdate=xXServiceDefObj.getDefOptions(); + serviceDefOptionsPreUpdate=jsonStringToMap(jsonStrPreUpdate); + xXServiceDefObj=null; + } + dbKafkaServiceDef=svcDBStore.getServiceDefByName(SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME); + + if(dbKafkaServiceDef!=null){ + embeddedKafkaResourceDefs = embeddedKafkaServiceDef.getResources(); + embeddedKafkaAccessTypes = embeddedKafkaServiceDef.getAccessTypes(); + + if (checkNewKafkaresourcePresent(embeddedKafkaResourceDefs)) { + // This is to check if URL def is added to the resource definition, if so update the resource def and accessType def + if (embeddedKafkaResourceDefs != null) { + dbKafkaServiceDef.setResources(embeddedKafkaResourceDefs); + } + if (embeddedKafkaAccessTypes != null) { + if(!embeddedKafkaAccessTypes.toString().equalsIgnoreCase(dbKafkaServiceDef.getAccessTypes().toString())) { + dbKafkaServiceDef.setAccessTypes(embeddedKafkaAccessTypes); + } + } + } + + RangerServiceDefValidator validator = validatorFactory.getServiceDefValidator(svcStore); + validator.validate(dbKafkaServiceDef, Action.UPDATE); + + ret = svcStore.updateServiceDef(dbKafkaServiceDef); + if(ret==null){ + logger.error("Error while updating "+SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME+"service-def"); + throw new RuntimeException("Error while updating "+SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME+"service-def"); + } + xXServiceDefObj = daoMgr.getXXServiceDef().findByName(SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME); + if(xXServiceDefObj!=null) { + String jsonStrPostUpdate=xXServiceDefObj.getDefOptions(); + Map<String, String> serviceDefOptionsPostUpdate=jsonStringToMap(jsonStrPostUpdate); + if (serviceDefOptionsPostUpdate != null && serviceDefOptionsPostUpdate.containsKey(RangerServiceDef.OPTION_ENABLE_DENY_AND_EXCEPTIONS_IN_POLICIES)) { + if(serviceDefOptionsPreUpdate == null || !serviceDefOptionsPreUpdate.containsKey(RangerServiceDef.OPTION_ENABLE_DENY_AND_EXCEPTIONS_IN_POLICIES)) { + String preUpdateValue = serviceDefOptionsPreUpdate == null ? null : serviceDefOptionsPreUpdate.get(RangerServiceDef.OPTION_ENABLE_DENY_AND_EXCEPTIONS_IN_POLICIES); + if (preUpdateValue == null) { + serviceDefOptionsPostUpdate.remove(RangerServiceDef.OPTION_ENABLE_DENY_AND_EXCEPTIONS_IN_POLICIES); + } else { + serviceDefOptionsPostUpdate.put(RangerServiceDef.OPTION_ENABLE_DENY_AND_EXCEPTIONS_IN_POLICIES, preUpdateValue); + } + xXServiceDefObj.setDefOptions(mapToJsonString(serviceDefOptionsPostUpdate)); + daoMgr.getXXServiceDef().update(xXServiceDefObj); + } + } + } + } + } + }catch(Exception e) + { + logger.error("Error while updating "+SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME+"service-def", e); + } + } + + private boolean checkNewKafkaresourcePresent(List<RangerServiceDef.RangerResourceDef> resourceDefs) { + boolean ret = false; + for(RangerServiceDef.RangerResourceDef resourceDef : resourceDefs) { + if (TRANSACTIONALID_RESOURCE_NAME.equals(resourceDef.getName()) ) { + ret = true ; + break; + } + } + return ret; + } + + private String mapToJsonString(Map<String, String> map) { + String ret = null; + if(map != null) { + try { + ret = jsonUtil.readMapToString(map); + } catch(Exception excp) { + logger.warn("mapToJsonString() failed to convert map: " + map, excp); + } + } + return ret; + } + + protected Map<String, String> jsonStringToMap(String jsonStr) { + Map<String, String> ret = null; + if(!StringUtils.isEmpty(jsonStr)) { + try { + ret = jsonUtil.jsonToMap(jsonStr); + } catch(Exception excp) { + // fallback to earlier format: "name1=value1;name2=value2" + for(String optionString : jsonStr.split(";")) { + if(StringUtils.isEmpty(optionString)) { + continue; + } + String[] nvArr = optionString.split("="); + String name = (nvArr != null && nvArr.length > 0) ? nvArr[0].trim() : null; + String value = (nvArr != null && nvArr.length > 1) ? nvArr[1].trim() : null; + if(StringUtils.isEmpty(name)) { + continue; + } + if(ret == null) { + ret = new HashMap<String, String>(); + } + ret.put(name, value); + } + } + } + return ret; + } +} \ No newline at end of file
