kafka-927; Integrate controlled shutdown into kafka shutdown hook; patched by 
Sriram Subramanian; reviewed by Neha Narkhede and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4f387ae4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4f387ae4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4f387ae4

Branch: refs/heads/trunk
Commit: 4f387ae43544c422b1845b3da5ab09aee8e4acd0
Parents: 6584276
Author: Sriram Subramanian <sri...@gmail.com>
Authored: Mon Jun 3 16:08:37 2013 -0700
Committer: Jun Rao <jun...@gmail.com>
Committed: Mon Jun 3 16:08:37 2013 -0700

----------------------------------------------------------------------
 .../kafka/api/ControlledShutdownRequest.scala   |  73 ++++++++++
 .../kafka/api/ControlledShutdownResponse.scala  |  70 ++++++++++
 core/src/main/scala/kafka/api/RequestKeys.scala |   4 +-
 .../main/scala/kafka/cluster/Partition.scala    |  11 +-
 .../kafka/controller/KafkaController.scala      |  94 +++++--------
 .../src/main/scala/kafka/server/KafkaApis.scala |  13 +-
 .../main/scala/kafka/server/KafkaConfig.scala   |  14 +-
 .../main/scala/kafka/server/KafkaServer.scala   | 112 +++++++++++++--
 .../scala/kafka/server/ReplicaManager.scala     |   3 +-
 .../test/scala/unit/kafka/admin/AdminTest.scala |   6 +-
 .../kafka/integration/RollingBounceTest.scala   | 140 +++++++++++++++++++
 .../server/HighwatermarkPersistenceTest.scala   |   5 +-
 .../unit/kafka/server/ISRExpirationTest.scala   |   3 +-
 .../unit/kafka/server/SimpleFetchTest.scala     |   8 +-
 14 files changed, 472 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala 
