Repository: kafka
Updated Branches:
  refs/heads/0.11.0 81a657ec4 -> 8acd77f15


MINOR: onControllerResignation should be invoked if triggerControllerMove is 
called

Also update the test to be simpler since we can use a mock event to simulate 
the issue
more easily (thanks Jun for the suggestion). This should fix two issues:

1. A transient test failure due to a NPE in 
ControllerFailoverTest.testMetadataUpdate:

```text
Caused by: java.lang.NullPointerException
        at 
kafka.controller.ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers(ControllerChannelManager.scala:338)
        at 
kafka.controller.KafkaController.sendUpdateMetadataRequest(KafkaController.scala:975)
        at 
kafka.controller.ControllerFailoverTest.testMetadataUpdate(ControllerFailoverTest.scala:141)
```

The test was creating an additional thread and it does not seem like it was 
doing the
appropriate synchronization (perhaps this became more of an issue after we 
changed
the Controller to be single-threaded and changed the locking)

2. Setting `activeControllerId.set(-1)` in `triggerControllerMove` causes 
`Reelect` not to invoke `onControllerResignation`. Among other things, this 
causes an `IllegalStateException` to be thrown when `KafkaScheduler.startup` is 
invoked for the second time without the corresponding `shutdown`. We now simply 
call `onControllerResignation` as part of `triggerControllerMove`.

Finally, I included a few clean-ups:

1. No longer update the broker state in `onControllerFailover`. This is no 
longer needed
since we removed the `RunningAsController` state (KAFKA-3761).
2. Trivial clean-ups in KafkaController
3. Removed unused parameter in `ZkUtils.getPartitionLeaderAndIsrForTopics`

Author: Ismael Juma <[email protected]>

Reviewers: Jun Rao <[email protected]>

Closes #2935 from ijuma/on-controller-resignation-if-trigger-controller-move

(cherry picked from commit 6021618f9dafa3478104575d307e7bcd2cb4cca9)
Signed-off-by: Jun Rao <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8acd77f1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8acd77f1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8acd77f1

Branch: refs/heads/0.11.0
Commit: 8acd77f15265214cb359586b94c8f1b3990e2c30
Parents: 81a657e
Author: Ismael Juma <[email protected]>
Authored: Tue May 30 16:59:33 2017 -0700
Committer: Jun Rao <[email protected]>
Committed: Tue May 30 16:59:44 2017 -0700

