This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new 8e6ffd2  KAFKA-7216: Ignore unknown ResourceTypes while loading acl 
cache (#5679)
8e6ffd2 is described below

commit 8e6ffd2bb8f6a54b4d2298b6faf66a1035875bef
Author: Manikumar Reddy O <manikumar.re...@gmail.com>
AuthorDate: Wed Sep 26 06:51:14 2018 +0530

    KAFKA-7216: Ignore unknown ResourceTypes while loading acl cache (#5679)
    
    Reviewers: Jun Rao <jun...@gmail.com>
---
 .../kafka/security/auth/SimpleAclAuthorizer.scala     | 19 ++++++++++++-------
 1 file changed, 12 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala 
b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 3a00226..d4ec4f2 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -31,7 +31,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
 import scala.collection.JavaConverters._
 import org.apache.log4j.Logger
 
-import scala.util.Random
+import scala.util.{Failure, Random, Success, Try}
 
 object SimpleAclAuthorizer {
   //optional override zookeeper cluster configuration where acls will be 
stored, if not specified acls will be stored in
@@ -235,12 +235,17 @@ class SimpleAclAuthorizer extends Authorizer with Logging 
{
     inWriteLock(lock) {
       val resourceTypes = zkUtils.getChildren(SimpleAclAuthorizer.AclZkPath)
       for (rType <- resourceTypes) {
-        val resourceType = ResourceType.fromString(rType)
-        val resourceTypePath = SimpleAclAuthorizer.AclZkPath + "/" + 
resourceType.name
-        val resourceNames = zkUtils.getChildren(resourceTypePath)
-        for (resourceName <- resourceNames) {
-          val versionedAcls = getAclsFromZk(Resource(resourceType, 
resourceName.toString))
-          updateCache(new Resource(resourceType, resourceName), versionedAcls)
+        val resourceType = Try(ResourceType.fromString(rType))
+        resourceType match {
+          case Success(resourceTypeObj) => {
+            val resourceTypePath = SimpleAclAuthorizer.AclZkPath + "/" + 
resourceTypeObj.name
+            val resourceNames = zkUtils.getChildren(resourceTypePath)
+            for (resourceName <- resourceNames) {
+              val versionedAcls = getAclsFromZk(Resource(resourceTypeObj, 
resourceName.toString))
+              updateCache(new Resource(resourceTypeObj, resourceName), 
versionedAcls)
+            }
+          }
+          case Failure(f) => warn(s"Ignoring unknown ResourceType: $rType")
         }
       }
     }

Reply via email to