b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
new file mode 100644
index 0000000..ad6a20d
--- /dev/null
+++ b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import kafka.api.ApiUtils._
+import collection.mutable.ListBuffer
+import kafka.network.{BoundedByteBufferSend, RequestChannel}
+import kafka.common.{TopicAndPartition, ErrorMapping}
+import kafka.network.RequestChannel.Response
+import kafka.utils.Logging
+
+object ControlledShutdownRequest extends Logging {
+  val CurrentVersion = 0.shortValue
+  val DefaultClientId = ""
+
+  def readFrom(buffer: ByteBuffer): ControlledShutdownRequest = {
+    val versionId = buffer.getShort
+    val correlationId = buffer.getInt
+    val brokerId = buffer.getInt
+    new ControlledShutdownRequest(versionId, correlationId, brokerId)
+  }
+}
+
+case class ControlledShutdownRequest(val versionId: Short,
+                                     override val correlationId: Int,
+                                     val brokerId: Int)
+  extends RequestOrResponse(Some(RequestKeys.ControlledShutdownKey), 
correlationId){
+
+  def this(correlationId: Int, brokerId: Int) =
+    this(ControlledShutdownRequest.CurrentVersion, correlationId, brokerId)
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putShort(versionId)
+    buffer.putInt(correlationId)
+    buffer.putInt(brokerId)
+  }
+
+  def sizeInBytes(): Int = {
+    2 +  /* version id */
+      4 + /* correlation id */
+      4 /* broker id */
+  }
+
+  override def toString(): String = {
+    val controlledShutdownRequest = new StringBuilder
+    controlledShutdownRequest.append("Name: " + this.getClass.getSimpleName)
+    controlledShutdownRequest.append("; Version: " + versionId)
+    controlledShutdownRequest.append("; CorrelationId: " + correlationId)
+    controlledShutdownRequest.append("; BrokerId: " + brokerId)
+    controlledShutdownRequest.toString()
+  }
+
+  override  def handleError(e: Throwable, requestChannel: RequestChannel, 
request: RequestChannel.Request): Unit = {
+    val errorResponse = ControlledShutdownResponse(correlationId, 
ErrorMapping.codeFor(e.getCause.asInstanceOf[Class[Throwable]]), 
Set.empty[TopicAndPartition])
+    requestChannel.sendResponse(new Response(request, new 
BoundedByteBufferSend(errorResponse)))
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala 
b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
new file mode 100644
index 0000000..b7c8448
--- /dev/null
+++ b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import collection.mutable.HashMap
+import collection.immutable.Map
+import kafka.common.{TopicAndPartition, ErrorMapping}
+import kafka.api.ApiUtils._
+
+
+object ControlledShutdownResponse {
+  def readFrom(buffer: ByteBuffer): ControlledShutdownResponse = {
+    val correlationId = buffer.getInt
+    val errorCode = buffer.getShort
+    val numEntries = buffer.getInt
+
+    var partitionsRemaining = Set[TopicAndPartition]()
+    for (i<- 0 until numEntries){
+      val topic = readShortString(buffer)
+      val partition = buffer.getInt
+      partitionsRemaining += new TopicAndPartition(topic, partition)
+    }
+    new ControlledShutdownResponse(correlationId, errorCode, 
partitionsRemaining)
+  }
+}
+
+
+case class ControlledShutdownResponse(override val correlationId: Int,
+                                      val errorCode: Short = 
ErrorMapping.NoError,
+                                      val partitionsRemaining: 
Set[TopicAndPartition])
+  extends RequestOrResponse(correlationId = correlationId) {
+  def sizeInBytes(): Int ={
+    var size =
+      4 /* correlation id */ +
+        2 /* error code */ +
+        4 /* number of responses */
+    for (topicAndPartition <- partitionsRemaining) {
+      size +=
+        2 + topicAndPartition.topic.length /* topic */ +
+        4 /* partition */
+    }
+    size
+  }
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putInt(correlationId)
+    buffer.putShort(errorCode)
+    buffer.putInt(partitionsRemaining.size)
+    for (topicAndPartition:TopicAndPartition <- partitionsRemaining){
+      writeShortString(buffer, topicAndPartition.topic)
+      buffer.putInt(topicAndPartition.partition)
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/main/scala/kafka/api/RequestKeys.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala 
b/core/src/main/scala/kafka/api/RequestKeys.scala
index 541cf84..e2ce9bd 100644
--- a/core/src/main/scala/kafka/api/RequestKeys.scala
+++ b/core/src/main/scala/kafka/api/RequestKeys.scala
@@ -28,6 +28,7 @@ object RequestKeys {
   val LeaderAndIsrKey: Short = 4
   val StopReplicaKey: Short = 5
   val UpdateMetadataKey: Short = 6
+  val ControlledShutdownKey: Short = 7
 
   val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => 
RequestOrResponse)]=
     Map(ProduceKey -> ("Produce", ProducerRequest.readFrom),
@@ -36,7 +37,8 @@ object RequestKeys {
         MetadataKey -> ("Metadata", TopicMetadataRequest.readFrom),
         LeaderAndIsrKey -> ("LeaderAndIsr", LeaderAndIsrRequest.readFrom),
         StopReplicaKey -> ("StopReplica", StopReplicaRequest.readFrom),
-        UpdateMetadataKey -> ("UpdateMetadata", 
UpdateMetadataRequest.readFrom))
+        UpdateMetadataKey -> ("UpdateMetadata", 
UpdateMetadataRequest.readFrom),
+        ControlledShutdownKey -> ("ControlledShutdown", 
ControlledShutdownRequest.readFrom))
 
   def nameForKey(key: Short): String = {
     keyToNameAndDeserializerMap.get(key) match {

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 02d2c44..88fc8dd 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -195,8 +195,15 @@ class Partition(val topic: String,
           leaderEpoch = leaderAndIsr.leaderEpoch
           zkVersion = leaderAndIsr.zkVersion
           leaderReplicaIdOpt = Some(newLeaderBrokerId)
-          // start fetcher thread to current leader
-          replicaFetcherManager.addFetcher(topic, partitionId, 
localReplica.logEndOffset, leaderBroker)
+          if (!replicaManager.isShuttingDown.get()) {
+            // start fetcher thread to current leader if we are not shutting 
down
+            replicaFetcherManager.addFetcher(topic, partitionId, 
localReplica.logEndOffset, leaderBroker)
+          }
+          else {
+            stateChangeLogger.trace("Broker %d ignored the become-follower 
state change with correlation id %d from " +
+              " controller %d epoch %d since it is shutting down"
+                .format(localBrokerId, correlationId, controllerId, 
leaderIsrAndControllerEpoch.controllerEpoch))
+          }
         case None => // leader went down
           stateChangeLogger.trace("Broker %d aborted the become-follower state 
change with correlation id %d from " +
             " controller %d epoch %d since leader %d for partition [%s,%d] is 
unavailable during the state change operation"

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index a4e96cc..5ac38fd 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -70,7 +70,7 @@ class ControllerContext(val zkClient: ZkClient,
 }
 
 trait KafkaControllerMBean {
-  def shutdownBroker(id: Int): Int
+  def shutdownBroker(id: Int): Set[TopicAndPartition]
 }
 
 object KafkaController {
@@ -118,17 +118,18 @@ class KafkaController(val config : KafkaConfig, zkClient: 
ZkClient) extends Logg
   def clientId = "id_%d-host_%s-port_%d".format(config.brokerId, 
config.hostName, config.port)
 
   /**
-   * JMX operation to initiate clean shutdown of a broker. On clean shutdown,
-   * the controller first determines the partitions that the shutting down
-   * broker leads, and moves leadership of those partitions to another broker
-   * that is in that partition's ISR. When all partitions have been moved, the
-   * broker process can be stopped normally (i.e., by sending it a SIGTERM or
-   * SIGINT) and no data loss should be observed.
+   * On clean shutdown, the controller first determines the partitions that the
+   * shutting down broker leads, and moves leadership of those partitions to 
another broker
+   * that is in that partition's ISR.
    *
    * @param id Id of the broker to shutdown.
    * @return The number of partitions that the broker still leads.
    */
-  def shutdownBroker(id: Int) = {
+  def shutdownBroker(id: Int) : Set[TopicAndPartition] = {
+
+    if (!isActive()) {
+      throw new ControllerMovedException("Controller moved to another broker. 
Aborting controlled shutdown")
+    }
 
     controllerContext.brokerShutdownLock synchronized {
       info("Shutting down broker " + id)
@@ -151,68 +152,40 @@ class KafkaController(val config : KafkaConfig, zkClient: 
ZkClient) extends Logg
         }
       }
 
-      def replicatedPartitionsBrokerLeads() = 
controllerContext.controllerLock.synchronized {
-        trace("All leaders = " + 
controllerContext.partitionLeadershipInfo.mkString(","))
-        controllerContext.partitionLeadershipInfo.filter {
-          case (topicAndPartition, leaderIsrAndControllerEpoch) =>
-            leaderIsrAndControllerEpoch.leaderAndIsr.leader == id && 
controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1
-        }.map(_._1)
-      }
-
-      val partitionsToMove = replicatedPartitionsBrokerLeads().toSet
-
-      debug("Partitions to move leadership from broker %d: %s".format(id, 
partitionsToMove.mkString(",")))
-
-      partitionsToMove.foreach { topicAndPartition =>
-        val (topic, partition) = topicAndPartition.asTuple
-        // move leadership serially to relinquish lock.
+      allPartitionsAndReplicationFactorOnBroker.foreach {
+        case(topicAndPartition, replicationFactor) =>
+        // Move leadership serially to relinquish lock.
         controllerContext.controllerLock synchronized {
           
controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { 
currLeaderIsrAndControllerEpoch =>
             if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) {
+              // If the broker leads the topic partition, transition the 
leader and update isr. Updates zk and
+              // notifies all affected brokers
               partitionStateMachine.handleStateChanges(Set(topicAndPartition), 
OnlinePartition,
                 controlledShutdownPartitionLeaderSelector)
-              val newLeaderIsrAndControllerEpoch = 
controllerContext.partitionLeadershipInfo(topicAndPartition)
-
-              // mark replica offline only if leadership was moved successfully
-              if (newLeaderIsrAndControllerEpoch.leaderAndIsr.leader != 
currLeaderIsrAndControllerEpoch.leaderAndIsr.leader)
-                
replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topic, 
partition, id)), OfflineReplica)
-            } else
-              debug("Partition %s moved from leader %d to new leader %d during 
shutdown."
-                .format(topicAndPartition, id, 
currLeaderIsrAndControllerEpoch.leaderAndIsr.leader))
+            }
+            else {
+              // Stop the replica first. The state change below initiates ZK 
changes which should take some time
+              // before which the stop replica request should be completed (in 
most cases)
+              brokerRequestBatch.newBatch()
+              brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), 
topicAndPartition.topic, topicAndPartition.partition, deletePartition = false)
+              brokerRequestBatch.sendRequestsToBrokers(epoch, 
controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
+
+              // If the broker is a follower, updates the isr in ZK and 
notifies the current leader
+              
replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic,
+                topicAndPartition.partition, id)), OfflineReplica)
+            }
           }
         }
       }
 