----------------------------------------------------------------------
 .../controller/ControllerChannelManager.scala   |   4 +-
 .../kafka/controller/KafkaController.scala      | 113 +++++++-------
 .../controller/PartitionStateMachine.scala      |   6 +-
 .../kafka/controller/ReplicaStateMachine.scala  |   4 +-
 .../main/scala/kafka/server/KafkaServer.scala   |   2 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala   |   3 +-
 .../controller/ControllerEventManagerTest.scala |  15 +-
 .../controller/ControllerFailoverTest.scala     | 148 +++++--------------
 .../controller/ControllerIntegrationTest.scala  |   6 +-
 .../kafka/controller/ControllerTestUtils.scala  |  35 +++++
 10 files changed, 137 insertions(+), 199 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8acd77f1/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index ea8d13b..8f98a8c 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -381,7 +381,9 @@ class ControllerBrokerRequestBatch(controller: 
KafkaController) extends  Logging
     try {
       leaderAndIsrRequestMap.foreach { case (broker, partitionStateInfos) =>
         partitionStateInfos.foreach { case (topicPartition, state) =>
-          val typeOfRequest = if (broker == 
state.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else 
"become-follower"
+          val typeOfRequest =
+            if (broker == 
state.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader"
+            else "become-follower"
           stateChangeLogger.trace(("Controller %d epoch %d sending %s 
LeaderAndIsr request %s to broker %d " +
                                    "for partition 
[%s,%d]").format(controllerId, controllerEpoch, typeOfRequest,
                                                                    
state.leaderIsrAndControllerEpoch, broker,

http://git-wip-us.apache.org/repos/asf/kafka/blob/8acd77f1/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index dbce485..956d1ca 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -146,18 +146,20 @@ object KafkaController extends Logging {
   }
 }
 
-class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val 
brokerState: BrokerState, time: Time, metrics: Metrics, threadNamePrefix: 
Option[String] = None) extends Logging with KafkaMetricsGroup {
+class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, 
metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with 
KafkaMetricsGroup {
   this.logIdent = "[Controller " + config.brokerId + "]: "
   private val stateChangeLogger = KafkaController.stateChangeLogger
   val controllerContext = new ControllerContext(zkUtils)
   val partitionStateMachine = new PartitionStateMachine(this)
   val replicaStateMachine = new ReplicaStateMachine(this)
 
-  // have a separate scheduler for the controller to be able to start and stop 
independently of the
-  // kafka server
-  private val kafkaScheduler = new KafkaScheduler(1)
+  // have a separate scheduler for the controller to be able to start and stop 
independently of the kafka server
+  // visible for testing
+  private[controller] val kafkaScheduler = new KafkaScheduler(1)
 
-  private val eventManager = new 
ControllerEventManager(controllerContext.stats.rateAndTimeMetrics, _ => 
updateMetrics())
+  // visible for testing
+  private[controller] val eventManager = new 
ControllerEventManager(controllerContext.stats.rateAndTimeMetrics,
+    _ => updateMetrics())
 
   val topicDeletionManager = new TopicDeletionManager(this, eventManager)
   val offlinePartitionSelector = new 
OfflinePartitionLeaderSelector(controllerContext, config)
@@ -290,7 +292,6 @@ class KafkaController(val config: KafkaConfig, zkUtils: 
ZkUtils, val brokerState
   /**
    * This callback is invoked by the zookeeper leader elector when the current 
broker resigns as the controller. This is
    * required to clean up internal controller data structures
-   * Note:We need to resign as a controller out of the controller lock to 
avoid potential deadlock issue
    */
   def onControllerResignation() {
     debug("Controller resigning, broker id %d".format(config.brokerId))
@@ -318,9 +319,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: 
ZkUtils, val brokerState
     replicaStateMachine.shutdown()
     deregisterBrokerChangeListener()
 
-    // reset controller context
     resetControllerContext()
-    brokerState.newState(RunningAsBroker)
 
     info("Broker %d resigned as the controller".format(config.brokerId))
   }
@@ -746,18 +745,15 @@ class KafkaController(val config: KafkaConfig, zkUtils: 
ZkUtils, val brokerState
   }
 
   def updateLeaderAndIsrCache(topicAndPartitions: Set[TopicAndPartition] = 
controllerContext.partitionReplicaAssignment.keySet) {
-    val leaderAndIsrInfo = 
zkUtils.getPartitionLeaderAndIsrForTopics(zkUtils.zkClient, topicAndPartitions)
-    for((topicPartition, leaderIsrAndControllerEpoch) <- leaderAndIsrInfo)
+    val leaderAndIsrInfo = 
zkUtils.getPartitionLeaderAndIsrForTopics(topicAndPartitions)
+    for ((topicPartition, leaderIsrAndControllerEpoch) <- leaderAndIsrInfo)
       controllerContext.partitionLeadershipInfo.put(topicPartition, 
leaderIsrAndControllerEpoch)
   }
 
   private def areReplicasInIsr(topic: String, partition: Int, replicas: 
Seq[Int]): Boolean = {
-    zkUtils.getLeaderAndIsrForPartition(topic, partition) match {
-      case Some(leaderAndIsr) =>
-        val replicasNotInIsr = replicas.filterNot(r => 
leaderAndIsr.isr.contains(r))
-        replicasNotInIsr.isEmpty
-      case None => false
-    }
+    zkUtils.getLeaderAndIsrForPartition(topic, partition).map { leaderAndIsr =>
+      replicas.forall(leaderAndIsr.isr.contains)
+    }.getOrElse(false)
   }
 
   private def moveReassignedPartitionLeaderIfRequired(topicAndPartition: 
TopicAndPartition,
@@ -824,22 +820,16 @@ class KafkaController(val config: KafkaConfig, zkUtils: 
ZkUtils, val brokerState
   }
 
   private def updateLeaderEpochAndSendRequest(topicAndPartition: 
TopicAndPartition, replicasToReceiveRequest: Seq[Int], newAssignedReplicas: 
Seq[Int]) {
-    brokerRequestBatch.newBatch()
     updateLeaderEpoch(topicAndPartition.topic, topicAndPartition.partition) 
match {
       case Some(updatedLeaderIsrAndControllerEpoch) =>
         try {
+          brokerRequestBatch.newBatch()
           
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasToReceiveRequest, 
topicAndPartition.topic,
             topicAndPartition.partition, updatedLeaderIsrAndControllerEpoch, 
newAssignedReplicas)
           brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
         } catch {
-          case e : IllegalStateException => {
-            // Resign if the controller is in an illegal state
-            error("Forcing the controller to resign")
-            brokerRequestBatch.clear()
-            triggerControllerMove()
-
-            throw e
-          }
+          case e: IllegalStateException =>
+            handleIllegalState(e)
         }
         stateChangeLogger.trace(("Controller %d epoch %d sent LeaderAndIsr 
request %s with new assigned replica list %s " +
           "to leader %d for partition being reassigned 
%s").format(config.brokerId, controllerContext.epoch, 
updatedLeaderIsrAndControllerEpoch,
@@ -986,14 +976,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: 
ZkUtils, val brokerState
       brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, 
partitions)
       brokerRequestBatch.sendRequestsToBrokers(epoch)
     } catch {
-      case e : IllegalStateException => {
-        // Resign if the controller is in an illegal state
-        error("Forcing the controller to resign")
-        brokerRequestBatch.clear()
-        triggerControllerMove()
-
-        throw e
-      }
+      case e: IllegalStateException =>
+        handleIllegalState(e)
     }
   }
 
@@ -1425,39 +1409,32 @@ class KafkaController(val config: KafkaConfig, zkUtils: 
ZkUtils, val brokerState
           controllerContext.partitionsOnBroker(id)
             .map(topicAndPartition => (topicAndPartition, 
controllerContext.partitionReplicaAssignment(topicAndPartition).size))
 
-      allPartitionsAndReplicationFactorOnBroker.foreach {
-        case(topicAndPartition, replicationFactor) =>
-          
controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { 
currLeaderIsrAndControllerEpoch =>
-            if (replicationFactor > 1) {
-              if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) {
-                // If the broker leads the topic partition, transition the 
leader and update isr. Updates zk and
-                // notifies all affected brokers
-                
partitionStateMachine.handleStateChanges(Set(topicAndPartition), 
OnlinePartition,
-                  controlledShutdownPartitionLeaderSelector)
-              } else {
-                // Stop the replica first. The state change below initiates ZK 
changes which should take some time
-                // before which the stop replica request should be completed 
(in most cases)
-                try {
-                  brokerRequestBatch.newBatch()
-                  brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), 
topicAndPartition.topic,
-                    topicAndPartition.partition, deletePartition = false)
-                  brokerRequestBatch.sendRequestsToBrokers(epoch)
-                } catch {
-                  case e : IllegalStateException => {
-                    // Resign if the controller is in an illegal state
-                    error("Forcing the controller to resign")
-                    brokerRequestBatch.clear()
-                    triggerControllerMove()
-
-                    throw e
-                  }
-                }
-                // If the broker is a follower, updates the isr in ZK and 
notifies the current leader
-                
replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic,
-                  topicAndPartition.partition, id)), OfflineReplica)
+      allPartitionsAndReplicationFactorOnBroker.foreach { case 
(topicAndPartition, replicationFactor) =>
+        
controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { 
currLeaderIsrAndControllerEpoch =>
+          if (replicationFactor > 1) {
+            if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) {
+              // If the broker leads the topic partition, transition the 
leader and update isr. Updates zk and
+              // notifies all affected brokers
+              partitionStateMachine.handleStateChanges(Set(topicAndPartition), 
OnlinePartition,
+                controlledShutdownPartitionLeaderSelector)
+            } else {
+              // Stop the replica first. The state change below initiates ZK 
changes which should take some time
+              // before which the stop replica request should be completed (in 
most cases)
+              try {
+                brokerRequestBatch.newBatch()
+                brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), 
topicAndPartition.topic,
+                  topicAndPartition.partition, deletePartition = false)
+                brokerRequestBatch.sendRequestsToBrokers(epoch)
+              } catch {
+                case e: IllegalStateException =>
+                  handleIllegalState(e)
               }
+              // If the broker is a follower, updates the isr in ZK and 
notifies the current leader
+              
replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic,
+                topicAndPartition.partition, id)), OfflineReplica)
             }
           }
+        }
       }
       def replicatedPartitionsBrokerLeads() = {
         trace("All leaders = " + 
controllerContext.partitionLeadershipInfo.mkString(","))
@@ -1557,7 +1534,17 @@ class KafkaController(val config: KafkaConfig, zkUtils: 
ZkUtils, val brokerState
       }
   }
 
