kafka-927; Integrate controlled shutdown into kafka shutdown hook; patched by Sriram Subramanian; reviewed by Neha Narkhede and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4f387ae4 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4f387ae4 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4f387ae4 Branch: refs/heads/trunk Commit: 4f387ae43544c422b1845b3da5ab09aee8e4acd0 Parents: 6584276 Author: Sriram Subramanian <sri...@gmail.com> Authored: Mon Jun 3 16:08:37 2013 -0700 Committer: Jun Rao <jun...@gmail.com> Committed: Mon Jun 3 16:08:37 2013 -0700 ---------------------------------------------------------------------- .../kafka/api/ControlledShutdownRequest.scala | 73 ++++++++++ .../kafka/api/ControlledShutdownResponse.scala | 70 ++++++++++ core/src/main/scala/kafka/api/RequestKeys.scala | 4 +- .../main/scala/kafka/cluster/Partition.scala | 11 +- .../kafka/controller/KafkaController.scala | 94 +++++-------- .../src/main/scala/kafka/server/KafkaApis.scala | 13 +- .../main/scala/kafka/server/KafkaConfig.scala | 14 +- .../main/scala/kafka/server/KafkaServer.scala | 112 +++++++++++++-- .../scala/kafka/server/ReplicaManager.scala | 3 +- .../test/scala/unit/kafka/admin/AdminTest.scala | 6 +- .../kafka/integration/RollingBounceTest.scala | 140 +++++++++++++++++++ .../server/HighwatermarkPersistenceTest.scala | 5 +- .../unit/kafka/server/ISRExpirationTest.scala | 3 +- .../unit/kafka/server/SimpleFetchTest.scala | 8 +- 14 files changed, 472 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala new file mode 100644 index 0000000..ad6a20d --- /dev/null +++ b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.nio.ByteBuffer +import kafka.api.ApiUtils._ +import collection.mutable.ListBuffer +import kafka.network.{BoundedByteBufferSend, RequestChannel} +import kafka.common.{TopicAndPartition, ErrorMapping} +import kafka.network.RequestChannel.Response +import kafka.utils.Logging + +object ControlledShutdownRequest extends Logging { + val CurrentVersion = 0.shortValue + val DefaultClientId = "" + + def readFrom(buffer: ByteBuffer): ControlledShutdownRequest = { + val versionId = buffer.getShort + val correlationId = buffer.getInt + val brokerId = buffer.getInt + new ControlledShutdownRequest(versionId, correlationId, brokerId) + } +} + +case class ControlledShutdownRequest(val versionId: Short, + override val correlationId: Int, + val brokerId: Int) + extends RequestOrResponse(Some(RequestKeys.ControlledShutdownKey), correlationId){ + + def this(correlationId: Int, brokerId: Int) = + this(ControlledShutdownRequest.CurrentVersion, correlationId, brokerId) + + def writeTo(buffer: ByteBuffer) { + buffer.putShort(versionId) + buffer.putInt(correlationId) + buffer.putInt(brokerId) + } + + def sizeInBytes(): Int = { + 2 + /* version id */ + 4 + /* correlation id */ + 4 /* broker id */ + } + + override def toString(): String = { + val controlledShutdownRequest = new StringBuilder + controlledShutdownRequest.append("Name: " + this.getClass.getSimpleName) + controlledShutdownRequest.append("; Version: " + versionId) + controlledShutdownRequest.append("; CorrelationId: " + correlationId) + controlledShutdownRequest.append("; BrokerId: " + brokerId) + controlledShutdownRequest.toString() + } + + override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { + val errorResponse = ControlledShutdownResponse(correlationId, ErrorMapping.codeFor(e.getCause.asInstanceOf[Class[Throwable]]), Set.empty[TopicAndPartition]) + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse))) + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala new file mode 100644 index 0000000..b7c8448 --- /dev/null +++ b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.nio.ByteBuffer +import collection.mutable.HashMap +import collection.immutable.Map +import kafka.common.{TopicAndPartition, ErrorMapping} +import kafka.api.ApiUtils._ + + +object ControlledShutdownResponse { + def readFrom(buffer: ByteBuffer): ControlledShutdownResponse = { + val correlationId = buffer.getInt + val errorCode = buffer.getShort + val numEntries = buffer.getInt + + var partitionsRemaining = Set[TopicAndPartition]() + for (i<- 0 until numEntries){ + val topic = readShortString(buffer) + val partition = buffer.getInt + partitionsRemaining += new TopicAndPartition(topic, partition) + } + new ControlledShutdownResponse(correlationId, errorCode, partitionsRemaining) + } +} + + +case class ControlledShutdownResponse(override val correlationId: Int, + val errorCode: Short = ErrorMapping.NoError, + val partitionsRemaining: Set[TopicAndPartition]) + extends RequestOrResponse(correlationId = correlationId) { + def sizeInBytes(): Int ={ + var size = + 4 /* correlation id */ + + 2 /* error code */ + + 4 /* number of responses */ + for (topicAndPartition <- partitionsRemaining) { + size += + 2 + topicAndPartition.topic.length /* topic */ + + 4 /* partition */ + } + size + } + + def writeTo(buffer: ByteBuffer) { + buffer.putInt(correlationId) + buffer.putShort(errorCode) + buffer.putInt(partitionsRemaining.size) + for (topicAndPartition:TopicAndPartition <- partitionsRemaining){ + writeShortString(buffer, topicAndPartition.topic) + buffer.putInt(topicAndPartition.partition) + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/main/scala/kafka/api/RequestKeys.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala index 541cf84..e2ce9bd 100644 --- a/core/src/main/scala/kafka/api/RequestKeys.scala +++ b/core/src/main/scala/kafka/api/RequestKeys.scala @@ -28,6 +28,7 @@ object RequestKeys { val LeaderAndIsrKey: Short = 4 val StopReplicaKey: Short = 5 val UpdateMetadataKey: Short = 6 + val ControlledShutdownKey: Short = 7 val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]= Map(ProduceKey -> ("Produce", ProducerRequest.readFrom), @@ -36,7 +37,8 @@ object RequestKeys { MetadataKey -> ("Metadata", TopicMetadataRequest.readFrom), LeaderAndIsrKey -> ("LeaderAndIsr", LeaderAndIsrRequest.readFrom), StopReplicaKey -> ("StopReplica", StopReplicaRequest.readFrom), - UpdateMetadataKey -> ("UpdateMetadata", UpdateMetadataRequest.readFrom)) + UpdateMetadataKey -> ("UpdateMetadata", UpdateMetadataRequest.readFrom), + ControlledShutdownKey -> ("ControlledShutdown", ControlledShutdownRequest.readFrom)) def nameForKey(key: Short): String = { keyToNameAndDeserializerMap.get(key) match { http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/main/scala/kafka/cluster/Partition.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 02d2c44..88fc8dd 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -195,8 +195,15 @@ class Partition(val topic: String, leaderEpoch = leaderAndIsr.leaderEpoch zkVersion = leaderAndIsr.zkVersion leaderReplicaIdOpt = Some(newLeaderBrokerId) - // start fetcher thread to current leader - replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker) + if (!replicaManager.isShuttingDown.get()) { + // start fetcher thread to current leader if we are not shutting down + replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker) + } + else { + stateChangeLogger.trace("Broker %d ignored the become-follower state change with correlation id %d from " + + " controller %d epoch %d since it is shutting down" + .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch)) + } case None => // leader went down stateChangeLogger.trace("Broker %d aborted the become-follower state change with correlation id %d from " + " controller %d epoch %d since leader %d for partition [%s,%d] is unavailable during the state change operation" http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index a4e96cc..5ac38fd 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -70,7 +70,7 @@ class ControllerContext(val zkClient: ZkClient, } trait KafkaControllerMBean { - def shutdownBroker(id: Int): Int + def shutdownBroker(id: Int): Set[TopicAndPartition] } object KafkaController { @@ -118,17 +118,18 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg def clientId = "id_%d-host_%s-port_%d".format(config.brokerId, config.hostName, config.port) /** - * JMX operation to initiate clean shutdown of a broker. On clean shutdown, - * the controller first determines the partitions that the shutting down - * broker leads, and moves leadership of those partitions to another broker - * that is in that partition's ISR. When all partitions have been moved, the - * broker process can be stopped normally (i.e., by sending it a SIGTERM or - * SIGINT) and no data loss should be observed. + * On clean shutdown, the controller first determines the partitions that the + * shutting down broker leads, and moves leadership of those partitions to another broker + * that is in that partition's ISR. * * @param id Id of the broker to shutdown. * @return The number of partitions that the broker still leads. */ - def shutdownBroker(id: Int) = { + def shutdownBroker(id: Int) : Set[TopicAndPartition] = { + + if (!isActive()) { + throw new ControllerMovedException("Controller moved to another broker. Aborting controlled shutdown") + } controllerContext.brokerShutdownLock synchronized { info("Shutting down broker " + id) @@ -151,68 +152,40 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } } - def replicatedPartitionsBrokerLeads() = controllerContext.controllerLock.synchronized { - trace("All leaders = " + controllerContext.partitionLeadershipInfo.mkString(",")) - controllerContext.partitionLeadershipInfo.filter { - case (topicAndPartition, leaderIsrAndControllerEpoch) => - leaderIsrAndControllerEpoch.leaderAndIsr.leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1 - }.map(_._1) - } - - val partitionsToMove = replicatedPartitionsBrokerLeads().toSet - - debug("Partitions to move leadership from broker %d: %s".format(id, partitionsToMove.mkString(","))) - - partitionsToMove.foreach { topicAndPartition => - val (topic, partition) = topicAndPartition.asTuple - // move leadership serially to relinquish lock. + allPartitionsAndReplicationFactorOnBroker.foreach { + case(topicAndPartition, replicationFactor) => + // Move leadership serially to relinquish lock. controllerContext.controllerLock synchronized { controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch => if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) { + // If the broker leads the topic partition, transition the leader and update isr. Updates zk and + // notifies all affected brokers partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, controlledShutdownPartitionLeaderSelector) - val newLeaderIsrAndControllerEpoch = controllerContext.partitionLeadershipInfo(topicAndPartition) - - // mark replica offline only if leadership was moved successfully - if (newLeaderIsrAndControllerEpoch.leaderAndIsr.leader != currLeaderIsrAndControllerEpoch.leaderAndIsr.leader) - replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topic, partition, id)), OfflineReplica) - } else - debug("Partition %s moved from leader %d to new leader %d during shutdown." - .format(topicAndPartition, id, currLeaderIsrAndControllerEpoch.leaderAndIsr.leader)) + } + else { + // Stop the replica first. The state change below initiates ZK changes which should take some time + // before which the stop replica request should be completed (in most cases) + brokerRequestBatch.newBatch() + brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic, topicAndPartition.partition, deletePartition = false) + brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers) + + // If the broker is a follower, updates the isr in ZK and notifies the current leader + replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic, + topicAndPartition.partition, id)), OfflineReplica) + } } } } - val partitionsRemaining = replicatedPartitionsBrokerLeads().toSet - - /* - * Force the shutting down broker out of the ISR of partitions that it - * follows, and shutdown the corresponding replica fetcher threads. - * This is really an optimization, so no need to register any callback - * to wait until completion. - */ - if (partitionsRemaining.size == 0) { - brokerRequestBatch.newBatch() - allPartitionsAndReplicationFactorOnBroker foreach { - case(topicAndPartition, replicationFactor) => - val (topic, partition) = topicAndPartition.asTuple - if (controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader != id) { - brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topic, partition, deletePartition = false) - removeReplicaFromIsr(topic, partition, id) match { - case Some(updatedLeaderIsrAndControllerEpoch) => - brokerRequestBatch.addLeaderAndIsrRequestForBrokers( - Seq(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader), topic, partition, - updatedLeaderIsrAndControllerEpoch, controllerContext.partitionReplicaAssignment(topicAndPartition)) - case None => - // ignore - } - } - } - brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers) + def replicatedPartitionsBrokerLeads() = controllerContext.controllerLock.synchronized { + trace("All leaders = " + controllerContext.partitionLeadershipInfo.mkString(",")) + controllerContext.partitionLeadershipInfo.filter { + case (topicAndPartition, leaderIsrAndControllerEpoch) => + leaderIsrAndControllerEpoch.leaderAndIsr.leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1 + }.map(_._1) } - - debug("Remaining partitions to move from broker %d: %s".format(id, partitionsRemaining.mkString(","))) - partitionsRemaining.size + replicatedPartitionsBrokerLeads().toSet } } @@ -487,6 +460,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg controllerContext.allTopics = ZkUtils.getAllTopics(zkClient).toSet controllerContext.partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, controllerContext.allTopics.toSeq) controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch] + controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int] // update the leader and isr cache for all existing partitions from Zookeeper updateLeaderAndIsrCache() // start the channel manager http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index dd88ccd..208e3ef 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -30,6 +30,7 @@ import kafka.common._ import kafka.utils.{ZkUtils, Pool, SystemTime, Logging} import kafka.network.RequestChannel.Response import kafka.cluster.Broker +import kafka.controller.KafkaController /** @@ -38,7 +39,8 @@ import kafka.cluster.Broker class KafkaApis(val requestChannel: RequestChannel, val replicaManager: ReplicaManager, val zkClient: ZkClient, - brokerId: Int) extends Logging { + brokerId: Int, + val controller: KafkaController) extends Logging { private val producerRequestPurgatory = new ProducerRequestPurgatory(replicaManager.config.producerPurgatoryPurgeIntervalRequests) @@ -68,6 +70,7 @@ class KafkaApis(val requestChannel: RequestChannel, case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request) case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request) case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request) + case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request) case requestId => throw new KafkaException("No mapping found for handler id " + requestId) } } catch { @@ -126,6 +129,14 @@ class KafkaApis(val requestChannel: RequestChannel, requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(updateMetadataResponse))) } + def handleControlledShutdownRequest(request: RequestChannel.Request) { + val controlledShutdownRequest = request.requestObj.asInstanceOf[ControlledShutdownRequest] + val partitionsRemaining = controller.shutdownBroker(controlledShutdownRequest.brokerId) + val controlledShutdownResponse = new ControlledShutdownResponse(controlledShutdownRequest.correlationId, + ErrorMapping.NoError, partitionsRemaining) + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(controlledShutdownResponse))) + } + /** * Check if a partitionData from a produce request can unblock any * DelayedFetch requests. http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 549b4b0..b774431 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -169,4 +169,16 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the purge interval (in number of requests) of the producer request purgatory */ val producerPurgatoryPurgeIntervalRequests = props.getInt("producer.purgatory.purge.interval.requests", 10000) - } + /*********** Controlled shutdown configuration ***********/ + + /** Controlled shutdown can fail for multiple reasons. This determines the number of retries when such failure happens */ + val controlledShutdownMaxRetries = props.getInt("controlled.shutdown.max.retries", 3) + + /** Before each retry, the system needs time to recover from the state that caused the previous failure (Controller + * fail over, replica lag etc). This config determines the amount of time to wait before retrying. */ + val controlledShutdownRetryBackoffMs = props.getInt("controlled.shutdown.retry.backoff.ms", 5000) + + /* enable controlled shutdown of the server */ + val controlledShutdownEnable = props.getBoolean("controlled.shutdown.enable", false) + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index b0348bb..a26de88 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -17,13 +17,15 @@ package kafka.server -import kafka.network.SocketServer +import kafka.network.{Receive, BlockingChannel, SocketServer} import kafka.log.LogManager import kafka.utils._ import java.util.concurrent._ -import atomic.AtomicBoolean -import org.I0Itec.zkclient.ZkClient +import atomic.{AtomicInteger, AtomicBoolean} import kafka.controller.{ControllerStats, KafkaController} +import kafka.cluster.Broker +import kafka.api.{ControlledShutdownResponse, ControlledShutdownRequest} +import kafka.common.ErrorMapping /** * Represents the lifecycle of a single Kafka broker. Handles all functionality required @@ -33,6 +35,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg this.logIdent = "[Kafka Server " + config.brokerId + "], " private var isShuttingDown = new AtomicBoolean(false) private var shutdownLatch = new CountDownLatch(1) + private var startupComplete = new AtomicBoolean(false); + val correlationId: AtomicInteger = new AtomicInteger(0) var socketServer: SocketServer = null var requestHandlerPool: KafkaRequestHandlerPool = null var logManager: LogManager = null @@ -41,14 +45,14 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg var apis: KafkaApis = null var kafkaController: KafkaController = null val kafkaScheduler = new KafkaScheduler(4) - var zkClient: ZkClient = null + /** * Start up API for bringing up a single instance of the Kafka server. * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers */ def startup() { - info("starting") + info("Starting") isShuttingDown = new AtomicBoolean(false) shutdownLatch = new CountDownLatch(1) @@ -79,10 +83,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg info("Connecting to ZK: " + config.zkConnect) - replicaManager = new ReplicaManager(config, time, kafkaZookeeper.getZookeeperClient, kafkaScheduler, logManager) + replicaManager = new ReplicaManager(config, time, kafkaZookeeper.getZookeeperClient, kafkaScheduler, logManager, isShuttingDown) kafkaController = new KafkaController(config, kafkaZookeeper.getZookeeperClient) - apis = new KafkaApis(socketServer.requestChannel, replicaManager, kafkaZookeeper.getZookeeperClient, config.brokerId) + apis = new KafkaApis(socketServer.requestChannel, replicaManager, kafkaZookeeper.getZookeeperClient, config.brokerId, kafkaController) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) Mx4jLoader.maybeLoad @@ -92,7 +96,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg kafkaController.startup() // register metrics beans registerStats() - info("started") + startupComplete.set(true); + info("Started") } /** @@ -105,13 +110,99 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg } /** + * Performs controlled shutdown + */ + private def controlledShutdown() { + if (startupComplete.get() && config.controlledShutdownEnable) { + // We request the controller to do a controlled shutdown. On failure, we backoff for a configured period + // of time and try again for a configured number of retries. If all the attempt fails, we simply force + // the shutdown. + var remainingRetries = config.controlledShutdownMaxRetries + info("Starting controlled shutdown") + var channel : BlockingChannel = null; + var prevController : Broker = null + var shutdownSuceeded : Boolean =false + try { + while (!shutdownSuceeded && remainingRetries > 0) { + remainingRetries = remainingRetries - 1 + + // 1. Find the controller and establish a connection to it. + + // Get the current controller info. This is to ensure we use the most recent info to issue the + // controlled shutdown request + val controllerId = ZkUtils.getController(kafkaZookeeper.getZookeeperClient) + ZkUtils.getBrokerInfo(kafkaZookeeper.getZookeeperClient, controllerId) match { + case Some(broker) => + if (channel == null || prevController == null || !prevController.equals(broker)) { + // if this is the first attempt or if the controller has changed, create a channel to the most recent + // controller + if (channel != null) { + channel.disconnect() + } + channel = new BlockingChannel(broker.host, broker.port, + BlockingChannel.UseDefaultBufferSize, + BlockingChannel.UseDefaultBufferSize, + config.controllerSocketTimeoutMs) + channel.connect() + prevController = broker + } + case None=> + //ignore and try again + } + + // 2. issue a controlled shutdown to the controller + if (channel != null) { + var response: Receive = null + try { + // send the controlled shutdown request + val request = new ControlledShutdownRequest(correlationId.getAndIncrement, config.brokerId) + channel.send(request) + response = channel.receive() + val shutdownResponse = ControlledShutdownResponse.readFrom(response.buffer) + if (shutdownResponse.errorCode == ErrorMapping.NoError && shutdownResponse.partitionsRemaining != null && + shutdownResponse.partitionsRemaining.size == 0) { + shutdownSuceeded = true + info ("Controlled shutdown succeeded") + } + else { + info("Remaining partitions to move: %s".format(shutdownResponse.partitionsRemaining.mkString(","))) + info("Error code from controller: %d".format(shutdownResponse.errorCode)) + } + } + catch { + case ioe: java.io.IOException => + channel.disconnect() + channel = null + // ignore and try again + } + } + if (!shutdownSuceeded) { + Thread.sleep(config.controlledShutdownRetryBackoffMs) + warn("Retrying controlled shutdown after the previous attempt failed...") + } + } + } + finally { + if (channel != null) { + channel.disconnect() + channel = null + } + } + if (!shutdownSuceeded) { + warn("Proceeding to do an unclean shutdown as all the controlled shutdown attempts failed") + } + } + } + + /** * Shutdown API for shutting down a single instance of the Kafka server. * Shuts down the LogManager, the SocketServer and the log cleaner scheduler thread */ def shutdown() { - info("shutting down") + info("Shutting down") val canShutdown = isShuttingDown.compareAndSet(false, true); if (canShutdown) { + Utils.swallow(controlledShutdown()) if(kafkaZookeeper != null) Utils.swallow(kafkaZookeeper.shutdown()) if(socketServer != null) @@ -130,7 +221,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg Utils.swallow(kafkaController.shutdown()) shutdownLatch.countDown() - info("shut down completed") + startupComplete.set(false); + info("Shut down completed") } } http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 8e49b83..9d41e82 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -40,7 +40,8 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient, kafkaScheduler: KafkaScheduler, - val logManager: LogManager) extends Logging with KafkaMetricsGroup { + val logManager: LogManager, + val isShuttingDown: AtomicBoolean ) extends Logging with KafkaMetricsGroup { /* epoch of the controller that last changed the leader */ @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1 private val localBrokerId = config.brokerId http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/test/scala/unit/kafka/admin/AdminTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 95e7218..0d8b70f 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -345,7 +345,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { // wait for the update metadata request to trickle to the brokers assertTrue("Topic test not created after timeout", TestUtils.waitUntilTrue(() => activeServers.foldLeft(true)(_ && _.apis.leaderCache(TopicAndPartition(topic, partition)).leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3), 1000)) - assertEquals(0, partitionsRemaining) + assertEquals(0, partitionsRemaining.size) var partitionStateInfo = activeServers.head.apis.leaderCache(TopicAndPartition(topic, partition)) var leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader assertEquals(0, leaderAfterShutdown) @@ -353,7 +353,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { assertEquals(List(0,1), partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.isr) partitionsRemaining = controller.shutdownBroker(1) - assertEquals(0, partitionsRemaining) + assertEquals(0, partitionsRemaining.size) activeServers = servers.filter(s => s.config.brokerId == 0) partitionStateInfo = activeServers.head.apis.leaderCache(TopicAndPartition(topic, partition)) leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader @@ -361,7 +361,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { assertTrue(servers.foldLeft(true)(_ && _.apis.leaderCache(TopicAndPartition(topic, partition)).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0)) partitionsRemaining = controller.shutdownBroker(0) - assertEquals(1, partitionsRemaining) + assertEquals(1, partitionsRemaining.size) // leader doesn't change since all the replicas are shut down assertTrue(servers.foldLeft(true)(_ && _.apis.leaderCache(TopicAndPartition(topic, partition)).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0)) } http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala new file mode 100644 index 0000000..26e9bd6 --- /dev/null +++ b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import org.scalatest.junit.JUnit3Suite +import kafka.zk.ZooKeeperTestHarness +import kafka.admin.CreateTopicCommand +import kafka.utils.TestUtils._ +import junit.framework.Assert._ +import kafka.utils.{ZkUtils, Utils, TestUtils} +import kafka.controller.{ControllerContext, LeaderIsrAndControllerEpoch, ControllerChannelManager} +import kafka.cluster.Broker +import kafka.common.ErrorMapping +import kafka.api._ + +class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness { + val brokerId1 = 0 + val brokerId2 = 1 + val brokerId3 = 2 + val brokerId4 = 3 + + val port1 = TestUtils.choosePort() + val port2 = TestUtils.choosePort() + val port3 = TestUtils.choosePort() + val port4 = TestUtils.choosePort() + + val enableShutdown = true + val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1) + configProps1.put("controlled.shutdown.enable", "true") + val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2) + configProps2.put("controlled.shutdown.enable", "true") + val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3) + configProps3.put("controlled.shutdown.enable", "true") + val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4) + configProps4.put("controlled.shutdown.enable", "true") + configProps4.put("controlled.shutdown.retry.backoff.ms", "100") + + var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] + + val partitionId = 0 + + override def setUp() { + super.setUp() + // start all the servers + val server1 = TestUtils.createServer(new KafkaConfig(configProps1)) + val server2 = TestUtils.createServer(new KafkaConfig(configProps2)) + val server3 = TestUtils.createServer(new KafkaConfig(configProps3)) + val server4 = TestUtils.createServer(new KafkaConfig(configProps4)) + + servers ++= List(server1, server2, server3, server4) + } + + override def tearDown() { + servers.map(server => server.shutdown()) + servers.map(server => Utils.rm(server.config.logDirs)) + super.tearDown() + } + + def testRollingBounce { + // start all the brokers + val topic1 = "new-topic1" + val topic2 = "new-topic2" + val topic3 = "new-topic3" + val topic4 = "new-topic4" + + // create topics with 1 partition, 2 replicas, one on each broker + CreateTopicCommand.createTopic(zkClient, topic1, 1, 2, "0:1") + CreateTopicCommand.createTopic(zkClient, topic2, 1, 2, "1:2") + CreateTopicCommand.createTopic(zkClient, topic3, 1, 2, "2:3") + CreateTopicCommand.createTopic(zkClient, topic4, 1, 2, "0:3") + + + // wait until leader is elected + var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, partitionId, 500) + var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, partitionId, 500) + var leader3 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, partitionId, 500) + var leader4 = waitUntilLeaderIsElectedOrChanged(zkClient, topic4, partitionId, 500) + + debug("Leader for " + topic1 + " is elected to be: %s".format(leader1.getOrElse(-1))) + debug("Leader for " + topic2 + " is elected to be: %s".format(leader1.getOrElse(-1))) + debug("Leader for " + topic3 + "is elected to be: %s".format(leader1.getOrElse(-1))) + debug("Leader for " + topic4 + "is elected to be: %s".format(leader1.getOrElse(-1))) + + assertTrue("Leader should get elected", leader1.isDefined) + assertTrue("Leader should get elected", leader2.isDefined) + assertTrue("Leader should get elected", leader3.isDefined) + assertTrue("Leader should get elected", leader4.isDefined) + + assertTrue("Leader could be broker 0 or broker 1 for " + topic1, (leader1.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 1)) + assertTrue("Leader could be broker 1 or broker 2 for " + topic2, (leader2.getOrElse(-1) == 1) || (leader1.getOrElse(-1) == 2)) + assertTrue("Leader could be broker 2 or broker 3 for " + topic3, (leader3.getOrElse(-1) == 2) || (leader1.getOrElse(-1) == 3)) + assertTrue("Leader could be broker 3 or broker 4 for " + topic4, (leader4.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 3)) + + // Do a rolling bounce and check if leader transitions happen correctly + + // Bring down the leader for the first topic + bounceServer(topic1, 0) + + // Bring down the leader for the second topic + bounceServer(topic2, 1) + + // Bring down the leader for the third topic + bounceServer(topic3, 2) + + // Bring down the leader for the fourth topic + bounceServer(topic4, 3) + } + + private def bounceServer(topic: String, startIndex: Int) { + var prevLeader = 0 + if (isLeaderLocalOnBroker(topic, partitionId, servers(startIndex))) { + servers(startIndex).shutdown() + prevLeader = startIndex + } + else { + servers((startIndex + 1) % 4).shutdown() + prevLeader = (startIndex + 1) % 4 + } + var newleader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1500) + // Ensure the new leader is different from the old + assertTrue("Leader transition did not happen for " + topic, newleader.getOrElse(-1) != -1 && (newleader.getOrElse(-1) != prevLeader)) + // Start the server back up again + servers(prevLeader).startup() + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index f30b097..2719055 100644 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -25,6 +25,7 @@ import org.junit.Assert._ import kafka.common.KafkaException import kafka.cluster.Replica import kafka.utils.{SystemTime, KafkaScheduler, TestUtils, MockTime, Utils} +import java.util.concurrent.atomic.AtomicBoolean class HighwatermarkPersistenceTest extends JUnit3Suite { @@ -47,7 +48,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { val scheduler = new KafkaScheduler(2) scheduler.startup // create replica manager - val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, logManagers(0)) + val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, logManagers(0), new AtomicBoolean(false)) replicaManager.startup() replicaManager.checkpointHighWatermarks() var fooPartition0Hw = hwmFor(replicaManager, topic, 0) @@ -86,7 +87,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { val scheduler = new KafkaScheduler(2) scheduler.startup // create replica manager - val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, logManagers(0)) + val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, logManagers(0), new AtomicBoolean(false)) replicaManager.startup() replicaManager.checkpointHighWatermarks() var topic1Partition0Hw = hwmFor(replicaManager, topic1, 0) http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala index 6184f42..7026432 100644 --- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala @@ -24,6 +24,7 @@ import org.easymock.EasyMock import kafka.log.Log import org.junit.Assert._ import kafka.utils._ +import java.util.concurrent.atomic.AtomicBoolean class IsrExpirationTest extends JUnit3Suite { @@ -80,7 +81,7 @@ class IsrExpirationTest extends JUnit3Suite { private def getPartitionWithAllReplicasInIsr(topic: String, partitionId: Int, time: Time, config: KafkaConfig, localLog: Log): Partition = { val leaderId=config.brokerId - val replicaManager = new ReplicaManager(config, time, null, null, null) + val replicaManager = new ReplicaManager(config, time, null, null, null, new AtomicBoolean(false)) val partition = replicaManager.getOrCreatePartition(topic, partitionId, 1) val leaderReplica = new Replica(leaderId, partition, time, 0, Some(localLog)) http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index c7dd8a7..23a8cb5 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -87,10 +87,12 @@ class SimpleFetchTest extends JUnit3Suite { EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes() EasyMock.replay(replicaManager) + val controller = EasyMock.createMock(classOf[kafka.controller.KafkaController]) + // start a request channel with 2 processors and a queue size of 5 (this is more or less arbitrary) // don't provide replica or leader callbacks since they will not be tested here val requestChannel = new RequestChannel(2, 5) - val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId) + val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId, controller) // This request (from a follower) wants to read up to 2*HW but should only get back up to HW bytes into the log val goodFetch = new FetchRequestBuilder() @@ -184,8 +186,10 @@ class SimpleFetchTest extends JUnit3Suite { EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes() EasyMock.replay(replicaManager) + val controller = EasyMock.createMock(classOf[kafka.controller.KafkaController]) + val requestChannel = new RequestChannel(2, 5) - val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId) + val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId, controller) /** * This fetch, coming from a replica, requests all data at offset "15". Because the request is coming