This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 4ab6c4a KAFKA-13127; Fix stray topic partition deletion for kraft
(#11118)
4ab6c4a is described below
commit 4ab6c4a6fd1b20e014fb2988a19a0449325b6f17
Author: Jason Gustafson <[email protected]>
AuthorDate: Fri Jul 23 15:01:39 2021 -0700
KAFKA-13127; Fix stray topic partition deletion for kraft (#11118)
This patch fixes BrokerMetadataPublisher.findGhostReplicas (renamed to
findStrayPartitions)
so that it returns the stray partitions. Previously it was returning the
non-stray partitions. This
caused all of these partitions to get deleted on startup by mistake.
Reviewers: Colin P. McCabe <[email protected]>, José Armando García Sancio
<[email protected]>
---
core/src/main/scala/kafka/log/Log.scala | 1 +
.../main/scala/kafka/server/ReplicaManager.scala | 8 +-
.../server/metadata/BrokerMetadataPublisher.scala | 50 +++++-----
.../kafka/server/KRaftClusterTest.scala | 5 +-
.../server/AbstractApiVersionsRequestTest.scala | 5 +-
.../kafka/server/ClientQuotasRequestTest.scala | 2 +
.../kafka/server/DescribeQuorumRequestTest.scala | 2 +
.../metadata/BrokerMetadataPublisherTest.scala | 105 ++++++++++++++++++++-
8 files changed, 148 insertions(+), 30 deletions(-)
diff --git a/core/src/main/scala/kafka/log/Log.scala
b/core/src/main/scala/kafka/log/Log.scala
index 3df4e93..350200b 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1890,6 +1890,7 @@ class Log(@volatile private var _dir: File,
override def toString: String = {
val logString = new StringBuilder
logString.append(s"Log(dir=$dir")
+ topicId.foreach(id => logString.append(s", topicId=$id"))
logString.append(s", topic=${topicPartition.topic}")
logString.append(s", partition=${topicPartition.partition}")
logString.append(s", highWatermark=$highWatermark")
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index caae10b..6837d81 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -2201,15 +2201,15 @@ class ReplicaManager(val config: KafkaConfig,
replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollower)
}
- def deleteGhostReplicas(topicPartitions: Iterable[TopicPartition]): Unit = {
+ def deleteStrayReplicas(topicPartitions: Iterable[TopicPartition]): Unit = {
stopPartitions(topicPartitions.map { tp => tp -> true }.toMap).foreach {
case (topicPartition, e) =>
if (e.isInstanceOf[KafkaStorageException]) {
- stateChangeLogger.error(s"Unable to delete ghost replica
${topicPartition} because " +
+ stateChangeLogger.error(s"Unable to delete stray replica
$topicPartition because " +
"the local replica for the partition is in an offline log
directory")
} else {
- stateChangeLogger.error(s"Unable to delete ghost replica
${topicPartition} because " +
- s"we got an unexpected ${e.getClass.getName} exception:
${e.getMessage}")
+ stateChangeLogger.error(s"Unable to delete stray replica
$topicPartition because " +
+ s"we got an unexpected ${e.getClass.getName} exception:
${e.getMessage}", e)
}
}
}
diff --git
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 0389d67..f1466e1 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -26,12 +26,12 @@ import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
+import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta,
TopicsImage}
import scala.collection.mutable
-object BrokerMetadataPublisher {
+object BrokerMetadataPublisher extends Logging {
/**
* Given a topic name, find out if it changed. Note: if a topic named X was
deleted and
* then re-created, this method will return just the re-creation. The
deletion will show
@@ -56,29 +56,35 @@ object BrokerMetadataPublisher {
/**
* Find logs which should not be on the current broker, according to the
metadata image.
*
- * @param brokerId The ID of the current broker.
- * @param newImage The metadata image.
- * @param logs A collection of Log objects.
+ * @param brokerId The ID of the current broker.
+ * @param newTopicsImage The new topics image after broker has been reloaded
+ * @param logs A collection of Log objects.
*
* @return The topic partitions which are no longer needed on this
broker.
*/
- def findGhostReplicas(brokerId: Int,
- newImage: MetadataImage,
- logs: Iterable[Log]): Iterable[TopicPartition] = {
+ def findStrayPartitions(brokerId: Int,
+ newTopicsImage: TopicsImage,
+ logs: Iterable[Log]): Iterable[TopicPartition] = {
logs.flatMap { log =>
- log.topicId match {
- case None => throw new RuntimeException(s"Topic ${log.name} does not
have a topic ID, " +
+ val topicId = log.topicId.getOrElse {
+ throw new RuntimeException(s"The log dir $log does not have a topic
ID, " +
"which is not allowed when running in KRaft mode.")
- case Some(topicId) =>
- val partitionId = log.topicPartition.partition()
- Option(newImage.topics().getPartition(topicId, partitionId)) match {
- case None => None
- case Some(partition) => if (partition.replicas.contains(brokerId))
{
- Some(log.topicPartition)
- } else {
- None
- }
+ }
+
+ val partitionId = log.topicPartition.partition()
+ Option(newTopicsImage.getPartition(topicId, partitionId)) match {
+ case Some(partition) =>
+ if (!partition.replicas.contains(brokerId)) {
+ info(s"Found stray log dir $log: the current replica assignment
${partition.replicas} " +
+ s"does not contain the local brokerId $brokerId.")
+ Some(log.topicPartition)
+ } else {
+ None
}
+
+ case None =>
+ info(s"Found stray log dir $log: the topicId $topicId does not exist
in the metadata image")
+ Some(log.topicPartition)
}
}
}
@@ -239,9 +245,9 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
// Delete log directories which we're not supposed to have, according to
the
// latest metadata. This is only necessary to do when we're first starting
up. If
// we have to load a snapshot later, these topics will appear in
deletedTopicIds.
- val ghostReplicas = findGhostReplicas(brokerId, newImage,
logManager.allLogs)
- if (ghostReplicas.nonEmpty) {
- replicaManager.deleteGhostReplicas(ghostReplicas)
+ val strayPartitions = findStrayPartitions(brokerId, newImage.topics,
logManager.allLogs)
+ if (strayPartitions.nonEmpty) {
+ replicaManager.deleteStrayReplicas(strayPartitions)
}
// Make sure that the high water mark checkpoint thread is running for the
replica
diff --git
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 42872ff..180bf0b 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -29,15 +29,16 @@ import
org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity,
import org.apache.kafka.common.requests.{DescribeClusterRequest,
DescribeClusterResponse}
import org.apache.kafka.metadata.BrokerState
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{Test, Timeout}
-
+import org.junit.jupiter.api.{Tag, Test, Timeout}
import java.util
import java.util.{Arrays, Collections, Optional}
+
import scala.collection.mutable
import scala.concurrent.duration.{FiniteDuration, MILLISECONDS, SECONDS}
import scala.jdk.CollectionConverters._
@Timeout(120)
+@Tag("integration")
class KRaftClusterTest {
@Test
diff --git
a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
index f086443..1e6c05d 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
@@ -16,6 +16,8 @@
*/
package kafka.server
+import java.util.Properties
+
import kafka.test.ClusterInstance
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion
@@ -24,10 +26,11 @@ import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{ApiVersionsRequest,
ApiVersionsResponse, RequestUtils}
import org.apache.kafka.common.utils.Utils
import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Tag
-import java.util.Properties
import scala.jdk.CollectionConverters._
+@Tag("integration")
abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
def sendApiVersionsRequest(request: ApiVersionsRequest, listenerName:
ListenerName): ApiVersionsResponse = {
diff --git
a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
index fc468e5..573bd95 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
@@ -32,12 +32,14 @@ import org.apache.kafka.common.internals.KafkaFutureImpl
import org.apache.kafka.common.quota.{ClientQuotaAlteration,
ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
import org.apache.kafka.common.requests.{AlterClientQuotasRequest,
AlterClientQuotasResponse, DescribeClientQuotasRequest,
DescribeClientQuotasResponse}
import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Tag
import org.junit.jupiter.api.extension.ExtendWith
import scala.jdk.CollectionConverters._
@ClusterTestDefaults(clusterType = Type.BOTH)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@Tag("integration")
class ClientQuotasRequestTest(cluster: ClusterInstance) {
private val ConsumerByteRateProp =
QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG
private val ProducerByteRateProp =
QuotaConfigs.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG
diff --git
a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
index b31ac36..55b9fe9 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
@@ -26,6 +26,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.DescribeQuorumRequest.singletonRequest
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse,
ApiVersionsRequest, ApiVersionsResponse, DescribeQuorumRequest,
DescribeQuorumResponse}
import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Tag
import org.junit.jupiter.api.extension.ExtendWith
import scala.jdk.CollectionConverters._
@@ -33,6 +34,7 @@ import scala.reflect.ClassTag
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@ClusterTestDefaults(clusterType = Type.KRAFT)
+@Tag("integration")
class DescribeQuorumRequestTest(cluster: ClusterInstance) {
@ClusterTest(clusterType = Type.ZK)
diff --git
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index eac1571..6a316e3 100644
---
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -17,9 +17,17 @@
package unit.kafka.server.metadata
+import kafka.log.Log
import kafka.server.metadata.BrokerMetadataPublisher
-import org.apache.kafka.image.MetadataImageTest
+import org.apache.kafka.common.{TopicPartition, Uuid}
+import org.apache.kafka.image.{MetadataImageTest, TopicImage, TopicsImage}
+import org.apache.kafka.metadata.PartitionRegistration
import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.Assertions.assertEquals
+
+import org.mockito.Mockito
+
+import scala.jdk.CollectionConverters._
class BrokerMetadataPublisherTest {
@Test
@@ -39,4 +47,99 @@ class BrokerMetadataPublisherTest {
MetadataImageTest.IMAGE1,
MetadataImageTest.DELTA1).isDefined, "Expected to see delta for changed
topic")
}
+
+ @Test
+ def testFindStrayReplicas(): Unit = {
+ val brokerId = 0
+
+ // Topic has been deleted
+ val deletedTopic = "a"
+ val deletedTopicId = Uuid.randomUuid()
+ val deletedTopicPartition1 = new TopicPartition(deletedTopic, 0)
+ val deletedTopicLog1 = mockLog(deletedTopicId, deletedTopicPartition1)
+ val deletedTopicPartition2 = new TopicPartition(deletedTopic, 1)
+ val deletedTopicLog2 = mockLog(deletedTopicId, deletedTopicPartition2)
+
+ // Topic was deleted and recreated
+ val recreatedTopic = "b"
+ val recreatedTopicPartition = new TopicPartition(recreatedTopic, 0)
+ val recreatedTopicLog = mockLog(Uuid.randomUuid(), recreatedTopicPartition)
+ val recreatedTopicImage = topicImage(Uuid.randomUuid(), recreatedTopic,
Map(
+ recreatedTopicPartition.partition -> Seq(0, 1, 2)
+ ))
+
+ // Topic exists, but some partitions were reassigned
+ val reassignedTopic = "c"
+ val reassignedTopicId = Uuid.randomUuid()
+ val reassignedTopicPartition = new TopicPartition(reassignedTopic, 0)
+ val reassignedTopicLog = mockLog(reassignedTopicId,
reassignedTopicPartition)
+ val retainedTopicPartition = new TopicPartition(reassignedTopic, 1)
+ val retainedTopicLog = mockLog(reassignedTopicId, retainedTopicPartition)
+
+ val reassignedTopicImage = topicImage(reassignedTopicId, reassignedTopic,
Map(
+ reassignedTopicPartition.partition -> Seq(1, 2, 3),
+ retainedTopicPartition.partition -> Seq(0, 2, 3)
+ ))
+
+ val logs = Seq(
+ deletedTopicLog1,
+ deletedTopicLog2,
+ recreatedTopicLog,
+ reassignedTopicLog,
+ retainedTopicLog
+ )
+
+ val image = topicsImage(Seq(
+ recreatedTopicImage,
+ reassignedTopicImage
+ ))
+
+ val expectedStrayPartitions = Set(
+ deletedTopicPartition1,
+ deletedTopicPartition2,
+ recreatedTopicPartition,
+ reassignedTopicPartition
+ )
+
+ val strayPartitions =
BrokerMetadataPublisher.findStrayPartitions(brokerId, image, logs).toSet
+ assertEquals(expectedStrayPartitions, strayPartitions)
+ }
+
+ private def mockLog(
+ topicId: Uuid,
+ topicPartition: TopicPartition
+ ): Log = {
+ val log = Mockito.mock(classOf[Log])
+ Mockito.when(log.topicId).thenReturn(Some(topicId))
+ Mockito.when(log.topicPartition).thenReturn(topicPartition)
+ log
+ }
+
+ private def topicImage(
+ topicId: Uuid,
+ topic: String,
+ partitions: Map[Int, Seq[Int]]
+ ): TopicImage = {
+ val partitionRegistrations = partitions.map { case (partitionId, replicas)
=>
+ Int.box(partitionId) -> new PartitionRegistration(
+ replicas.toArray,
+ replicas.toArray,
+ Array.empty[Int],
+ Array.empty[Int],
+ replicas.head,
+ 0,
+ 0
+ )
+ }
+ new TopicImage(topic, topicId, partitionRegistrations.asJava)
+ }
+
+ private def topicsImage(
+ topics: Seq[TopicImage]
+ ): TopicsImage = {
+ val idsMap = topics.map(t => t.id -> t).toMap
+ val namesMap = topics.map(t => t.name -> t).toMap
+ new TopicsImage(idsMap.asJava, namesMap.asJava)
+ }
+
}