-      val partitionsRemaining = replicatedPartitionsBrokerLeads().toSet
-
-      /*
-      * Force the shutting down broker out of the ISR of partitions that it
-      * follows, and shutdown the corresponding replica fetcher threads.
-      * This is really an optimization, so no need to register any callback
-      * to wait until completion.
-      */
-      if (partitionsRemaining.size == 0) {
-        brokerRequestBatch.newBatch()
-        allPartitionsAndReplicationFactorOnBroker foreach {
-          case(topicAndPartition, replicationFactor) =>
-            val (topic, partition) = topicAndPartition.asTuple
-            if 
(controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
 != id) {
-              brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), 
topic, partition, deletePartition = false)
-              removeReplicaFromIsr(topic, partition, id) match {
-                case Some(updatedLeaderIsrAndControllerEpoch) =>
-                  brokerRequestBatch.addLeaderAndIsrRequestForBrokers(
-                    
Seq(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader), topic, partition,
-                    updatedLeaderIsrAndControllerEpoch, 
controllerContext.partitionReplicaAssignment(topicAndPartition))
-                case None =>
-                // ignore
-              }
-            }
-        }
-        brokerRequestBatch.sendRequestsToBrokers(epoch, 
controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
+      def replicatedPartitionsBrokerLeads() = 
controllerContext.controllerLock.synchronized {
+        trace("All leaders = " + 
controllerContext.partitionLeadershipInfo.mkString(","))
+        controllerContext.partitionLeadershipInfo.filter {
+          case (topicAndPartition, leaderIsrAndControllerEpoch) =>
+            leaderIsrAndControllerEpoch.leaderAndIsr.leader == id && 
controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1
+        }.map(_._1)
       }
-
-      debug("Remaining partitions to move from broker %d: %s".format(id, 
partitionsRemaining.mkString(",")))
-      partitionsRemaining.size
+      replicatedPartitionsBrokerLeads().toSet
     }
   }
 
