This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 4ab6c4a  KAFKA-13127; Fix stray topic partition deletion for kraft 
(#11118)
4ab6c4a is described below

commit 4ab6c4a6fd1b20e014fb2988a19a0449325b6f17
Author: Jason Gustafson <[email protected]>
AuthorDate: Fri Jul 23 15:01:39 2021 -0700

    KAFKA-13127; Fix stray topic partition deletion for kraft (#11118)
    
    This patch fixes BrokerMetadataPublisher.findGhostReplicas (renamed to 
findStrayPartitions)
    so that it returns the stray partitions. Previously it was returning the 
non-stray partitions. This
    caused all of these partitions to get deleted on startup by mistake.
    
    Reviewers: Colin P. McCabe <[email protected]>, José Armando García Sancio 
<[email protected]>
---
 core/src/main/scala/kafka/log/Log.scala            |   1 +
 .../main/scala/kafka/server/ReplicaManager.scala   |   8 +-
 .../server/metadata/BrokerMetadataPublisher.scala  |  50 +++++-----
 .../kafka/server/KRaftClusterTest.scala            |   5 +-
 .../server/AbstractApiVersionsRequestTest.scala    |   5 +-
 .../kafka/server/ClientQuotasRequestTest.scala     |   2 +
 .../kafka/server/DescribeQuorumRequestTest.scala   |   2 +
 .../metadata/BrokerMetadataPublisherTest.scala     | 105 ++++++++++++++++++++-
 8 files changed, 148 insertions(+), 30 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index 3df4e93..350200b 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1890,6 +1890,7 @@ class Log(@volatile private var _dir: File,
   override def toString: String = {
     val logString = new StringBuilder
     logString.append(s"Log(dir=$dir")
+    topicId.foreach(id => logString.append(s", topicId=$id"))
     logString.append(s", topic=${topicPartition.topic}")
     logString.append(s", partition=${topicPartition.partition}")
     logString.append(s", highWatermark=$highWatermark")
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index caae10b..6837d81 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -2201,15 +2201,15 @@ class ReplicaManager(val config: KafkaConfig,
     replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollower)
   }
 
-  def deleteGhostReplicas(topicPartitions: Iterable[TopicPartition]): Unit = {
+  def deleteStrayReplicas(topicPartitions: Iterable[TopicPartition]): Unit = {
     stopPartitions(topicPartitions.map { tp => tp -> true }.toMap).foreach {
       case (topicPartition, e) =>
         if (e.isInstanceOf[KafkaStorageException]) {
-          stateChangeLogger.error(s"Unable to delete ghost replica 
${topicPartition} because " +
+          stateChangeLogger.error(s"Unable to delete stray replica 
$topicPartition because " +
             "the local replica for the partition is in an offline log 
directory")
         } else {
-          stateChangeLogger.error(s"Unable to delete ghost replica 
${topicPartition} because " +
-            s"we got an unexpected ${e.getClass.getName} exception: 
${e.getMessage}")
+          stateChangeLogger.error(s"Unable to delete stray replica 
$topicPartition because " +
+            s"we got an unexpected ${e.getClass.getName} exception: 
${e.getMessage}", e)
         }
     }
   }
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 0389d67..f1466e1 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -26,12 +26,12 @@ import kafka.utils.Logging
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
+import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta, 
TopicsImage}
 
 import scala.collection.mutable
 
 
-object BrokerMetadataPublisher {
+object BrokerMetadataPublisher extends Logging {
   /**
    * Given a topic name, find out if it changed. Note: if a topic named X was 
deleted and
    * then re-created, this method will return just the re-creation. The 
deletion will show
@@ -56,29 +56,35 @@ object BrokerMetadataPublisher {
   /**
    * Find logs which should not be on the current broker, according to the 
metadata image.
    *
-   * @param brokerId  The ID of the current broker.
-   * @param newImage  The metadata image.
-   * @param logs      A collection of Log objects.
+   * @param brokerId        The ID of the current broker.
+   * @param newTopicsImage  The new topics image after broker has been reloaded
+   * @param logs            A collection of Log objects.
    *
    * @return          The topic partitions which are no longer needed on this 
broker.
    */
-  def findGhostReplicas(brokerId: Int,
-                        newImage: MetadataImage,
-                        logs: Iterable[Log]): Iterable[TopicPartition] = {
+  def findStrayPartitions(brokerId: Int,
+                          newTopicsImage: TopicsImage,
+                          logs: Iterable[Log]): Iterable[TopicPartition] = {
     logs.flatMap { log =>
-      log.topicId match {
-        case None => throw new RuntimeException(s"Topic ${log.name} does not 
have a topic ID, " +
+      val topicId = log.topicId.getOrElse {
+        throw new RuntimeException(s"The log dir $log does not have a topic 
ID, " +
           "which is not allowed when running in KRaft mode.")
-        case Some(topicId) =>
-          val partitionId = log.topicPartition.partition()
-          Option(newImage.topics().getPartition(topicId, partitionId)) match {
-            case None => None
-            case Some(partition) => if (partition.replicas.contains(brokerId)) 
{
-              Some(log.topicPartition)
-            } else {
-              None
-            }
+      }
+
+      val partitionId = log.topicPartition.partition()
+      Option(newTopicsImage.getPartition(topicId, partitionId)) match {
+        case Some(partition) =>
+          if (!partition.replicas.contains(brokerId)) {
+            info(s"Found stray log dir $log: the current replica assignment 
${partition.replicas} " +
+              s"does not contain the local brokerId $brokerId.")
+            Some(log.topicPartition)
+          } else {
+            None
           }
+
+        case None =>
+          info(s"Found stray log dir $log: the topicId $topicId does not exist 
in the metadata image")
+          Some(log.topicPartition)
       }
     }
   }
@@ -239,9 +245,9 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
     // Delete log directories which we're not supposed to have, according to 
the
     // latest metadata. This is only necessary to do when we're first starting 
up. If
     // we have to load a snapshot later, these topics will appear in 
deletedTopicIds.
-    val ghostReplicas = findGhostReplicas(brokerId, newImage, 
logManager.allLogs)
-    if (ghostReplicas.nonEmpty) {
-      replicaManager.deleteGhostReplicas(ghostReplicas)
+    val strayPartitions = findStrayPartitions(brokerId, newImage.topics, 
logManager.allLogs)
+    if (strayPartitions.nonEmpty) {
+      replicaManager.deleteStrayReplicas(strayPartitions)
     }
 
     // Make sure that the high water mark checkpoint thread is running for the 
replica
diff --git 
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala 
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 42872ff..180bf0b 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -29,15 +29,16 @@ import 
org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity,
 import org.apache.kafka.common.requests.{DescribeClusterRequest, 
DescribeClusterResponse}
 import org.apache.kafka.metadata.BrokerState
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{Test, Timeout}
-
+import org.junit.jupiter.api.{Tag, Test, Timeout}
 import java.util
 import java.util.{Arrays, Collections, Optional}
+
 import scala.collection.mutable
 import scala.concurrent.duration.{FiniteDuration, MILLISECONDS, SECONDS}
 import scala.jdk.CollectionConverters._
 
 @Timeout(120)
+@Tag("integration")
 class KRaftClusterTest {
 
   @Test
diff --git 
a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
index f086443..1e6c05d 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
@@ -16,6 +16,8 @@
  */
 package kafka.server
 
+import java.util.Properties
+
 import kafka.test.ClusterInstance
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion
@@ -24,10 +26,11 @@ import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.requests.{ApiVersionsRequest, 
ApiVersionsResponse, RequestUtils}
 import org.apache.kafka.common.utils.Utils
 import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Tag
 
-import java.util.Properties
 import scala.jdk.CollectionConverters._
 
+@Tag("integration")
 abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
 
   def sendApiVersionsRequest(request: ApiVersionsRequest, listenerName: 
ListenerName): ApiVersionsResponse = {
diff --git 
a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
index fc468e5..573bd95 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
@@ -32,12 +32,14 @@ import org.apache.kafka.common.internals.KafkaFutureImpl
 import org.apache.kafka.common.quota.{ClientQuotaAlteration, 
ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
 import org.apache.kafka.common.requests.{AlterClientQuotasRequest, 
AlterClientQuotasResponse, DescribeClientQuotasRequest, 
DescribeClientQuotasResponse}
 import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Tag
 import org.junit.jupiter.api.extension.ExtendWith
 
 import scala.jdk.CollectionConverters._
 
 @ClusterTestDefaults(clusterType = Type.BOTH)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@Tag("integration")
 class ClientQuotasRequestTest(cluster: ClusterInstance) {
   private val ConsumerByteRateProp = 
QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG
   private val ProducerByteRateProp = 
QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG
diff --git 
a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
index b31ac36..55b9fe9 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
@@ -26,6 +26,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.DescribeQuorumRequest.singletonRequest
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, 
ApiVersionsRequest, ApiVersionsResponse, DescribeQuorumRequest, 
DescribeQuorumResponse}
 import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Tag
 import org.junit.jupiter.api.extension.ExtendWith
 
 import scala.jdk.CollectionConverters._
@@ -33,6 +34,7 @@ import scala.reflect.ClassTag
 
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 @ClusterTestDefaults(clusterType = Type.KRAFT)
+@Tag("integration")
 class DescribeQuorumRequestTest(cluster: ClusterInstance) {
 
   @ClusterTest(clusterType = Type.ZK)
diff --git 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index eac1571..6a316e3 100644
--- 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -17,9 +17,17 @@
 
 package unit.kafka.server.metadata
 
+import kafka.log.Log
 import kafka.server.metadata.BrokerMetadataPublisher
-import org.apache.kafka.image.MetadataImageTest
+import org.apache.kafka.common.{TopicPartition, Uuid}
+import org.apache.kafka.image.{MetadataImageTest, TopicImage, TopicsImage}
+import org.apache.kafka.metadata.PartitionRegistration
 import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.Assertions.assertEquals
+
+import org.mockito.Mockito
+
+import scala.jdk.CollectionConverters._
 
 class BrokerMetadataPublisherTest {
   @Test
@@ -39,4 +47,99 @@ class BrokerMetadataPublisherTest {
       MetadataImageTest.IMAGE1,
       MetadataImageTest.DELTA1).isDefined, "Expected to see delta for changed 
topic")
   }
+
+  @Test
+  def testFindStrayReplicas(): Unit = {
+    val brokerId = 0
+
+    // Topic has been deleted
+    val deletedTopic = "a"
+    val deletedTopicId = Uuid.randomUuid()
+    val deletedTopicPartition1 = new TopicPartition(deletedTopic, 0)
+    val deletedTopicLog1 = mockLog(deletedTopicId, deletedTopicPartition1)
+    val deletedTopicPartition2 = new TopicPartition(deletedTopic, 1)
+    val deletedTopicLog2 = mockLog(deletedTopicId, deletedTopicPartition2)
+
+    // Topic was deleted and recreated
+    val recreatedTopic = "b"
+    val recreatedTopicPartition = new TopicPartition(recreatedTopic, 0)
+    val recreatedTopicLog = mockLog(Uuid.randomUuid(), recreatedTopicPartition)
+    val recreatedTopicImage = topicImage(Uuid.randomUuid(), recreatedTopic, 
Map(
+      recreatedTopicPartition.partition -> Seq(0, 1, 2)
+    ))
+
+    // Topic exists, but some partitions were reassigned
+    val reassignedTopic = "c"
+    val reassignedTopicId = Uuid.randomUuid()
+    val reassignedTopicPartition = new TopicPartition(reassignedTopic, 0)
+    val reassignedTopicLog = mockLog(reassignedTopicId, 
reassignedTopicPartition)
+    val retainedTopicPartition = new TopicPartition(reassignedTopic, 1)
+    val retainedTopicLog = mockLog(reassignedTopicId, retainedTopicPartition)
+
+    val reassignedTopicImage = topicImage(reassignedTopicId, reassignedTopic, 
Map(
+      reassignedTopicPartition.partition -> Seq(1, 2, 3),
+      retainedTopicPartition.partition -> Seq(0, 2, 3)
+    ))
+
+    val logs = Seq(
+      deletedTopicLog1,
+      deletedTopicLog2,
+      recreatedTopicLog,
+      reassignedTopicLog,
+      retainedTopicLog
+    )
+
+    val image = topicsImage(Seq(
+      recreatedTopicImage,
+      reassignedTopicImage
+    ))
+
+    val expectedStrayPartitions = Set(
+      deletedTopicPartition1,
+      deletedTopicPartition2,
+      recreatedTopicPartition,
+      reassignedTopicPartition
+    )
+
+    val strayPartitions = 
BrokerMetadataPublisher.findStrayPartitions(brokerId, image, logs).toSet
+    assertEquals(expectedStrayPartitions, strayPartitions)
+  }
+
+  private def mockLog(
+    topicId: Uuid,
+    topicPartition: TopicPartition
+  ): Log = {
+    val log = Mockito.mock(classOf[Log])
+    Mockito.when(log.topicId).thenReturn(Some(topicId))
+    Mockito.when(log.topicPartition).thenReturn(topicPartition)
+    log
+  }
+
+  private def topicImage(
+    topicId: Uuid,
+    topic: String,
+    partitions: Map[Int, Seq[Int]]
+  ): TopicImage = {
+    val partitionRegistrations = partitions.map { case (partitionId, replicas) 
=>
+      Int.box(partitionId) -> new PartitionRegistration(
+        replicas.toArray,
+        replicas.toArray,
+        Array.empty[Int],
+        Array.empty[Int],
+        replicas.head,
+        0,
+        0
+      )
+    }
+    new TopicImage(topic, topicId, partitionRegistrations.asJava)
+  }
+
+  private def topicsImage(
+    topics: Seq[TopicImage]
+  ): TopicsImage = {
+    val idsMap = topics.map(t => t.id -> t).toMap
+    val namesMap = topics.map(t => t.name -> t).toMap
+    new TopicsImage(idsMap.asJava, namesMap.asJava)
+  }
+
 }

Reply via email to