Repository: incubator-ranger
Updated Branches:
  refs/heads/ranger-0.5 9cbff669b -> 4993b50a5


RANGER-740: Kafka Authorizer interface has added close() method. Ranger
should also implement it


Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/4993b50a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/4993b50a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/4993b50a

Branch: refs/heads/ranger-0.5
Commit: 4993b50a58396d9422421bda4c1e11d1706afa33
Parents: 9cbff66
Author: Don Bosco Durai <[email protected]>
Authored: Tue Nov 24 12:52:19 2015 -0800
Committer: Velmurugan Periasamy <[email protected]>
Committed: Tue Nov 24 16:53:06 2015 -0500

----------------------------------------------------------------------
 .../kafka/authorizer/RangerKafkaAuthorizer.java | 33 ++++++++++++++++----
 1 file changed, 27 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4993b50a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
----------------------------------------------------------------------
diff --git 
a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
 
b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
index 08ff928..29c2ceb 100644
--- 
a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
+++ 
b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
@@ -26,7 +26,9 @@ import javax.security.auth.Subject;
 
 import kafka.security.auth.Acl;
 import kafka.security.auth.Authorizer;
+
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
+
 import kafka.security.auth.*;
 import kafka.server.KafkaConfig;
 import kafka.common.security.LoginManager;
@@ -99,8 +101,26 @@ public class RangerKafkaAuthorizer implements Authorizer {
                }
        }
 
+       /*
+        * (non-Javadoc)
+        * 
+        * @see kafka.security.auth.Authorizer#configure(Map<String, Object>)
+        */
        @Override
-       public boolean authorize(Session session, Operation operation, Resource 
resource) {
+       public void close() {
+               logger.info("close() called on authorizer.");
+               try {
+                       if (rangerPlugin != null) {
+                               rangerPlugin.cleanup();
+                       }
+               } catch (Throwable t) {
+                       logger.error("Error closing RangerPlugin.", t);
+               }
+       }
+
+       @Override
+       public boolean authorize(Session session, Operation operation,
+                       Resource resource) {
 
                if (rangerPlugin == null) {
                        MiscUtil.logErrorMessageByInterval(logger,
@@ -124,7 +144,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
                String ip = session.host();
 
                // skip leading slash
-               if(StringUtils.isNotEmpty(ip) && ip.charAt(0) == '/') {
+               if (StringUtils.isNotEmpty(ip) && ip.charAt(0) == '/') {
                        ip = ip.substring(1);
                }
 
@@ -250,10 +270,12 @@ public class RangerKafkaAuthorizer implements Authorizer {
         * (non-Javadoc)
         * 
         * @see
-        * 
kafka.security.auth.Authorizer#getAcls(kafka.security.auth.KafkaPrincipal)
+        * 
kafka.security.auth.Authorizer#getAcls(kafka.security.auth.KafkaPrincipal
+        * )
         */
        @Override
-       public scala.collection.immutable.Map<Resource, Set<Acl>> 
getAcls(KafkaPrincipal principal) {
+       public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls(
+                       KafkaPrincipal principal) {
                scala.collection.immutable.Map<Resource, Set<Acl>> aclList = 
new scala.collection.immutable.HashMap<Resource, Set<Acl>>();
                logger.error("getAcls(KafkaPrincipal) is not supported by 
Ranger for Kafka");
                return aclList;
@@ -262,8 +284,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
        /*
         * (non-Javadoc)
         * 
-        * @see
-        * kafka.security.auth.Authorizer#getAcls()
+        * @see kafka.security.auth.Authorizer#getAcls()
         */
        @Override
        public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls() {

Reply via email to