Repository: kafka Updated Branches: refs/heads/trunk 5db147c1d -> c44c898ec
KAFKA-3045; ZkNodeChangeNotificationListener shouldn't log InterruptedException as error Author: Dong Lin <[email protected]> Reviewers: Ismael Juma <[email protected]>, Jun Rao <[email protected]> Closes #731 from lindong28/KAFKA-3045 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c44c898e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c44c898e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c44c898e Branch: refs/heads/trunk Commit: c44c898ec835bb16950137f831cbc18c6b8b82f5 Parents: 5db147c Author: Dong Lin <[email protected]> Authored: Tue Jan 5 12:03:19 2016 -0800 Committer: Jun Rao <[email protected]> Committed: Tue Jan 5 12:03:19 2016 -0800 ---------------------------------------------------------------------- .../ZkNodeChangeNotificationListener.scala | 36 ++++++++++++++------ .../security/auth/SimpleAclAuthorizer.scala | 7 ++-- 2 files changed, 29 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c44c898e/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala index a600d5d..91b4fb9 100644 --- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala +++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala @@ -16,8 +16,11 @@ */ package kafka.common +import java.util.concurrent.atomic.AtomicBoolean + import kafka.utils.{Time, SystemTime, ZkUtils, Logging} -import org.I0Itec.zkclient.{IZkChildListener, ZkClient} +import org.I0Itec.zkclient.exception.ZkInterruptedException +import org.I0Itec.zkclient.IZkChildListener import scala.collection.JavaConverters._ /** @@ -37,7 +40,7 @@ trait NotificationHandler { * The caller/user of this class should ensure that they use zkClient.subscribeStateChanges and call processAllNotifications * method of this class from ZkStateChangeListener's handleNewSession() method. This is necessary to ensure that if zk session * is terminated and reestablished any missed notification will be processed immediately. - * @param zkClient + * @param zkUtils * @param seqNodeRoot * @param seqNodePrefix * @param notificationHandler @@ -51,6 +54,7 @@ class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils, private val changeExpirationMs: Long = 15 * 60 * 1000, private val time: Time = SystemTime) extends Logging { private var lastExecutedChange = -1L + private val isClosed = new AtomicBoolean(false) /** * create seqNodeRoot and begin watching for any new children nodes. @@ -61,6 +65,10 @@ class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils, processAllNotifications() } + def close() = { + isClosed.set(true) + } + /** * Process all changes */ @@ -75,17 +83,23 @@ class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils, private def processNotifications(notifications: Seq[String]) { if (notifications.nonEmpty) { info(s"Processing notification(s) to $seqNodeRoot") - val now = time.milliseconds - for (notification <- notifications) { - val changeId = changeNumber(notification) - if (changeId > lastExecutedChange) { - val changeZnode = seqNodeRoot + "/" + notification - val (data, stat) = zkUtils.readDataMaybeNull(changeZnode) - data map (notificationHandler.processNotification(_)) getOrElse(logger.warn(s"read null data from $changeZnode when processing notification $notification")) + try { + val now = time.milliseconds + for (notification <- notifications) { + val changeId = changeNumber(notification) + if (changeId > lastExecutedChange) { + val changeZnode = seqNodeRoot + "/" + notification + val (data, stat) = zkUtils.readDataMaybeNull(changeZnode) + data map (notificationHandler.processNotification(_)) getOrElse (logger.warn(s"read null data from $changeZnode when processing notification $notification")) + } + lastExecutedChange = changeId } - lastExecutedChange = changeId + purgeObsoleteNotifications(now, notifications) + } catch { + case e: ZkInterruptedException => + if (!isClosed.get) + throw e } - purgeObsoleteNotifications(now, notifications) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/c44c898e/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala index 780bdf3..f716e16 100644 --- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala @@ -26,7 +26,7 @@ import kafka.network.RequestChannel.Session import kafka.server.KafkaConfig import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.utils._ -import org.I0Itec.zkclient.{IZkStateListener, ZkClient} +import org.I0Itec.zkclient.IZkStateListener import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.security.auth.KafkaPrincipal import scala.collection.JavaConverters._ @@ -105,7 +105,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging { loadCache() zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclChangedZkPath) - aclChangeListener = new ZkNodeChangeNotificationListener(zkUtils, SimpleAclAuthorizer.AclChangedZkPath, SimpleAclAuthorizer.AclChangedPrefix, AclChangedNotificaitonHandler) + aclChangeListener = new ZkNodeChangeNotificationListener(zkUtils, SimpleAclAuthorizer.AclChangedZkPath, SimpleAclAuthorizer.AclChangedPrefix, AclChangedNotificationHandler) aclChangeListener.init() zkUtils.zkClient.subscribeStateChanges(ZkStateChangeListener) @@ -230,6 +230,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging { } def close() { + if (aclChangeListener != null) aclChangeListener.close() if (zkUtils != null) zkUtils.close() } @@ -269,7 +270,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging { zkUtils.createSequentialPersistentPath(SimpleAclAuthorizer.AclChangedZkPath + "/" + SimpleAclAuthorizer.AclChangedPrefix, resource.toString) } - object AclChangedNotificaitonHandler extends NotificationHandler { + object AclChangedNotificationHandler extends NotificationHandler { override def processNotification(notificationMessage: String) { val resource: Resource = Resource.fromString(notificationMessage)
