http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 2027ec8..f39b9a1 100755 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -37,6 +37,7 @@ import kafka.utils.ZkUtils._ import kafka.utils._ import org.I0Itec.zkclient.exception.ZkNodeExistsException import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient, ZkConnection} +import org.apache.kafka.common.security.JaasUtils import org.apache.zookeeper.Watcher.Event.KeeperState import scala.collection._ @@ -88,8 +89,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private val isShuttingDown = new AtomicBoolean(false) private val rebalanceLock = new Object private var fetcher: Option[ConsumerFetcherManager] = None - private var zkClient: ZkClient = null - private var zkConnection : ZkConnection = null + private var zkUtils: ZkUtils = null private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]] private val checkpointedZkOffsets = new Pool[TopicAndPartition, Long] private val topicThreadIdAndQueues = new Pool[(String, ConsumerThreadId), BlockingQueue[FetchedDataChunk]] @@ -173,21 +173,22 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private def createFetcher() { if (enableFetcher) - fetcher = Some(new ConsumerFetcherManager(consumerIdString, config, zkClient)) + fetcher = Some(new ConsumerFetcherManager(consumerIdString, config, zkUtils)) } private def connectZk() { info("Connecting to zookeeper instance at " + config.zkConnect) - val (client, connection) = ZkUtils.createZkClientAndConnection(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs) - zkClient = client - zkConnection = connection + zkUtils = ZkUtils(config.zkConnect, + config.zkSessionTimeoutMs, + config.zkConnectionTimeoutMs, + JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM))) } // Blocks until the offset manager is located and a channel is established to it. private def ensureOffsetManagerConnected() { if (config.offsetsStorage == "kafka") { if (offsetsChannel == null || !offsetsChannel.isConnected) - offsetsChannel = ClientUtils.channelToOffsetManager(config.groupId, zkClient, + offsetsChannel = ClientUtils.channelToOffsetManager(config.groupId, zkUtils, config.offsetsChannelSocketTimeoutMs, config.offsetsChannelBackoffMs) debug("Connected to offset manager %s:%d.".format(offsetsChannel.host, offsetsChannel.port)) @@ -213,9 +214,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, sendShutdownToAllQueues() if (config.autoCommitEnable) commitOffsets(true) - if (zkClient != null) { - zkClient.close() - zkClient = null + if (zkUtils != null) { + zkUtils.close() + zkUtils = null } if (offsetsChannel != null) offsetsChannel.disconnect() @@ -266,7 +267,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val zkWatchedEphemeral = new ZKCheckedEphemeral(dirs. consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo, - zkConnection.getZookeeper) + zkUtils.zkConnection.getZookeeper, + false) zkWatchedEphemeral.create() info("end registering consumer " + consumerIdString + " in ZK") @@ -296,7 +298,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, def commitOffsetToZooKeeper(topicPartition: TopicAndPartition, offset: Long) { if (checkpointedZkOffsets.get(topicPartition) != offset) { val topicDirs = new ZKGroupTopicDirs(config.groupId, topicPartition.topic) - updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + topicPartition.partition, offset.toString) + zkUtils.updatePersistentPath(topicDirs.consumerOffsetDir + "/" + topicPartition.partition, offset.toString) checkpointedZkOffsets.put(topicPartition, offset) zkCommitMeter.mark() } @@ -404,7 +406,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private def fetchOffsetFromZooKeeper(topicPartition: TopicAndPartition) = { val dirs = new ZKGroupTopicDirs(config.groupId, topicPartition.topic) - val offsetString = readDataMaybeNull(zkClient, dirs.consumerOffsetDir + "/" + topicPartition.partition)._1 + val offsetString = zkUtils.readDataMaybeNull(dirs.consumerOffsetDir + "/" + topicPartition.partition)._1 offsetString match { case Some(offsetStr) => (topicPartition, OffsetMetadataAndError(offsetStr.toLong)) case None => (topicPartition, OffsetMetadataAndError.NoOffset) @@ -599,7 +601,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private def deletePartitionOwnershipFromZK(topic: String, partition: Int) { val topicDirs = new ZKGroupTopicDirs(group, topic) val znode = topicDirs.consumerOwnerDir + "/" + partition - deletePath(zkClient, znode) + zkUtils.deletePath(znode) debug("Consumer " + consumerIdString + " releasing " + znode) } @@ -630,7 +632,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, var done = false var cluster: Cluster = null try { - cluster = getCluster(zkClient) + cluster = zkUtils.getCluster() done = rebalance(cluster) } catch { case e: Throwable => @@ -660,14 +662,14 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private def rebalance(cluster: Cluster): Boolean = { val myTopicThreadIdsMap = TopicCount.constructTopicCount( - group, consumerIdString, zkClient, config.excludeInternalTopics).getConsumerThreadIdsPerTopic - val brokers = getAllBrokersInCluster(zkClient) + group, consumerIdString, zkUtils, config.excludeInternalTopics).getConsumerThreadIdsPerTopic + val brokers = zkUtils.getAllBrokersInCluster() if (brokers.size == 0) { // This can happen in a rare case when there are no brokers available in the cluster when the consumer is started. // We log an warning and register for child changes on brokers/id so that rebalance can be triggered when the brokers // are up. warn("no brokers found when trying to rebalance.") - zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, loadBalancerListener) + zkUtils.zkClient.subscribeChildChanges(BrokerIdsPath, loadBalancerListener) true } else { @@ -690,7 +692,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, ) } releasePartitionOwnership(topicRegistry) - val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient) + val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkUtils) val globalPartitionAssignment = partitionAssignor.assign(assignmentContext) val partitionAssignment = globalPartitionAssignment.get(assignmentContext.consumerId) val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]( @@ -713,7 +715,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, }) /** - * move the partition ownership here, since that can be used to indicate a truly successful rebalancing attempt + * move the partition ownership here, since that can be used to indicate a truly successful re-balancing attempt * A rebalancing attempt is completed successfully only after the fetchers have been started correctly */ if(reflectPartitionOwnershipDecision(partitionAssignment)) { @@ -832,9 +834,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val topic = partitionOwner._1.topic val partition = partitionOwner._1.partition val consumerThreadId = partitionOwner._2 - val partitionOwnerPath = getConsumerPartitionOwnerPath(group, topic, partition) + val partitionOwnerPath = zkUtils.getConsumerPartitionOwnerPath(group, topic, partition) try { - createEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId.toString) + zkUtils.createEphemeralPathExpectConflict(partitionOwnerPath, consumerThreadId.toString) info(consumerThreadId + " successfully owned partition " + partition + " for topic " + topic) successfullyOwnedPartitions ::= (topic, partition) true @@ -951,14 +953,14 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, }) // listener to consumer and partition changes - zkClient.subscribeStateChanges(sessionExpirationListener) + zkUtils.zkClient.subscribeStateChanges(sessionExpirationListener) - zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener) + zkUtils.zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener) topicStreamsMap.foreach { topicAndStreams => // register on broker partition path changes val topicPath = BrokerTopicsPath + "/" + topicAndStreams._1 - zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListener) + zkUtils.zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListener) } // explicitly trigger load balancing for this consumer @@ -988,11 +990,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, // bootstrap with existing topics private var wildcardTopics = - getChildrenParentMayNotExist(zkClient, BrokerTopicsPath) + zkUtils.getChildrenParentMayNotExist(BrokerTopicsPath) .filter(topic => topicFilter.isTopicAllowed(topic, config.excludeInternalTopics)) private val wildcardTopicCount = TopicCount.constructTopicCount( - consumerIdString, topicFilter, numStreams, zkClient, config.excludeInternalTopics) + consumerIdString, topicFilter, numStreams, zkUtils, config.excludeInternalTopics) val dirs = new ZKGroupDirs(config.groupId) registerConsumerInZK(dirs, consumerIdString, wildcardTopicCount) @@ -1002,7 +1004,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, * Topic events will trigger subsequent synced rebalances. */ info("Creating topic event watcher for topics " + topicFilter) - wildcardTopicWatcher = new ZookeeperTopicEventWatcher(zkClient, this) + wildcardTopicWatcher = new ZookeeperTopicEventWatcher(zkUtils, this) def handleTopicEvent(allTopics: Seq[String]) { debug("Handling topic event")
http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala index f74823b..0cd22f0 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala @@ -22,7 +22,7 @@ import kafka.utils.{ZkUtils, Logging} import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient} import org.apache.zookeeper.Watcher.Event.KeeperState -class ZookeeperTopicEventWatcher(val zkClient: ZkClient, +class ZookeeperTopicEventWatcher(val zkUtils: ZkUtils, val eventHandler: TopicEventHandler[String]) extends Logging { val lock = new Object() @@ -31,24 +31,24 @@ class ZookeeperTopicEventWatcher(val zkClient: ZkClient, private def startWatchingTopicEvents() { val topicEventListener = new ZkTopicEventListener() - ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.BrokerTopicsPath) + zkUtils.makeSurePersistentPathExists(ZkUtils.BrokerTopicsPath) - zkClient.subscribeStateChanges( + zkUtils.zkClient.subscribeStateChanges( new ZkSessionExpireListener(topicEventListener)) - val topics = zkClient.subscribeChildChanges( + val topics = zkUtils.zkClient.subscribeChildChanges( ZkUtils.BrokerTopicsPath, topicEventListener).toList // call to bootstrap topic list topicEventListener.handleChildChange(ZkUtils.BrokerTopicsPath, topics) } - private def stopWatchingTopicEvents() { zkClient.unsubscribeAll() } + private def stopWatchingTopicEvents() { zkUtils.zkClient.unsubscribeAll() } def shutdown() { lock.synchronized { info("Shutting down topic event watcher.") - if (zkClient != null) { + if (zkUtils != null) { stopWatchingTopicEvents() } else { @@ -63,8 +63,8 @@ class ZookeeperTopicEventWatcher(val zkClient: ZkClient, def handleChildChange(parent: String, children: java.util.List[String]) { lock.synchronized { try { - if (zkClient != null) { - val latestTopics = zkClient.getChildren(ZkUtils.BrokerTopicsPath).toList + if (zkUtils != null) { + val latestTopics = zkUtils.zkClient.getChildren(ZkUtils.BrokerTopicsPath).toList debug("all topics: %s".format(latestTopics)) eventHandler.handleTopicEvent(latestTopics) } @@ -87,9 +87,9 @@ class ZookeeperTopicEventWatcher(val zkClient: ZkClient, @throws(classOf[Exception]) def handleNewSession() { lock.synchronized { - if (zkClient != null) { + if (zkUtils != null) { info("ZK expired: resubscribing topic event listener to topic registry") - zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicEventListener) + zkUtils.zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicEventListener) } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index a7b44ca..0a1a684 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -43,8 +43,7 @@ import java.util.concurrent.locks.ReentrantLock import kafka.server._ import kafka.common.TopicAndPartition -class ControllerContext(val zkClient: ZkClient, - val zkConnection: ZkConnection, +class ControllerContext(val zkUtils: ZkUtils, val zkSessionTimeout: Int) { var controllerChannelManager: ControllerChannelManager = null val controllerLock: ReentrantLock = new ReentrantLock() @@ -154,11 +153,11 @@ object KafkaController extends Logging { } } -class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection: ZkConnection, val brokerState: BrokerState, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup { +class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerState: BrokerState, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup { this.logIdent = "[Controller " + config.brokerId + "]: " private var isRunning = true private val stateChangeLogger = KafkaController.stateChangeLogger - val controllerContext = new ControllerContext(zkClient, zkConnection, config.zkSessionTimeoutMs) + val controllerContext = new ControllerContext(zkUtils, config.zkSessionTimeoutMs) val partitionStateMachine = new PartitionStateMachine(this) val replicaStateMachine = new ReplicaStateMachine(this) private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover, @@ -321,7 +320,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection //read controller epoch from zk readControllerEpochFromZookeeper() // increment the controller epoch - incrementControllerEpoch(zkClient) + incrementControllerEpoch(zkUtils.zkClient) // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks registerReassignedPartitionsListener() registerIsrChangeNotificationListener() @@ -613,7 +612,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection reassignedReplicas.toSet) reassignedPartitionContext.isrChangeListener = isrChangeListener // register listener on the leader and isr path to wait until they catch up with the current leader - zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), isrChangeListener) + zkUtils.zkClient.subscribeDataChanges(getTopicPartitionLeaderAndIsrPath(topic, partition), isrChangeListener) } def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition, @@ -703,7 +702,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection def incrementControllerEpoch(zkClient: ZkClient) = { try { var newControllerEpoch = controllerContext.epoch + 1 - val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPathIfExists(zkClient, + val (updateSucceeded, newVersion) = zkUtils.conditionalUpdatePersistentPathIfExists( ZkUtils.ControllerEpochPath, newControllerEpoch.toString, controllerContext.epochZkVersion) if(!updateSucceeded) throw new ControllerMovedException("Controller moved to another broker. Aborting controller startup procedure") @@ -732,14 +731,14 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection } private def registerSessionExpirationListener() = { - zkClient.subscribeStateChanges(new SessionExpirationListener()) + zkUtils.zkClient.subscribeStateChanges(new SessionExpirationListener()) } private def initializeControllerContext() { // update controller cache with delete topic information - controllerContext.liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet - controllerContext.allTopics = ZkUtils.getAllTopics(zkClient).toSet - controllerContext.partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, controllerContext.allTopics.toSeq) + controllerContext.liveBrokers = zkUtils.getAllBrokersInCluster().toSet + controllerContext.allTopics = zkUtils.getAllTopics().toSet + controllerContext.partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(controllerContext.allTopics.toSeq) controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch] controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int] // update the leader and isr cache for all existing partitions from Zookeeper @@ -756,7 +755,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection private def initializePreferredReplicaElection() { // initialize preferred replica election state - val partitionsUndergoingPreferredReplicaElection = ZkUtils.getPartitionsUndergoingPreferredReplicaElection(zkClient) + val partitionsUndergoingPreferredReplicaElection = zkUtils.getPartitionsUndergoingPreferredReplicaElection() // check if they are already completed or topic was deleted val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter { partition => val replicasOpt = controllerContext.partitionReplicaAssignment.get(partition) @@ -774,7 +773,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection private def initializePartitionReassignment() { // read the partitions being reassigned from zookeeper path /admin/reassign_partitions - val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient) + val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned() // check if they are already completed or topic was deleted val reassignedPartitions = partitionsBeingReassigned.filter { partition => val replicasOpt = controllerContext.partitionReplicaAssignment.get(partition._1) @@ -793,7 +792,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection } private def initializeTopicDeletion() { - val topicsQueuedForDeletion = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.DeleteTopicsPath).toSet + val topicsQueuedForDeletion = zkUtils.getChildrenParentMayNotExist(ZkUtils.DeleteTopicsPath).toSet val topicsWithReplicasOnDeadBrokers = controllerContext.partitionReplicaAssignment.filter { case(partition, replicas) => replicas.exists(r => !controllerContext.liveBrokerIds.contains(r)) }.keySet.map(_.topic) val topicsForWhichPartitionReassignmentIsInProgress = controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic) @@ -822,13 +821,13 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection } def updateLeaderAndIsrCache(topicAndPartitions: Set[TopicAndPartition] = controllerContext.partitionReplicaAssignment.keySet) { - val leaderAndIsrInfo = ZkUtils.getPartitionLeaderAndIsrForTopics(zkClient, topicAndPartitions) + val leaderAndIsrInfo = zkUtils.getPartitionLeaderAndIsrForTopics(zkUtils.zkClient, topicAndPartitions) for((topicPartition, leaderIsrAndControllerEpoch) <- leaderAndIsrInfo) controllerContext.partitionLeadershipInfo.put(topicPartition, leaderIsrAndControllerEpoch) } private def areReplicasInIsr(topic: String, partition: Int, replicas: Seq[Int]): Boolean = { - getLeaderAndIsrForPartition(zkClient, topic, partition) match { + zkUtils.getLeaderAndIsrForPartition(topic, partition) match { case Some(leaderAndIsr) => val replicasNotInIsr = replicas.filterNot(r => leaderAndIsr.isr.contains(r)) replicasNotInIsr.isEmpty @@ -930,42 +929,42 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection private def registerIsrChangeNotificationListener() = { debug("Registering IsrChangeNotificationListener") - zkClient.subscribeChildChanges(ZkUtils.IsrChangeNotificationPath, isrChangeNotificationListener) + zkUtils.zkClient.subscribeChildChanges(ZkUtils.IsrChangeNotificationPath, isrChangeNotificationListener) } private def deregisterIsrChangeNotificationListener() = { debug("De-registering IsrChangeNotificationListener") - zkClient.unsubscribeChildChanges(ZkUtils.IsrChangeNotificationPath, isrChangeNotificationListener) + zkUtils.zkClient.unsubscribeChildChanges(ZkUtils.IsrChangeNotificationPath, isrChangeNotificationListener) } private def registerReassignedPartitionsListener() = { - zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener) + zkUtils.zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener) } private def deregisterReassignedPartitionsListener() = { - zkClient.unsubscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener) + zkUtils.zkClient.unsubscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener) } private def registerPreferredReplicaElectionListener() { - zkClient.subscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, preferredReplicaElectionListener) + zkUtils.zkClient.subscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, preferredReplicaElectionListener) } private def deregisterPreferredReplicaElectionListener() { - zkClient.unsubscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, preferredReplicaElectionListener) + zkUtils.zkClient.unsubscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, preferredReplicaElectionListener) } private def deregisterReassignedPartitionsIsrChangeListeners() { controllerContext.partitionsBeingReassigned.foreach { case (topicAndPartition, reassignedPartitionsContext) => - val zkPartitionPath = ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition) - zkClient.unsubscribeDataChanges(zkPartitionPath, reassignedPartitionsContext.isrChangeListener) + val zkPartitionPath = getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition) + zkUtils.zkClient.unsubscribeDataChanges(zkPartitionPath, reassignedPartitionsContext.isrChangeListener) } } private def readControllerEpochFromZookeeper() { // initialize the controller epoch and zk version by reading from zookeeper - if(ZkUtils.pathExists(controllerContext.zkClient, ZkUtils.ControllerEpochPath)) { - val epochData = ZkUtils.readData(controllerContext.zkClient, ZkUtils.ControllerEpochPath) + if(controllerContext.zkUtils.pathExists(ZkUtils.ControllerEpochPath)) { + val epochData = controllerContext.zkUtils.readData(ZkUtils.ControllerEpochPath) controllerContext.epoch = epochData._1.toInt controllerContext.epochZkVersion = epochData._2.getVersion info("Initialized controller epoch to %d and zk version %d".format(controllerContext.epoch, controllerContext.epochZkVersion)) @@ -975,15 +974,15 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection def removePartitionFromReassignedPartitions(topicAndPartition: TopicAndPartition) { if(controllerContext.partitionsBeingReassigned.get(topicAndPartition).isDefined) { // stop watching the ISR changes for this partition - zkClient.unsubscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition), + zkUtils.zkClient.unsubscribeDataChanges(getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition), controllerContext.partitionsBeingReassigned(topicAndPartition).isrChangeListener) } // read the current list of reassigned partitions from zookeeper - val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient) + val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned() // remove this partition from that list val updatedPartitionsBeingReassigned = partitionsBeingReassigned - topicAndPartition // write the new list to zookeeper - ZkUtils.updatePartitionReassignmentData(zkClient, updatedPartitionsBeingReassigned.mapValues(_.newReplicas)) + zkUtils.updatePartitionReassignmentData(updatedPartitionsBeingReassigned.mapValues(_.newReplicas)) // update the cache. NO-OP if the partition's reassignment was never started controllerContext.partitionsBeingReassigned.remove(topicAndPartition) } @@ -991,9 +990,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection def updateAssignedReplicasForPartition(topicAndPartition: TopicAndPartition, newReplicaAssignmentForTopic: Map[TopicAndPartition, Seq[Int]]) { try { - val zkPath = ZkUtils.getTopicPath(topicAndPartition.topic) - val jsonPartitionMap = ZkUtils.replicaAssignmentZkData(newReplicaAssignmentForTopic.map(e => (e._1.partition.toString -> e._2))) - ZkUtils.updatePersistentPath(zkClient, zkPath, jsonPartitionMap) + val zkPath = getTopicPath(topicAndPartition.topic) + val jsonPartitionMap = zkUtils.replicaAssignmentZkData(newReplicaAssignmentForTopic.map(e => (e._1.partition.toString -> e._2))) + zkUtils.updatePersistentPath(zkPath, jsonPartitionMap) debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap)) } catch { case e: ZkNoNodeException => throw new IllegalStateException("Topic %s doesn't exist".format(topicAndPartition.topic)) @@ -1014,7 +1013,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection } } if (!isTriggeredByAutoRebalance) - ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath) + zkUtils.deletePath(ZkUtils.PreferredReplicaLeaderElectionPath) controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsToBeRemoved } @@ -1057,7 +1056,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection var zkWriteCompleteOrUnnecessary = false while (!zkWriteCompleteOrUnnecessary) { // refresh leader and isr from zookeeper again - val leaderIsrAndEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) + val leaderIsrAndEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition) zkWriteCompleteOrUnnecessary = leaderIsrAndEpochOpt match { case Some(leaderIsrAndEpoch) => // increment the leader epoch even if the ISR changes val leaderAndIsr = leaderIsrAndEpoch.leaderAndIsr @@ -1074,7 +1073,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection // if the replica to be removed from the ISR is the last surviving member of the ISR and unclean leader election // is disallowed for the corresponding topic, then we must preserve the ISR membership so that the replica can // eventually be restored as the leader. - if (newIsr.isEmpty && !LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(zkClient, + if (newIsr.isEmpty && !LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) { info("Retaining last ISR %d of partition %s since unclean leader election is disabled".format(replicaId, topicAndPartition)) newIsr = leaderAndIsr.isr @@ -1083,7 +1082,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection val newLeaderAndIsr = new LeaderAndIsr(newLeader, leaderAndIsr.leaderEpoch + 1, newIsr, leaderAndIsr.zkVersion + 1) // update the new leadership decision in zookeeper or retry - val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topic, partition, + val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partition, newLeaderAndIsr, epoch, leaderAndIsr.zkVersion) newLeaderAndIsr.zkVersion = newVersion @@ -1120,7 +1119,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection var zkWriteCompleteOrUnnecessary = false while (!zkWriteCompleteOrUnnecessary) { // refresh leader and isr from zookeeper again - val leaderIsrAndEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) + val leaderIsrAndEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition) zkWriteCompleteOrUnnecessary = leaderIsrAndEpochOpt match { case Some(leaderIsrAndEpoch) => val leaderAndIsr = leaderIsrAndEpoch.leaderAndIsr @@ -1134,7 +1133,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection val newLeaderAndIsr = new LeaderAndIsr(leaderAndIsr.leader, leaderAndIsr.leaderEpoch + 1, leaderAndIsr.isr, leaderAndIsr.zkVersion + 1) // update the new leadership decision in zookeeper or retry - val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topic, + val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partition, newLeaderAndIsr, epoch, leaderAndIsr.zkVersion) newLeaderAndIsr.zkVersion = newVersion @@ -1245,7 +1244,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection */ class PartitionsReassignedListener(controller: KafkaController) extends IZkDataListener with Logging { this.logIdent = "[PartitionsReassignedListener on " + controller.config.brokerId + "]: " - val zkClient = controller.controllerContext.zkClient + val zkUtils = controller.controllerContext.zkUtils val controllerContext = controller.controllerContext /** @@ -1256,7 +1255,7 @@ class PartitionsReassignedListener(controller: KafkaController) extends IZkDataL def handleDataChange(dataPath: String, data: Object) { debug("Partitions reassigned listener fired for path %s. Record partitions to be reassigned %s" .format(dataPath, data)) - val partitionsReassignmentData = ZkUtils.parsePartitionReassignmentData(data.toString) + val partitionsReassignmentData = zkUtils.parsePartitionReassignmentData(data.toString) val partitionsToBeReassigned = inLock(controllerContext.controllerLock) { partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1)) } @@ -1288,7 +1287,7 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic: reassignedReplicas: Set[Int]) extends IZkDataListener with Logging { this.logIdent = "[ReassignedPartitionsIsrChangeListener on controller " + controller.config.brokerId + "]: " - val zkClient = controller.controllerContext.zkClient + val zkUtils = controller.controllerContext.zkUtils val controllerContext = controller.controllerContext /** @@ -1305,7 +1304,7 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic: controllerContext.partitionsBeingReassigned.get(topicAndPartition) match { case Some(reassignedPartitionContext) => // need to re-read leader and isr from zookeeper since the zkclient callback doesn't return the Stat object - val newLeaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) + val newLeaderAndIsrOpt = zkUtils.getLeaderAndIsrForPartition(topic, partition) newLeaderAndIsrOpt match { case Some(leaderAndIsr) => // check if new replicas have joined ISR val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet @@ -1359,7 +1358,7 @@ class IsrChangeNotificationListener(controller: KafkaController) extends IZkChil processUpdateNotifications(topicAndPartitions) } finally { // delete processed children - childrenAsScala.map(x => ZkUtils.deletePath(controller.controllerContext.zkClient, + childrenAsScala.map(x => controller.controllerContext.zkUtils.deletePath( ZkUtils.IsrChangeNotificationPath + "/" + x)) } } @@ -1373,7 +1372,7 @@ class IsrChangeNotificationListener(controller: KafkaController) extends IZkChil private def getTopicAndPartition(child: String): Set[TopicAndPartition] = { val changeZnode: String = ZkUtils.IsrChangeNotificationPath + "/" + child - val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(controller.controllerContext.zkClient, changeZnode) + val (jsonOpt, stat) = controller.controllerContext.zkUtils.readDataMaybeNull(changeZnode) if (jsonOpt.isDefined) { val json = Json.parseFull(jsonOpt.get) @@ -1410,7 +1409,7 @@ object IsrChangeNotificationListener { */ class PreferredReplicaElectionListener(controller: KafkaController) extends IZkDataListener with Logging { this.logIdent = "[PreferredReplicaElectionListener on " + controller.config.brokerId + "]: " - val zkClient = controller.controllerContext.zkClient + val zkUtils = controller.controllerContext.zkUtils val controllerContext = controller.controllerContext /** http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala index 4ebeb5a..5eed382 100644 --- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala +++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala @@ -61,7 +61,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi case true => // Prior to electing an unclean (i.e. non-ISR) leader, ensure that doing so is not disallowed by the configuration // for unclean leader election. - if (!LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(controllerContext.zkClient, + if (!LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(controllerContext.zkUtils, ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) { throw new NoReplicaOnlineException(("No broker in ISR for partition " + "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) + http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/controller/PartitionStateMachine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 675a807..73b173e 100755 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -22,7 +22,8 @@ import collection.mutable.Buffer import java.util.concurrent.atomic.AtomicBoolean import kafka.api.LeaderAndIsr import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException} -import kafka.utils.{Logging, ZkUtils, ReplicationUtils} +import kafka.utils.{Logging, ReplicationUtils} +import kafka.utils.ZkUtils._ import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener} import org.I0Itec.zkclient.exception.ZkNodeExistsException import kafka.controller.Callbacks.CallbackBuilder @@ -43,7 +44,7 @@ import kafka.utils.CoreUtils._ class PartitionStateMachine(controller: KafkaController) extends Logging { private val controllerContext = controller.controllerContext private val controllerId = controller.config.brokerId - private val zkClient = controllerContext.zkClient + private val zkUtils = controllerContext.zkUtils private val partitionState: mutable.Map[TopicAndPartition, PartitionState] = mutable.Map.empty private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller) private val hasStarted = new AtomicBoolean(false) @@ -83,7 +84,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { deregisterTopicChangeListener() addPartitionsListener.foreach { case (topic, listener) => - zkClient.unsubscribeDataChanges(ZkUtils.getTopicPath(topic), listener) + zkUtils.zkClient.unsubscribeDataChanges(getTopicPath(topic), listener) } addPartitionsListener.clear() if(controller.config.deleteTopicEnable) @@ -289,9 +290,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { controller.epoch) debug("Initializing leader and isr for partition %s to %s".format(topicAndPartition, leaderIsrAndControllerEpoch)) try { - ZkUtils.createPersistentPath(controllerContext.zkClient, - ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition), - ZkUtils.leaderAndIsrZkData(leaderIsrAndControllerEpoch.leaderAndIsr, controller.epoch)) + zkUtils.createPersistentPath( + getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition), + zkUtils.leaderAndIsrZkData(leaderIsrAndControllerEpoch.leaderAndIsr, controller.epoch)) // NOTE: the above write can fail only if the current controller lost its zk session and the new controller // took over and initialized this partition. This can happen if the current controller went into a long // GC pause @@ -301,7 +302,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } catch { case e: ZkNodeExistsException => // read the controller epoch - val leaderIsrAndEpoch = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topicAndPartition.topic, + val leaderIsrAndEpoch = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topicAndPartition.topic, topicAndPartition.partition).get val failMsg = ("encountered error while changing partition %s's state from New to Online since LeaderAndIsr path already " + "exists with value %s and controller epoch %d") @@ -342,7 +343,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } // elect new leader or throw exception val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topicAndPartition, currentLeaderAndIsr) - val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topic, partition, + val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partition, leaderAndIsr, controller.epoch, currentLeaderAndIsr.zkVersion) newLeaderAndIsr = leaderAndIsr newLeaderAndIsr.zkVersion = newVersion @@ -370,34 +371,34 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { } private def registerTopicChangeListener() = { - zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicChangeListener) + zkUtils.zkClient.subscribeChildChanges(BrokerTopicsPath, topicChangeListener) } private def deregisterTopicChangeListener() = { - zkClient.unsubscribeChildChanges(ZkUtils.BrokerTopicsPath, topicChangeListener) + zkUtils.zkClient.unsubscribeChildChanges(BrokerTopicsPath, topicChangeListener) } def registerPartitionChangeListener(topic: String) = { addPartitionsListener.put(topic, new AddPartitionsListener(topic)) - zkClient.subscribeDataChanges(ZkUtils.getTopicPath(topic), addPartitionsListener(topic)) + zkUtils.zkClient.subscribeDataChanges(getTopicPath(topic), addPartitionsListener(topic)) } def deregisterPartitionChangeListener(topic: String) = { - zkClient.unsubscribeDataChanges(ZkUtils.getTopicPath(topic), addPartitionsListener(topic)) + zkUtils.zkClient.unsubscribeDataChanges(getTopicPath(topic), addPartitionsListener(topic)) addPartitionsListener.remove(topic) } private def registerDeleteTopicListener() = { - zkClient.subscribeChildChanges(ZkUtils.DeleteTopicsPath, deleteTopicsListener) + zkUtils.zkClient.subscribeChildChanges(DeleteTopicsPath, deleteTopicsListener) } private def deregisterDeleteTopicListener() = { - zkClient.unsubscribeChildChanges(ZkUtils.DeleteTopicsPath, deleteTopicsListener) + zkUtils.zkClient.unsubscribeChildChanges(DeleteTopicsPath, deleteTopicsListener) } private def getLeaderIsrAndEpochOrThrowException(topic: String, partition: Int): LeaderIsrAndControllerEpoch = { val topicAndPartition = TopicAndPartition(topic, partition) - ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) match { + ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition) match { case Some(currentLeaderIsrAndEpoch) => currentLeaderIsrAndEpoch case None => val failMsg = "LeaderAndIsr information doesn't exist for partition %s in %s state" @@ -426,7 +427,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { val deletedTopics = controllerContext.allTopics -- currentChildren controllerContext.allTopics = currentChildren - val addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, newTopics.toSeq) + val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics.toSeq) controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p => !deletedTopics.contains(p._1.topic)) controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment) @@ -449,7 +450,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { */ class DeleteTopicsListener() extends IZkChildListener with Logging { this.logIdent = "[DeleteTopicsListener on " + controller.config.brokerId + "]: " - val zkClient = controllerContext.zkClient + val zkUtils = controllerContext.zkUtils /** * Invoked when a topic is being deleted @@ -466,7 +467,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { val nonExistentTopics = topicsToBeDeleted.filter(t => !controllerContext.allTopics.contains(t)) if(nonExistentTopics.size > 0) { warn("Ignoring request to delete non-existing topics " + nonExistentTopics.mkString(",")) - nonExistentTopics.foreach(topic => ZkUtils.deletePathRecursive(zkClient, ZkUtils.getDeleteTopicPath(topic))) + nonExistentTopics.foreach(topic => zkUtils.deletePathRecursive(getDeleteTopicPath(topic))) } topicsToBeDeleted --= nonExistentTopics if(topicsToBeDeleted.size > 0) { @@ -505,7 +506,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { inLock(controllerContext.controllerLock) { try { info("Add Partition triggered " + data.toString + " for path " + dataPath) - val partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic)) + val partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic)) val partitionsToBeAdded = partitionReplicaAssignment.filter(p => !controllerContext.partitionReplicaAssignment.contains(p._1)) if(controller.deleteTopicManager.isTopicQueuedUpForDeletion(topic)) http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index acad83a..32ed288 100755 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -47,7 +47,7 @@ import kafka.utils.CoreUtils._ class ReplicaStateMachine(controller: KafkaController) extends Logging { private val controllerContext = controller.controllerContext private val controllerId = controller.config.brokerId - private val zkClient = controllerContext.zkClient + private val zkUtils = controllerContext.zkUtils private val replicaState: mutable.Map[PartitionAndReplica, ReplicaState] = mutable.Map.empty private val brokerChangeListener = new BrokerChangeListener() private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller) @@ -171,7 +171,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { case NewReplica => assertValidPreviousStates(partitionAndReplica, List(NonExistentReplica), targetState) // start replica as a follower to the current leader for its partition - val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) + val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition) leaderIsrAndControllerEpochOpt match { case Some(leaderIsrAndControllerEpoch) => if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId) @@ -313,11 +313,11 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { } private def registerBrokerChangeListener() = { - zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener) + zkUtils.zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener) } private def deregisterBrokerChangeListener() = { - zkClient.unsubscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener) + zkUtils.zkClient.unsubscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener) } /** @@ -359,10 +359,10 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { try { val curBrokerIds = currentBrokerList.map(_.toInt).toSet val newBrokerIds = curBrokerIds -- controllerContext.liveOrShuttingDownBrokerIds - val newBrokerInfo = newBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)) + val newBrokerInfo = newBrokerIds.map(zkUtils.getBrokerInfo(_)) val newBrokers = newBrokerInfo.filter(_.isDefined).map(_.get) val deadBrokerIds = controllerContext.liveOrShuttingDownBrokerIds -- curBrokerIds - controllerContext.liveBrokers = curBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get) + controllerContext.liveBrokers = curBrokerIds.map(zkUtils.getBrokerInfo(_)).filter(_.isDefined).map(_.get) info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s" .format(newBrokerIds.mkString(","), deadBrokerIds.mkString(","), controllerContext.liveBrokerIds.mkString(","))) newBrokers.foreach(controllerContext.controllerChannelManager.addBroker(_)) http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/controller/TopicDeletionManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala index 9e39dd5..c6f80ac 100755 --- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala +++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala @@ -23,8 +23,9 @@ import org.apache.kafka.common.requests.{StopReplicaResponse, AbstractRequestRes import collection.mutable import collection.JavaConverters._ -import kafka.utils.{ShutdownableThread, Logging, ZkUtils} +import kafka.utils.{ShutdownableThread, Logging} import kafka.utils.CoreUtils._ +import kafka.utils.ZkUtils._ import collection.Set import kafka.common.TopicAndPartition import java.util.concurrent.locks.ReentrantLock @@ -288,9 +289,10 @@ class TopicDeletionManager(controller: KafkaController, partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, NonExistentPartition) topicsToBeDeleted -= topic partitionsToBeDeleted.retain(_.topic != topic) - controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicPath(topic)) - controllerContext.zkClient.deleteRecursive(ZkUtils.getEntityConfigPath(ConfigType.Topic, topic)) - controllerContext.zkClient.delete(ZkUtils.getDeleteTopicPath(topic)) + val zkUtils = controllerContext.zkUtils + zkUtils.zkClient.deleteRecursive(getTopicPath(topic)) + zkUtils.zkClient.deleteRecursive(getEntityConfigPath(ConfigType.Topic, topic)) + zkUtils.zkClient.delete(getDeleteTopicPath(topic)) controllerContext.removeTopic(topic) } @@ -385,7 +387,7 @@ class TopicDeletionManager(controller: KafkaController, } class DeleteTopicsThread() extends ShutdownableThread(name = "delete-topics-thread-" + controller.config.brokerId, isInterruptible = false) { - val zkClient = controllerContext.zkClient + val zkUtils = controllerContext.zkUtils override def doWork() { awaitTopicDeletionNotification() http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala index 68ff4fc..bf23e9b 100644 --- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala @@ -43,7 +43,7 @@ class ConsumerCoordinator(val brokerId: Int, val groupConfig: GroupManagerConfig, val offsetConfig: OffsetManagerConfig, private val offsetManager: OffsetManager, - zkClient: ZkClient) extends Logging { + zkUtils: ZkUtils) extends Logging { this.logIdent = "[ConsumerCoordinator " + brokerId + "]: " @@ -57,9 +57,9 @@ class ConsumerCoordinator(val brokerId: Int, groupConfig: GroupManagerConfig, offsetConfig: OffsetManagerConfig, replicaManager: ReplicaManager, - zkClient: ZkClient, + zkUtils: ZkUtils, scheduler: KafkaScheduler) = this(brokerId, groupConfig, offsetConfig, - new OffsetManager(offsetConfig, replicaManager, zkClient, scheduler), zkClient) + new OffsetManager(offsetConfig, replicaManager, zkUtils, scheduler), zkUtils) def offsetsTopicConfigs: Properties = { val props = new Properties @@ -81,7 +81,7 @@ class ConsumerCoordinator(val brokerId: Int, info("Starting up.") heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", brokerId) rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance]("Rebalance", brokerId) - coordinatorMetadata = new CoordinatorMetadata(brokerId, zkClient, maybePrepareRebalance) + coordinatorMetadata = new CoordinatorMetadata(brokerId, zkUtils, maybePrepareRebalance) isActive.set(true) info("Startup complete.") } @@ -499,7 +499,7 @@ object ConsumerCoordinator { val OffsetsTopicName = "__consumer_offsets" def create(config: KafkaConfig, - zkClient: ZkClient, + zkUtils: ZkUtils, replicaManager: ReplicaManager, kafkaScheduler: KafkaScheduler): ConsumerCoordinator = { val offsetConfig = OffsetManagerConfig(maxMetadataSize = config.offsetMetadataMaxSize, @@ -513,11 +513,11 @@ object ConsumerCoordinator { val groupConfig = GroupManagerConfig(consumerMinSessionTimeoutMs = config.consumerMinSessionTimeoutMs, consumerMaxSessionTimeoutMs = config.consumerMaxSessionTimeoutMs) - new ConsumerCoordinator(config.brokerId, groupConfig, offsetConfig, replicaManager, zkClient, kafkaScheduler) + new ConsumerCoordinator(config.brokerId, groupConfig, offsetConfig, replicaManager, zkUtils, kafkaScheduler) } def create(config: KafkaConfig, - zkClient: ZkClient, + zkUtils: ZkUtils, offsetManager: OffsetManager): ConsumerCoordinator = { val offsetConfig = OffsetManagerConfig(maxMetadataSize = config.offsetMetadataMaxSize, loadBufferSize = config.offsetsLoadBufferSize, @@ -530,6 +530,6 @@ object ConsumerCoordinator { val groupConfig = GroupManagerConfig(consumerMinSessionTimeoutMs = config.consumerMinSessionTimeoutMs, consumerMaxSessionTimeoutMs = config.consumerMaxSessionTimeoutMs) - new ConsumerCoordinator(config.brokerId, groupConfig, offsetConfig, offsetManager, zkClient) + new ConsumerCoordinator(config.brokerId, groupConfig, offsetConfig, offsetManager, zkUtils) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala index 2920320..a33231a 100644 --- a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala @@ -20,7 +20,7 @@ package kafka.coordinator import kafka.server.KafkaConfig import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.utils.{threadsafe, ZkUtils, Logging} - +import kafka.utils.ZkUtils._ import org.I0Itec.zkclient.{ZkClient, IZkDataListener} import java.util.concurrent.locks.ReentrantReadWriteLock @@ -33,7 +33,7 @@ import scala.collection.mutable */ @threadsafe private[coordinator] class CoordinatorMetadata(brokerId: Int, - zkClient: ZkClient, + zkUtils: ZkUtils, maybePrepareRebalance: ConsumerGroupMetadata => Unit) { /** @@ -159,19 +159,19 @@ private[coordinator] class CoordinatorMetadata(brokerId: Int, } private def getTopicPartitionCountFromZK(topic: String) = { - val topicData = ZkUtils.getPartitionAssignmentForTopics(zkClient, Seq(topic)) + val topicData = zkUtils.getPartitionAssignmentForTopics(Seq(topic)) topicData(topic).size } private def registerTopicPartitionChangeListener(topic: String) { val listener = new TopicPartitionChangeListener topicPartitionChangeListeners.put(topic, listener) - zkClient.subscribeDataChanges(ZkUtils.getTopicPath(topic), listener) + zkUtils.zkClient.subscribeDataChanges(getTopicPath(topic), listener) } private def deregisterTopicPartitionChangeListener(topic: String) { val listener = topicPartitionChangeListeners(topic) - zkClient.unsubscribeDataChanges(ZkUtils.getTopicPath(topic), listener) + zkUtils.zkClient.unsubscribeDataChanges(getTopicPath(topic), listener) topicPartitionChangeListeners.remove(topic) } http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala index 2e5ee8d..9b4314e 100644 --- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala @@ -27,6 +27,7 @@ import kafka.server.KafkaConfig import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.utils._ import org.I0Itec.zkclient.{IZkStateListener, ZkClient} +import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.security.auth.KafkaPrincipal import scala.collection.JavaConverters._ import org.apache.log4j.Logger @@ -59,7 +60,7 @@ object SimpleAclAuthorizer { //notification node which gets updated with the resource name when acl on a resource is changed. val AclChangedZkPath = "/kafka-acl-changes" - //prefix of all the change notificiation sequence node. + //prefix of all the change notification sequence node. val AclChangedPrefix = "acl_changes_" } @@ -67,7 +68,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging { private val authorizerLogger = Logger.getLogger("kafka.authorizer.logger") private var superUsers = Set.empty[KafkaPrincipal] private var shouldAllowEveryoneIfNoAclIsFound = false - private var zkClient: ZkClient = null + private var zkUtils: ZkUtils = null private var aclChangeListener: ZkNodeChangeNotificationListener = null private val aclCache = new scala.collection.mutable.HashMap[Resource, Set[Acl]] @@ -92,16 +93,19 @@ class SimpleAclAuthorizer extends Authorizer with Logging { val zkConnectionTimeoutMs = configs.getOrElse(SimpleAclAuthorizer.ZkConnectionTimeOutProp, kafkaConfig.zkConnectionTimeoutMs).toString.toInt val zkSessionTimeOutMs = configs.getOrElse(SimpleAclAuthorizer.ZkSessionTimeOutProp, kafkaConfig.zkSessionTimeoutMs).toString.toInt - zkClient = ZkUtils.createZkClient(zkUrl, zkConnectionTimeoutMs, zkSessionTimeOutMs) - ZkUtils.makeSurePersistentPathExists(zkClient, SimpleAclAuthorizer.AclZkPath) + zkUtils = ZkUtils(zkUrl, + zkConnectionTimeoutMs, + zkSessionTimeOutMs, + JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM))) + zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclZkPath) loadCache() - ZkUtils.makeSurePersistentPathExists(zkClient, SimpleAclAuthorizer.AclChangedZkPath) - aclChangeListener = new ZkNodeChangeNotificationListener(zkClient, SimpleAclAuthorizer.AclChangedZkPath, SimpleAclAuthorizer.AclChangedPrefix, AclChangedNotificaitonHandler) + zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclChangedZkPath) + aclChangeListener = new ZkNodeChangeNotificationListener(zkUtils, SimpleAclAuthorizer.AclChangedZkPath, SimpleAclAuthorizer.AclChangedPrefix, AclChangedNotificaitonHandler) aclChangeListener.init() - zkClient.subscribeStateChanges(ZkStateChangeListener) + zkUtils.zkClient.subscribeStateChanges(ZkStateChangeListener) } override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = { @@ -162,17 +166,17 @@ class SimpleAclAuthorizer extends Authorizer with Logging { val updatedAcls = getAcls(resource) ++ acls val path = toResourcePath(resource) - if (ZkUtils.pathExists(zkClient, path)) - ZkUtils.updatePersistentPath(zkClient, path, Json.encode(Acl.toJsonCompatibleMap(updatedAcls))) + if (zkUtils.pathExists(path)) + zkUtils.updatePersistentPath(path, Json.encode(Acl.toJsonCompatibleMap(updatedAcls))) else - ZkUtils.createPersistentPath(zkClient, path, Json.encode(Acl.toJsonCompatibleMap(updatedAcls))) + zkUtils.createPersistentPath(path, Json.encode(Acl.toJsonCompatibleMap(updatedAcls))) updateAclChangedFlag(resource) } } override def removeAcls(aclsTobeRemoved: Set[Acl], resource: Resource): Boolean = { - if (ZkUtils.pathExists(zkClient, toResourcePath(resource))) { + if (zkUtils.pathExists(toResourcePath(resource))) { val existingAcls = getAcls(resource) val filteredAcls = existingAcls.filter((acl: Acl) => !aclsTobeRemoved.contains(acl)) @@ -180,9 +184,9 @@ class SimpleAclAuthorizer extends Authorizer with Logging { if (aclNeedsRemoval) { val path: String = toResourcePath(resource) if (filteredAcls.nonEmpty) - ZkUtils.updatePersistentPath(zkClient, path, Json.encode(Acl.toJsonCompatibleMap(filteredAcls))) + zkUtils.updatePersistentPath(path, Json.encode(Acl.toJsonCompatibleMap(filteredAcls))) else - ZkUtils.deletePath(zkClient, toResourcePath(resource)) + zkUtils.deletePath(toResourcePath(resource)) updateAclChangedFlag(resource) } @@ -192,8 +196,8 @@ class SimpleAclAuthorizer extends Authorizer with Logging { } override def removeAcls(resource: Resource): Boolean = { - if (ZkUtils.pathExists(zkClient, toResourcePath(resource))) { - ZkUtils.deletePath(zkClient, toResourcePath(resource)) + if (zkUtils.pathExists(toResourcePath(resource))) { + zkUtils.deletePath(toResourcePath(resource)) updateAclChangedFlag(resource) true } else false @@ -206,7 +210,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging { } private def getAclsFromZk(resource: Resource): Set[Acl] = { - val aclJson = ZkUtils.readDataMaybeNull(zkClient, toResourcePath(resource))._1 + val aclJson = zkUtils.readDataMaybeNull(toResourcePath(resource))._1 aclJson.map(Acl.fromJson).getOrElse(Set.empty) } @@ -224,11 +228,11 @@ class SimpleAclAuthorizer extends Authorizer with Logging { private def loadCache() { var acls = Set.empty[Acl] - val resourceTypes = ZkUtils.getChildren(zkClient, SimpleAclAuthorizer.AclZkPath) + val resourceTypes = zkUtils.getChildren(SimpleAclAuthorizer.AclZkPath) for (rType <- resourceTypes) { val resourceType = ResourceType.fromString(rType) val resourceTypePath = SimpleAclAuthorizer.AclZkPath + "/" + resourceType.name - val resourceNames = ZkUtils.getChildren(zkClient, resourceTypePath) + val resourceNames = zkUtils.getChildren(resourceTypePath) for (resourceName <- resourceNames) { acls = getAclsFromZk(Resource(resourceType, resourceName.toString)) updateCache(new Resource(resourceType, resourceName), acls) @@ -255,7 +259,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging { } private def updateAclChangedFlag(resource: Resource) { - ZkUtils.createSequentialPersistentPath(zkClient, SimpleAclAuthorizer.AclChangedZkPath + "/" + SimpleAclAuthorizer.AclChangedPrefix, resource.toString) + zkUtils.createSequentialPersistentPath(SimpleAclAuthorizer.AclChangedZkPath + "/" + SimpleAclAuthorizer.AclChangedPrefix, resource.toString) } object AclChangedNotificaitonHandler extends NotificationHandler { http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/server/DynamicConfigManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala index 4da1833..d443a1f 100644 --- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala +++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala @@ -70,7 +70,7 @@ object ConfigType { * on startup where a change might be missed between the initial config load and registering for change notifications. * */ -class DynamicConfigManager(private val zkClient: ZkClient, +class DynamicConfigManager(private val zkUtils: ZkUtils, private val configHandler : Map[String, ConfigHandler], private val changeExpirationMs: Long = 15*60*1000, private val time: Time = SystemTime) extends Logging { @@ -80,8 +80,8 @@ class DynamicConfigManager(private val zkClient: ZkClient, * Begin watching for config changes */ def startup() { - ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.EntityConfigChangesPath) - zkClient.subscribeChildChanges(ZkUtils.EntityConfigChangesPath, ConfigChangeListener) + zkUtils.makeSurePersistentPathExists(ZkUtils.EntityConfigChangesPath) + zkUtils.zkClient.subscribeChildChanges(ZkUtils.EntityConfigChangesPath, ConfigChangeListener) processAllConfigChanges() } @@ -89,7 +89,7 @@ class DynamicConfigManager(private val zkClient: ZkClient, * Process all config changes */ private def processAllConfigChanges() { - val configChanges = zkClient.getChildren(ZkUtils.EntityConfigChangesPath) + val configChanges = zkUtils.zkClient.getChildren(ZkUtils.EntityConfigChangesPath) import JavaConversions._ processConfigChanges((configChanges: mutable.Buffer[String]).sorted) } @@ -107,7 +107,7 @@ class DynamicConfigManager(private val zkClient: ZkClient, if (changeId > lastExecutedChange) { val changeZnode = ZkUtils.EntityConfigChangesPath + "/" + notification - val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, changeZnode) + val (jsonOpt, stat) = zkUtils.readDataMaybeNull(changeZnode) processNotification(jsonOpt) } lastExecutedChange = changeId @@ -138,7 +138,7 @@ class DynamicConfigManager(private val zkClient: ZkClient, case Some(value: String) => value case _ => throw new IllegalArgumentException("Config change notification does not specify 'entity_name'. Received: " + json) } - configHandler(entityType).processConfigChanges(entity, AdminUtils.fetchEntityConfig(zkClient, entityType, entity)) + configHandler(entityType).processConfigChanges(entity, AdminUtils.fetchEntityConfig(zkUtils, entityType, entity)) case o => throw new IllegalArgumentException("Config change notification has an unexpected value. The format is:" + "{\"version\" : 1," + @@ -151,12 +151,12 @@ class DynamicConfigManager(private val zkClient: ZkClient, private def purgeObsoleteNotifications(now: Long, notifications: Seq[String]) { for(notification <- notifications.sorted) { - val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.EntityConfigChangesPath + "/" + notification) + val (jsonOpt, stat) = zkUtils.readDataMaybeNull(ZkUtils.EntityConfigChangesPath + "/" + notification) if(jsonOpt.isDefined) { val changeZnode = ZkUtils.EntityConfigChangesPath + "/" + notification if (now - stat.getCtime > changeExpirationMs) { debug("Purging config change notification " + notification) - ZkUtils.deletePath(zkClient, changeZnode) + zkUtils.deletePath(changeZnode) } else { return } http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 5715626..6acab8d 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -44,7 +44,7 @@ class KafkaApis(val requestChannel: RequestChannel, val replicaManager: ReplicaManager, val coordinator: ConsumerCoordinator, val controller: KafkaController, - val zkClient: ZkClient, + val zkUtils: ZkUtils, val brokerId: Int, val config: KafkaConfig, val metadataCache: MetadataCache, @@ -221,7 +221,7 @@ class KafkaApis(val requestChannel: RequestChannel, } else if (metaAndError.metadata != null && metaAndError.metadata.length > config.offsetMetadataMaxSize) { (topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode) } else { - ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + + zkUtils.updatePersistentPath(topicDirs.consumerOffsetDir + "/" + topicAndPartition.partition, metaAndError.offset.toString) (topicAndPartition, ErrorMapping.NoError) } @@ -535,14 +535,14 @@ class KafkaApis(val requestChannel: RequestChannel, Math.min(config.offsetsTopicReplicationFactor.toInt, aliveBrokers.length) else config.offsetsTopicReplicationFactor.toInt - AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions, + AdminUtils.createTopic(zkUtils, topic, config.offsetsTopicPartitions, offsetsTopicReplicationFactor, coordinator.offsetsTopicConfigs) info("Auto creation of topic %s with %d partitions and replication factor %d is successful!" .format(topic, config.offsetsTopicPartitions, offsetsTopicReplicationFactor)) } else { - AdminUtils.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor) + AdminUtils.createTopic(zkUtils, topic, config.numPartitions, config.defaultReplicationFactor) info("Auto creation of topic %s with %d partitions and replication factor %d is successful!" .format(topic, config.numPartitions, config.defaultReplicationFactor)) } @@ -624,7 +624,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (metadataCache.getTopicMetadata(Set(topicAndPartition.topic), request.securityProtocol).size <= 0) { (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition) } else { - val payloadOpt = ZkUtils.readDataMaybeNull(zkClient, topicDirs.consumerOffsetDir + "/" + topicAndPartition.partition)._1 + val payloadOpt = zkUtils.readDataMaybeNull(topicDirs.consumerOffsetDir + "/" + topicAndPartition.partition)._1 payloadOpt match { case Some(payload) => (topicAndPartition, OffsetMetadataAndError(payload.toLong)) case None => (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition) http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/server/KafkaHealthcheck.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala index 16760d4..928ff43 100644 --- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala +++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala @@ -35,14 +35,13 @@ import java.net.InetAddress */ class KafkaHealthcheck(private val brokerId: Int, private val advertisedEndpoints: Map[SecurityProtocol, EndPoint], - private val zkClient: ZkClient, - private val zkConnection: ZkConnection) extends Logging { + private val zkUtils: ZkUtils) extends Logging { val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId val sessionExpireListener = new SessionExpireListener def startup() { - zkClient.subscribeStateChanges(sessionExpireListener) + zkUtils.zkClient.subscribeStateChanges(sessionExpireListener) register() } @@ -62,7 +61,7 @@ class KafkaHealthcheck(private val brokerId: Int, // only PLAINTEXT is supported as default // if the broker doesn't listen on PLAINTEXT protocol, an empty endpoint will be registered and older clients will be unable to connect val plaintextEndpoint = updatedEndpoints.getOrElse(SecurityProtocol.PLAINTEXT, new EndPoint(null,-1,null)) - ZkUtils.registerBrokerInZk(zkClient, zkConnection, brokerId, plaintextEndpoint.host, plaintextEndpoint.port, updatedEndpoints, jmxPort) + zkUtils.registerBrokerInZk(brokerId, plaintextEndpoint.host, plaintextEndpoint.port, updatedEndpoints, jmxPort) } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 99a3c12..f50c266 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -38,6 +38,7 @@ import org.apache.kafka.common.network.{Selectable, ChannelBuilders, NetworkRece import org.apache.kafka.common.protocol.{Errors, ApiKeys, SecurityProtocol} import org.apache.kafka.common.metrics.{JmxReporter, Metrics} import org.apache.kafka.common.requests.{ControlledShutdownResponse, ControlledShutdownRequest, RequestSend} +import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.security.ssl.SSLFactory import org.apache.kafka.common.utils.AppInfoParser @@ -128,8 +129,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr var kafkaHealthcheck: KafkaHealthcheck = null val metadataCache: MetadataCache = new MetadataCache(config.brokerId) - var zkClient: ZkClient = null - var zkConnection: ZkConnection = null + var zkUtils: ZkUtils = null val correlationId: AtomicInteger = new AtomicInteger(0) val brokerMetaPropsFile = "meta.properties" val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator +brokerMetaPropsFile)))).toMap @@ -165,12 +165,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr kafkaScheduler.startup() /* setup zookeeper */ - val (client, connection) = initZk() - zkClient = client - zkConnection = connection + zkUtils = initZk() /* start log manager */ - logManager = createLogManager(zkClient, brokerState) + logManager = createLogManager(zkUtils.zkClient, brokerState) logManager.startup() /* generate brokerId */ @@ -181,16 +179,16 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr socketServer.startup() /* start replica manager */ - replicaManager = new ReplicaManager(config, metrics, time, kafkaMetricsTime, zkClient, kafkaScheduler, logManager, + replicaManager = new ReplicaManager(config, metrics, time, kafkaMetricsTime, zkUtils, kafkaScheduler, logManager, isShuttingDown) replicaManager.startup() /* start kafka controller */ - kafkaController = new KafkaController(config, zkClient, zkConnection, brokerState, kafkaMetricsTime, metrics, threadNamePrefix) + kafkaController = new KafkaController(config, zkUtils, brokerState, kafkaMetricsTime, metrics, threadNamePrefix) kafkaController.startup() /* start kafka coordinator */ - consumerCoordinator = ConsumerCoordinator.create(config, zkClient, replicaManager, kafkaScheduler) + consumerCoordinator = ConsumerCoordinator.create(config, zkUtils, replicaManager, kafkaScheduler) consumerCoordinator.startup() /* Get the authorizer and initialize it if one is specified.*/ @@ -204,7 +202,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr /* start processing requests */ apis = new KafkaApis(socketServer.requestChannel, replicaManager, consumerCoordinator, - kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer) + kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) brokerState.newState(RunningAsBroker) @@ -213,7 +211,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr /* start dynamic config manager */ dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager), ConfigType.Client -> new ClientIdConfigHandler) - dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers) + dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers) dynamicConfigManager.startup() /* tell everyone we are alive */ @@ -223,7 +221,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr else (protocol, endpoint) } - kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkClient, zkConnection) + kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils) kafkaHealthcheck.startup() /* register broker metrics */ @@ -245,7 +243,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr } } - private def initZk(): (ZkClient, ZkConnection) = { + private def initZk(): ZkUtils = { info("Connecting to zookeeper on " + config.zkConnect) val chroot = { @@ -257,15 +255,21 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr if (chroot.length > 1) { val zkConnForChrootCreation = config.zkConnect.substring(0, config.zkConnect.indexOf("/")) - val zkClientForChrootCreation = ZkUtils.createZkClient(zkConnForChrootCreation, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs) - ZkUtils.makeSurePersistentPathExists(zkClientForChrootCreation, chroot) + val zkClientForChrootCreation = ZkUtils(zkConnForChrootCreation, + config.zkSessionTimeoutMs, + config.zkConnectionTimeoutMs, + JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM))) + zkClientForChrootCreation.makeSurePersistentPathExists(chroot) info("Created zookeeper path " + chroot) - zkClientForChrootCreation.close() + zkClientForChrootCreation.zkClient.close() } - val (zkClient, zkConnection) = ZkUtils.createZkClientAndConnection(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs) - ZkUtils.setupCommonPaths(zkClient) - (zkClient, zkConnection) + val zkUtils = ZkUtils(config.zkConnect, + config.zkSessionTimeoutMs, + config.zkConnectionTimeoutMs, + JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM))) + zkUtils.setupCommonPaths() + zkUtils } @@ -334,8 +338,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr // Get the current controller info. This is to ensure we use the most recent info to issue the // controlled shutdown request - val controllerId = ZkUtils.getController(zkClient) - ZkUtils.getBrokerInfo(zkClient, controllerId) match { + val controllerId = zkUtils.getController() + zkUtils.getBrokerInfo(controllerId) match { case Some(broker) => // if this is the first attempt, if the controller has changed or if an exception was thrown in a previous // attempt, connect to the most recent controller @@ -410,8 +414,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr // Get the current controller info. This is to ensure we use the most recent info to issue the // controlled shutdown request - val controllerId = ZkUtils.getController(zkClient) - ZkUtils.getBrokerInfo(zkClient, controllerId) match { + val controllerId = zkUtils.getController() + zkUtils.getBrokerInfo(controllerId) match { case Some(broker) => if (channel == null || prevController == null || !prevController.equals(broker)) { // if this is the first attempt or if the controller has changed, create a channel to the most recent @@ -524,8 +528,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr CoreUtils.swallow(consumerCoordinator.shutdown()) if(kafkaController != null) CoreUtils.swallow(kafkaController.shutdown()) - if(zkClient != null) - CoreUtils.swallow(zkClient.close()) + if(zkUtils != null) + CoreUtils.swallow(zkUtils.close()) if (metrics != null) CoreUtils.swallow(metrics.close()) @@ -559,7 +563,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr val defaultProps = KafkaServer.copyKafkaConfigToLog(config) val defaultLogConfig = LogConfig(defaultProps) - val configs = AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps, _)) + val configs = AdminUtils.fetchAllTopicConfigs(zkUtils).mapValues(LogConfig.fromProps(defaultProps, _)) // read the log configurations from zookeeper val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads, dedupeBufferSize = config.logCleanerDedupeBufferSize, @@ -626,7 +630,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr private def generateBrokerId: Int = { try { - ZkUtils.getBrokerSequenceId(zkClient, config.maxReservedBrokerId) + zkUtils.getBrokerSequenceId(config.maxReservedBrokerId) } catch { case e: Exception => error("Failed to generate broker.id due to ", e) http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/server/OffsetManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index 0e613e7..bdc3bb6 100755 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -87,7 +87,7 @@ object OffsetManagerConfig { class OffsetManager(val config: OffsetManagerConfig, replicaManager: ReplicaManager, - zkClient: ZkClient, + zkUtils: ZkUtils, scheduler: Scheduler) extends Logging with KafkaMetricsGroup { /* offsets and metadata cache */ @@ -449,7 +449,7 @@ class OffsetManager(val config: OffsetManagerConfig, */ private def getOffsetsTopicPartitionCount = { val topic = ConsumerCoordinator.OffsetsTopicName - val topicData = ZkUtils.getPartitionAssignmentForTopics(zkClient, Seq(topic)) + val topicData = zkUtils.getPartitionAssignmentForTopics(Seq(topic)) if (topicData(topic).nonEmpty) topicData(topic).size else http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 82a6001..0a17fd0 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -144,7 +144,7 @@ class ReplicaFetcherThread(name: String, // Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election. // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise, // we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration. - if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchEntityConfig(replicaMgr.zkClient, + if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchEntityConfig(replicaMgr.zkUtils, ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) { // Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur. fatal("Halting because log truncation is not allowed for topic %s,".format(topicAndPartition.topic) + http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index c0fec67..1fc47f4 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -99,7 +99,7 @@ class ReplicaManager(val config: KafkaConfig, metrics: Metrics, time: Time, jTime: JTime, - val zkClient: ZkClient, + val zkUtils: ZkUtils, scheduler: Scheduler, val logManager: LogManager, val isShuttingDown: AtomicBoolean, @@ -163,7 +163,7 @@ class ReplicaManager(val config: KafkaConfig, def maybePropagateIsrChanges() { isrChangeSet synchronized { if (isrChangeSet.nonEmpty) { - ReplicationUtils.propagateIsrChanges(zkClient, isrChangeSet) + ReplicationUtils.propagateIsrChanges(zkUtils, isrChangeSet) isrChangeSet.clear() } }