@@ -487,6 +460,7 @@ class KafkaController(val config : KafkaConfig, zkClient: 
ZkClient) extends Logg
     controllerContext.allTopics = ZkUtils.getAllTopics(zkClient).toSet
     controllerContext.partitionReplicaAssignment = 
ZkUtils.getReplicaAssignmentForTopics(zkClient, 
controllerContext.allTopics.toSeq)
     controllerContext.partitionLeadershipInfo = new 
mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
+    controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int]
     // update the leader and isr cache for all existing partitions from 
Zookeeper
     updateLeaderAndIsrCache()
     // start the channel manager

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index dd88ccd..208e3ef 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -30,6 +30,7 @@ import kafka.common._
 import kafka.utils.{ZkUtils, Pool, SystemTime, Logging}
 import kafka.network.RequestChannel.Response
 import kafka.cluster.Broker
+import kafka.controller.KafkaController
 
 
 /**
@@ -38,7 +39,8 @@ import kafka.cluster.Broker
 class KafkaApis(val requestChannel: RequestChannel,
                 val replicaManager: ReplicaManager,
                 val zkClient: ZkClient,
-                brokerId: Int) extends Logging {
+                brokerId: Int,
+                val controller: KafkaController) extends Logging {
 
   private val producerRequestPurgatory =
     new 
ProducerRequestPurgatory(replicaManager.config.producerPurgatoryPurgeIntervalRequests)
@@ -68,6 +70,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
         case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
         case RequestKeys.UpdateMetadataKey => 
handleUpdateMetadataRequest(request)
+        case RequestKeys.ControlledShutdownKey => 
handleControlledShutdownRequest(request)
         case requestId => throw new KafkaException("No mapping found for 
handler id " + requestId)
       }
     } catch {
@@ -126,6 +129,14 @@ class KafkaApis(val requestChannel: RequestChannel,
     requestChannel.sendResponse(new Response(request, new 
BoundedByteBufferSend(updateMetadataResponse)))
   }
 
+  def handleControlledShutdownRequest(request: RequestChannel.Request) {
+    val controlledShutdownRequest = 
request.requestObj.asInstanceOf[ControlledShutdownRequest]
+    val partitionsRemaining = 
controller.shutdownBroker(controlledShutdownRequest.brokerId)
+    val controlledShutdownResponse = new 
ControlledShutdownResponse(controlledShutdownRequest.correlationId,
+      ErrorMapping.NoError, partitionsRemaining)
+    requestChannel.sendResponse(new Response(request, new 
BoundedByteBufferSend(controlledShutdownResponse)))
+  }
+
   /**
    * Check if a partitionData from a produce request can unblock any
    * DelayedFetch requests.

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 549b4b0..b774431 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -169,4 +169,16 @@ class KafkaConfig private (val props: 
VerifiableProperties) extends ZKConfig(pro
   /* the purge interval (in number of requests) of the producer request 
purgatory */
   val producerPurgatoryPurgeIntervalRequests = 
props.getInt("producer.purgatory.purge.interval.requests", 10000)
 
- }
+  /*********** Controlled shutdown configuration ***********/
+
+  /** Controlled shutdown can fail for multiple reasons. This determines the 
number of retries when such failure happens */
+  val controlledShutdownMaxRetries = 
props.getInt("controlled.shutdown.max.retries", 3)
+
+  /** Before each retry, the system needs time to recover from the state that 
caused the previous failure (Controller
+    * fail over, replica lag etc). This config determines the amount of time 
to wait before retrying. */
+  val controlledShutdownRetryBackoffMs = 
props.getInt("controlled.shutdown.retry.backoff.ms", 5000)
+
+  /* enable controlled shutdown of the server */
+  val controlledShutdownEnable = 
props.getBoolean("controlled.shutdown.enable", false)
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index b0348bb..a26de88 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -17,13 +17,15 @@
 
 package kafka.server
 
