This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.4 by this push:
new 4064f4a7e43 KAFKA-13972; Ensure replica state deleted after
reassignment cancellation (#13107)
4064f4a7e43 is described below
commit 4064f4a7e434b7096ee506d17413fc37258384d0
Author: Jason Gustafson <[email protected]>
AuthorDate: Wed Jan 18 10:26:48 2023 -0800
KAFKA-13972; Ensure replica state deleted after reassignment cancellation
(#13107)
When a reassignment is cancelled, we need to delete the partition state of
adding replicas. Failing to do so causes "stray" replicas which take up disk
space and can cause topicId conflicts if the topic is later recreated.
Currently, this logic does not work because the leader epoch does not always
get bumped after cancellation. Without a leader epoch bump, the replica will
ignore StopReplica requests sent by the controller and the replica may remain
online.
In this patch, we fix the problem by loosening the epoch check on the
broker side when a StopReplica request is received. Instead of ignoring the
request when the request epoch matches the current epoch, the request will be
accepted.
Note, this problem only affects the ZK controller. The integration tests
added here nevertheless cover both metadata modes.
Reviewers: David Jacot <[email protected]>, Justine Olshan
<[email protected]>
---
.../main/scala/kafka/server/ReplicaManager.scala | 2 +-
.../admin/ReassignPartitionsIntegrationTest.scala | 95 ++++++++++++++++++----
.../unit/kafka/server/ReplicaManagerTest.scala | 4 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 21 +++--
4 files changed, 94 insertions(+), 28 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index b2a37479bae..069ef27d67b 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -387,7 +387,7 @@ class ReplicaManager(val config: KafkaConfig,
// epoch, a sentinel value (NoEpoch) is used and bypass the
epoch validation.
if (requestLeaderEpoch == LeaderAndIsr.EpochDuringDelete ||
requestLeaderEpoch == LeaderAndIsr.NoEpoch ||
- requestLeaderEpoch > currentLeaderEpoch) {
+ requestLeaderEpoch >= currentLeaderEpoch) {
stoppedPartitions += topicPartition -> deletePartition
// Assume that everything will go right. It is overwritten in
case of an error.
responseMap.put(topicPartition, Errors.NONE)
diff --git
a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
index 9b3b935f23e..d3a04da4d13 100644
---
a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
@@ -108,12 +108,13 @@ class ReassignPartitionsIntegrationTest extends
QuorumTestHarness {
"""{"topic":"bar","partition":0,"replicas":[3,2,0],"log_dirs":["any","any","any"]}"""
+
"""]}"""
+ val foo0 = new TopicPartition("foo", 0)
+ val bar0 = new TopicPartition("bar", 0)
+
// Check that the assignment has not yet been started yet.
val initialAssignment = Map(
- new TopicPartition("foo", 0) ->
- PartitionReassignmentState(Seq(0, 1, 2), Seq(0, 1, 3), true),
- new TopicPartition("bar", 0) ->
- PartitionReassignmentState(Seq(3, 2, 1), Seq(3, 2, 0), true)
+ foo0 -> PartitionReassignmentState(Seq(0, 1, 2), Seq(0, 1, 3), true),
+ bar0 -> PartitionReassignmentState(Seq(3, 2, 1), Seq(3, 2, 0), true)
)
waitForVerifyAssignment(cluster.adminClient, assignment, false,
VerifyAssignmentResult(initialAssignment))
@@ -122,10 +123,8 @@ class ReassignPartitionsIntegrationTest extends
QuorumTestHarness {
runExecuteAssignment(cluster.adminClient, false, assignment, -1L, -1L)
assertEquals(unthrottledBrokerConfigs,
describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet.toSeq))
val finalAssignment = Map(
- new TopicPartition("foo", 0) ->
- PartitionReassignmentState(Seq(0, 1, 3), Seq(0, 1, 3), true),
- new TopicPartition("bar", 0) ->
- PartitionReassignmentState(Seq(3, 2, 0), Seq(3, 2, 0), true)
+ foo0 -> PartitionReassignmentState(Seq(0, 1, 3), Seq(0, 1, 3), true),
+ bar0 -> PartitionReassignmentState(Seq(3, 2, 0), Seq(3, 2, 0), true)
)
val verifyAssignmentResult = runVerifyAssignment(cluster.adminClient,
assignment, false)
@@ -137,6 +136,10 @@ class ReassignPartitionsIntegrationTest extends
QuorumTestHarness {
assertEquals(unthrottledBrokerConfigs,
describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet.toSeq))
+
+ // Verify that partitions are removed from brokers no longer assigned
+ verifyReplicaDeleted(topicPartition = foo0, replicaId = 2)
+ verifyReplicaDeleted(topicPartition = bar0, replicaId = 1)
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@@ -296,10 +299,13 @@ class ReassignPartitionsIntegrationTest extends
QuorumTestHarness {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testCancellation(quorum: String): Unit = {
+ val foo0 = new TopicPartition("foo", 0)
+ val baz1 = new TopicPartition("baz", 1)
+
cluster = new ReassignPartitionsTestCluster()
cluster.setup()
- cluster.produceMessages("foo", 0, 200)
- cluster.produceMessages("baz", 1, 200)
+ cluster.produceMessages(foo0.topic, foo0.partition, 200)
+ cluster.produceMessages(baz1.topic, baz1.partition, 200)
val assignment = """{"version":1,"partitions":""" +
"""[{"topic":"foo","partition":0,"replicas":[0,1,3],"log_dirs":["any","any","any"]},"""
+
"""{"topic":"baz","partition":1,"replicas":[0,2,3],"log_dirs":["any","any","any"]}"""
+
@@ -314,14 +320,11 @@ class ReassignPartitionsIntegrationTest extends
QuorumTestHarness {
// from completing before this runs.
waitForVerifyAssignment(cluster.adminClient, assignment, true,
VerifyAssignmentResult(Map(
- new TopicPartition("foo", 0) -> PartitionReassignmentState(Seq(0, 1,
3, 2), Seq(0, 1, 3), false),
- new TopicPartition("baz", 1) -> PartitionReassignmentState(Seq(0, 2,
3, 1), Seq(0, 2, 3), false)),
+ foo0 -> PartitionReassignmentState(Seq(0, 1, 3, 2), Seq(0, 1, 3),
false),
+ baz1 -> PartitionReassignmentState(Seq(0, 2, 3, 1), Seq(0, 2, 3),
false)),
true, Map(), false))
// Cancel the reassignment.
- assertEquals((Set(
- new TopicPartition("foo", 0),
- new TopicPartition("baz", 1)
- ), Set()), runCancelAssignment(cluster.adminClient, assignment, true))
+ assertEquals((Set(foo0, baz1), Set()),
runCancelAssignment(cluster.adminClient, assignment, true))
// Broker throttles are still active because we passed --preserve-throttles
waitForInterBrokerThrottle(Set(0, 1, 2, 3), interBrokerThrottle)
// Cancelling the reassignment again should reveal nothing to cancel.
@@ -330,6 +333,62 @@ class ReassignPartitionsIntegrationTest extends
QuorumTestHarness {
waitForBrokerLevelThrottles(unthrottledBrokerConfigs)
// Verify that there are no ongoing reassignments.
assertFalse(runVerifyAssignment(cluster.adminClient, assignment,
false).partsOngoing)
+ // Verify that the partition is removed from cancelled replicas
+ verifyReplicaDeleted(topicPartition = foo0, replicaId = 3)
+ verifyReplicaDeleted(topicPartition = baz1, replicaId = 3)
+ }
+
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCancellationWithAddingReplicaInIsr(quorum: String): Unit = {
+ val foo0 = new TopicPartition("foo", 0)
+
+ cluster = new ReassignPartitionsTestCluster()
+ cluster.setup()
+ cluster.produceMessages(foo0.topic, foo0.partition, 200)
+
+ // The reassignment will bring replicas 3 and 4 into the replica set and
remove 1 and 2.
+ val assignment = """{"version":1,"partitions":""" +
+
"""[{"topic":"foo","partition":0,"replicas":[0,3,4],"log_dirs":["any","any","any"]}"""
+
+ """]}"""
+
+ // We will throttle replica 4 so that only replica 3 joins the ISR
+ TestUtils.setReplicationThrottleForPartitions(
+ cluster.adminClient,
+ brokerIds = Seq(4),
+ partitions = Set(foo0),
+ throttleBytes = 1
+ )
+
+ // Execute the assignment and wait for replica 3 (only) to join the ISR
+ runExecuteAssignment(
+ cluster.adminClient,
+ additional = false,
+ reassignmentJson = assignment
+ )
+ TestUtils.waitUntilTrue(
+ () => TestUtils.currentIsr(cluster.adminClient, foo0) == Set(0, 1, 2, 3),
+ msg = "Timed out while waiting for replica 3 to join the ISR"
+ )
+
+ // Now cancel the assignment and verify that the partition is removed from
cancelled replicas
+ assertEquals((Set(foo0), Set()), runCancelAssignment(cluster.adminClient,
assignment, preserveThrottles = true))
+ verifyReplicaDeleted(topicPartition = foo0, replicaId = 3)
+ verifyReplicaDeleted(topicPartition = foo0, replicaId = 4)
+ }
+
+ private def verifyReplicaDeleted(
+ topicPartition: TopicPartition,
+ replicaId: Int
+ ): Unit = {
+ def isReplicaStoppedAndDeleted(): Boolean = {
+ val server = cluster.servers(replicaId)
+ val partition = server.replicaManager.getPartition(topicPartition)
+ val log = server.logManager.getLog(topicPartition)
+ partition == HostedPartition.None && log.isEmpty
+ }
+ TestUtils.waitUntilTrue(isReplicaStoppedAndDeleted,
+ msg = s"Timed out waiting for replica $replicaId of $topicPartition to
be deleted")
}
private def waitForLogDirThrottle(throttledBrokers: Set[Int],
logDirThrottle: Long): Unit = {
@@ -541,8 +600,8 @@ class ReassignPartitionsIntegrationTest extends
QuorumTestHarness {
private def runExecuteAssignment(adminClient: Admin,
additional: Boolean,
reassignmentJson: String,
- interBrokerThrottle: Long,
- replicaAlterLogDirsThrottle: Long) = {
+ interBrokerThrottle: Long = -1,
+ replicaAlterLogDirsThrottle: Long = -1) = {
println(s"==> executeAssignment(adminClient, additional=${additional}, " +
s"reassignmentJson=${reassignmentJson}, " +
s"interBrokerThrottle=${interBrokerThrottle}, " +
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index e623816c397..4cdc92881ca 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -2707,7 +2707,7 @@ class ReplicaManagerTest {
@Test
def testStopReplicaWithExistingPartitionAndEqualLeaderEpoch(): Unit = {
- testStopReplicaWithExistingPartition(1, false, false,
Errors.FENCED_LEADER_EPOCH)
+ testStopReplicaWithExistingPartition(1, false, false, Errors.NONE)
}
@Test
@@ -2737,7 +2737,7 @@ class ReplicaManagerTest {
@Test
def
testStopReplicaWithDeletePartitionAndExistingPartitionAndEqualLeaderEpoch():
Unit = {
- testStopReplicaWithExistingPartition(1, true, false,
Errors.FENCED_LEADER_EPOCH)
+ testStopReplicaWithExistingPartition(1, true, false, Errors.NONE)
}
@Test
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 7110237ce4c..e21214b6f7e 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1964,16 +1964,23 @@ object TestUtils extends Logging {
)
}
+ def currentIsr(admin: Admin, partition: TopicPartition): Set[Int] = {
+ val description = admin.describeTopics(Set(partition.topic).asJava)
+ .allTopicNames
+ .get
+ .asScala
+
+ description
+ .values
+ .flatMap(_.partitions.asScala.flatMap(_.isr.asScala))
+ .map(_.id)
+ .toSet
+ }
+
def waitForBrokersInIsr(client: Admin, partition: TopicPartition, brokerIds:
Set[Int]): Unit = {
waitUntilTrue(
() => {
- val description =
client.describeTopics(Set(partition.topic).asJava).allTopicNames.get.asScala
- val isr = description
- .values
- .flatMap(_.partitions.asScala.flatMap(_.isr.asScala))
- .map(_.id)
- .toSet
-
+ val isr = currentIsr(client, partition)
brokerIds.subsetOf(isr)
},
s"Expected brokers $brokerIds to be in the ISR for $partition"