This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new 569a6f9 KAFKA-9038; Allow creating partitions while a reassignment is
in progress (#7582)
569a6f9 is described below
commit 569a6f9cecb8eff8a9ab0811aea2bb070d11c2db
Author: NIkhil Bhatia <[email protected]>
AuthorDate: Thu Oct 24 23:40:02 2019 -0700
KAFKA-9038; Allow creating partitions while a reassignment is in progress
(#7582)
Prior to this patch, partition creation would not be allowed for any topic
while a reassignment is in progress. The PR makes this a topic level check. As
long as a particular topic is not being reassigned, we allow partitions to be
increased.
Reviewers: Stanislav Kozlovski <[email protected]>, Jason
Gustafson <[email protected]>
---
.../src/main/scala/kafka/server/AdminManager.scala | 14 ++---
.../admin/ReassignPartitionsClusterTest.scala | 72 ++++++++++++++++++++--
2 files changed, 74 insertions(+), 12 deletions(-)
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala
b/core/src/main/scala/kafka/server/AdminManager.scala
index 5de1bcd..d9a1732 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -259,20 +259,20 @@ class AdminManager(val config: KafkaConfig,
listenerName: ListenerName,
callback: Map[String, ApiError] => Unit): Unit = {
- val reassignPartitionsInProgress = zkClient.reassignPartitionsInProgress
val allBrokers = adminZkClient.getBrokerMetadatas()
val allBrokerIds = allBrokers.map(_.id)
// 1. map over topics creating assignment and calling AdminUtils
val metadata = newPartitions.map { case (topic, newPartition) =>
try {
- // We prevent addition partitions while a reassignment is in progress,
since
- // during reassignment there is no meaningful notion of replication
factor
- if (reassignPartitionsInProgress)
- throw new ReassignmentInProgressException("A partition reassignment
is in progress.")
-
val existingAssignment =
zkClient.getFullReplicaAssignmentForTopics(immutable.Set(topic)).map {
- case (topicPartition, assignment) => topicPartition.partition ->
assignment
+ case (topicPartition, assignment) =>
+ if (assignment.isBeingReassigned) {
+ // We prevent adding partitions while topic reassignment is in
progress, to protect from a race condition
+ // between the controller thread processing reassignment update
and createPartitions(this) request.
+ throw new ReassignmentInProgressException(s"A partition
reassignment is in progress for the topic '$topic'.")
+ }
+ topicPartition.partition -> assignment
}
if (existingAssignment.isEmpty)
throw new UnknownTopicOrPartitionException(s"The topic '$topic' does
not exist.")
diff --git
a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index 6ee2f53..5331559 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -12,8 +12,6 @@
*/
package kafka.admin
-import java.util.{Collections, Properties}
-
import kafka.admin.ReassignPartitionsCommand._
import kafka.common.AdminCommandFailedException
import kafka.server.{DynamicConfig, KafkaConfig, KafkaServer}
@@ -23,19 +21,22 @@ import kafka.zk.{ReassignPartitionsZNode, ZkVersion,
ZooKeeperTestHarness}
import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
import org.junit.{After, Before, Test}
import kafka.admin.ReplicationQuotaUtils._
-import org.apache.kafka.clients.admin.{Admin, AdminClientConfig,
AlterConfigOp, ConfigEntry, NewPartitionReassignment, PartitionReassignment,
AdminClient => JAdminClient}
+import org.apache.kafka.clients.admin.{Admin, AdminClientConfig,
AlterConfigOp, ConfigEntry, NewPartitionReassignment, NewPartitions,
PartitionReassignment, AdminClient => JAdminClient}
import org.apache.kafka.common.{TopicPartition, TopicPartitionReplica}
import scala.collection.JavaConverters._
import scala.collection.{Map, Seq}
import scala.util.Random
import java.io.File
+import java.util.{Collections, Properties}
+import java.util.concurrent.ExecutionException
import kafka.controller.ReplicaAssignment
import kafka.log.LogConfig
-import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.errors.NoReassignmentInProgressException
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.errors.{NoReassignmentInProgressException,
ReassignmentInProgressException}
+import org.scalatest.Assertions.intercept
class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
var servers: Seq[KafkaServer] = null
@@ -1121,6 +1122,67 @@ class ReassignPartitionsClusterTest extends
ZooKeeperTestHarness with Logging {
}
/**
+ * Verifies that partitions can be created for topics not in reassignment
and for the topics that are in reassignment
+ * an ReassignmentInProgressException should be thrown. The test creates two
topics `topicName` and `otherTopicName`,
+ * the `topicName` topic undergoes partition reassignment and the test
validates that during reassignment createPartitions
+ * call throws ReassignmentInProgressException `topicName` topic and for
topic `otherTopicName` which is not being reassigned
+ * successfully creates partitions. Further validates that after the
reassignment is complete for topic `topicName`
+ * createPartition is successful for that topic.
+ */
+ @Test
+ def shouldCreatePartitionsForTopicNotInReassignment(): Unit = {
+ startBrokers(Seq(100, 101))
+ val otherTopicName = "anyTopic"
+ val otp0 = new TopicPartition(otherTopicName, 0)
+ val otp1 = new TopicPartition(otherTopicName, 1)
+ adminClient = createAdminClient(servers)
+ createTopic(zkClient, topicName,
+ Map(otp0.partition() -> Seq(100),
+ otp1.partition() -> Seq(100)),
+ servers = servers)
+ createTopic(zkClient, otherTopicName,
+ Map(tp0.partition() -> Seq(100),
+ tp1.partition() -> Seq(100)),
+ servers = servers)
+
+ // Throttle to avoid race conditions
+ throttle(Seq(topicName), throttleSettingForSeconds(10), Map(
+ tp0 -> Seq(100, 101),
+ tp1 -> Seq(100, 101)
+ ))
+
+ // Alter `topicName` partition reassignment
+ adminClient.alterPartitionReassignments(
+ Map(reassignmentEntry(tp0, Seq(101)),
+ reassignmentEntry(tp1, Seq(101))).asJava
+ ).all().get()
+ waitUntilTrue(() => {
+ !adminClient.listPartitionReassignments().reassignments().get().isEmpty
+ }, "Controller should have picked up reassignment", 1000)
+
+ def testCreatePartitions(topicName: String, isTopicBeingReassigned:
Boolean): Unit = {
+ if (isTopicBeingReassigned)
+ assertTrue("createPartitions for topic under reassignment should throw
an exception", intercept[ExecutionException](
+ adminClient.createPartitions(Map(topicName ->
NewPartitions.increaseTo(4)).asJava).values.get(topicName).get()).
+ getCause.isInstanceOf[ReassignmentInProgressException])
+ else
+ adminClient.createPartitions(Map(topicName ->
NewPartitions.increaseTo(4)).asJava).values.get(topicName).get()
+ }
+
+ // Test case: createPartitions throws ReassignmentInProgressException
Topics with partitions in reassignment.
+ testCreatePartitions(topicName, true)
+ // Test case: createPartitions is successful for Topics with partitions
NOT in reassignment.
+ testCreatePartitions(otherTopicName, false)
+
+ // complete reassignment
+ resetBrokersThrottle()
+ waitForAllReassignmentsToComplete()
+
+ // Test case: createPartitions is successful for Topics with partitions
after reassignment has completed.
+ testCreatePartitions(topicName, false)
+ }
+
+ /**
* Asserts that a replica is being reassigned from the given replicas to
the target replicas
*/
def assertIsReassigning(from: Seq[Int], to: Seq[Int], reassignment:
PartitionReassignment): Unit = {