-import kafka.network.SocketServer
+import kafka.network.{Receive, BlockingChannel, SocketServer}
 import kafka.log.LogManager
 import kafka.utils._
 import java.util.concurrent._
-import atomic.AtomicBoolean
-import org.I0Itec.zkclient.ZkClient
+import atomic.{AtomicInteger, AtomicBoolean}
 import kafka.controller.{ControllerStats, KafkaController}
+import kafka.cluster.Broker
+import kafka.api.{ControlledShutdownResponse, ControlledShutdownRequest}
+import kafka.common.ErrorMapping
 
 /**
  * Represents the lifecycle of a single Kafka broker. Handles all 
functionality required
@@ -33,6 +35,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime) extends Logg
   this.logIdent = "[Kafka Server " + config.brokerId + "], "
   private var isShuttingDown = new AtomicBoolean(false)
   private var shutdownLatch = new CountDownLatch(1)
+  private var startupComplete = new AtomicBoolean(false);
+  val correlationId: AtomicInteger = new AtomicInteger(0)
   var socketServer: SocketServer = null
   var requestHandlerPool: KafkaRequestHandlerPool = null
   var logManager: LogManager = null
@@ -41,14 +45,14 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime) extends Logg
   var apis: KafkaApis = null
   var kafkaController: KafkaController = null
   val kafkaScheduler = new KafkaScheduler(4)
-  var zkClient: ZkClient = null
+
 
   /**
    * Start up API for bringing up a single instance of the Kafka server.
    * Instantiates the LogManager, the SocketServer and the request handlers - 
KafkaRequestHandlers
    */
   def startup() {
-    info("starting")
+    info("Starting")
     isShuttingDown = new AtomicBoolean(false)
     shutdownLatch = new CountDownLatch(1)
 
@@ -79,10 +83,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime) extends Logg
 
     info("Connecting to ZK: " + config.zkConnect)
 
-    replicaManager = new ReplicaManager(config, time, 
kafkaZookeeper.getZookeeperClient, kafkaScheduler, logManager)
+    replicaManager = new ReplicaManager(config, time, 
kafkaZookeeper.getZookeeperClient, kafkaScheduler, logManager, isShuttingDown)
 
     kafkaController = new KafkaController(config, 
kafkaZookeeper.getZookeeperClient)
-    apis = new KafkaApis(socketServer.requestChannel, replicaManager, 
kafkaZookeeper.getZookeeperClient, config.brokerId)
+    apis = new KafkaApis(socketServer.requestChannel, replicaManager, 
kafkaZookeeper.getZookeeperClient, config.brokerId, kafkaController)
     requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, 
socketServer.requestChannel, apis, config.numIoThreads)
     Mx4jLoader.maybeLoad
 
@@ -92,7 +96,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime) extends Logg
     kafkaController.startup()
     // register metrics beans
     registerStats()
