This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new aaf8e02 KAFKA-7482: LeaderAndIsrRequest should be sent to the shutting down broker (#5745) aaf8e02 is described below commit aaf8e0240355d77777eb3828583f61af14dc3c3b Author: Jun Rao <jun...@gmail.com> AuthorDate: Fri Oct 12 10:11:54 2018 -0700 KAFKA-7482: LeaderAndIsrRequest should be sent to the shutting down broker (#5745) Reviewers: Dong Lin <lindon...@gmail.com> --- core/src/main/scala/kafka/controller/PartitionStateMachine.scala | 6 +++--- .../scala/unit/kafka/controller/PartitionStateMachineTest.scala | 5 +++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 663ee8d..e4f0532 100755 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -441,12 +441,12 @@ class PartitionStateMachine(config: KafkaConfig, Seq[(TopicPartition, Option[LeaderAndIsr], Seq[Int])] = { leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) => val assignment = controllerContext.partitionReplicaAssignment(partition) - val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, partition)) + val liveOrShuttingDownReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, partition, includeShuttingDownBrokers = true)) val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr - val leaderOpt = PartitionLeaderElectionAlgorithms.controlledShutdownPartitionLeaderElection(assignment, isr, liveReplicas.toSet, shuttingDownBrokers) + val leaderOpt = PartitionLeaderElectionAlgorithms.controlledShutdownPartitionLeaderElection(assignment, isr, liveOrShuttingDownReplicas.toSet, shuttingDownBrokers) val newIsr = isr.filter(replica => !controllerContext.shuttingDownBrokerIds.contains(replica)) val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderIsrAndControllerEpoch.leaderAndIsr.newLeaderAndIsr(leader, newIsr)) - (partition, newLeaderAndIsrOpt, liveReplicas) + (partition, newLeaderAndIsrOpt, liveOrShuttingDownReplicas) } } diff --git a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala index b89632e..3370b54 100644 --- a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala +++ b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala @@ -192,7 +192,9 @@ class PartitionStateMachineTest extends JUnitSuite { val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2) EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch, controllerContext.epochZkVersion)) .andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty)) - EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(otherBrokerId), + + // The leaderAndIsr request should be sent to both brokers, including the shutting down one + EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId, otherBrokerId), partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch), Seq(brokerId, otherBrokerId), isNew = false)) EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch)) @@ -455,5 +457,4 @@ class PartitionStateMachineTest extends JUnitSuite { topicDeletionManager.enqueueTopicsForDeletion(Set(topic)) assertEquals(s"There should be no offline partition(s)", 0, partitionStateMachine.offlinePartitionCount) } - }