This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 341db99  KAFKA-6650: Allowing transition to OfflineReplica state for 
replicas without leadership info (#4825)
341db99 is described below

commit 341db990dc4e2acb207622c7fa254f07650742bc
Author: gitlw <lucasatu...@gmail.com>
AuthorDate: Mon Apr 16 17:16:08 2018 -0700

    KAFKA-6650: Allowing transition to OfflineReplica state for replicas 
without leadership info (#4825)
    
    A partially deleted topic can end up with some partitions having no 
leadership info.
    For the partially deleted topic, a new controller should be able to finish 
the topic deletion
    by transitioning the rogue partition's replicas to OfflineReplica state.
    This patch adds logic to transition replicas to OfflineReplica state whose 
partitions have
    no leadership info.
    
    Added a new test method to cover the partially deleted topic case.
    
    Reviewers: Jun Rao <jun...@gmail.com>
---
 .../kafka/controller/ReplicaStateMachine.scala     | 11 +++++-
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   |  2 +-
 .../scala/unit/kafka/admin/DeleteTopicTest.scala   | 43 +++++++++++++++++++---
 .../kafka/controller/ReplicaStateMachineTest.scala |  2 +-
 4 files changed, 48 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 
b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index a2d04e6..5fafcc4 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -202,8 +202,10 @@ class ReplicaStateMachine(config: KafkaConfig,
           
controllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(replicaId), 
replica.topicPartition,
             deletePartition = false, (_, _) => ())
         }
-        val replicasToRemoveFromIsr = validReplicas.filter(replica => 
controllerContext.partitionLeadershipInfo.contains(replica.topicPartition))
-        val updatedLeaderIsrAndControllerEpochs = 
removeReplicasFromIsr(replicaId, replicasToRemoveFromIsr.map(_.topicPartition))
+        val (replicasWithLeadershipInfo, replicasWithoutLeadershipInfo) = 
validReplicas.partition { replica =>
+          
controllerContext.partitionLeadershipInfo.contains(replica.topicPartition)
+        }
+        val updatedLeaderIsrAndControllerEpochs = 
removeReplicasFromIsr(replicaId, 
replicasWithLeadershipInfo.map(_.topicPartition))
         updatedLeaderIsrAndControllerEpochs.foreach { case (partition, 
leaderIsrAndControllerEpoch) =>
           if (!topicDeletionManager.isPartitionToBeDeleted(partition)) {
             val recipients = 
controllerContext.partitionReplicaAssignment(partition).filterNot(_ == 
replicaId)
@@ -216,6 +218,11 @@ class ReplicaStateMachine(config: KafkaConfig,
           logSuccessfulTransition(replicaId, partition, replicaState(replica), 
OfflineReplica)
           replicaState.put(replica, OfflineReplica)
         }
+
+        replicasWithoutLeadershipInfo.foreach { replica =>
+          logSuccessfulTransition(replicaId, replica.topicPartition, 
replicaState(replica), OfflineReplica)
+          replicaState.put(replica, OfflineReplica)
+        }
       case ReplicaDeletionStarted =>
         validReplicas.foreach { replica =>
           logSuccessfulTransition(replicaId, replica.topicPartition, 
replicaState(replica), ReplicaDeletionStarted)
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala 
b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 9b58fc7..a65128a 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -1370,7 +1370,7 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
    * @return true if path gets deleted successfully, false if root path 
doesn't exist
    * @throws KeeperException if there is an error while deleting the znodes
    */
-  private[zk] def deleteRecursive(path: String): Boolean = {
+  def deleteRecursive(path: String): Boolean = {
     val getChildrenResponse = 
retryRequestUntilConnected(GetChildrenRequest(path))
     getChildrenResponse.resultCode match {
       case Code.OK =>
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala 
b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index ef455d4..4c033c4 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -17,7 +17,7 @@
 package kafka.admin
 
 import kafka.log.Log
-import kafka.zk.ZooKeeperTestHarness
+import kafka.zk.{TopicPartitionZNode, ZooKeeperTestHarness}
 import kafka.utils.TestUtils
 import kafka.server.{KafkaConfig, KafkaServer}
 import org.junit.Assert._
@@ -326,7 +326,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     brokerConfigs.head.setProperty("log.segment.bytes","100")
     brokerConfigs.head.setProperty("log.cleaner.dedupe.buffer.size","1048577")
 
-    servers = createTestTopicAndCluster(topic,brokerConfigs)
+    servers = createTestTopicAndCluster(topic, brokerConfigs, 
expectedReplicaAssignment)
 
     // for simplicity, we are validating cleaner offsets on a single broker
     val server = servers.head
@@ -363,18 +363,18 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
   }
 
-  private def createTestTopicAndCluster(topic: String, deleteTopicEnabled: 
Boolean = true): Seq[KafkaServer] = {
+  private def createTestTopicAndCluster(topic: String, deleteTopicEnabled: 
Boolean = true, replicaAssignment: Map[Int, List[Int]] = 
expectedReplicaAssignment): Seq[KafkaServer] = {
     val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, 
enableControlledShutdown = false)
     brokerConfigs.foreach(_.setProperty("delete.topic.enable", 
deleteTopicEnabled.toString))
-    createTestTopicAndCluster(topic, brokerConfigs)
+    createTestTopicAndCluster(topic, brokerConfigs, replicaAssignment)
   }
 
-  private def createTestTopicAndCluster(topic: String, brokerConfigs: 
Seq[Properties]): Seq[KafkaServer] = {
+  private def createTestTopicAndCluster(topic: String, brokerConfigs: 
Seq[Properties], replicaAssignment: Map[Int, List[Int]]): Seq[KafkaServer] = {
     val topicPartition = new TopicPartition(topic, 0)
     // create brokers
     val servers = brokerConfigs.map(b => 
TestUtils.createServer(KafkaConfig.fromProps(b)))
     // create the topic
-    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, 
expectedReplicaAssignment)
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, 
replicaAssignment)
     // wait until replica log is created on every broker
     TestUtils.waitUntilTrue(() => 
servers.forall(_.getLogManager().getLog(topicPartition).isDefined),
       "Replicas for topic test not created")
@@ -408,4 +408,35 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     val leaderIdOpt = zkClient.getLeaderForPartition(new TopicPartition(topic, 
0))
     assertTrue("Leader should exist for topic test", leaderIdOpt.isDefined)
   }
+
+  @Test
+  def testDeletingPartiallyDeletedTopic() {
+    /**
+      * A previous controller could have deleted some partitions of a topic 
from ZK, but not all partitions, and then crashed.
+      * In that case, the new controller should be able to handle the 
partially deleted topic, and finish the deletion.
+      */
+
+    val replicaAssignment = Map(0 -> List(0, 1, 2), 1 -> List(0, 1, 2))
+    val topic = "test"
+    servers = createTestTopicAndCluster(topic, true, replicaAssignment)
+
+    /**
+      * shutdown all brokers in order to create a partially deleted topic on ZK
+      */
+    servers.foreach(_.shutdown())
+
+    /**
+      * delete the partition znode at /brokers/topics/test/partition/0
+      * to simulate the case that a previous controller crashed right after 
deleting the partition znode
+      */
+    zkClient.deleteRecursive(TopicPartitionZNode.path(new 
TopicPartition(topic, 0)))
+    adminZkClient.deleteTopic(topic)
+
+    /**
+      * start up all brokers and verify that topic deletion eventually 
finishes.
+      */
+    servers.foreach(_.startup())
+    TestUtils.waitUntilTrue(() => servers.exists(_.kafkaController.isActive), 
"No controller is elected")
+    TestUtils.verifyTopicDeletion(zkClient, topic, 2, servers)
+  }
 }
diff --git 
a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala 
b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
index 6a961a5..14d2df2 100644
--- a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
@@ -119,7 +119,7 @@ class ReplicaStateMachineTest extends JUnitSuite {
     EasyMock.replay(mockControllerBrokerRequestBatch)
     replicaStateMachine.handleStateChanges(replicas, OfflineReplica)
     EasyMock.verify(mockControllerBrokerRequestBatch)
-    assertEquals(NewReplica, replicaState(replica))
+    assertEquals(OfflineReplica, replicaState(replica))
   }
 
   @Test

-- 
To stop receiving notification emails like this one, please contact
jun...@apache.org.

Reply via email to