Repository: incubator-ranger
Updated Branches:
  refs/heads/master 0b725f044 -> e47756ced


RANGER-737: updated Ranger Kakfa plugin for recent changes in Kafka authorizer


Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/e47756ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/e47756ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/e47756ce

Branch: refs/heads/master
Commit: e47756ced5b9307e4e0c29543847d9ba0f6fad2b
Parents: 0b725f0
Author: Madhan Neethiraj <[email protected]>
Authored: Thu Nov 19 11:16:35 2015 -0800
Committer: Madhan Neethiraj <[email protected]>
Committed: Thu Nov 19 23:09:42 2015 -0800

----------------------------------------------------------------------
 .../kafka/authorizer/RangerKafkaAuthorizer.java | 68 +++++++++++++-------
 .../services/kafka/RangerServiceKafka.java      | 37 +++++++----
 .../kafka/client/ServiceKafkaClient.java        | 42 ++++++++----
 pom.xml                                         |  5 +-
 ranger-kafka-plugin-shim/.gitignore             |  1 +
 .../kafka/authorizer/RangerKafkaAuthorizer.java | 65 +++++++++++++------
 6 files changed, 146 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/e47756ce/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
----------------------------------------------------------------------
diff --git 
a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
 
b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
index c5e955d..08ff928 100644
--- 
a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
+++ 
b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
@@ -20,14 +20,14 @@
 package org.apache.ranger.authorization.kafka.authorizer;
 
 import java.util.Date;
+import java.util.Map;
+
 import javax.security.auth.Subject;
 
 import kafka.security.auth.Acl;
 import kafka.security.auth.Authorizer;
-import kafka.security.auth.KafkaPrincipal;
-import kafka.security.auth.Operation;
-import kafka.security.auth.Resource;
-import kafka.security.auth.ResourceType;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import kafka.security.auth.*;
 import kafka.server.KafkaConfig;
 import kafka.common.security.LoginManager;
 import kafka.network.RequestChannel.Session;
