urbandan commented on a change in pull request #133:
URL: https://github.com/apache/ranger/pull/133#discussion_r817599735



##########
File path: 
ranger-kafka-plugin-shim/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
##########
@@ -19,272 +19,200 @@
 
 package org.apache.ranger.authorization.kafka.authorizer;
 
+import java.io.IOException;
+import java.util.List;
 import java.util.Map;
-
+import java.util.concurrent.CompletionStage;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.server.authorizer.AclCreateResult;
+import org.apache.kafka.server.authorizer.AclDeleteResult;
+import org.apache.kafka.server.authorizer.Action;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.authorizer.AuthorizationResult;
+import org.apache.kafka.server.authorizer.Authorizer;
+import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
 import org.apache.ranger.plugin.classloader.RangerPluginClassLoader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import scala.collection.immutable.Set;
-import kafka.network.RequestChannel.Session;
-import kafka.security.auth.Acl;
-import kafka.security.auth.Authorizer;
-import org.apache.kafka.common.security.auth.KafkaPrincipal;
-import kafka.security.auth.Operation;
-import kafka.security.auth.Resource;
+public class RangerKafkaAuthorizer implements Authorizer {
+  private static final Logger logger = 
LoggerFactory.getLogger(RangerKafkaAuthorizer.class);
+
+  private static final String RANGER_PLUGIN_TYPE = "kafka";
+  private static final String RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME = 
"org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer";
 
+  private Authorizer rangerKafkaAuthorizerImpl = null;
+  private RangerPluginClassLoader rangerPluginClassLoader = null;
 
-//public class RangerKafkaAuthorizer extends Authorizer {
-public class RangerKafkaAuthorizer implements Authorizer {
-       private static final Logger LOG  = 
LoggerFactory.getLogger(RangerKafkaAuthorizer.class);
-
-       private static final String   RANGER_PLUGIN_TYPE                      = 
"kafka";
-       private static final String   RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME  = 
"org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer";
-
-       private Authorizer              rangerKakfaAuthorizerImpl = null;
-       private RangerPluginClassLoader rangerPluginClassLoader   = null;
-       
-       public RangerKafkaAuthorizer() {
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("==> 
RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
-               }
-
-               this.init();
-
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("<== 
RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
-               }
-       }
-       
-       private void init(){
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("==> RangerKafkaAuthorizer.init()");
-               }
-
-               try {
-                       
-                       rangerPluginClassLoader = 
RangerPluginClassLoader.getInstance(RANGER_PLUGIN_TYPE, this.getClass());
-                       
-                       @SuppressWarnings("unchecked")
-                       Class<Authorizer> cls = (Class<Authorizer>) 
Class.forName(RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME, true, 
rangerPluginClassLoader);
-
-                       activatePluginClassLoader();
-
-                       rangerKakfaAuthorizerImpl = cls.newInstance();
-               } catch (Exception e) {
-                       // check what need to be done
-                       LOG.error("Error Enabling RangerKafkaPlugin", e);
-               } finally {
-                       deactivatePluginClassLoader();
-               }
-
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("<== RangerKafkaAuthorizer.init()");
-               }
-       }
-
-       @Override
-       public void configure(Map<String, ?> configs) {
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("==> 
RangerKafkaAuthorizer.configure(Map<String, ?>)");
-               }
-
-               try {
-                       activatePluginClassLoader();
-
-                       rangerKakfaAuthorizerImpl.configure(configs);
-               } finally {
-                       deactivatePluginClassLoader();
-               }
-
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("<== 
RangerKafkaAuthorizer.configure(Map<String, ?>)");
-               }
-       }
-
-       @Override
-       public void close() {
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("==> RangerKafkaAuthorizer.close()");
-               }
-
-               try {
-                       activatePluginClassLoader();
-                       
-                       rangerKakfaAuthorizerImpl.close();
-               } finally {
-                       deactivatePluginClassLoader();
-               }
-
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("<== RangerKafkaAuthorizer.close()");
-               }
-               
-       }
-
-       @Override
-       public boolean authorize(Session session, Operation operation,Resource 
resource) {      
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug(String.format("==> 
RangerKafkaAuthorizer.authorize(Session=%s, Operation=%s, Resource=%s)", 
session, operation, resource));
-               }
-
-               boolean ret = false;
-               
-               try {
-                       activatePluginClassLoader();
-
-                       ret = rangerKakfaAuthorizerImpl.authorize(session, 
operation, resource);
-               } finally {
-                       deactivatePluginClassLoader();
-               }
-
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("<== RangerKafkaAuthorizer.authorize: " + 
ret);
-               }
-               
-               return ret;
-       }
-
-       @Override
-       public void addAcls(Set<Acl> acls, Resource resource) {
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("==> RangerKafkaAuthorizer.addAcls(Set<Acl>, 
Resource)");
-               }
-
-               try {
-                       activatePluginClassLoader();
-
-                       rangerKakfaAuthorizerImpl.addAcls(acls, resource);
-               } finally {
-                       deactivatePluginClassLoader();
-               }
-
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("<== RangerKafkaAuthorizer.addAcls(Set<Acl>, 
Resource)");
-               }
-       }
-
-       @Override
-       public boolean removeAcls(Set<Acl> acls, Resource resource) {
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("==> 
RangerKafkaAuthorizer.removeAcls(Set<Acl>, Resource)");
-               }
-               boolean ret = false;
-               try {
-                       activatePluginClassLoader();
-
-                       ret = rangerKakfaAuthorizerImpl.removeAcls(acls, 
resource);
-               } finally {
-                       deactivatePluginClassLoader();
-               }
-
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("<== 
RangerKafkaAuthorizer.removeAcls(Set<Acl>, Resource)");
-               }
-               
-               return ret;
-       }
-
-       @Override
-       public boolean removeAcls(Resource resource) {
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("==> 
RangerKafkaAuthorizer.removeAcls(Resource)");
-               }
-               boolean ret = false;
-               try {
-                       activatePluginClassLoader();
-
-                       ret = rangerKakfaAuthorizerImpl.removeAcls(resource);
-               } finally {
-                       deactivatePluginClassLoader();
-               }
-
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("<== 
RangerKafkaAuthorizer.removeAcls(Resource)");
-               }
-
-               return ret;
-       }
-
-       @Override
-       public Set<Acl> getAcls(Resource resource) {
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("==> 
RangerKafkaAuthorizer.getAcls(Resource)");
-               }
-               
-               Set<Acl> ret = null;
-               
-               try {
-                       activatePluginClassLoader();
-
-                       ret = rangerKakfaAuthorizerImpl.getAcls(resource);
-               } finally {
-                       deactivatePluginClassLoader();
-               }
-
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("<== 
RangerKafkaAuthorizer.getAcls(Resource)");
-               }
-
-               return ret;
-       }
-
-       @Override
-       public scala.collection.immutable.Map<Resource, Set<Acl>> 
getAcls(KafkaPrincipal principal) {
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("==> 
RangerKafkaAuthorizer.getAcls(KafkaPrincipal)");
-               }
-
-               scala.collection.immutable.Map<Resource, Set<Acl>> ret = null;
-
-               try {
-                       activatePluginClassLoader();
-
-                       ret = rangerKakfaAuthorizerImpl.getAcls(principal);
-               } finally {
-                       deactivatePluginClassLoader();
-               }
-
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("<== 
RangerKafkaAuthorizer.getAcls(KafkaPrincipal)");
-               }
-
-               return ret;
-       }
-
-       @Override
-       public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls() {
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("==> RangerKafkaAuthorizer.getAcls()");
-               }
-
-               scala.collection.immutable.Map<Resource, Set<Acl>> ret = null;
-
-               try {
-                       activatePluginClassLoader();
-
-                       ret = rangerKakfaAuthorizerImpl.getAcls();
-               } finally {
-                       deactivatePluginClassLoader();
-               }
-
-               if(LOG.isDebugEnabled()) {
-                       LOG.debug("<== RangerKafkaAuthorizer.getAcls()");
-               }
-
-               return ret;
-       }
-       
-       private void activatePluginClassLoader() {
-               if(rangerPluginClassLoader != null) {
-                       rangerPluginClassLoader.activate();
-               }
-       }
-
-       private void deactivatePluginClassLoader() {
-               if(rangerPluginClassLoader != null) {
-                       rangerPluginClassLoader.deactivate();
-               }
-       }
-               
+  public RangerKafkaAuthorizer() {
+    logger.debug("==> RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
+
+    this.init();
+
+    logger.debug("<== RangerKafkaAuthorizer.RangerKafkaAuthorizer()");
+  }
+
+  private void init() {
+    logger.debug("==> RangerKafkaAuthorizer.init()");
+
+    try {
+
+      rangerPluginClassLoader = 
RangerPluginClassLoader.getInstance(RANGER_PLUGIN_TYPE, this.getClass());
+
+      @SuppressWarnings("unchecked")
+      Class<Authorizer> cls = (Class<Authorizer>) 
Class.forName(RANGER_KAFKA_AUTHORIZER_IMPL_CLASSNAME, true, 
rangerPluginClassLoader);
+
+      activatePluginClassLoader();
+
+      rangerKafkaAuthorizerImpl = cls.newInstance();
+    } catch (Exception e) {
+      // check what need to be done

Review comment:
       sounds ok to me




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to