-    info("started")
+    startupComplete.set(true);
+    info("Started")
   }
 
   /**
@@ -105,13 +110,99 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime) extends Logg
   }
 
   /**
+   *  Performs controlled shutdown
+   */
+  private def controlledShutdown() {
+    if (startupComplete.get() && config.controlledShutdownEnable) {
+      // We request the controller to do a controlled shutdown. On failure, we 
backoff for a configured period
+      // of time and try again for a configured number of retries. If all the 
attempt fails, we simply force
+      // the shutdown.
+      var remainingRetries = config.controlledShutdownMaxRetries
+      info("Starting controlled shutdown")
+      var channel : BlockingChannel = null;
+      var prevController : Broker = null
+      var shutdownSuceeded : Boolean =false
+      try {
+        while (!shutdownSuceeded && remainingRetries > 0) {
+          remainingRetries = remainingRetries - 1
+
+          // 1. Find the controller and establish a connection to it.
+
+          // Get the current controller info. This is to ensure we use the 
most recent info to issue the
+          // controlled shutdown request
+          val controllerId = 
ZkUtils.getController(kafkaZookeeper.getZookeeperClient)
+          ZkUtils.getBrokerInfo(kafkaZookeeper.getZookeeperClient, 
controllerId) match {
+            case Some(broker) =>
+              if (channel == null || prevController == null || 
!prevController.equals(broker)) {
+                // if this is the first attempt or if the controller has 
changed, create a channel to the most recent
+                // controller
+                if (channel != null) {
+                  channel.disconnect()
+                }
+                channel = new BlockingChannel(broker.host, broker.port,
+                  BlockingChannel.UseDefaultBufferSize,
+                  BlockingChannel.UseDefaultBufferSize,
+                  config.controllerSocketTimeoutMs)
+                channel.connect()
+                prevController = broker
+              }
+            case None=>
+              //ignore and try again
+          }
+
+          // 2. issue a controlled shutdown to the controller
+          if (channel != null) {
+            var response: Receive = null
+            try {
+              // send the controlled shutdown request
+              val request = new 
ControlledShutdownRequest(correlationId.getAndIncrement, config.brokerId)
+              channel.send(request)
+              response = channel.receive()
+              val shutdownResponse = 
ControlledShutdownResponse.readFrom(response.buffer)
+              if (shutdownResponse.errorCode == ErrorMapping.NoError && 
shutdownResponse.partitionsRemaining != null &&
+                  shutdownResponse.partitionsRemaining.size == 0) {
+                shutdownSuceeded = true
+                info ("Controlled shutdown succeeded")
+              }
+              else {
+                info("Remaining partitions to move: 
%s".format(shutdownResponse.partitionsRemaining.mkString(",")))
+                info("Error code from controller: 
%d".format(shutdownResponse.errorCode))
+              }
+            }
+            catch {
+              case ioe: java.io.IOException =>
+                channel.disconnect()
+                channel = null
+                // ignore and try again
+            }
+          }
+          if (!shutdownSuceeded) {
+            Thread.sleep(config.controlledShutdownRetryBackoffMs)
+            warn("Retrying controlled shutdown after the previous attempt 
failed...")
+          }
+        }
+      }
+      finally {
+        if (channel != null) {
+          channel.disconnect()
+          channel = null
+        }
+      }
+      if (!shutdownSuceeded) {
+        warn("Proceeding to do an unclean shutdown as all the controlled 
shutdown attempts failed")
+      }
+    }
+  }
+
+  /**
    * Shutdown API for shutting down a single instance of the Kafka server.
    * Shuts down the LogManager, the SocketServer and the log cleaner scheduler 
thread
    */
   def shutdown() {
-    info("shutting down")
+    info("Shutting down")
     val canShutdown = isShuttingDown.compareAndSet(false, true);
     if (canShutdown) {
+      Utils.swallow(controlledShutdown())
       if(kafkaZookeeper != null)
         Utils.swallow(kafkaZookeeper.shutdown())
       if(socketServer != null)
@@ -130,7 +221,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime) extends Logg
         Utils.swallow(kafkaController.shutdown())
 
       shutdownLatch.countDown()
-      info("shut down completed")
+      startupComplete.set(false);
+      info("Shut down completed")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 8e49b83..9d41e82 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -40,7 +40,8 @@ class ReplicaManager(val config: KafkaConfig,
                      time: Time, 
                      val zkClient: ZkClient, 
                      kafkaScheduler: KafkaScheduler,
-                     val logManager: LogManager) extends Logging with 
KafkaMetricsGroup {
+                     val logManager: LogManager,
+                     val isShuttingDown: AtomicBoolean ) extends Logging with 
KafkaMetricsGroup {
   /* epoch of the controller that last changed the leader */
   @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch 
- 1
   private val localBrokerId = config.brokerId

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala 
b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 95e7218..0d8b70f 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -345,7 +345,7 @@ class AdminTest extends JUnit3Suite with 
ZooKeeperTestHarness with Logging {
       // wait for the update metadata request to trickle to the brokers
       assertTrue("Topic test not created after timeout", 
TestUtils.waitUntilTrue(() =>
         activeServers.foldLeft(true)(_ && 
_.apis.leaderCache(TopicAndPartition(topic, 
partition)).leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3), 1000))
-      assertEquals(0, partitionsRemaining)
+      assertEquals(0, partitionsRemaining.size)
       var partitionStateInfo = 
activeServers.head.apis.leaderCache(TopicAndPartition(topic, partition))
       var leaderAfterShutdown = 
partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
       assertEquals(0, leaderAfterShutdown)
@@ -353,7 +353,7 @@ class AdminTest extends JUnit3Suite with 
ZooKeeperTestHarness with Logging {
       assertEquals(List(0,1), 
partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.isr)
 
       partitionsRemaining = controller.shutdownBroker(1)
-      assertEquals(0, partitionsRemaining)
+      assertEquals(0, partitionsRemaining.size)
       activeServers = servers.filter(s => s.config.brokerId == 0)
       partitionStateInfo = 
activeServers.head.apis.leaderCache(TopicAndPartition(topic, partition))
       leaderAfterShutdown = 
partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
@@ -361,7 +361,7 @@ class AdminTest extends JUnit3Suite with 
ZooKeeperTestHarness with Logging {
 
       assertTrue(servers.foldLeft(true)(_ && 
_.apis.leaderCache(TopicAndPartition(topic, 
partition)).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
       partitionsRemaining = controller.shutdownBroker(0)
-      assertEquals(1, partitionsRemaining)
+      assertEquals(1, partitionsRemaining.size)
       // leader doesn't change since all the replicas are shut down
       assertTrue(servers.foldLeft(true)(_ && 
_.apis.leaderCache(TopicAndPartition(topic, 
partition)).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala 
b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
new file mode 100644
index 0000000..26e9bd6
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import org.scalatest.junit.JUnit3Suite
+import kafka.zk.ZooKeeperTestHarness
+import kafka.admin.CreateTopicCommand
+import kafka.utils.TestUtils._
+import junit.framework.Assert._
+import kafka.utils.{ZkUtils, Utils, TestUtils}
+import kafka.controller.{ControllerContext, LeaderIsrAndControllerEpoch, 
ControllerChannelManager}
+import kafka.cluster.Broker
+import kafka.common.ErrorMapping
+import kafka.api._
+
+class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness {
+  val brokerId1 = 0
+  val brokerId2 = 1
+  val brokerId3 = 2
+  val brokerId4 = 3
+
+  val port1 = TestUtils.choosePort()
+  val port2 = TestUtils.choosePort()
+  val port3 = TestUtils.choosePort()
+  val port4 = TestUtils.choosePort()
+
+  val enableShutdown = true
+  val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1)
+  configProps1.put("controlled.shutdown.enable", "true")
+  val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2)
+  configProps2.put("controlled.shutdown.enable", "true")
+  val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3)
+  configProps3.put("controlled.shutdown.enable", "true")
+  val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4)
+  configProps4.put("controlled.shutdown.enable", "true")
+  configProps4.put("controlled.shutdown.retry.backoff.ms", "100")
+
+  var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
+
+  val partitionId = 0
+
+  override def setUp() {
+    super.setUp()
+    // start all the servers
+    val server1 = TestUtils.createServer(new KafkaConfig(configProps1))
+    val server2 = TestUtils.createServer(new KafkaConfig(configProps2))
+    val server3 = TestUtils.createServer(new KafkaConfig(configProps3))
+    val server4 = TestUtils.createServer(new KafkaConfig(configProps4))
+
+    servers ++= List(server1, server2, server3, server4)
+  }
+
+  override def tearDown() {
+    servers.map(server => server.shutdown())
+    servers.map(server => Utils.rm(server.config.logDirs))
+    super.tearDown()
+  }
+
+  def testRollingBounce {
+    // start all the brokers
+    val topic1 = "new-topic1"
+    val topic2 = "new-topic2"
+    val topic3 = "new-topic3"
+    val topic4 = "new-topic4"
+
+    // create topics with 1 partition, 2 replicas, one on each broker
+    CreateTopicCommand.createTopic(zkClient, topic1, 1, 2, "0:1")
+    CreateTopicCommand.createTopic(zkClient, topic2, 1, 2, "1:2")
+    CreateTopicCommand.createTopic(zkClient, topic3, 1, 2, "2:3")
+    CreateTopicCommand.createTopic(zkClient, topic4, 1, 2, "0:3")
+
+
+    // wait until leader is elected
+    var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 
partitionId, 500)
+    var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 
partitionId, 500)
+    var leader3 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 
partitionId, 500)
+    var leader4 = waitUntilLeaderIsElectedOrChanged(zkClient, topic4, 
partitionId, 500)
+
+    debug("Leader for " + topic1  + " is elected to be: 
%s".format(leader1.getOrElse(-1)))
+    debug("Leader for " + topic2 + " is elected to be: 
%s".format(leader1.getOrElse(-1)))
+    debug("Leader for " + topic3 + "is elected to be: 
%s".format(leader1.getOrElse(-1)))
+    debug("Leader for " + topic4 + "is elected to be: 
%s".format(leader1.getOrElse(-1)))
+
+    assertTrue("Leader should get elected", leader1.isDefined)
+    assertTrue("Leader should get elected", leader2.isDefined)
+    assertTrue("Leader should get elected", leader3.isDefined)
+    assertTrue("Leader should get elected", leader4.isDefined)
+
+    assertTrue("Leader could be broker 0 or broker 1 for " + topic1, 
(leader1.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 1))
+    assertTrue("Leader could be broker 1 or broker 2 for " + topic2, 
(leader2.getOrElse(-1) == 1) || (leader1.getOrElse(-1) == 2))
+    assertTrue("Leader could be broker 2 or broker 3 for " + topic3, 
(leader3.getOrElse(-1) == 2) || (leader1.getOrElse(-1) == 3))
+    assertTrue("Leader could be broker 3 or broker 4 for " + topic4, 
(leader4.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 3))
+
+    // Do a rolling bounce and check if leader transitions happen correctly
+
+    // Bring down the leader for the first topic
+    bounceServer(topic1, 0)
+
+    // Bring down the leader for the second topic
+    bounceServer(topic2, 1)
+
+    // Bring down the leader for the third topic
+    bounceServer(topic3, 2)
+
+    // Bring down the leader for the fourth topic
+    bounceServer(topic4, 3)
+  }
+
+  private def bounceServer(topic: String, startIndex: Int) {
+    var prevLeader = 0
+    if (isLeaderLocalOnBroker(topic, partitionId, servers(startIndex))) {
+      servers(startIndex).shutdown()
+      prevLeader = startIndex
+    }
+    else {
+      servers((startIndex + 1) % 4).shutdown()
+      prevLeader = (startIndex + 1) % 4
+    }
+    var newleader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, 
partitionId, 1500)
+    // Ensure the new leader is different from the old
+    assertTrue("Leader transition did not happen for " + topic, 
newleader.getOrElse(-1) != -1 && (newleader.getOrElse(-1) != prevLeader))
+    // Start the server back up again
+    servers(prevLeader).startup()
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index f30b097..2719055 100644
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -25,6 +25,7 @@ import org.junit.Assert._
 import kafka.common.KafkaException
 import kafka.cluster.Replica
 import kafka.utils.{SystemTime, KafkaScheduler, TestUtils, MockTime, Utils}
+import java.util.concurrent.atomic.AtomicBoolean
 
 class HighwatermarkPersistenceTest extends JUnit3Suite {
 
@@ -47,7 +48,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
     val scheduler = new KafkaScheduler(2)
     scheduler.startup
     // create replica manager
-    val replicaManager = new ReplicaManager(configs.head, new MockTime(), 
zkClient, scheduler, logManagers(0))
+    val replicaManager = new ReplicaManager(configs.head, new MockTime(), 
zkClient, scheduler, logManagers(0), new AtomicBoolean(false))
     replicaManager.startup()
     replicaManager.checkpointHighWatermarks()
     var fooPartition0Hw = hwmFor(replicaManager, topic, 0)
@@ -86,7 +87,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
     val scheduler = new KafkaScheduler(2)
     scheduler.startup
     // create replica manager
-    val replicaManager = new ReplicaManager(configs.head, new MockTime(), 
zkClient, scheduler, logManagers(0))
+    val replicaManager = new ReplicaManager(configs.head, new MockTime(), 
zkClient, scheduler, logManagers(0), new AtomicBoolean(false))
     replicaManager.startup()
     replicaManager.checkpointHighWatermarks()
     var topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 6184f42..7026432 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -24,6 +24,7 @@ import org.easymock.EasyMock
 import kafka.log.Log
 import org.junit.Assert._
 import kafka.utils._
+import java.util.concurrent.atomic.AtomicBoolean
 
 class IsrExpirationTest extends JUnit3Suite {
 
@@ -80,7 +81,7 @@ class IsrExpirationTest extends JUnit3Suite {
   private def getPartitionWithAllReplicasInIsr(topic: String, partitionId: 
Int, time: Time, config: KafkaConfig,
                                                localLog: Log): Partition = {
     val leaderId=config.brokerId
-    val replicaManager = new ReplicaManager(config, time, null, null, null)
+    val replicaManager = new ReplicaManager(config, time, null, null, null, 
new AtomicBoolean(false))
     val partition = replicaManager.getOrCreatePartition(topic, partitionId, 1)
     val leaderReplica = new Replica(leaderId, partition, time, 0, 
Some(localLog))
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f387ae4/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index c7dd8a7..23a8cb5 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -87,10 +87,12 @@ class SimpleFetchTest extends JUnit3Suite {
     EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, 
partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes()
     EasyMock.replay(replicaManager)
 
+    val controller = 
EasyMock.createMock(classOf[kafka.controller.KafkaController])
+
     // start a request channel with 2 processors and a queue size of 5 (this 
is more or less arbitrary)
     // don't provide replica or leader callbacks since they will not be tested 
here
     val requestChannel = new RequestChannel(2, 5)
-    val apis = new KafkaApis(requestChannel, replicaManager, zkClient, 
configs.head.brokerId)
+    val apis = new KafkaApis(requestChannel, replicaManager, zkClient, 
configs.head.brokerId, controller)
 
     // This request (from a follower) wants to read up to 2*HW but should only 
get back up to HW bytes into the log
     val goodFetch = new FetchRequestBuilder()
@@ -184,8 +186,10 @@ class SimpleFetchTest extends JUnit3Suite {
     EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, 
partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes()
     EasyMock.replay(replicaManager)
 
+    val controller = 
EasyMock.createMock(classOf[kafka.controller.KafkaController])
+
     val requestChannel = new RequestChannel(2, 5)
-    val apis = new KafkaApis(requestChannel, replicaManager, zkClient, 
configs.head.brokerId)
+    val apis = new KafkaApis(requestChannel, replicaManager, zkClient, 
configs.head.brokerId, controller)
 
     /**
      * This fetch, coming from a replica, requests all data at offset "15".  
Because the request is coming

Reply via email to