+  // visible for testing
+  private[controller] def handleIllegalState(e: IllegalStateException): 
Nothing = {
+    // Resign if the controller is in an illegal state
+    error("Forcing the controller to resign")
+    brokerRequestBatch.clear()
+    triggerControllerMove()
+    throw e
+  }
+
   private def triggerControllerMove(): Unit = {
+    onControllerResignation()
     activeControllerId = -1
     controllerContext.zkUtils.deletePath(ZkUtils.ControllerPath)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8acd77f1/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala 
b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 4cffc13..5751e17 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -79,9 +79,9 @@ class PartitionStateMachine(controller: KafkaController) 
extends Logging {
       brokerRequestBatch.newBatch()
       // try to move all partitions in NewPartition or OfflinePartition state 
to OnlinePartition state except partitions
       // that belong to topics to be deleted
-      for((topicAndPartition, partitionState) <- partitionState
+      for ((topicAndPartition, partitionState) <- partitionState
           if 
!controller.topicDeletionManager.isTopicQueuedUpForDeletion(topicAndPartition.topic))
 {
-        if(partitionState.equals(OfflinePartition) || 
partitionState.equals(NewPartition))
+        if (partitionState.equals(OfflinePartition) || 
partitionState.equals(NewPartition))
           handleStateChange(topicAndPartition.topic, 
topicAndPartition.partition, OnlinePartition, 
controller.offlinePartitionSelector,
                             (new CallbackBuilder).build)
       }
@@ -111,7 +111,7 @@ class PartitionStateMachine(controller: KafkaController) 
extends Logging {
         handleStateChange(topicAndPartition.topic, 
topicAndPartition.partition, targetState, leaderSelector, callbacks)
       }
       brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
-    }catch {
+    } catch {
       case e: Throwable => error("Error while moving some partitions to %s 
state".format(targetState), e)
       // TODO: It is not enough to bail out and log an error, it is important 
to trigger state changes for those partitions
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8acd77f1/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 
b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 0759ed4..60b9990 100755
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -81,13 +81,13 @@ class ReplicaStateMachine(controller: KafkaController) 
extends Logging {
    */
   def handleStateChanges(replicas: Set[PartitionAndReplica], targetState: 
ReplicaState,
                          callbacks: Callbacks = (new CallbackBuilder).build) {
-    if(replicas.nonEmpty) {
+    if (replicas.nonEmpty) {
       info("Invoking state change to %s for replicas %s".format(targetState, 
replicas.mkString(",")))
       try {
         brokerRequestBatch.newBatch()
         replicas.foreach(r => handleStateChange(r, targetState, callbacks))
         brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
-      }catch {
+      } catch {
         case e: Throwable => error("Error while moving some replicas to %s 
state".format(targetState), e)
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8acd77f1/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index c7dac0d..0a87750 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -226,7 +226,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
         replicaManager.startup()
 
         /* start kafka controller */
-        kafkaController = new KafkaController(config, zkUtils, brokerState, 
time, metrics, threadNamePrefix)
+        kafkaController = new KafkaController(config, zkUtils, time, metrics, 
threadNamePrefix)
         kafkaController.startup()
 
         adminManager = new AdminManager(config, metrics, metadataCache, 
zkUtils)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8acd77f1/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala 
b/core/src/main/scala/kafka/utils/ZkUtils.scala
index fc78501..ac497c4 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -701,8 +701,7 @@ class ZkUtils(val zkClient: ZkClient,
     cluster
   }
 
-  def getPartitionLeaderAndIsrForTopics(zkClient: ZkClient, 
topicAndPartitions: Set[TopicAndPartition])
-  : mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = {
+  def getPartitionLeaderAndIsrForTopics(topicAndPartitions: 
Set[TopicAndPartition]): mutable.Map[TopicAndPartition, 
LeaderIsrAndControllerEpoch] = {
     val ret = new mutable.HashMap[TopicAndPartition, 
LeaderIsrAndControllerEpoch]
     for(topicAndPartition <- topicAndPartitions) {
       ReplicationUtils.getLeaderIsrAndEpochForPartition(this, 
topicAndPartition.topic, topicAndPartition.partition) match {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8acd77f1/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala 
b/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala
index 727c4f3..ec9343e 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala
@@ -23,7 +23,6 @@ import java.util.concurrent.atomic.AtomicInteger
 import com.yammer.metrics.Metrics
 import com.yammer.metrics.core.Timer
 import kafka.utils.TestUtils
-import org.easymock.{EasyMock, IAnswer}
 import org.junit.{After, Test}
 import org.junit.Assert.{assertEquals, fail}
 
@@ -60,21 +59,13 @@ class ControllerEventManagerTest {
 
     val initialTimerCount = timer(metricName).count
 
-    // `ControllerEvent` is sealed so we use EasyMock to create a subclass
-    val eventMock = EasyMock.createMock(classOf[ControllerEvent])
-    EasyMock.expect(eventMock.state).andReturn(controllerState)
-
     // Only return from `process()` once we have checked 
`controllerEventManager.state`
     val latch = new CountDownLatch(1)
-    EasyMock.expect(eventMock.process()).andAnswer(new IAnswer[Unit]() {
-      def answer(): Unit = {
-        latch.await()
-        process()
-      }
+    val eventMock = 
ControllerTestUtils.createMockControllerEvent(controllerState, { () =>
+      latch.await()
+      process()
     })
 
-    EasyMock.replay(eventMock)
-
     controllerEventManager.put(eventMock)
     TestUtils.waitUntilTrue(() => controllerEventManager.state == 
controllerState,
       s"Controller state is not $controllerState")

http://git-wip-us.apache.org/repos/asf/kafka/blob/8acd77f1/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala 
b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
index 83a315f..7a91bef 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
@@ -18,18 +18,17 @@
 package kafka.controller
 
 import java.util.Properties
-import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.CountDownLatch
 
+import kafka.admin.AdminUtils
 import kafka.common.TopicAndPartition
 import kafka.integration.KafkaServerTestHarness
-import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.server.KafkaConfig
 import kafka.utils._
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.utils.Time
-import org.apache.log4j.{Level, Logger}
-import org.junit.{After, Ignore, Test}
-
-import scala.collection.mutable
+import org.apache.log4j.Logger
+import org.junit.{After, Test}
+import org.junit.Assert._
 
 class ControllerFailoverTest extends KafkaServerTestHarness with Logging {
   val log = Logger.getLogger(classOf[ControllerFailoverTest])
@@ -54,119 +53,44 @@ class ControllerFailoverTest extends 
KafkaServerTestHarness with Logging {
    * See @link{https://issues.apache.org/jira/browse/KAFKA-2300}
    * for the background of this test case
    */
-  @Ignore // This needs to be reworked as described here: 
https://github.com/apache/kafka/pull/2935#discussion_r114374412
   @Test
-  def testMetadataUpdate() {
-    log.setLevel(Level.INFO)
-    var controller: KafkaServer = this.servers.head
-    // Find the current controller
-    val epochMap: mutable.Map[Int, Int] = mutable.Map.empty
-    for (server <- this.servers) {
-      epochMap += (server.config.brokerId -> server.kafkaController.epoch)
-      if(server.kafkaController.isActive) {
-        controller = server
-      }
+  def testHandleIllegalStateException() {
+    val initialController = 
servers.find(_.kafkaController.isActive).map(_.kafkaController).getOrElse {
+      fail("Could not find controller")
     }
+    val initialEpoch = initialController.epoch
     // Create topic with one partition
-    kafka.admin.AdminUtils.createTopic(controller.zkUtils, topic, 1, 1)
+    AdminUtils.createTopic(servers.head.zkUtils, topic, 1, 1)
     val topicPartition = TopicAndPartition("topic1", 0)
-    var partitions = 
controller.kafkaController.partitionStateMachine.partitionsInState(OnlinePartition)
-    while (!partitions.contains(topicPartition)) {
-      partitions = 
controller.kafkaController.partitionStateMachine.partitionsInState(OnlinePartition)
-      Thread.sleep(100)
-    }
-    // Replace channel manager with our mock manager
-    
controller.kafkaController.controllerContext.controllerChannelManager.shutdown()
-    val channelManager = new 
MockChannelManager(controller.kafkaController.controllerContext, 
-                                                  
controller.kafkaController.config, metrics)
-    channelManager.startup()
-    controller.kafkaController.controllerContext.controllerChannelManager = 
channelManager
-    channelManager.shrinkBlockingQueue(0)
-    channelManager.stopSendThread(0)
-    // Spawn a new thread to block on the outgoing channel
-    // queue
-    val thread = new Thread(new Runnable {
-      def run() {
-        try {
-          controller.kafkaController.sendUpdateMetadataRequest(Seq(0), 
Set(topicPartition))
-          log.info("Queue state %d %d".format(channelManager.queueCapacity(0), 
channelManager.queueSize(0)))
-          controller.kafkaController.sendUpdateMetadataRequest(Seq(0), 
Set(topicPartition))
-          log.info("Queue state %d %d".format(channelManager.queueCapacity(0), 
channelManager.queueSize(0)))
-        } catch {
-          case _: Exception => log.info("Thread interrupted")
-        }
+    TestUtils.waitUntilTrue(() =>
+      
initialController.partitionStateMachine.partitionsInState(OnlinePartition).contains(topicPartition),
+      s"Partition $topicPartition did not transition to online state")
+
+    // Wait until we have verified that we have resigned
+    val latch = new CountDownLatch(1)
+    @volatile var expectedExceptionThrown = false
+    @volatile var unexpectedExceptionThrown: Option[Throwable] = None
+    val illegalStateEvent = 
ControllerTestUtils.createMockControllerEvent(ControllerState.BrokerChange, { 
() =>
+      try initialController.handleIllegalState(new 
IllegalStateException("Thrown for test purposes"))
+      catch {
+        case _: IllegalStateException => expectedExceptionThrown = true
+        case t: Throwable => unexpectedExceptionThrown = Some(t)
       }
+      latch.await()
     })
-    thread.setName("mythread")
-    thread.start()
-    while (thread.getState() != Thread.State.WAITING) {
-      Thread.sleep(100)
-    }
-    // Assume that the thread is WAITING because it is
-    // blocked on the queue, so interrupt and move forward
-    thread.interrupt()
-    thread.join()
-    channelManager.resumeSendThread(0)
-    // Wait and find current controller
-    var found = false
-    var counter = 0
-    while (!found && counter < 10) {
-      for (server <- this.servers) {
-        val previousEpoch = epochMap get server.config.brokerId match {
-          case Some(epoch) =>
-            epoch
-          case None =>
-            val msg = String.format("Missing element in epoch map %s", 
epochMap.mkString(", "))
-            throw new IllegalStateException(msg)
-        }
+    initialController.eventManager.put(illegalStateEvent)
+    // Check that we have shutdown the scheduler (via onControllerResigned)
+    TestUtils.waitUntilTrue(() => !initialController.kafkaScheduler.isStarted, 
"Scheduler was not shutdown")
+    TestUtils.waitUntilTrue(() => !initialController.isActive, "Controller did 
not become inactive")
+    latch.countDown()
+    assertTrue("IllegalStateException was not thrown", expectedExceptionThrown)
+    assertEquals("Unexpected exception thrown", None, 
unexpectedExceptionThrown)
 
-        if (server.kafkaController.isActive
-            && previousEpoch < server.kafkaController.epoch) {
-          controller = server
-          found = true
-        }
+    TestUtils.waitUntilTrue(() => {
+      servers.exists { server =>
+        server.kafkaController.isActive && server.kafkaController.epoch > 
initialController.epoch
       }
-      if (!found) {
-          Thread.sleep(100)
-          counter += 1
-      }
-    }
-    // Give it a shot to make sure that sending isn't blocking
-    try {
-      controller.kafkaController.sendUpdateMetadataRequest(Seq(0), 
Set(topicPartition))
-    } catch {
-      case e : Throwable => {
-        fail(e)
-      }
-    }
-  }
-}
-
-class MockChannelManager(private val controllerContext: ControllerContext, 
config: KafkaConfig, metrics: Metrics)
-  extends ControllerChannelManager(controllerContext, config, Time.SYSTEM, 
metrics) {
-
-  def stopSendThread(brokerId: Int) {
-    val requestThread = brokerStateInfo(brokerId).requestSendThread
-    requestThread.isRunning.set(false)
-    requestThread.interrupt
-    requestThread.join
-  }
-
-  def shrinkBlockingQueue(brokerId: Int) {
-    val messageQueue = new LinkedBlockingQueue[QueueItem](1)
-    val brokerInfo = this.brokerStateInfo(brokerId)
-    this.brokerStateInfo.put(brokerId, brokerInfo.copy(messageQueue = 
messageQueue))
-  }
-
-  def resumeSendThread (brokerId: Int) {
-    this.startRequestSendThread(0)
-  }
-
-  def queueCapacity(brokerId: Int): Int = {
-    this.brokerStateInfo(brokerId).messageQueue.remainingCapacity
-  }
+    }, "Failed to find controller")
 
-  def queueSize(brokerId: Int): Int = {
-    this.brokerStateInfo(brokerId).messageQueue.size
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8acd77f1/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index 2df93c7..d5f2fe0 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -264,7 +264,7 @@ class ControllerIntegrationTest extends 
ZooKeeperTestHarness {
     servers(otherBrokerId).shutdown()
     servers(otherBrokerId).awaitShutdown()
     TestUtils.waitUntilTrue(() => {
-      val leaderIsrAndControllerEpochMap = 
zkUtils.getPartitionLeaderAndIsrForTopics(null, Set(tp))
+      val leaderIsrAndControllerEpochMap = 
zkUtils.getPartitionLeaderAndIsrForTopics(Set(tp))
       leaderIsrAndControllerEpochMap.contains(tp) &&
         isExpectedPartitionState(leaderIsrAndControllerEpochMap(tp), 
KafkaController.InitialControllerEpoch, LeaderAndIsr.NoLeader, 
LeaderAndIsr.initialLeaderEpoch + 1) &&
         leaderIsrAndControllerEpochMap(tp).leaderAndIsr.isr == 
List(otherBrokerId)
@@ -284,7 +284,7 @@ class ControllerIntegrationTest extends 
ZooKeeperTestHarness {
     servers(1).shutdown()
     servers(1).awaitShutdown()
     TestUtils.waitUntilTrue(() => {
-      val leaderIsrAndControllerEpochMap = 
zkUtils.getPartitionLeaderAndIsrForTopics(null, Set(tp))
+      val leaderIsrAndControllerEpochMap = 
zkUtils.getPartitionLeaderAndIsrForTopics(Set(tp))
       leaderIsrAndControllerEpochMap.contains(tp) &&
         isExpectedPartitionState(leaderIsrAndControllerEpochMap(tp), 
KafkaController.InitialControllerEpoch, LeaderAndIsr.NoLeader, 
LeaderAndIsr.initialLeaderEpoch + 1) &&
         leaderIsrAndControllerEpochMap(tp).leaderAndIsr.isr == List.empty
@@ -301,7 +301,7 @@ class ControllerIntegrationTest extends 
ZooKeeperTestHarness {
                                     leaderEpoch: Int,
                                     message: String): Unit = {
     TestUtils.waitUntilTrue(() => {
-      val leaderIsrAndControllerEpochMap = 
zkUtils.getPartitionLeaderAndIsrForTopics(null, Set(tp))
+      val leaderIsrAndControllerEpochMap = 
zkUtils.getPartitionLeaderAndIsrForTopics(Set(tp))
       leaderIsrAndControllerEpochMap.contains(tp) &&
         isExpectedPartitionState(leaderIsrAndControllerEpochMap(tp), 
controllerEpoch, leader, leaderEpoch)
     }, message)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8acd77f1/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala 
b/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala
new file mode 100644
index 0000000..407297a
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.controller
+
+import org.easymock.{EasyMock, IAnswer}
+
+object ControllerTestUtils {
+
+  /** Since ControllerEvent is sealed, return a subclass of ControllerEvent 
created with EasyMock */
+  def createMockControllerEvent(controllerState: ControllerState, process: () 
=> Unit): ControllerEvent = {
+    val mockEvent = EasyMock.createMock(classOf[ControllerEvent])
+    EasyMock.expect(mockEvent.state).andReturn(controllerState)
+    EasyMock.expect(mockEvent.process()).andAnswer(new IAnswer[Unit]() {
+      def answer(): Unit = {
+        process()
+      }
+    })
+    EasyMock.replay(mockEvent)
+    mockEvent
+  }
+}

Reply via email to