This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new aaf8e02  KAFKA-7482: LeaderAndIsrRequest should be sent to the 
shutting down broker (#5745)
aaf8e02 is described below

commit aaf8e0240355d77777eb3828583f61af14dc3c3b
Author: Jun Rao <jun...@gmail.com>
AuthorDate: Fri Oct 12 10:11:54 2018 -0700

    KAFKA-7482: LeaderAndIsrRequest should be sent to the shutting down broker 
(#5745)
    
    Reviewers: Dong Lin <lindon...@gmail.com>
---
 core/src/main/scala/kafka/controller/PartitionStateMachine.scala    | 6 +++---
 .../scala/unit/kafka/controller/PartitionStateMachineTest.scala     | 5 +++--
 2 files changed, 6 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala 
b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 663ee8d..e4f0532 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -441,12 +441,12 @@ class PartitionStateMachine(config: KafkaConfig,
   Seq[(TopicPartition, Option[LeaderAndIsr], Seq[Int])] = {
     leaderIsrAndControllerEpochs.map { case (partition, 
leaderIsrAndControllerEpoch) =>
       val assignment = controllerContext.partitionReplicaAssignment(partition)
-      val liveReplicas = assignment.filter(replica => 
controllerContext.isReplicaOnline(replica, partition))
+      val liveOrShuttingDownReplicas = assignment.filter(replica => 
controllerContext.isReplicaOnline(replica, partition, 
includeShuttingDownBrokers = true))
       val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr
-      val leaderOpt = 
PartitionLeaderElectionAlgorithms.controlledShutdownPartitionLeaderElection(assignment,
 isr, liveReplicas.toSet, shuttingDownBrokers)
+      val leaderOpt = 
PartitionLeaderElectionAlgorithms.controlledShutdownPartitionLeaderElection(assignment,
 isr, liveOrShuttingDownReplicas.toSet, shuttingDownBrokers)
       val newIsr = isr.filter(replica => 
!controllerContext.shuttingDownBrokerIds.contains(replica))
       val newLeaderAndIsrOpt = leaderOpt.map(leader => 
leaderIsrAndControllerEpoch.leaderAndIsr.newLeaderAndIsr(leader, newIsr))
-      (partition, newLeaderAndIsrOpt, liveReplicas)
+      (partition, newLeaderAndIsrOpt, liveOrShuttingDownReplicas)
     }
   }
 
diff --git 
a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala 
b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
index b89632e..3370b54 100644
--- a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
@@ -192,7 +192,9 @@ class PartitionStateMachineTest extends JUnitSuite {
     val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2)
     EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> 
leaderAndIsrAfterElection), controllerEpoch, controllerContext.epochZkVersion))
       .andReturn(UpdateLeaderAndIsrResult(Map(partition -> 
updatedLeaderAndIsr), Seq.empty, Map.empty))
-    
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(otherBrokerId),
+
+    // The leaderAndIsr request should be sent to both brokers, including the 
shutting down one
+    
EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId,
 otherBrokerId),
       partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, 
controllerEpoch), Seq(brokerId, otherBrokerId),
       isNew = false))
     
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
@@ -455,5 +457,4 @@ class PartitionStateMachineTest extends JUnitSuite {
     topicDeletionManager.enqueueTopicsForDeletion(Set(topic))
     assertEquals(s"There should be no offline partition(s)", 0, 
partitionStateMachine.offlinePartitionCount)
   }
-
 }

Reply via email to