Repository: incubator-ranger Updated Branches: refs/heads/master 0b725f044 -> e47756ced
RANGER-737: updated Ranger Kakfa plugin for recent changes in Kafka authorizer Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/e47756ce Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/e47756ce Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/e47756ce Branch: refs/heads/master Commit: e47756ced5b9307e4e0c29543847d9ba0f6fad2b Parents: 0b725f0 Author: Madhan Neethiraj <[email protected]> Authored: Thu Nov 19 11:16:35 2015 -0800 Committer: Madhan Neethiraj <[email protected]> Committed: Thu Nov 19 23:09:42 2015 -0800 ---------------------------------------------------------------------- .../kafka/authorizer/RangerKafkaAuthorizer.java | 68 +++++++++++++------- .../services/kafka/RangerServiceKafka.java | 37 +++++++---- .../kafka/client/ServiceKafkaClient.java | 42 ++++++++---- pom.xml | 5 +- ranger-kafka-plugin-shim/.gitignore | 1 + .../kafka/authorizer/RangerKafkaAuthorizer.java | 65 +++++++++++++------ 6 files changed, 146 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/e47756ce/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java ---------------------------------------------------------------------- diff --git a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java index c5e955d..08ff928 100644 --- a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java +++ b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java @@ -20,14 +20,14 @@ package org.apache.ranger.authorization.kafka.authorizer; import java.util.Date; +import java.util.Map; + import javax.security.auth.Subject; import kafka.security.auth.Acl; import kafka.security.auth.Authorizer; -import kafka.security.auth.KafkaPrincipal; -import kafka.security.auth.Operation; -import kafka.security.auth.Resource; -import kafka.security.auth.ResourceType; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import kafka.security.auth.*; import kafka.server.KafkaConfig; import kafka.common.security.LoginManager; import kafka.network.RequestChannel.Session; @@ -73,11 +73,10 @@ public class RangerKafkaAuthorizer implements Authorizer { /* * (non-Javadoc) * - * @see kafka.security.auth.Authorizer#initialize(kafka.server.KafkaConfig) + * @see kafka.security.auth.Authorizer#configure(Map<String, Object>) */ @Override - public void initialize(KafkaConfig kafkaConfig) { - + public void configure(Map<String, ?> configs) { if (rangerPlugin == null) { try { Subject subject = LoginManager.subject(); @@ -110,7 +109,7 @@ public class RangerKafkaAuthorizer implements Authorizer { } // TODO: If resource type if consumer group, then allow it by default - if (resource.resourceType().equals(ResourceType.CONSUMER_GROUP)) { + if (resource.resourceType().equals(Group$.MODULE$)) { return true; } @@ -124,6 +123,11 @@ public class RangerKafkaAuthorizer implements Authorizer { .getGroupsForRequestUser(userName); String ip = session.host(); + // skip leading slash + if(StringUtils.isNotEmpty(ip) && ip.charAt(0) == '/') { + ip = ip.substring(1); + } + Date eventTime = StringUtil.getUTCDate(); String accessType = mapToRangerAccessType(operation); boolean validationFailed = false; @@ -152,12 +156,12 @@ public class RangerKafkaAuthorizer implements Authorizer { rangerRequest.setAction(action); rangerRequest.setRequestData(resource.name()); - if (resource.resourceType().equals(ResourceType.TOPIC)) { + if (resource.resourceType().equals(Topic$.MODULE$)) { rangerResource.setValue(KEY_TOPIC, resource.name()); - } else if (resource.resourceType().equals(ResourceType.CLUSTER)) { + } else if (resource.resourceType().equals(Cluster$.MODULE$)) { // CLUSTER should go as null // rangerResource.setValue(KEY_CLUSTER, resource.name()); - } else if (resource.resourceType().equals(ResourceType.CONSUMER_GROUP)) { + } else if (resource.resourceType().equals(Group$.MODULE$)) { rangerResource.setValue(KEY_CONSUMER_GROUP, resource.name()); } else { logger.fatal("Unsupported resourceType=" + resource.resourceType()); @@ -201,7 +205,7 @@ public class RangerKafkaAuthorizer implements Authorizer { */ @Override public void addAcls(Set<Acl> acls, Resource resource) { - logger.error("addAcls() is not supported by Ranger for Kafka"); + logger.error("addAcls(Set<Acl>, Resource) is not supported by Ranger for Kafka"); } /* @@ -213,7 +217,7 @@ public class RangerKafkaAuthorizer implements Authorizer { */ @Override public boolean removeAcls(Set<Acl> acls, Resource resource) { - logger.error("removeAcls() is not supported by Ranger for Kafka"); + logger.error("removeAcls(Set<Acl>, Resource) is not supported by Ranger for Kafka"); return false; } @@ -225,7 +229,7 @@ public class RangerKafkaAuthorizer implements Authorizer { */ @Override public boolean removeAcls(Resource resource) { - logger.error("removeAcls() is not supported by Ranger for Kafka"); + logger.error("removeAcls(Resource) is not supported by Ranger for Kafka"); return false; } @@ -237,7 +241,7 @@ public class RangerKafkaAuthorizer implements Authorizer { @Override public Set<Acl> getAcls(Resource resource) { Set<Acl> aclList = new HashSet<Acl>(); - logger.error("getAcls() is not supported by Ranger for Kafka"); + logger.error("getAcls(Resource) is not supported by Ranger for Kafka"); return aclList; } @@ -246,12 +250,24 @@ public class RangerKafkaAuthorizer implements Authorizer { * (non-Javadoc) * * @see - * kafka.security.auth.Authorizer#getAcls(kafka.security.auth.KafkaPrincipal - * ) + * kafka.security.auth.Authorizer#getAcls(kafka.security.auth.KafkaPrincipal) */ @Override - public Set<Acl> getAcls(KafkaPrincipal principal) { - Set<Acl> aclList = new HashSet<Acl>(); + public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls(KafkaPrincipal principal) { + scala.collection.immutable.Map<Resource, Set<Acl>> aclList = new scala.collection.immutable.HashMap<Resource, Set<Acl>>(); + logger.error("getAcls(KafkaPrincipal) is not supported by Ranger for Kafka"); + return aclList; + } + + /* + * (non-Javadoc) + * + * @see + * kafka.security.auth.Authorizer#getAcls() + */ + @Override + public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls() { + scala.collection.immutable.Map<Resource, Set<Acl>> aclList = new scala.collection.immutable.HashMap<Resource, Set<Acl>>(); logger.error("getAcls() is not supported by Ranger for Kafka"); return aclList; } @@ -261,16 +277,20 @@ public class RangerKafkaAuthorizer implements Authorizer { * @return */ private String mapToRangerAccessType(Operation operation) { - if (operation.equals(Operation.READ)) { + if (operation.equals(Read$.MODULE$)) { return ACCESS_TYPE_READ; - } else if (operation.equals(Operation.WRITE)) { + } else if (operation.equals(Write$.MODULE$)) { return ACCESS_TYPE_WRITE; - } else if (operation.equals(Operation.ALTER)) { + } else if (operation.equals(Alter$.MODULE$)) { return ACCESS_TYPE_CONFIGURE; - } else if (operation.equals(Operation.DESCRIBE)) { + } else if (operation.equals(Describe$.MODULE$)) { return ACCESS_TYPE_DESCRIBE; - } else if (operation.equals(Operation.CLUSTER_ACTION)) { + } else if (operation.equals(ClusterAction$.MODULE$)) { return ACCESS_TYPE_KAFKA_ADMIN; + } else if (operation.equals(Create$.MODULE$)) { + return ACCESS_TYPE_CREATE; + } else if (operation.equals(Delete$.MODULE$)) { + return ACCESS_TYPE_DELETE; } return null; } http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/e47756ce/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/RangerServiceKafka.java ---------------------------------------------------------------------- diff --git a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/RangerServiceKafka.java b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/RangerServiceKafka.java index ea6d316..8a82b2f 100644 --- a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/RangerServiceKafka.java +++ b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/RangerServiceKafka.java @@ -31,7 +31,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; public class RangerServiceKafka extends RangerBaseService { - private static final Log LOG = LogFactory.getLog(RangerServiceKafka.class); public RangerServiceKafka() { @@ -46,33 +45,45 @@ public class RangerServiceKafka extends RangerBaseService { @Override public HashMap<String, Object> validateConfig() throws Exception { HashMap<String, Object> ret = new HashMap<String, Object>(); - String serviceName = getServiceName(); + if (LOG.isDebugEnabled()) { - LOG.debug("==> RangerServiceKafka.validateConfig Service: (" - + serviceName + " )"); + LOG.debug("==> RangerServiceKafka.validateConfig(" + serviceName + ")"); } + if (configs != null) { try { - ret = ServiceKafkaConnectionMgr.testConnection(serviceName, - configs); + ret = ServiceKafkaConnectionMgr.testConnection(serviceName, configs); } catch (Exception e) { LOG.error("<== RangerServiceKafka.validateConfig Error:" + e); throw e; } } + if (LOG.isDebugEnabled()) { - LOG.debug("<== RangerServiceKafka.validateConfig Response : (" + ret - + " )"); + LOG.debug("<== RangerServiceKafka.validateConfig(" + serviceName + "): ret=" + ret); } + return ret; } @Override - public List<String> lookupResource(ResourceLookupContext context) - throws Exception { + public List<String> lookupResource(ResourceLookupContext context) throws Exception { + List<String> ret = null; + + if (LOG.isDebugEnabled()) { + LOG.debug("==> RangerServiceKafka.lookupResource(" + serviceName + ")"); + } - ServiceKafkaClient serviceKafkaClient = ServiceKafkaConnectionMgr - .getKafkaClient(serviceName, configs); - return serviceKafkaClient.getResources(context); + if(configs != null) { + ServiceKafkaClient serviceKafkaClient = ServiceKafkaConnectionMgr.getKafkaClient(serviceName, configs); + + ret = serviceKafkaClient.getResources(context); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== RangerServiceKafka.lookupResource(" + serviceName + "): ret=" + ret); + } + + return ret; } } http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/e47756ce/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java ---------------------------------------------------------------------- diff --git a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java index 0698bf6..f5c04fe 100644 --- a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java +++ b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java @@ -28,8 +28,9 @@ import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import kafka.utils.ZkUtils; - -import org.I0Itec.zkclient.ZkClient; +import kafka.utils.ZkUtils$; +import org.apache.kafka.common.security.JaasUtils; +import org.I0Itec.zkclient.*; import org.apache.log4j.Logger; import org.apache.ranger.plugin.client.BaseClient; import org.apache.ranger.plugin.service.ResourceLookupContext; @@ -79,31 +80,48 @@ public class ServiceKafkaClient { return responseData; } - public List<String> getTopicList(List<String> ignoreTopicList) - throws Exception { + private List<String> getTopicList(List<String> ignoreTopicList) throws Exception { + List<String> ret = new ArrayList<String>(); - List<String> list = new ArrayList<String>(); + int sessionTimeout = 5000; + int connectionTimeout = 10000; + ZkClient zkClient = null; + ZkConnection zkConnection = null; - ZkClient zkClient = new ZkClient(zookeeperConnect); try { - Seq<String> topicList = ZkUtils.getChildrenParentMayNotExist( - zkClient, ZkUtils.BrokerTopicsPath()); + zkClient = ZkUtils$.MODULE$.createZkClient(zookeeperConnect, sessionTimeout, connectionTimeout); + zkConnection = new ZkConnection(zookeeperConnect, sessionTimeout); + + boolean zkSecurityEnabled = JaasUtils.isZkSecurityEnabled(); + ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, true); + Seq<String> topicList = zkUtils.getChildrenParentMayNotExist(ZkUtils.BrokerTopicsPath()); Iterator<String> iter = topicList.iterator(); while (iter.hasNext()) { String topic = iter.next(); if (ignoreTopicList == null || !ignoreTopicList.contains(topic)) { - list.add(topic); + ret.add(topic); } } } finally { try { - zkClient.close(); + if(zkClient != null) { + zkClient.close(); + } } catch (Exception ex) { - LOG.error("Error closing zookeeper", ex); + LOG.error("Error closing zkClient", ex); + } + + try { + if(zkConnection != null) { + zkConnection.close(); + } + + } catch(Exception ex) { + LOG.error("Error closing zkConnection", ex); } } - return list; + return ret; } /** http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/e47756ce/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index d60fca4..1b183b4 100644 --- a/pom.xml +++ b/pom.xml @@ -159,7 +159,7 @@ <jersey-client.version>2.6</jersey-client.version> <junit.version>4.11</junit.version> <kafka.version>0.8.2.0</kafka.version> - <!-- <kafka.version>0.8.2.2.3.2.0-2950</kafka.version> --> + <!-- <kafka.version>0.8.2.2.3.4.0-3288</kafka.version> --> <mockito.version>1.8.4</mockito.version> <hamcrest-version>1.3</hamcrest-version> <knox.gateway.version>0.6.0</knox.gateway.version> @@ -233,7 +233,8 @@ <profile> <id>kafka-security</id> <modules> - <module>plugin-kafka</module> + <module>plugin-kafka</module> + <module>ranger-kafka-plugin-shim</module> </modules> </profile> </profiles> http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/e47756ce/ranger-kafka-plugin-shim/.gitignore ---------------------------------------------------------------------- diff --git a/ranger-kafka-plugin-shim/.gitignore b/ranger-kafka-plugin-shim/.gitignore new file mode 100644 index 0000000..b83d222 --- /dev/null +++ b/ranger-kafka-plugin-shim/.gitignore @@ -0,0 +1 @@ +/target/ http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/e47756ce/ranger-kafka-plugin-shim/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java ---------------------------------------------------------------------- diff --git a/ranger-kafka-plugin-shim/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java b/ranger-kafka-plugin-shim/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java index d39cac2..0937835 100644 --- a/ranger-kafka-plugin-shim/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java +++ b/ranger-kafka-plugin-shim/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java @@ -19,6 +19,8 @@ package org.apache.ranger.authorization.kafka.authorizer; +import java.util.Map; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.ranger.plugin.classloader.RangerPluginClassLoader; @@ -27,10 +29,9 @@ import scala.collection.immutable.Set; import kafka.network.RequestChannel.Session; import kafka.security.auth.Acl; import kafka.security.auth.Authorizer; -import kafka.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.KafkaPrincipal; import kafka.security.auth.Operation; import kafka.security.auth.Resource; -import kafka.server.KafkaConfig; //public class RangerKafkaAuthorizer extends Authorizer { @@ -82,31 +83,30 @@ public class RangerKafkaAuthorizer implements Authorizer { LOG.debug("<== RangerKafkaAuthorizer.init()"); } } - - + @Override - public void initialize(KafkaConfig kafkaConfig) { + public void configure(Map<String, ?> configs) { if(LOG.isDebugEnabled()) { - LOG.debug("==> RangerKafkaAuthorizer.initialize()"); + LOG.debug("==> RangerKafkaAuthorizer.configure(Map<String, ?>)"); } try { activatePluginClassLoader(); - rangerKakfaAuthorizerImpl.initialize(kafkaConfig); + rangerKakfaAuthorizerImpl.configure(configs); } finally { deactivatePluginClassLoader(); } if(LOG.isDebugEnabled()) { - LOG.debug("<== RangerKafkaAuthorizer.initialize()"); + LOG.debug("<== RangerKafkaAuthorizer.configure(Map<String, ?>)"); } } @Override public boolean authorize(Session session, Operation operation,Resource resource) { if(LOG.isDebugEnabled()) { - LOG.debug("==> RangerKafkaAuthorizer.authorize()"); + LOG.debug("==> RangerKafkaAuthorizer.authorize(Session, Operation, Resource)"); } boolean ret = false; @@ -120,7 +120,7 @@ public class RangerKafkaAuthorizer implements Authorizer { } if(LOG.isDebugEnabled()) { - LOG.debug("<== RangerKafkaAuthorizer.authorize()"); + LOG.debug("<== RangerKafkaAuthorizer.authorize(Session, Operation, Resource)"); } return ret; @@ -129,7 +129,7 @@ public class RangerKafkaAuthorizer implements Authorizer { @Override public void addAcls(Set<Acl> acls, Resource resource) { if(LOG.isDebugEnabled()) { - LOG.debug("==> RangerKafkaAuthorizer.addAcls()"); + LOG.debug("==> RangerKafkaAuthorizer.addAcls(Set<Acl>, Resource)"); } try { @@ -141,14 +141,14 @@ public class RangerKafkaAuthorizer implements Authorizer { } if(LOG.isDebugEnabled()) { - LOG.debug("<== RangerKafkaAuthorizer.addAcls()"); + LOG.debug("<== RangerKafkaAuthorizer.addAcls(Set<Acl>, Resource)"); } } @Override public boolean removeAcls(Set<Acl> acls, Resource resource) { if(LOG.isDebugEnabled()) { - LOG.debug("==> RangerKafkaAuthorizer.removeAcls()"); + LOG.debug("==> RangerKafkaAuthorizer.removeAcls(Set<Acl>, Resource)"); } boolean ret = false; try { @@ -160,7 +160,7 @@ public class RangerKafkaAuthorizer implements Authorizer { } if(LOG.isDebugEnabled()) { - LOG.debug("<== RangerKafkaAuthorizer.removeAcls()"); + LOG.debug("<== RangerKafkaAuthorizer.removeAcls(Set<Acl>, Resource)"); } return ret; @@ -169,7 +169,7 @@ public class RangerKafkaAuthorizer implements Authorizer { @Override public boolean removeAcls(Resource resource) { if(LOG.isDebugEnabled()) { - LOG.debug("==> RangerKafkaAuthorizer.removeAcls()"); + LOG.debug("==> RangerKafkaAuthorizer.removeAcls(Resource)"); } boolean ret = false; try { @@ -181,7 +181,7 @@ public class RangerKafkaAuthorizer implements Authorizer { } if(LOG.isDebugEnabled()) { - LOG.debug("<== RangerKafkaAuthorizer.removeAcls()"); + LOG.debug("<== RangerKafkaAuthorizer.removeAcls(Resource)"); } return ret; @@ -190,7 +190,7 @@ public class RangerKafkaAuthorizer implements Authorizer { @Override public Set<Acl> getAcls(Resource resource) { if(LOG.isDebugEnabled()) { - LOG.debug("==> RangerKafkaAuthorizer.getAcls()"); + LOG.debug("==> RangerKafkaAuthorizer.getAcls(Resource)"); } Set<Acl> ret = null; @@ -204,19 +204,19 @@ public class RangerKafkaAuthorizer implements Authorizer { } if(LOG.isDebugEnabled()) { - LOG.debug("<== RangerKafkaAuthorizer.getAcls()"); + LOG.debug("<== RangerKafkaAuthorizer.getAcls(Resource)"); } return ret; } @Override - public Set<Acl> getAcls(KafkaPrincipal principal) { + public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls(KafkaPrincipal principal) { if(LOG.isDebugEnabled()) { - LOG.debug("==> RangerKafkaAuthorizer.getAcls()"); + LOG.debug("==> RangerKafkaAuthorizer.getAcls(KafkaPrincipal)"); } - Set<Acl> ret = null; + scala.collection.immutable.Map<Resource, Set<Acl>> ret = null; try { activatePluginClassLoader(); @@ -227,6 +227,29 @@ public class RangerKafkaAuthorizer implements Authorizer { } if(LOG.isDebugEnabled()) { + LOG.debug("<== RangerKafkaAuthorizer.getAcls(KafkaPrincipal)"); + } + + return ret; + } + + @Override + public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls() { + if(LOG.isDebugEnabled()) { + LOG.debug("==> RangerKafkaAuthorizer.getAcls()"); + } + + scala.collection.immutable.Map<Resource, Set<Acl>> ret = null; + + try { + activatePluginClassLoader(); + + ret = rangerKakfaAuthorizerImpl.getAcls(); + } finally { + deactivatePluginClassLoader(); + } + + if(LOG.isDebugEnabled()) { LOG.debug("<== RangerKafkaAuthorizer.getAcls()"); }
