This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch 1.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.0 by this push: new 8e6ffd2 KAFKA-7216: Ignore unknown ResourceTypes while loading acl cache (#5679) 8e6ffd2 is described below commit 8e6ffd2bb8f6a54b4d2298b6faf66a1035875bef Author: Manikumar Reddy O <manikumar.re...@gmail.com> AuthorDate: Wed Sep 26 06:51:14 2018 +0530 KAFKA-7216: Ignore unknown ResourceTypes while loading acl cache (#5679) Reviewers: Jun Rao <jun...@gmail.com> --- .../kafka/security/auth/SimpleAclAuthorizer.scala | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala index 3a00226..d4ec4f2 100644 --- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala @@ -31,7 +31,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal import scala.collection.JavaConverters._ import org.apache.log4j.Logger -import scala.util.Random +import scala.util.{Failure, Random, Success, Try} object SimpleAclAuthorizer { //optional override zookeeper cluster configuration where acls will be stored, if not specified acls will be stored in @@ -235,12 +235,17 @@ class SimpleAclAuthorizer extends Authorizer with Logging { inWriteLock(lock) { val resourceTypes = zkUtils.getChildren(SimpleAclAuthorizer.AclZkPath) for (rType <- resourceTypes) { - val resourceType = ResourceType.fromString(rType) - val resourceTypePath = SimpleAclAuthorizer.AclZkPath + "/" + resourceType.name - val resourceNames = zkUtils.getChildren(resourceTypePath) - for (resourceName <- resourceNames) { - val versionedAcls = getAclsFromZk(Resource(resourceType, resourceName.toString)) - updateCache(new Resource(resourceType, resourceName), versionedAcls) + val resourceType = Try(ResourceType.fromString(rType)) + resourceType match { + case Success(resourceTypeObj) => { + val resourceTypePath = SimpleAclAuthorizer.AclZkPath + "/" + resourceTypeObj.name + val resourceNames = zkUtils.getChildren(resourceTypePath) + for (resourceName <- resourceNames) { + val versionedAcls = getAclsFromZk(Resource(resourceTypeObj, resourceName.toString)) + updateCache(new Resource(resourceTypeObj, resourceName), versionedAcls) + } + } + case Failure(f) => warn(s"Ignoring unknown ResourceType: $rType") } } }