Repository: incubator-ranger Updated Branches: refs/heads/ranger-0.5 9cbff669b -> 4993b50a5
RANGER-740: Kafka Authorizer interface has added close() method. Ranger should also implement it Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/4993b50a Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/4993b50a Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/4993b50a Branch: refs/heads/ranger-0.5 Commit: 4993b50a58396d9422421bda4c1e11d1706afa33 Parents: 9cbff66 Author: Don Bosco Durai <[email protected]> Authored: Tue Nov 24 12:52:19 2015 -0800 Committer: Velmurugan Periasamy <[email protected]> Committed: Tue Nov 24 16:53:06 2015 -0500 ---------------------------------------------------------------------- .../kafka/authorizer/RangerKafkaAuthorizer.java | 33 ++++++++++++++++---- 1 file changed, 27 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4993b50a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java ---------------------------------------------------------------------- diff --git a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java index 08ff928..29c2ceb 100644 --- a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java +++ b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java @@ -26,7 +26,9 @@ import javax.security.auth.Subject; import kafka.security.auth.Acl; import kafka.security.auth.Authorizer; + import org.apache.kafka.common.security.auth.KafkaPrincipal; + import kafka.security.auth.*; import kafka.server.KafkaConfig; import kafka.common.security.LoginManager; @@ -99,8 +101,26 @@ public class RangerKafkaAuthorizer implements Authorizer { } } + /* + * (non-Javadoc) + * + * @see kafka.security.auth.Authorizer#configure(Map<String, Object>) + */ @Override - public boolean authorize(Session session, Operation operation, Resource resource) { + public void close() { + logger.info("close() called on authorizer."); + try { + if (rangerPlugin != null) { + rangerPlugin.cleanup(); + } + } catch (Throwable t) { + logger.error("Error closing RangerPlugin.", t); + } + } + + @Override + public boolean authorize(Session session, Operation operation, + Resource resource) { if (rangerPlugin == null) { MiscUtil.logErrorMessageByInterval(logger, @@ -124,7 +144,7 @@ public class RangerKafkaAuthorizer implements Authorizer { String ip = session.host(); // skip leading slash - if(StringUtils.isNotEmpty(ip) && ip.charAt(0) == '/') { + if (StringUtils.isNotEmpty(ip) && ip.charAt(0) == '/') { ip = ip.substring(1); } @@ -250,10 +270,12 @@ public class RangerKafkaAuthorizer implements Authorizer { * (non-Javadoc) * * @see - * kafka.security.auth.Authorizer#getAcls(kafka.security.auth.KafkaPrincipal) + * kafka.security.auth.Authorizer#getAcls(kafka.security.auth.KafkaPrincipal + * ) */ @Override - public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls(KafkaPrincipal principal) { + public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls( + KafkaPrincipal principal) { scala.collection.immutable.Map<Resource, Set<Acl>> aclList = new scala.collection.immutable.HashMap<Resource, Set<Acl>>(); logger.error("getAcls(KafkaPrincipal) is not supported by Ranger for Kafka"); return aclList; @@ -262,8 +284,7 @@ public class RangerKafkaAuthorizer implements Authorizer { /* * (non-Javadoc) * - * @see - * kafka.security.auth.Authorizer#getAcls() + * @see kafka.security.auth.Authorizer#getAcls() */ @Override public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls() {