@@ -73,11 +73,10 @@ public class RangerKafkaAuthorizer implements Authorizer {
        /*
         * (non-Javadoc)
         * 
-        * @see 
kafka.security.auth.Authorizer#initialize(kafka.server.KafkaConfig)
+        * @see kafka.security.auth.Authorizer#configure(Map<String, Object>)
         */
        @Override
-       public void initialize(KafkaConfig kafkaConfig) {
-
+       public void configure(Map<String, ?> configs) {
                if (rangerPlugin == null) {
                        try {
                                Subject subject = LoginManager.subject();
@@ -110,7 +109,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
                }
 
                // TODO: If resource type if consumer group, then allow it by 
default
-               if 
(resource.resourceType().equals(ResourceType.CONSUMER_GROUP)) {
+               if (resource.resourceType().equals(Group$.MODULE$)) {
                        return true;
                }
 
@@ -124,6 +123,11 @@ public class RangerKafkaAuthorizer implements Authorizer {
                                .getGroupsForRequestUser(userName);
                String ip = session.host();
 
+               // skip leading slash
+               if(StringUtils.isNotEmpty(ip) && ip.charAt(0) == '/') {
+                       ip = ip.substring(1);
+               }
+
                Date eventTime = StringUtil.getUTCDate();
                String accessType = mapToRangerAccessType(operation);
                boolean validationFailed = false;
@@ -152,12 +156,12 @@ public class RangerKafkaAuthorizer implements Authorizer {
                rangerRequest.setAction(action);
                rangerRequest.setRequestData(resource.name());
 
-               if (resource.resourceType().equals(ResourceType.TOPIC)) {
+               if (resource.resourceType().equals(Topic$.MODULE$)) {
                        rangerResource.setValue(KEY_TOPIC, resource.name());
-               } else if 
(resource.resourceType().equals(ResourceType.CLUSTER)) {
+               } else if (resource.resourceType().equals(Cluster$.MODULE$)) {
                        // CLUSTER should go as null
                        // rangerResource.setValue(KEY_CLUSTER, 
resource.name());
-               } else if 
(resource.resourceType().equals(ResourceType.CONSUMER_GROUP)) {
+               } else if (resource.resourceType().equals(Group$.MODULE$)) {
                        rangerResource.setValue(KEY_CONSUMER_GROUP, 
resource.name());
                } else {
                        logger.fatal("Unsupported resourceType=" + 
resource.resourceType());
@@ -201,7 +205,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
         */
        @Override
        public void addAcls(Set<Acl> acls, Resource resource) {
-               logger.error("addAcls() is not supported by Ranger for Kafka");
+               logger.error("addAcls(Set<Acl>, Resource) is not supported by 
Ranger for Kafka");
        }
 
        /*
@@ -213,7 +217,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
         */
        @Override
        public boolean removeAcls(Set<Acl> acls, Resource resource) {
-               logger.error("removeAcls() is not supported by Ranger for 
Kafka");
+               logger.error("removeAcls(Set<Acl>, Resource) is not supported 
by Ranger for Kafka");
                return false;
        }
 
@@ -225,7 +229,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
         */
        @Override
        public boolean removeAcls(Resource resource) {
-               logger.error("removeAcls() is not supported by Ranger for 
Kafka");
+               logger.error("removeAcls(Resource) is not supported by Ranger 
for Kafka");
                return false;
        }
 
@@ -237,7 +241,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
        @Override
        public Set<Acl> getAcls(Resource resource) {
                Set<Acl> aclList = new HashSet<Acl>();
-               logger.error("getAcls() is not supported by Ranger for Kafka");
+               logger.error("getAcls(Resource) is not supported by Ranger for 
Kafka");
 
                return aclList;
        }
@@ -246,12 +250,24 @@ public class RangerKafkaAuthorizer implements Authorizer {
         * (non-Javadoc)
         * 
         * @see
-        * 
kafka.security.auth.Authorizer#getAcls(kafka.security.auth.KafkaPrincipal
-        * )
+        * 
kafka.security.auth.Authorizer#getAcls(kafka.security.auth.KafkaPrincipal)
         */
        @Override
-       public Set<Acl> getAcls(KafkaPrincipal principal) {
-               Set<Acl> aclList = new HashSet<Acl>();
+       public scala.collection.immutable.Map<Resource, Set<Acl>> 
getAcls(KafkaPrincipal principal) {
+               scala.collection.immutable.Map<Resource, Set<Acl>> aclList = 
new scala.collection.immutable.HashMap<Resource, Set<Acl>>();
+               logger.error("getAcls(KafkaPrincipal) is not supported by 
Ranger for Kafka");
+               return aclList;
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see
+        * kafka.security.auth.Authorizer#getAcls()
+        */
+       @Override
+       public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls() {
+               scala.collection.immutable.Map<Resource, Set<Acl>> aclList = 
new scala.collection.immutable.HashMap<Resource, Set<Acl>>();
                logger.error("getAcls() is not supported by Ranger for Kafka");
                return aclList;
        }
@@ -261,16 +277,20 @@ public class RangerKafkaAuthorizer implements Authorizer {
         * @return
         */
        private String mapToRangerAccessType(Operation operation) {
-               if (operation.equals(Operation.READ)) {
+               if (operation.equals(Read$.MODULE$)) {
                        return ACCESS_TYPE_READ;
-               } else if (operation.equals(Operation.WRITE)) {
+               } else if (operation.equals(Write$.MODULE$)) {
                        return ACCESS_TYPE_WRITE;
-               } else if (operation.equals(Operation.ALTER)) {
+               } else if (operation.equals(Alter$.MODULE$)) {
                        return ACCESS_TYPE_CONFIGURE;
-               } else if (operation.equals(Operation.DESCRIBE)) {
+               } else if (operation.equals(Describe$.MODULE$)) {
                        return ACCESS_TYPE_DESCRIBE;
-               } else if (operation.equals(Operation.CLUSTER_ACTION)) {
+               } else if (operation.equals(ClusterAction$.MODULE$)) {
                        return ACCESS_TYPE_KAFKA_ADMIN;
+               } else if (operation.equals(Create$.MODULE$)) {
+                       return ACCESS_TYPE_CREATE;
+               } else if (operation.equals(Delete$.MODULE$)) {
+                       return ACCESS_TYPE_DELETE;
                }
                return null;
        }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/e47756ce/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/RangerServiceKafka.java
----------------------------------------------------------------------
diff --git 
a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/RangerServiceKafka.java
 
b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/RangerServiceKafka.java
index ea6d316..8a82b2f 100644
--- 
a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/RangerServiceKafka.java
+++ 
b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/RangerServiceKafka.java
@@ -31,7 +31,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 public class RangerServiceKafka extends RangerBaseService {
-
        private static final Log LOG = 
LogFactory.getLog(RangerServiceKafka.class);
 
        public RangerServiceKafka() {
@@ -46,33 +45,45 @@ public class RangerServiceKafka extends RangerBaseService {
        @Override
        public HashMap<String, Object> validateConfig() throws Exception {
                HashMap<String, Object> ret = new HashMap<String, Object>();
-               String serviceName = getServiceName();
+
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> RangerServiceKafka.validateConfig 
Service: ("
-                                       + serviceName + " )");
+                       LOG.debug("==> RangerServiceKafka.validateConfig(" + 
serviceName + ")");
                }
+
                if (configs != null) {
                        try {
-                               ret = 
ServiceKafkaConnectionMgr.testConnection(serviceName,
-                                               configs);
+                               ret = 
ServiceKafkaConnectionMgr.testConnection(serviceName, configs);
                        } catch (Exception e) {
                                LOG.error("<== 
RangerServiceKafka.validateConfig Error:" + e);
                                throw e;
                        }
                }
+
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== RangerServiceKafka.validateConfig 
Response : (" + ret
-                                       + " )");
+                       LOG.debug("<== RangerServiceKafka.validateConfig(" + 
serviceName + "): ret=" + ret);
                }
+
                return ret;
        }
 
        @Override
-       public List<String> lookupResource(ResourceLookupContext context)
-                       throws Exception {
+       public List<String> lookupResource(ResourceLookupContext context) 
throws Exception {
+               List<String> ret = null;
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> RangerServiceKafka.lookupResource(" + 
serviceName + ")");
+               }
 
-               ServiceKafkaClient serviceKafkaClient = 
ServiceKafkaConnectionMgr
-                               .getKafkaClient(serviceName, configs);
-               return serviceKafkaClient.getResources(context);
+               if(configs != null) {
+                       ServiceKafkaClient serviceKafkaClient = 
ServiceKafkaConnectionMgr.getKafkaClient(serviceName, configs);
+
+                       ret = serviceKafkaClient.getResources(context);
+               }
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("<== RangerServiceKafka.lookupResource(" + 
serviceName + "): ret=" + ret);
+               }
+
+               return ret;
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/e47756ce/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
----------------------------------------------------------------------
diff --git 
a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
 
b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
index 0698bf6..f5c04fe 100644
--- 
a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
+++ 
b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java
@@ -28,8 +28,9 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
 import kafka.utils.ZkUtils;
-
-import org.I0Itec.zkclient.ZkClient;
+import kafka.utils.ZkUtils$;
+import org.apache.kafka.common.security.JaasUtils;
+import org.I0Itec.zkclient.*;
 import org.apache.log4j.Logger;
 import org.apache.ranger.plugin.client.BaseClient;
 import org.apache.ranger.plugin.service.ResourceLookupContext;
@@ -79,31 +80,48 @@ public class ServiceKafkaClient {
                return responseData;
        }
 
-       public List<String> getTopicList(List<String> ignoreTopicList)
-                       throws Exception {
+       private List<String> getTopicList(List<String> ignoreTopicList) throws 
Exception {
+               List<String> ret = new ArrayList<String>();
 
-               List<String> list = new ArrayList<String>();
+               int          sessionTimeout    = 5000;
+        int          connectionTimeout = 10000;
+               ZkClient     zkClient          = null;
+               ZkConnection zkConnection      = null;
 
-               ZkClient zkClient = new ZkClient(zookeeperConnect);
                try {
-                       Seq<String> topicList = 
ZkUtils.getChildrenParentMayNotExist(
-                                       zkClient, ZkUtils.BrokerTopicsPath());
+               zkClient     = 
ZkUtils$.MODULE$.createZkClient(zookeeperConnect, sessionTimeout, 
connectionTimeout);
+               zkConnection = new ZkConnection(zookeeperConnect, 
sessionTimeout);
+
+               boolean      zkSecurityEnabled = 
JaasUtils.isZkSecurityEnabled();
+               ZkUtils      zkUtils           = new ZkUtils(zkClient, 
zkConnection, true);
+               Seq<String>  topicList         = 
zkUtils.getChildrenParentMayNotExist(ZkUtils.BrokerTopicsPath());
 
                        Iterator<String> iter = topicList.iterator();
                        while (iter.hasNext()) {
                                String topic = iter.next();
                                if (ignoreTopicList == null || 
!ignoreTopicList.contains(topic)) {
-                                       list.add(topic);
+                                       ret.add(topic);
                                }
                        }
                } finally {
                        try {
-                               zkClient.close();
+                               if(zkClient != null) {
+                                       zkClient.close();
+                               }
                        } catch (Exception ex) {
-                               LOG.error("Error closing zookeeper", ex);
+                               LOG.error("Error closing zkClient", ex);
+                       }
+                       
+                       try {
+                               if(zkConnection != null) {
+                                       zkConnection.close();
+                               }
+                               
+                       } catch(Exception ex) {
+                               LOG.error("Error closing zkConnection", ex);
                        }
                }
-               return list;
+               return ret;
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/e47756ce/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d60fca4..1b183b4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -159,7 +159,7 @@
                <jersey-client.version>2.6</jersey-client.version>
                <junit.version>4.11</junit.version>
                <kafka.version>0.8.2.0</kafka.version>
-               <!-- <kafka.version>0.8.2.2.3.2.0-2950</kafka.version> -->
+               <!-- <kafka.version>0.8.2.2.3.4.0-3288</kafka.version> -->
                <mockito.version>1.8.4</mockito.version>
                <hamcrest-version>1.3</hamcrest-version>
                <knox.gateway.version>0.6.0</knox.gateway.version>
@@ -233,7 +233,8 @@
       <profile>
           <id>kafka-security</id>
          <modules>
-                <module>plugin-kafka</module>         
+             <module>plugin-kafka</module>
+             <module>ranger-kafka-plugin-shim</module>
          </modules>
       </profile>
   </profiles>

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/e47756ce/ranger-kafka-plugin-shim/.gitignore
----------------------------------------------------------------------
diff --git a/ranger-kafka-plugin-shim/.gitignore 
b/ranger-kafka-plugin-shim/.gitignore
new file mode 100644
index 0000000..b83d222
--- /dev/null
+++ b/ranger-kafka-plugin-shim/.gitignore
@@ -0,0 +1 @@
+/target/

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/e47756ce/ranger-kafka-plugin-shim/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
----------------------------------------------------------------------
diff --git 
a/ranger-kafka-plugin-shim/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
 
b/ranger-kafka-plugin-shim/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
index d39cac2..0937835 100644
--- 
a/ranger-kafka-plugin-shim/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
+++ 
b/ranger-kafka-plugin-shim/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
@@ -19,6 +19,8 @@
 
 package org.apache.ranger.authorization.kafka.authorizer;
 
+import java.util.Map;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ranger.plugin.classloader.RangerPluginClassLoader;
@@ -27,10 +29,9 @@ import scala.collection.immutable.Set;
 import kafka.network.RequestChannel.Session;
 import kafka.security.auth.Acl;
 import kafka.security.auth.Authorizer;
-import kafka.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import kafka.security.auth.Operation;
 import kafka.security.auth.Resource;
-import kafka.server.KafkaConfig;
 
 
 //public class RangerKafkaAuthorizer extends Authorizer {
@@ -82,31 +83,30 @@ public class RangerKafkaAuthorizer implements Authorizer {
                        LOG.debug("<== RangerKafkaAuthorizer.init()");
                }
        }
-       
-       
+
        @Override
-       public void initialize(KafkaConfig kafkaConfig) {
+       public void configure(Map<String, ?> configs) {
                if(LOG.isDebugEnabled()) {
-                       LOG.debug("==> RangerKafkaAuthorizer.initialize()");
+                       LOG.debug("==> 
RangerKafkaAuthorizer.configure(Map<String, ?>)");
                }
 
                try {
                        activatePluginClassLoader();
 
-                       rangerKakfaAuthorizerImpl.initialize(kafkaConfig);
+                       rangerKakfaAuthorizerImpl.configure(configs);
                } finally {
                        deactivatePluginClassLoader();
                }
 
                if(LOG.isDebugEnabled()) {
-                       LOG.debug("<== RangerKafkaAuthorizer.initialize()");
+                       LOG.debug("<== 
RangerKafkaAuthorizer.configure(Map<String, ?>)");
                }
        }
 
        @Override
        public boolean authorize(Session session, Operation operation,Resource 
resource) {      
                if(LOG.isDebugEnabled()) {
-                       LOG.debug("==> RangerKafkaAuthorizer.authorize()");
+                       LOG.debug("==> RangerKafkaAuthorizer.authorize(Session, 
Operation, Resource)");
                }
 
                boolean ret = false;
@@ -120,7 +120,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
                }
 
                if(LOG.isDebugEnabled()) {
-                       LOG.debug("<== RangerKafkaAuthorizer.authorize()");
+                       LOG.debug("<== RangerKafkaAuthorizer.authorize(Session, 
Operation, Resource)");
                }
                
                return ret;
@@ -129,7 +129,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
        @Override
        public void addAcls(Set<Acl> acls, Resource resource) {
                if(LOG.isDebugEnabled()) {
-                       LOG.debug("==> RangerKafkaAuthorizer.addAcls()");
+                       LOG.debug("==> RangerKafkaAuthorizer.addAcls(Set<Acl>, 
Resource)");
                }
 
                try {
@@ -141,14 +141,14 @@ public class RangerKafkaAuthorizer implements Authorizer {
                }
 
                if(LOG.isDebugEnabled()) {
-                       LOG.debug("<== RangerKafkaAuthorizer.addAcls()");
+                       LOG.debug("<== RangerKafkaAuthorizer.addAcls(Set<Acl>, 
Resource)");
                }
        }
 
        @Override
        public boolean removeAcls(Set<Acl> acls, Resource resource) {
                if(LOG.isDebugEnabled()) {
-                       LOG.debug("==> RangerKafkaAuthorizer.removeAcls()");
+                       LOG.debug("==> 
RangerKafkaAuthorizer.removeAcls(Set<Acl>, Resource)");
                }
                boolean ret = false;
                try {
@@ -160,7 +160,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
                }
 
                if(LOG.isDebugEnabled()) {
-                       LOG.debug("<== RangerKafkaAuthorizer.removeAcls()");
+                       LOG.debug("<== 
RangerKafkaAuthorizer.removeAcls(Set<Acl>, Resource)");
                }
                
                return ret;
@@ -169,7 +169,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
        @Override
        public boolean removeAcls(Resource resource) {
                if(LOG.isDebugEnabled()) {
-                       LOG.debug("==> RangerKafkaAuthorizer.removeAcls()");
+                       LOG.debug("==> 
RangerKafkaAuthorizer.removeAcls(Resource)");
                }
                boolean ret = false;
                try {
@@ -181,7 +181,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
                }
 
                if(LOG.isDebugEnabled()) {
-                       LOG.debug("<== RangerKafkaAuthorizer.removeAcls()");
+                       LOG.debug("<== 
RangerKafkaAuthorizer.removeAcls(Resource)");
                }
 
                return ret;
@@ -190,7 +190,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
        @Override
        public Set<Acl> getAcls(Resource resource) {
                if(LOG.isDebugEnabled()) {
-                       LOG.debug("==> RangerKafkaAuthorizer.getAcls()");
+                       LOG.debug("==> 
RangerKafkaAuthorizer.getAcls(Resource)");
                }
                
                Set<Acl> ret = null;
@@ -204,19 +204,19 @@ public class RangerKafkaAuthorizer implements Authorizer {
                }
 
                if(LOG.isDebugEnabled()) {
-                       LOG.debug("<== RangerKafkaAuthorizer.getAcls()");
+                       LOG.debug("<== 
RangerKafkaAuthorizer.getAcls(Resource)");
                }
 
                return ret;
        }
 
        @Override
-       public Set<Acl> getAcls(KafkaPrincipal principal) {
+       public scala.collection.immutable.Map<Resource, Set<Acl>> 
getAcls(KafkaPrincipal principal) {
                if(LOG.isDebugEnabled()) {
-                       LOG.debug("==> RangerKafkaAuthorizer.getAcls()");
+                       LOG.debug("==> 
RangerKafkaAuthorizer.getAcls(KafkaPrincipal)");
                }
 
-               Set<Acl> ret = null;
+               scala.collection.immutable.Map<Resource, Set<Acl>> ret = null;
 
                try {
                        activatePluginClassLoader();
@@ -227,6 +227,29 @@ public class RangerKafkaAuthorizer implements Authorizer {
                }
 
                if(LOG.isDebugEnabled()) {
+                       LOG.debug("<== 
RangerKafkaAuthorizer.getAcls(KafkaPrincipal)");
+               }
+
+               return ret;
+       }
+
+       @Override
+       public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls() {
+               if(LOG.isDebugEnabled()) {
+                       LOG.debug("==> RangerKafkaAuthorizer.getAcls()");
+               }
+
+               scala.collection.immutable.Map<Resource, Set<Acl>> ret = null;
+
+               try {
+                       activatePluginClassLoader();
+
+                       ret = rangerKakfaAuthorizerImpl.getAcls();
+               } finally {
+                       deactivatePluginClassLoader();
+               }
+
+               if(LOG.isDebugEnabled()) {
                        LOG.debug("<== RangerKafkaAuthorizer.getAcls()");
                }
 

Reply via email to