This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push:
new 129d531 KAFKA-8875; CreateTopic API should check topic existence
before replication factor (#7298)
129d531 is described below
commit 129d5317f5abe895cb44955ee4e843e3b09fb864
Author: huxi <[email protected]>
AuthorDate: Thu Sep 12 05:24:27 2019 +0800
KAFKA-8875; CreateTopic API should check topic existence before replication
factor (#7298)
If the topic already exists, `handleCreateTopicsRequest` should return
TopicExistsException even given an invalid config (replication factor for
instance).
Reviewers: Rajini Sivaram <[email protected]>, Jason Gustafson
<[email protected]>
---
core/src/main/scala/kafka/server/AdminManager.scala | 5 ++++-
.../kafka/api/AdminClientIntegrationTest.scala | 17 +++++++++++++++++
2 files changed, 21 insertions(+), 1 deletion(-)
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala
b/core/src/main/scala/kafka/server/AdminManager.scala
index d424700..7115981 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.clients.admin.AlterConfigOp
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.common.config.ConfigDef.ConfigKey
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef,
ConfigException, ConfigResource}
-import org.apache.kafka.common.errors.{ApiException,
InvalidConfigurationException, InvalidPartitionsException,
InvalidReplicaAssignmentException, InvalidRequestException,
ReassignmentInProgressException, UnknownTopicOrPartitionException}
+import org.apache.kafka.common.errors.{ApiException,
InvalidConfigurationException, InvalidPartitionsException,
InvalidReplicaAssignmentException, InvalidRequestException,
ReassignmentInProgressException, TopicExistsException,
UnknownTopicOrPartitionException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
import org.apache.kafka.common.metrics.Metrics
@@ -84,6 +84,9 @@ class AdminManager(val config: KafkaConfig,
val brokers = metadataCache.getAliveBrokers.map { b =>
kafka.admin.BrokerMetadata(b.id, b.rack) }
val metadata = toCreate.values.map(topic =>
try {
+ if (metadataCache.contains(topic.name))
+ throw new TopicExistsException(s"Topic '${topic.name}' already
exists.")
+
val configs = new Properties()
topic.configs().asScala.foreach { case entry =>
configs.setProperty(entry.name(), entry.value())
diff --git
a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 40d2820..8c92ec6 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -212,6 +212,23 @@ class AdminClientIntegrationTest extends
IntegrationTestHarness with Logging {
}
@Test
+ def testCreateExistingTopicsThrowTopicExistsException(): Unit = {
+ client = AdminClient.create(createConfig())
+ val topic = "mytopic"
+ val topics = Seq(topic)
+ val newTopics = Seq(new NewTopic(topic, 1, 1.toShort))
+
+ client.createTopics(newTopics.asJava).all.get()
+ waitForTopics(client, topics, List())
+
+ val newTopicsWithInvalidRF = Seq(new NewTopic(topic, 1, (servers.size +
1).toShort))
+ val e = intercept[ExecutionException] {
+ client.createTopics(newTopicsWithInvalidRF.asJava, new
CreateTopicsOptions().validateOnly(true)).all.get()
+ }
+ assertTrue(e.getCause.isInstanceOf[TopicExistsException])
+ }
+
+ @Test
def testMetadataRefresh(): Unit = {
client = AdminClient.create(createConfig())
val topics = Seq("mytopic")