This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3c0b89d  KAFKA-13225: Controller skips sending UpdateMetadataRequest 
when no change in partition state. (#11255)
3c0b89d is described below

commit 3c0b89d9df7a46cceeaa08a6efd25bd2e49dcaaf
Author: David Mao <[email protected]>
AuthorDate: Thu Sep 2 15:44:12 2021 -0500

    KAFKA-13225: Controller skips sending UpdateMetadataRequest when no change 
in partition state. (#11255)
    
    The controller can skip sending updateMetadataRequest during the broker 
failure callback if there are offline partitions and the deleted brokers don't 
host any partitions.
    
    Reviewers: Jun Rao <[email protected]>
---
 .../scala/kafka/controller/KafkaController.scala   |  9 ++++---
 .../kafka/controller/PartitionStateMachine.scala   |  4 +--
 .../controller/ControllerIntegrationTest.scala     | 29 ++++++++++++++++++++++
 3 files changed, 36 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index 1d36f52..e99bf75 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -616,7 +616,7 @@ class KafkaController(val config: KafkaConfig,
     // trigger OfflinePartition state for all partitions whose current leader 
is one amongst the newOfflineReplicas
     
partitionStateMachine.handleStateChanges(partitionsWithOfflineLeader.toSeq, 
OfflinePartition)
     // trigger OnlinePartition state changes for offline or new partitions
-    partitionStateMachine.triggerOnlinePartitionStateChange()
+    val onlineStateChangeResults = 
partitionStateMachine.triggerOnlinePartitionStateChange()
     // trigger OfflineReplica state change for those newly offline replicas
     
replicaStateMachine.handleStateChanges(newOfflineReplicasNotForDeletion.toSeq, 
OfflineReplica)
 
@@ -628,9 +628,10 @@ class KafkaController(val config: KafkaConfig,
       topicDeletionManager.failReplicaDeletion(newOfflineReplicasForDeletion)
     }
 
-    // If replica failure did not require leader re-election, inform brokers 
of the offline brokers
-    // Note that during leader re-election, brokers update their metadata
-    if (partitionsWithOfflineLeader.isEmpty) {
+    // If no partition has changed leader or ISR, no UpdateMetadataRequest is 
sent through PartitionStateMachine
+    // and ReplicaStateMachine. In that case, we want to send an 
UpdateMetadataRequest explicitly to
+    // propagate the information about the new offline brokers.
+    if (newOfflineReplicasNotForDeletion.isEmpty && 
onlineStateChangeResults.values.forall(_.isLeft)) {
       
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, 
Set.empty)
     }
   }
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala 
b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index e812a14..894d0e4 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -54,7 +54,7 @@ abstract class PartitionStateMachine(controllerContext: 
ControllerContext) exten
    * This API invokes the OnlinePartition state change on all partitions in 
either the NewPartition or OfflinePartition
    * state. This is called on a successful controller election and on broker 
changes
    */
-  def triggerOnlinePartitionStateChange(): Unit = {
+  def triggerOnlinePartitionStateChange(): Map[TopicPartition, 
Either[Throwable, LeaderAndIsr]] = {
     val partitions = 
controllerContext.partitionsInStates(Set(OfflinePartition, NewPartition))
     triggerOnlineStateChangeForPartitions(partitions)
   }
@@ -64,7 +64,7 @@ abstract class PartitionStateMachine(controllerContext: 
ControllerContext) exten
     triggerOnlineStateChangeForPartitions(partitions)
   }
 
-  private def triggerOnlineStateChangeForPartitions(partitions: 
collection.Set[TopicPartition]): Unit = {
+  private def triggerOnlineStateChangeForPartitions(partitions: 
collection.Set[TopicPartition]): Map[TopicPartition, Either[Throwable, 
LeaderAndIsr]] = {
     // try to move all partitions in NewPartition or OfflinePartition state to 
OnlinePartition state except partitions
     // that belong to topics to be deleted
     val partitionsToTrigger = partitions.filter { partition =>
diff --git 
a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index 815d0c6..bdc06a7 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -221,6 +221,35 @@ class ControllerIntegrationTest extends 
ZooKeeperTestHarness {
   }
 
   @Test
+  def testMetadataPropagationOnBrokerShutdownWithNoReplicas(): Unit = {
+    servers = makeServers(3)
+    TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+    val replicaBroker = servers.filter(e => e.config.brokerId != 
controllerId).head
+
+    val controllerBroker = servers.filter(e => e.config.brokerId == 
controllerId).head
+    val otherBroker = servers.filter(e => e.config.brokerId != controllerId &&
+      e.config.brokerId != replicaBroker.config.brokerId).head
+
+    val topic = "topic1"
+    val assignment = Map(0 -> Seq(replicaBroker.config.brokerId))
+
+    // Create topic
+    TestUtils.createTopic(zkClient, topic, assignment, servers)
+
+    // Shutdown the broker with replica
+    replicaBroker.shutdown()
+    replicaBroker.awaitShutdown()
+
+    // Shutdown the other broker
+    otherBroker.shutdown()
+    otherBroker.awaitShutdown()
+
+    // The controller should be the only alive broker
+    TestUtils.waitUntilBrokerMetadataIsPropagated(Seq(controllerBroker))
+  }
+
+  @Test
   def testTopicCreation(): Unit = {
     servers = makeServers(1)
     val tp = new TopicPartition("t", 0)

Reply via email to