akatona84 commented on a change in pull request #133:
URL: https://github.com/apache/ranger/pull/133#discussion_r816786753



##########
File path: 
plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
##########
@@ -43,286 +60,256 @@
 import org.slf4j.LoggerFactory;
 
 public class RangerKafkaAuthorizer implements Authorizer {
-       private static final Logger logger = LoggerFactory
-                       .getLogger(RangerKafkaAuthorizer.class);
-       private static final Logger PERF_KAFKAAUTH_REQUEST_LOG = 
RangerPerfTracer.getPerfLogger("kafkaauth.request");
-
-       public static final String KEY_TOPIC = "topic";
-       public static final String KEY_CLUSTER = "cluster";
-       public static final String KEY_CONSUMER_GROUP = "consumergroup";
-       public static final String KEY_TRANSACTIONALID = "transactionalid";
-       public static final String KEY_DELEGATIONTOKEN = "delegationtoken";
-
-       public static final String ACCESS_TYPE_READ = "consume";
-       public static final String ACCESS_TYPE_WRITE = "publish";
-       public static final String ACCESS_TYPE_CREATE = "create";
-       public static final String ACCESS_TYPE_DELETE = "delete";
-       public static final String ACCESS_TYPE_CONFIGURE = "configure";
-       public static final String ACCESS_TYPE_DESCRIBE = "describe";
-       public static final String ACCESS_TYPE_DESCRIBE_CONFIGS = 
"describe_configs";
-       public static final String ACCESS_TYPE_ALTER_CONFIGS    = 
"alter_configs";
-       public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = 
"idempotent_write";
-       public static final String ACCESS_TYPE_CLUSTER_ACTION   = 
"cluster_action";
-
-       private static volatile RangerBasePlugin rangerPlugin = null;
-       RangerKafkaAuditHandler auditHandler = null;
-
-       public RangerKafkaAuthorizer() {
-       }
-
-       /*
-        * (non-Javadoc)
-        *
-        * @see kafka.security.auth.Authorizer#configure(Map<String, Object>)
-        */
-       @Override
-       public void configure(Map<String, ?> configs) {
-               RangerBasePlugin me = rangerPlugin;
-               if (me == null) {
-                       synchronized(RangerKafkaAuthorizer.class) {
-                               me = rangerPlugin;
-                               if (me == null) {
-                                       try {
-                                               // Possible to override JAAS 
configuration which is used by Ranger, otherwise
-                                               // SASL_PLAINTEXT is used, 
which force Kafka to use 'sasl_plaintext.KafkaServer',
-                                               // if it's not defined, then it 
reverts to 'KafkaServer' configuration.
-                                               final Object jaasContext = 
configs.get("ranger.jaas.context");
-                                               final String listenerName = 
(jaasContext instanceof String
-                                                               && 
StringUtils.isNotEmpty((String) jaasContext)) ? (String) jaasContext
-                                                                               
: SecurityProtocol.SASL_PLAINTEXT.name();
-                                               final String saslMechanism = 
SaslConfigs.GSSAPI_MECHANISM;
-                                               JaasContext context = 
JaasContext.loadServerContext(new ListenerName(listenerName), saslMechanism, 
configs);
-                                               
MiscUtil.setUGIFromJAASConfig(context.name());
-                                               logger.info("LoginUser=" + 
MiscUtil.getUGILoginUser());
-                                       } catch (Throwable t) {
-                                               logger.error("Error getting 
principal.", t);
-                                       }
-                                       me = rangerPlugin = new 
RangerBasePlugin("kafka", "kafka");
-                               }
-                       }
-               }
-               logger.info("Calling plugin.init()");
-               rangerPlugin.init();
-               auditHandler = new RangerKafkaAuditHandler();
-               rangerPlugin.setResultProcessor(auditHandler);
-       }
-
-       @Override
-       public void close() {
-               logger.info("close() called on authorizer.");
-               try {
-                       if (rangerPlugin != null) {
-                               rangerPlugin.cleanup();
-                       }
-               } catch (Throwable t) {
-                       logger.error("Error closing RangerPlugin.", t);
-               }
-       }
-
-       @Override
-       public boolean authorize(Session session, Operation operation,
-                       Resource resource) {
-
-               if (rangerPlugin == null) {
-                       MiscUtil.logErrorMessageByInterval(logger,
-                                       "Authorizer is still not initialized");
-                       return false;
-               }
-
-               RangerPerfTracer perf = null;
-
-               
if(RangerPerfTracer.isPerfTraceEnabled(PERF_KAFKAAUTH_REQUEST_LOG)) {
-                       perf = 
RangerPerfTracer.getPerfTracer(PERF_KAFKAAUTH_REQUEST_LOG, 
"RangerKafkaAuthorizer.authorize(resource=" + resource + ")");
-               }
-               String userName = null;
-               if (session.principal() != null) {
-                       userName = session.principal().getName();
-               }
-               java.util.Set<String> userGroups = MiscUtil
-                               .getGroupsForRequestUser(userName);
-               String ip = session.clientAddress().getHostAddress();
-
-               // skip leading slash
-               if (StringUtils.isNotEmpty(ip) && ip.charAt(0) == '/') {
-                       ip = ip.substring(1);
-               }
-
-               Date eventTime = new Date();
-               String accessType = mapToRangerAccessType(operation);
-               boolean validationFailed = false;
-               String validationStr = "";
-
-               if (accessType == null) {
-                       if (MiscUtil.logErrorMessageByInterval(logger,
-                                       "Unsupported access type. operation=" + 
operation)) {
-                               logger.error("Unsupported access type. 
session=" + session
-                                               + ", operation=" + operation + 
", resource=" + resource);
-                       }
-                       validationFailed = true;
-                       validationStr += "Unsupported access type. operation=" 
+ operation;
-               }
-               String action = accessType;
-
-               RangerAccessRequestImpl rangerRequest = new 
RangerAccessRequestImpl();
-               rangerRequest.setUser(userName);
-               rangerRequest.setUserGroups(userGroups);
-               rangerRequest.setClientIPAddress(ip);
-               rangerRequest.setAccessTime(eventTime);
-
-               RangerAccessResourceImpl rangerResource = new 
RangerAccessResourceImpl();
-               rangerRequest.setResource(rangerResource);
-               rangerRequest.setAccessType(accessType);
-               rangerRequest.setAction(action);
-               rangerRequest.setRequestData(resource.name());
-
-               if (resource.resourceType().equals(Topic$.MODULE$)) {
-                       rangerResource.setValue(KEY_TOPIC, resource.name());
-               } else if (resource.resourceType().equals(Cluster$.MODULE$)) {
-                       rangerResource.setValue(KEY_CLUSTER, resource.name());
-               } else if (resource.resourceType().equals(Group$.MODULE$)) {
-                       rangerResource.setValue(KEY_CONSUMER_GROUP, 
resource.name());
-               } else if 
(resource.resourceType().equals(TransactionalId$.MODULE$)) {
-                       rangerResource.setValue(KEY_TRANSACTIONALID, 
resource.name());
-               } else if 
(resource.resourceType().equals(DelegationToken$.MODULE$)) {
-                       rangerResource.setValue(KEY_DELEGATIONTOKEN, 
resource.name());
-               } else {
-                       logger.error("Unsupported resourceType=" + 
resource.resourceType());
-                       validationFailed = true;
-               }
-
-               boolean returnValue = false;
-               if (validationFailed) {
-                       MiscUtil.logErrorMessageByInterval(logger, validationStr
-                                       + ", request=" + rangerRequest);
-               } else {
-
-                       try {
-                               RangerAccessResult result = rangerPlugin
-                                               .isAccessAllowed(rangerRequest);
-                               if (result == null) {
-                                       logger.error("Ranger Plugin returned 
null. Returning false");
-                               } else {
-                                       returnValue = result.getIsAllowed();
-                               }
-                       } catch (Throwable t) {
-                               logger.error("Error while calling 
isAccessAllowed(). request="
-                                               + rangerRequest, t);
-                       } finally {
-                               auditHandler.flushAudit();
-                       }
-               }
-               RangerPerfTracer.log(perf);
-
-               if (logger.isDebugEnabled()) {
-                       logger.debug("rangerRequest=" + rangerRequest + ", 
return="
-                                       + returnValue);
-               }
-               return returnValue;
-       }
-
-       /*
-        * (non-Javadoc)
-        *
-        * @see
-        * 
kafka.security.auth.Authorizer#addAcls(scala.collection.immutable.Set,
-        * kafka.security.auth.Resource)
-        */
-       @Override
-       public void addAcls(Set<Acl> acls, Resource resource) {
-               logger.error("addAcls(Set<Acl>, Resource) is not supported by 
Ranger for Kafka");
-       }
-
-       /*
-        * (non-Javadoc)
-        *
-        * @see
-        * 
kafka.security.auth.Authorizer#removeAcls(scala.collection.immutable.Set,
-        * kafka.security.auth.Resource)
-        */
-       @Override
-       public boolean removeAcls(Set<Acl> acls, Resource resource) {
-               logger.error("removeAcls(Set<Acl>, Resource) is not supported 
by Ranger for Kafka");
-               return false;
-       }
-
-       /*
-        * (non-Javadoc)
-        *
-        * @see
-        * 
kafka.security.auth.Authorizer#removeAcls(kafka.security.auth.Resource)
-        */
-       @Override
-       public boolean removeAcls(Resource resource) {
-               logger.error("removeAcls(Resource) is not supported by Ranger 
for Kafka");
-               return false;
-       }
-
-       /*
-        * (non-Javadoc)
-        *
-        * @see 
kafka.security.auth.Authorizer#getAcls(kafka.security.auth.Resource)
-        */
-       @Override
-       public Set<Acl> getAcls(Resource resource) {
-               Set<Acl> aclList = new HashSet<Acl>();
-               logger.error("getAcls(Resource) is not supported by Ranger for 
Kafka");
-
-               return aclList;
-       }
-
-       /*
-        * (non-Javadoc)
-        *
-        * @see
-        * 
kafka.security.auth.Authorizer#getAcls(kafka.security.auth.KafkaPrincipal
-        * )
-        */
-       @Override
-       public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls(
-                       KafkaPrincipal principal) {
-               scala.collection.immutable.Map<Resource, Set<Acl>> aclList = 
new scala.collection.immutable.HashMap<Resource, Set<Acl>>();
-               logger.error("getAcls(KafkaPrincipal) is not supported by 
Ranger for Kafka");
-               return aclList;
-       }
-
-       /*
-        * (non-Javadoc)
-        *
-        * @see kafka.security.auth.Authorizer#getAcls()
-        */
-       @Override
-       public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls() {
-               scala.collection.immutable.Map<Resource, Set<Acl>> aclList = 
new scala.collection.immutable.HashMap<Resource, Set<Acl>>();
-               logger.error("getAcls() is not supported by Ranger for Kafka");
-               return aclList;
-       }
-
-       /**
-        * @param operation
-        * @return
-        */
-       private String mapToRangerAccessType(Operation operation) {
-               if (operation.equals(Read$.MODULE$)) {
-                       return ACCESS_TYPE_READ;
-               } else if (operation.equals(Write$.MODULE$)) {
-                       return ACCESS_TYPE_WRITE;
-               } else if (operation.equals(Alter$.MODULE$)) {
-                       return ACCESS_TYPE_CONFIGURE;
-               } else if (operation.equals(Describe$.MODULE$)) {
-                       return ACCESS_TYPE_DESCRIBE;
-               } else if (operation.equals(ClusterAction$.MODULE$)) {
-                       return ACCESS_TYPE_CLUSTER_ACTION;
-               } else if (operation.equals(Create$.MODULE$)) {
-                       return ACCESS_TYPE_CREATE;
-               } else if (operation.equals(Delete$.MODULE$)) {
-                       return ACCESS_TYPE_DELETE;
-               } else if (operation.equals(DescribeConfigs$.MODULE$)) {
-                       return ACCESS_TYPE_DESCRIBE_CONFIGS;
-               } else if (operation.equals(AlterConfigs$.MODULE$)) {
-                       return ACCESS_TYPE_ALTER_CONFIGS;
-               } else if (operation.equals(IdempotentWrite$.MODULE$)) {
-                       return ACCESS_TYPE_IDEMPOTENT_WRITE;
-               }
-               return null;
-       }
+  public static final String ACCESS_TYPE_ALTER_CONFIGS = "alter_configs";
+  public static final String KEY_TOPIC = "topic";
+  public static final String KEY_CLUSTER = "cluster";
+  public static final String KEY_CONSUMER_GROUP = "consumergroup";
+  public static final String KEY_TRANSACTIONALID = "transactionalid";
+  public static final String KEY_DELEGATIONTOKEN = "delegationtoken";
+  public static final String ACCESS_TYPE_READ = "consume";
+  public static final String ACCESS_TYPE_WRITE = "publish";
+  public static final String ACCESS_TYPE_CREATE = "create";
+  public static final String ACCESS_TYPE_DELETE = "delete";
+  public static final String ACCESS_TYPE_CONFIGURE = "configure";
+  public static final String ACCESS_TYPE_DESCRIBE = "describe";
+  public static final String ACCESS_TYPE_DESCRIBE_CONFIGS = "describe_configs";
+  public static final String ACCESS_TYPE_CLUSTER_ACTION = "cluster_action";
+  public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = "idempotent_write";
+
+  private static final Logger PERF_KAFKAAUTH_REQUEST_LOG = 
RangerPerfTracer.getPerfLogger("kafkaauth.request");
+  private static final Logger logger = 
LoggerFactory.getLogger(RangerKafkaAuthorizer.class);
+
+  private static volatile RangerBasePlugin rangerPlugin = null;
+  RangerKafkaAuditHandler auditHandler = null;
+
+  public RangerKafkaAuthorizer() {
+  }
+
+  private static String mapToRangerAccessType(AclOperation operation) {
+    switch (operation) {
+      case READ:
+        return ACCESS_TYPE_READ;
+      case WRITE:
+        return ACCESS_TYPE_WRITE;
+      case ALTER:
+        return ACCESS_TYPE_CONFIGURE;
+      case DESCRIBE:
+        return ACCESS_TYPE_DESCRIBE;
+      case CLUSTER_ACTION:
+        return ACCESS_TYPE_CLUSTER_ACTION;
+      case CREATE:
+        return ACCESS_TYPE_CREATE;
+      case DELETE:
+        return ACCESS_TYPE_DELETE;
+      case DESCRIBE_CONFIGS:
+        return ACCESS_TYPE_DESCRIBE_CONFIGS;
+      case ALTER_CONFIGS:
+        return ACCESS_TYPE_ALTER_CONFIGS;
+      case IDEMPOTENT_WRITE:
+        return ACCESS_TYPE_IDEMPOTENT_WRITE;
+      case UNKNOWN:
+      case ANY:
+      case ALL:
+      default:
+        return null;
+    }
+  }
+
+  private static String mapToResourceType(ResourceType resourceType) {
+    switch (resourceType) {
+      case TOPIC:
+        return KEY_TOPIC;
+      case CLUSTER:
+        return KEY_CLUSTER;
+      case GROUP:
+        return KEY_CONSUMER_GROUP;
+      case TRANSACTIONAL_ID:
+        return KEY_TRANSACTIONALID;
+      case DELEGATION_TOKEN:
+        return KEY_DELEGATIONTOKEN;
+      case ANY:
+      case UNKNOWN:
+      default:
+        return null;
+    }
+  }
+
+  private static RangerAccessResourceImpl getRangerAccessResource(String 
resourceTypeKey, String resourceName) {
+    RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl();
+    rangerResource.setValue(resourceTypeKey, resourceName);
+    return rangerResource;
+  }
+
+  private static RangerAccessRequestImpl getRangerAccessRequest(String 
userName,
+                                                                Set<String> 
userGroups,
+                                                                String ip,
+                                                                Date eventTime,
+                                                                String 
resourceTypeKey,
+                                                                String 
resourceName,
+                                                                String 
accessType) {
+    RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl();
+    rangerRequest.setResource(getRangerAccessResource(resourceTypeKey, 
resourceName));
+    rangerRequest.setUser(userName);
+    rangerRequest.setUserGroups(userGroups);
+    rangerRequest.setClientIPAddress(ip);
+    rangerRequest.setAccessTime(eventTime);
+    rangerRequest.setAccessType(accessType);
+    rangerRequest.setAction(accessType);
+    rangerRequest.setRequestData(resourceName);
+    return rangerRequest;
+  }
+
+  private static List<AuthorizationResult> denyAll(List<Action> actions) {
+    return actions.stream().map(a -> 
AuthorizationResult.DENIED).collect(Collectors.toList());
+  }
+
+  private static List<AuthorizationResult> mapResults(List<Action> actions, 
Collection<RangerAccessResult> results) {
+    if (CollectionUtils.isEmpty(results)) {
+      logger.error("Ranger Plugin returned null or empty. Returning Denied for 
all");
+      return denyAll(actions);
+    }
+    return results.stream()
+        .map(r -> r != null && r.getIsAllowed() ? AuthorizationResult.ALLOWED 
: AuthorizationResult.DENIED)
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public void close() {
+    logger.info("close() called on authorizer.");
+    try {
+      if (rangerPlugin != null) {
+        rangerPlugin.cleanup();
+      }
+    } catch (Throwable t) {
+      logger.error("Error closing RangerPlugin.", t);
+    }
+  }
+
+  @Override
+  public void configure(Map<String, ?> configs) {
+    RangerBasePlugin me = rangerPlugin;
+    if (me == null) {
+      synchronized (RangerKafkaAuthorizer.class) {
+        me = rangerPlugin;
+        if (me == null) {
+          try {
+            // Possible to override JAAS configuration which is used by 
Ranger, otherwise
+            // SASL_PLAINTEXT is used, which force Kafka to use 
'sasl_plaintext.KafkaServer',
+            // if it's not defined, then it reverts to 'KafkaServer' 
configuration.
+            final Object jaasContext = configs.get("ranger.jaas.context");
+            final String listenerName = (jaasContext instanceof String
+                && StringUtils.isNotEmpty((String) jaasContext)) ? (String) 
jaasContext
+                : SecurityProtocol.SASL_PLAINTEXT.name();
+            final String saslMechanism = SaslConfigs.GSSAPI_MECHANISM;
+            JaasContext context = JaasContext.loadServerContext(new 
ListenerName(listenerName), saslMechanism, configs);
+            MiscUtil.setUGIFromJAASConfig(context.name());
+            UserGroupInformation loginUser = MiscUtil.getUGILoginUser();
+            logger.info("LoginUser={}", loginUser);
+          } catch (Throwable t) {
+            logger.error("Error getting principal.", t);
+          }
+          rangerPlugin = new RangerBasePlugin("kafka", "kafka");
+        }
+      }
+    }
+    logger.info("Calling plugin.init()");
+    rangerPlugin.init();

Review comment:
       yeah, plugin will be made once, no need to re-init imo.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to