This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 87aa8259dd KAFKA-13743: Prevent topics with conflicting metrics names
from being created in KRaft mode #11910
87aa8259dd is described below
commit 87aa8259ddf5d4e5ed75aa41c4ba2ad65e2624a6
Author: dengziming <[email protected]>
AuthorDate: Thu Mar 17 16:25:45 2022 +0800
KAFKA-13743: Prevent topics with conflicting metrics names from being
created in KRaft mode #11910
In ZK mode, the topic "foo_bar" will conflict with "foo.bar" because of
limitations in metric
names. We should implement this in KRaft mode. This PR also changes
TopicCommandIntegrationTest to
support KRaft mode.
Reviewers: Colin P. McCabe <[email protected]>
---
.../org/apache/kafka/common/internals/Topic.java | 13 +-
.../apache/kafka/common/internals/TopicTest.java | 9 +
.../kafka/admin/TopicCommandIntegrationTest.scala | 306 +++++++++++++--------
.../controller/ReplicationControlManager.java | 51 +++-
.../controller/ReplicationControlManagerTest.java | 21 +-
5 files changed, 278 insertions(+), 122 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
index 7a5fefb3d9..3c93ef87b5 100644
--- a/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
+++ b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
@@ -67,6 +67,17 @@ public class Topic {
return topic.contains("_") || topic.contains(".");
}
+ /**
+ * Unify topic name with a period ('.') or underscore ('_'), this is only
used to check collision and will not
+ * be used to really change topic name.
+ *
+ * @param topic A topic to unify
+ * @return A unified topic name
+ */
+ public static String unifyCollisionChars(String topic) {
+ return topic.replace('.', '_');
+ }
+
/**
* Returns true if the topicNames collide due to a period ('.') or
underscore ('_') in the same position.
*
@@ -75,7 +86,7 @@ public class Topic {
* @return true if the topics collide
*/
public static boolean hasCollision(String topicA, String topicB) {
- return topicA.replace('.', '_').equals(topicB.replace('.', '_'));
+ return unifyCollisionChars(topicA).equals(unifyCollisionChars(topicB));
}
/**
diff --git
a/clients/src/test/java/org/apache/kafka/common/internals/TopicTest.java
b/clients/src/test/java/org/apache/kafka/common/internals/TopicTest.java
index 9bf237fb1b..03c0811fa4 100644
--- a/clients/src/test/java/org/apache/kafka/common/internals/TopicTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/internals/TopicTest.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -81,6 +82,14 @@ public class TopicTest {
assertTrue(Topic.hasCollisionChars(topic));
}
+ @Test
+ public void testUnifyCollisionChars() {
+ assertEquals("topic", Topic.unifyCollisionChars("topic"));
+ assertEquals("_topic", Topic.unifyCollisionChars(".topic"));
+ assertEquals("_topic", Topic.unifyCollisionChars("_topic"));
+ assertEquals("__topic", Topic.unifyCollisionChars("_.topic"));
+ }
+
@Test
public void testTopicHasCollision() {
List<String> periodFirstMiddleLastNone = Arrays.asList(".topic",
"to.pic", "topic.", "topic");
diff --git
a/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala
b/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala
index 9a1fe378f6..ee7e64957e 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandIntegrationTest.scala
@@ -17,23 +17,24 @@
package kafka.admin
import java.util.{Collection, Collections, Optional, Properties}
-
import kafka.admin.TopicCommand.{TopicCommandOptions, TopicService}
import kafka.integration.KafkaServerTestHarness
-import kafka.server.{ConfigType, KafkaConfig}
-import kafka.utils.{Logging, TestUtils}
+import kafka.server.KafkaConfig
+import kafka.utils.{Logging, TestInfoUtils, TestUtils}
import kafka.zk.{ConfigEntityChangeNotificationZNode, DeleteTopicsTopicZNode}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.config.{ConfigException, ConfigResource,
TopicConfig}
-import org.apache.kafka.common.errors.{ClusterAuthorizationException,
ThrottlingQuotaExceededException, TopicExistsException}
+import org.apache.kafka.common.errors.{ClusterAuthorizationException,
InvalidTopicException, ThrottlingQuotaExceededException, TopicExistsException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.{Node, TopicPartition, TopicPartitionInfo}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import org.mockito.ArgumentMatcher
import org.mockito.ArgumentMatchers.{eq => eqThat, _}
import org.mockito.Mockito._
@@ -54,7 +55,7 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
*/
override def generateConfigs: Seq[KafkaConfig] =
TestUtils.createBrokerConfigs(
numConfigs = 6,
- zkConnect = zkConnect,
+ zkConnect = zkConnectOrNull,
rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4
-> "rack3", 5 -> "rack3"),
numPartitions = numPartitions,
defaultReplicationFactor = defaultReplicationFactor,
@@ -76,7 +77,7 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
}
private[this] def waitForTopicCreated(topicName: String, timeout: Int =
10000): Unit = {
- TestUtils.waitForPartitionMetadata(servers, topicName, partition = 0,
timeout)
+ TestUtils.waitForPartitionMetadata(brokers, topicName, partition = 0,
timeout)
}
@BeforeEach
@@ -98,16 +99,18 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
topicService.close()
}
- @Test
- def testCreate(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCreate(quorum: String): Unit = {
createAndWaitTopic(new TopicCommandOptions(
Array("--partitions", "2", "--replication-factor", "1", "--topic",
testTopicName)))
adminClient.listTopics().names().get().contains(testTopicName)
}
- @Test
- def testCreateWithDefaults(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCreateWithDefaults(quorum: String): Unit = {
createAndWaitTopic(new TopicCommandOptions(Array("--topic",
testTopicName)))
val partitions = adminClient
@@ -120,8 +123,9 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
assertEquals(partitions.get(0).replicas().size(), defaultReplicationFactor)
}
- @Test
- def testCreateWithDefaultReplication(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCreateWithDefaultReplication(quorum: String): Unit = {
createAndWaitTopic(new TopicCommandOptions(
Array("--topic", testTopicName, "--partitions", "2")))
@@ -135,8 +139,9 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
assertEquals(partitions.get(0).replicas().size(), defaultReplicationFactor)
}
- @Test
- def testCreateWithDefaultPartitions(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCreateWithDefaultPartitions(quorum: String): Unit = {
createAndWaitTopic(new TopicCommandOptions(
Array("--topic", testTopicName, "--replication-factor", "2")))
@@ -151,8 +156,9 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
assertEquals(partitions.get(0).replicas().size(), 2)
}
- @Test
- def testCreateWithConfigs(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCreateWithConfigs(quorum: String): Unit = {
val configResource = new ConfigResource(ConfigResource.Type.TOPIC,
testTopicName)
createAndWaitTopic(new TopicCommandOptions(
Array("--partitions", "2", "--replication-factor", "2", "--topic",
testTopicName, "--config", "delete.retention.ms=1000")))
@@ -163,8 +169,9 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
assertEquals(1000,
Integer.valueOf(configs.get("delete.retention.ms").value()))
}
- @Test
- def testCreateWhenAlreadyExists(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCreateWhenAlreadyExists(quorum: String): Unit = {
val numPartitions = 1
// create the topic
@@ -176,15 +183,17 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
assertThrows(classOf[TopicExistsException], () =>
topicService.createTopic(createOpts))
}
- @Test
- def testCreateWhenAlreadyExistsWithIfNotExists(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCreateWhenAlreadyExistsWithIfNotExists(quorum: String): Unit = {
val createOpts = new TopicCommandOptions(Array("--topic", testTopicName,
"--if-not-exists"))
createAndWaitTopic(createOpts)
topicService.createTopic(createOpts)
}
- @Test
- def testCreateWithReplicaAssignment(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCreateWithReplicaAssignment(quorum: String): Unit = {
// create the topic
val createOpts = new TopicCommandOptions(
Array("--replica-assignment", "5:4,3:2,1:0", "--topic", testTopicName))
@@ -202,37 +211,42 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
assertEquals(List(1, 0), partitions.get(2).replicas().asScala.map(_.id()))
}
- @Test
- def testCreateWithInvalidReplicationFactor(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCreateWithInvalidReplicationFactor(quorum: String): Unit = {
assertThrows(classOf[IllegalArgumentException],
() => topicService.createTopic(new TopicCommandOptions(
Array("--partitions", "2", "--replication-factor",
(Short.MaxValue+1).toString, "--topic", testTopicName))))
}
- @Test
- def testCreateWithNegativeReplicationFactor(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCreateWithNegativeReplicationFactor(quorum: String): Unit = {
assertThrows(classOf[IllegalArgumentException],
() => topicService.createTopic(new TopicCommandOptions(
Array("--partitions", "2", "--replication-factor", "-1", "--topic",
testTopicName))))
}
- @Test
- def testCreateWithNegativePartitionCount(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCreateWithNegativePartitionCount(quorum: String): Unit = {
assertThrows(classOf[IllegalArgumentException],
() => topicService.createTopic(new TopicCommandOptions(
Array("--partitions", "-1", "--replication-factor", "1", "--topic",
testTopicName))))
}
- @Test
- def testInvalidTopicLevelConfig(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testInvalidTopicLevelConfig(quorum: String): Unit = {
val createOpts = new TopicCommandOptions(
Array("--partitions", "1", "--replication-factor", "1", "--topic",
testTopicName,
"--config", "message.timestamp.type=boom"))
assertThrows(classOf[ConfigException], () =>
topicService.createTopic(createOpts))
}
- @Test
- def testListTopics(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testListTopics(quorum: String): Unit = {
createAndWaitTopic(new TopicCommandOptions(
Array("--partitions", "1", "--replication-factor", "1", "--topic",
testTopicName)))
@@ -242,8 +256,9 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
assertTrue(output.contains(testTopicName))
}
- @Test
- def testListTopicsWithIncludeList(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testListTopicsWithIncludeList(quorum: String): Unit = {
val topic1 = "kafka.testTopic1"
val topic2 = "kafka.testTopic2"
val topic3 = "oooof.testTopic1"
@@ -264,8 +279,9 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
assertFalse(output.contains(topic3))
}
- @Test
- def testListTopicsWithExcludeInternal(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testListTopicsWithExcludeInternal(quorum: String): Unit = {
val topic1 = "kafka.testTopic1"
adminClient.createTopics(
List(new NewTopic(topic1, 2, 2.toShort),
@@ -280,8 +296,9 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME))
}
- @Test
- def testAlterPartitionCount(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAlterPartitionCount(quorum: String): Unit = {
adminClient.createTopics(
List(new NewTopic(testTopicName, 2,
2.toShort)).asJavaCollection).all().get()
waitForTopicCreated(testTopicName)
@@ -289,12 +306,16 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
topicService.alterTopic(new TopicCommandOptions(
Array("--topic", testTopicName, "--partitions", "3")))
+ TestUtils.waitUntilTrue(
+ () =>
brokers.forall(_.metadataCache.getTopicPartitions(testTopicName).size == 3),
+ "Timeout waiting new assignment propagate to broker")
val topicDescription =
adminClient.describeTopics(Collections.singletonList(testTopicName)).topicNameValues().get(testTopicName).get()
assertTrue(topicDescription.partitions().size() == 3)
}
- @Test
- def testAlterAssignment(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAlterAssignment(quorum: String): Unit = {
adminClient.createTopics(
Collections.singletonList(new NewTopic(testTopicName, 2,
2.toShort))).all().get()
waitForTopicCreated(testTopicName)
@@ -307,8 +328,9 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
assertEquals(List(4,2),
topicDescription.partitions().get(2).replicas().asScala.map(_.id()))
}
- @Test
- def testAlterAssignmentWithMoreAssignmentThanPartitions(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAlterAssignmentWithMoreAssignmentThanPartitions(quorum: String):
Unit = {
adminClient.createTopics(
List(new NewTopic(testTopicName, 2,
2.toShort)).asJavaCollection).all().get()
waitForTopicCreated(testTopicName)
@@ -318,8 +340,9 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
Array("--topic", testTopicName, "--replica-assignment",
"5:3,3:1,4:2,3:2", "--partitions", "3"))))
}
- @Test
- def testAlterAssignmentWithMorePartitionsThanAssignment(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAlterAssignmentWithMorePartitionsThanAssignment(quorum: String):
Unit = {
adminClient.createTopics(
List(new NewTopic(testTopicName, 2,
2.toShort)).asJavaCollection).all().get()
waitForTopicCreated(testTopicName)
@@ -329,8 +352,9 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
Array("--topic", testTopicName, "--replica-assignment", "5:3,3:1,4:2",
"--partitions", "6"))))
}
- @Test
- def testAlterWithInvalidPartitionCount(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAlterWithInvalidPartitionCount(quorum: String): Unit = {
createAndWaitTopic(new TopicCommandOptions(
Array("--partitions", "1", "--replication-factor", "1", "--topic",
testTopicName)))
@@ -339,22 +363,25 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
Array("--partitions", "-1", "--topic", testTopicName))))
}
- @Test
- def testAlterWhenTopicDoesntExist(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAlterWhenTopicDoesntExist(quorum: String): Unit = {
// alter a topic that does not exist without --if-exists
val alterOpts = new TopicCommandOptions(Array("--topic", testTopicName,
"--partitions", "1"))
val topicService = TopicService(adminClient)
assertThrows(classOf[IllegalArgumentException], () =>
topicService.alterTopic(alterOpts))
}
- @Test
- def testAlterWhenTopicDoesntExistWithIfExists(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testAlterWhenTopicDoesntExistWithIfExists(quorum: String): Unit = {
topicService.alterTopic(new TopicCommandOptions(
Array("--topic", testTopicName, "--partitions", "1", "--if-exists")))
}
- @Test
- def testCreateAlterTopicWithRackAware(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCreateAlterTopicWithRackAware(quorum: String): Unit = {
val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1",
4 -> "rack3", 5 -> "rack3")
val numPartitions = 18
@@ -365,9 +392,9 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
"--topic", testTopicName))
createAndWaitTopic(createOpts)
- var assignment =
zkClient.getReplicaAssignmentForTopics(Set(testTopicName)).map { case (tp,
replicas) =>
- tp.partition -> replicas
- }
+ var assignment =
adminClient.describeTopics(Collections.singletonList(testTopicName))
+ .allTopicNames().get().get(testTopicName).partitions()
+ .asScala.map(info => info.partition() ->
info.replicas().asScala.map(_.id())).toMap
checkReplicaDistribution(assignment, rackInfo, rackInfo.size,
numPartitions, replicationFactor)
val alteredNumPartitions = 36
@@ -376,14 +403,19 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
"--partitions", alteredNumPartitions.toString,
"--topic", testTopicName))
topicService.alterTopic(alterOpts)
- assignment =
zkClient.getReplicaAssignmentForTopics(Set(testTopicName)).map { case (tp,
replicas) =>
- tp.partition -> replicas
- }
+
+ TestUtils.waitUntilTrue(
+ () =>
brokers.forall(_.metadataCache.getTopicPartitions(testTopicName).size ==
alteredNumPartitions),
+ "Timeout waiting new assignment propagate to broker")
+ assignment =
adminClient.describeTopics(Collections.singletonList(testTopicName))
+ .allTopicNames().get().get(testTopicName).partitions()
+ .asScala.map(info => info.partition() ->
info.replicas().asScala.map(_.id())).toMap
checkReplicaDistribution(assignment, rackInfo, rackInfo.size,
alteredNumPartitions, replicationFactor)
}
- @Test
- def testConfigPreservationAcrossPartitionAlteration(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testConfigPreservationAcrossPartitionAlteration(quorum: String): Unit = {
val numPartitionsOriginal = 1
val cleanupKey = "cleanup.policy"
val cleanupVal = "compact"
@@ -395,25 +427,30 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
"--config", cleanupKey + "=" + cleanupVal,
"--topic", testTopicName))
createAndWaitTopic(createOpts)
- val props = adminZkClient.fetchEntityConfig(ConfigType.Topic,
testTopicName)
- assertTrue(props.containsKey(cleanupKey), "Properties after creation don't
contain " + cleanupKey)
- assertTrue(props.getProperty(cleanupKey).equals(cleanupVal), "Properties
after creation have incorrect value")
+ val configResource = new ConfigResource(ConfigResource.Type.TOPIC,
testTopicName)
+ val props =
adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource)
+ // val props = adminZkClient.fetchEntityConfig(ConfigType.Topic,
testTopicName)
+ assertNotNull(props.get(cleanupKey), "Properties after creation don't
contain " + cleanupKey)
+ assertEquals(cleanupVal, props.get(cleanupKey).value(), "Properties after
creation have incorrect value")
// pre-create the topic config changes path to avoid a NoNodeException
-
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
+ if (!isKRaftTest()) {
+
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
+ }
// modify the topic to add new partitions
val numPartitionsModified = 3
val alterOpts = new TopicCommandOptions(
Array("--partitions", numPartitionsModified.toString, "--topic",
testTopicName))
topicService.alterTopic(alterOpts)
- val newProps = adminZkClient.fetchEntityConfig(ConfigType.Topic,
testTopicName)
- assertTrue(newProps.containsKey(cleanupKey), "Updated properties do not
contain " + cleanupKey)
- assertTrue(newProps.getProperty(cleanupKey).equals(cleanupVal), "Updated
properties have incorrect value")
+ val newProps =
adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource)
+ assertNotNull(newProps.get(cleanupKey), "Updated properties do not contain
" + cleanupKey)
+ assertEquals(cleanupVal, newProps.get(cleanupKey).value(), "Updated
properties have incorrect value")
}
- @Test
- def testTopicDeletion(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testTopicDeletion(quorum: String): Unit = {
// create the NormalTopic
val createOpts = new TopicCommandOptions(Array("--partitions", "1",
"--replication-factor", "1",
@@ -423,14 +460,17 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
// delete the NormalTopic
val deleteOpts = new TopicCommandOptions(Array("--topic", testTopicName))
- val deletePath = DeleteTopicsTopicZNode.path(testTopicName)
- assertFalse(zkClient.pathExists(deletePath), "Delete path for topic
shouldn't exist before deletion.")
+ if (!isKRaftTest()) {
+ val deletePath = DeleteTopicsTopicZNode.path(testTopicName)
+ assertFalse(zkClient.pathExists(deletePath), "Delete path for topic
shouldn't exist before deletion.")
+ }
topicService.deleteTopic(deleteOpts)
- TestUtils.verifyTopicDeletion(zkClient, testTopicName, 1, servers)
+ TestUtils.verifyTopicDeletion(zkClientOrNull, testTopicName, 1, brokers)
}
- @Test
- def testDeleteInternalTopic(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDeleteInternalTopic(quorum: String): Unit = {
// create the offset topic
val createOffsetTopicOpts = new TopicCommandOptions(Array("--partitions",
"1",
"--replication-factor", "1",
@@ -443,25 +483,30 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
val deleteOffsetTopicOpts = new TopicCommandOptions(
Array("--topic", Topic.GROUP_METADATA_TOPIC_NAME))
val deleteOffsetTopicPath =
DeleteTopicsTopicZNode.path(Topic.GROUP_METADATA_TOPIC_NAME)
- assertFalse(zkClient.pathExists(deleteOffsetTopicPath), "Delete path for
topic shouldn't exist before deletion.")
+ if (!isKRaftTest()) {
+ assertFalse(zkClient.pathExists(deleteOffsetTopicPath), "Delete path for
topic shouldn't exist before deletion.")
+ }
topicService.deleteTopic(deleteOffsetTopicOpts)
- TestUtils.verifyTopicDeletion(zkClient, Topic.GROUP_METADATA_TOPIC_NAME,
1, servers)
+ TestUtils.verifyTopicDeletion(zkClientOrNull,
Topic.GROUP_METADATA_TOPIC_NAME, 1, brokers)
}
- @Test
- def testDeleteWhenTopicDoesntExist(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDeleteWhenTopicDoesntExist(quorum: String): Unit = {
// delete a topic that does not exist
val deleteOpts = new TopicCommandOptions(Array("--topic", testTopicName))
assertThrows(classOf[IllegalArgumentException], () =>
topicService.deleteTopic(deleteOpts))
}
- @Test
- def testDeleteWhenTopicDoesntExistWithIfExists(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDeleteWhenTopicDoesntExistWithIfExists(quorum: String): Unit = {
topicService.deleteTopic(new TopicCommandOptions(Array("--topic",
testTopicName, "--if-exists")))
}
- @Test
- def testDescribe(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribe(quorum: String): Unit = {
adminClient.createTopics(
Collections.singletonList(new NewTopic(testTopicName, 2,
2.toShort))).all().get()
waitForTopicCreated(testTopicName)
@@ -473,19 +518,22 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
assertTrue(rows(0).startsWith(s"Topic: $testTopicName"))
}
- @Test
- def testDescribeWhenTopicDoesntExist(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeWhenTopicDoesntExist(quorum: String): Unit = {
assertThrows(classOf[IllegalArgumentException],
() => topicService.describeTopic(new
TopicCommandOptions(Array("--topic", testTopicName))))
}
- @Test
- def testDescribeWhenTopicDoesntExistWithIfExists(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeWhenTopicDoesntExistWithIfExists(quorum: String): Unit = {
topicService.describeTopic(new TopicCommandOptions(Array("--topic",
testTopicName, "--if-exists")))
}
- @Test
- def testDescribeUnavailablePartitions(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUnavailablePartitions(quorum: String): Unit = {
adminClient.createTopics(
Collections.singletonList(new NewTopic(testTopicName, 6,
1.toShort))).all().get()
waitForTopicCreated(testTopicName)
@@ -500,7 +548,7 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
// wait until the topic metadata for the test topic is propagated to
each alive broker
TestUtils.waitUntilTrue(() => {
- servers
+ brokers
.filterNot(_.config.brokerId == 0)
.foldLeft(true) {
(result, server) => {
@@ -527,15 +575,16 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
}
}
- @Test
- def testDescribeUnderReplicatedPartitions(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUnderReplicatedPartitions(quorum: String): Unit = {
adminClient.createTopics(
Collections.singletonList(new NewTopic(testTopicName, 1,
6.toShort))).all().get()
waitForTopicCreated(testTopicName)
try {
killBroker(0)
- val aliveServers = servers.filterNot(_.config.brokerId == 0)
+ val aliveServers = brokers.filterNot(_.config.brokerId == 0)
TestUtils.waitForPartitionMetadata(aliveServers, testTopicName, 0)
val output = TestUtils.grabConsoleOutput(
topicService.describeTopic(new
TopicCommandOptions(Array("--under-replicated-partitions"))))
@@ -546,8 +595,9 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
}
}
- @Test
- def testDescribeUnderMinIsrPartitions(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUnderMinIsrPartitions(quorum: String): Unit = {
val configMap = new java.util.HashMap[String, String]()
configMap.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "6")
@@ -557,7 +607,7 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
try {
killBroker(0)
- val aliveServers = servers.filterNot(_.config.brokerId == 0)
+ val aliveServers = brokers.filterNot(_.config.brokerId == 0)
TestUtils.waitForPartitionMetadata(aliveServers, testTopicName, 0)
val output = TestUtils.grabConsoleOutput(
topicService.describeTopic(new
TopicCommandOptions(Array("--under-min-isr-partitions"))))
@@ -568,8 +618,9 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
}
}
- @Test
- def testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress():
Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def
testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(quorum:
String): Unit = {
val configMap = new java.util.HashMap[String, String]()
val replicationFactor: Short = 1
val partitions = 1
@@ -580,12 +631,12 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
waitForTopicCreated(testTopicName)
// Produce multiple batches.
- TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages =
10, acks = -1)
- TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages =
10, acks = -1)
+ TestUtils.generateAndProduceMessages(brokers, testTopicName, numMessages =
10, acks = -1)
+ TestUtils.generateAndProduceMessages(brokers, testTopicName, numMessages =
10, acks = -1)
// Enable throttling. Note the broker config sets the replica max fetch
bytes to `1` upon to minimize replication
// throughput so the reassignment doesn't complete quickly.
- val brokerIds = servers.map(_.config.brokerId)
+ val brokerIds = brokers.map(_.config.brokerId)
TestUtils.setReplicationThrottleForPartitions(adminClient, brokerIds,
Set(tp), throttleBytes = 1)
val testTopicDesc =
adminClient.describeTopics(Collections.singleton(testTopicName)).allTopicNames().get().get(testTopicName)
@@ -622,8 +673,9 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
TestUtils.waitForAllReassignmentsToComplete(adminClient)
}
- @Test
- def testDescribeAtMinIsrPartitions(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeAtMinIsrPartitions(quorum: String): Unit = {
val configMap = new java.util.HashMap[String, String]()
configMap.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "4")
@@ -653,8 +705,9 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
*
* Output should only display the (1) topic with partition under min ISR
count and (3) topic with offline partition
*/
- @Test
- def testDescribeUnderMinIsrPartitionsMixed(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeUnderMinIsrPartitionsMixed(quorum: String): Unit = {
val underMinIsrTopic = "under-min-isr-topic"
val notUnderMinIsrTopic = "not-under-min-isr-topic"
val offlineTopic = "offline-topic"
@@ -677,7 +730,7 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
try {
killBroker(0)
- val aliveServers = servers.filterNot(_.config.brokerId == 0)
+ val aliveServers = brokers.filterNot(_.config.brokerId == 0)
TestUtils.waitForPartitionMetadata(aliveServers, underMinIsrTopic, 0)
val output = TestUtils.grabConsoleOutput(
topicService.describeTopic(new
TopicCommandOptions(Array("--under-min-isr-partitions"))))
@@ -690,8 +743,9 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
}
}
- @Test
- def testDescribeReportOverriddenConfigs(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeReportOverriddenConfigs(quorum: String): Unit = {
val config = "file.delete.delay.ms=1000"
createAndWaitTopic(new TopicCommandOptions(
Array("--partitions", "2", "--replication-factor", "2", "--topic",
testTopicName, "--config", config)))
@@ -700,8 +754,9 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
assertTrue(output.contains(config), s"Describe output should have
contained $config")
}
- @Test
- def testDescribeAndListTopicsWithoutInternalTopics(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeAndListTopicsWithoutInternalTopics(quorum: String): Unit = {
createAndWaitTopic(
new TopicCommandOptions(Array("--partitions", "1",
"--replication-factor", "1", "--topic", testTopicName)))
// create a internal topic
@@ -720,8 +775,9 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME))
}
- @Test
- def testDescribeDoesNotFailWhenListingReassignmentIsUnauthorized(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeDoesNotFailWhenListingReassignmentIsUnauthorized(quorum:
String): Unit = {
adminClient = spy(adminClient)
topicService = TopicService(adminClient)
@@ -746,8 +802,20 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
assertTrue(rows(0).startsWith(s"Topic: $testTopicName"))
}
- @Test
- def testCreateTopicDoesNotRetryThrottlingQuotaExceededException(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCreateWithTopicNameCollision(quorum: String): Unit = {
+ adminClient.createTopics(
+ Collections.singletonList(new NewTopic("foo_bar", 1,
6.toShort))).all().get()
+ waitForTopicCreated("foo_bar")
+
+ assertThrows(classOf[InvalidTopicException],
+ () => topicService.createTopic(new TopicCommandOptions(Array("--topic",
"foo.bar"))))
+ }
+
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCreateTopicDoesNotRetryThrottlingQuotaExceededException(quorum:
String): Unit = {
val adminClient = mock(classOf[Admin])
val topicService = TopicService(adminClient)
@@ -766,8 +834,9 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
)
}
- @Test
- def testDeleteTopicDoesNotRetryThrottlingQuotaExceededException(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDeleteTopicDoesNotRetryThrottlingQuotaExceededException(quorum:
String): Unit = {
val adminClient = mock(classOf[Admin])
val topicService = TopicService(adminClient)
@@ -787,8 +856,9 @@ class TopicCommandIntegrationTest extends
KafkaServerTestHarness with Logging wi
)
}
- @Test
- def testCreatePartitionsDoesNotRetryThrottlingQuotaExceededException(): Unit
= {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCreatePartitionsDoesNotRetryThrottlingQuotaExceededException(quorum:
String): Unit = {
val adminClient = mock(classOf[Admin])
val topicService = TopicService(adminClient)
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index d8005e60ce..f104364d1a 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -216,6 +216,23 @@ public class ReplicationControlManager {
*/
private final TimelineHashMap<String, Uuid> topicsByName;
+ /**
+ * We try to prevent topics from being created if their names would
collide with
+ * existing topics when periods in the topic name are replaced with
underscores.
+ * The reason for this is that some per-topic metrics do replace periods
with
+ * underscores, and would therefore be ambiguous otherwise.
+ *
+ * This map is from normalized topic name to a set of topic names. So if
we had two
+ * topics named foo.bar and foo_bar this map would contain
+ * a mapping from foo_bar to a set containing foo.bar and foo_bar.
+ *
+ * Since we reject topic creations that would collide, under normal
conditions the
+ * sets in this map should only have a size of 1. However, if the cluster
was
+ * upgraded from a version prior to KAFKA-13743, it may be possible to
have more
+ * values here, since collidiing topic names will be "grandfathered in."
+ */
+ private final TimelineHashMap<String, TimelineHashSet<String>>
topicsWithCollisionChars;
+
/**
* Maps topic UUIDs to structures containing topic information, including
partitions.
*/
@@ -258,6 +275,7 @@ public class ReplicationControlManager {
this.clusterControl = clusterControl;
this.globalPartitionCount = new TimelineInteger(snapshotRegistry);
this.topicsByName = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.topicsWithCollisionChars = new
TimelineHashMap<>(snapshotRegistry, 0);
this.topics = new TimelineHashMap<>(snapshotRegistry, 0);
this.brokersToIsrs = new BrokersToIsrs(snapshotRegistry);
this.reassigningTopics = new TimelineHashMap<>(snapshotRegistry, 0);
@@ -266,6 +284,15 @@ public class ReplicationControlManager {
public void replay(TopicRecord record) {
topicsByName.put(record.name(), record.topicId());
+ if (Topic.hasCollisionChars(record.name())) {
+ String normalizedName = Topic.unifyCollisionChars(record.name());
+ TimelineHashSet<String> topicNames =
topicsWithCollisionChars.get(normalizedName);
+ if (topicNames == null) {
+ topicNames = new TimelineHashSet<>(snapshotRegistry, 1);
+ topicsWithCollisionChars.put(normalizedName, topicNames);
+ }
+ topicNames.add(record.name());
+ }
topics.put(record.topicId(),
new TopicControlInfo(record.name(), snapshotRegistry,
record.topicId()));
controllerMetrics.setGlobalTopicsCount(topics.size());
@@ -374,6 +401,16 @@ public class ReplicationControlManager {
" to remove.");
}
topicsByName.remove(topic.name);
+ if (Topic.hasCollisionChars(topic.name)) {
+ String normalizedName = Topic.unifyCollisionChars(topic.name);
+ TimelineHashSet<String> colliding =
topicsWithCollisionChars.get(normalizedName);
+ if (colliding != null) {
+ colliding.remove(topic.name);
+ if (colliding.isEmpty()) {
+ topicsWithCollisionChars.remove(topic.name);
+ }
+ }
+ }
reassigningTopics.remove(record.topicId());
// Delete the configurations associated with this topic.
@@ -407,7 +444,7 @@ public class ReplicationControlManager {
List<ApiMessageAndVersion> records = new ArrayList<>();
// Check the topic names.
- validateNewTopicNames(topicErrors, request.topics());
+ validateNewTopicNames(topicErrors, request.topics(),
topicsWithCollisionChars);
// Identify topics that already exist and mark them with the
appropriate error
request.topics().stream().filter(creatableTopic ->
topicsByName.containsKey(creatableTopic.name()))
@@ -598,7 +635,8 @@ public class ReplicationControlManager {
}
static void validateNewTopicNames(Map<String, ApiError> topicErrors,
- CreatableTopicCollection topics) {
+ CreatableTopicCollection topics,
+ Map<String, ? extends Set<String>>
topicsWithCollisionChars) {
for (CreatableTopic topic : topics) {
if (topicErrors.containsKey(topic.name())) continue;
try {
@@ -607,6 +645,15 @@ public class ReplicationControlManager {
topicErrors.put(topic.name(),
new ApiError(Errors.INVALID_TOPIC_EXCEPTION,
e.getMessage()));
}
+ if (Topic.hasCollisionChars(topic.name())) {
+ String normalizedName =
Topic.unifyCollisionChars(topic.name());
+ Set<String> colliding =
topicsWithCollisionChars.get(normalizedName);
+ if (colliding != null) {
+ topicErrors.put(topic.name(), new
ApiError(Errors.INVALID_TOPIC_EXCEPTION,
+ "Topic '" + topic.name() + "' collides with existing
topic: " +
+ colliding.iterator().next()));
+ }
+ }
}
}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 108f2eca66..d095a9fe37 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -82,6 +82,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
+import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicLong;
import java.util.ArrayList;
import java.util.Collections;
@@ -641,7 +642,7 @@ public class ReplicationControlManagerTest {
topics.add(new CreatableTopic().setName(""));
topics.add(new CreatableTopic().setName("woo"));
topics.add(new CreatableTopic().setName("."));
- ReplicationControlManager.validateNewTopicNames(topicErrors, topics);
+ ReplicationControlManager.validateNewTopicNames(topicErrors, topics,
Collections.emptyMap());
Map<String, ApiError> expectedTopicErrors = new HashMap<>();
expectedTopicErrors.put("", new ApiError(INVALID_TOPIC_EXCEPTION,
"Topic name is illegal, it can't be empty"));
@@ -650,6 +651,24 @@ public class ReplicationControlManagerTest {
assertEquals(expectedTopicErrors, topicErrors);
}
+ @Test
+ public void testTopicNameCollision() {
+ Map<String, ApiError> topicErrors = new HashMap<>();
+ CreatableTopicCollection topics = new CreatableTopicCollection();
+ topics.add(new CreatableTopic().setName("foo.bar"));
+ topics.add(new CreatableTopic().setName("woo.bar_foo"));
+ Map<String, Set<String>> collisionMap = new HashMap<>();
+ collisionMap.put("foo_bar", new TreeSet<>(Arrays.asList("foo_bar")));
+ collisionMap.put("woo_bar_foo", new
TreeSet<>(Arrays.asList("woo.bar.foo", "woo_bar.foo")));
+ ReplicationControlManager.validateNewTopicNames(topicErrors, topics,
collisionMap);
+ Map<String, ApiError> expectedTopicErrors = new HashMap<>();
+ expectedTopicErrors.put("foo.bar", new
ApiError(INVALID_TOPIC_EXCEPTION,
+ "Topic 'foo.bar' collides with existing topic: foo_bar"));
+ expectedTopicErrors.put("woo.bar_foo", new
ApiError(INVALID_TOPIC_EXCEPTION,
+ "Topic 'woo.bar_foo' collides with existing topic: woo.bar.foo"));
+ assertEquals(expectedTopicErrors, topicErrors);
+ }
+
@Test
public void testRemoveLeaderships() throws Exception {
ReplicationControlTestContext ctx = new
ReplicationControlTestContext();