This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new dd7c036 KAFKA-10033: Throw UnknownTopicOrPartitionException when
modifying a non-existent topic's config
dd7c036 is described below
commit dd7c0369560c815482e6efc0a7ad08e0fcdf640f
Author: Brian Byrne <[email protected]>
AuthorDate: Sat Jun 6 21:04:04 2020 +0530
KAFKA-10033: Throw UnknownTopicOrPartitionException when modifying a
non-existent topic's config
Author: Brian Byrne <[email protected]>
Reviewers: Chia-Ping Tsai <[email protected]>, Boyang Chan
<[email protected]>, Manikumar Reddy <[email protected]>
Closes #8717 from bdbyrne/KAFKA-10033
---
.../src/main/scala/kafka/server/AdminManager.scala | 3 ++
.../kafka/server/DynamicConfigChangeTest.scala | 40 ++++++++++++++++++----
2 files changed, 37 insertions(+), 6 deletions(-)
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala
b/core/src/main/scala/kafka/server/AdminManager.scala
index 183a5d3..742156a 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -454,6 +454,9 @@ class AdminManager(val config: KafkaConfig,
private def alterTopicConfigs(resource: ConfigResource, validateOnly:
Boolean,
configProps: Properties, configEntriesMap:
Map[String, String]): (ConfigResource, ApiError) = {
val topic = resource.name
+ if (!metadataCache.contains(topic))
+ throw new UnknownTopicOrPartitionException(s"The topic '$topic' does not
exist.")
+
adminZkClient.validateTopicConfig(topic, configProps)
validateConfigPolicy(resource, configEntriesMap)
if (!validateOnly) {
diff --git
a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 30a044d..c745c92 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -18,18 +18,22 @@ package kafka.server
import java.nio.charset.StandardCharsets
import java.util.Properties
+import java.util.concurrent.ExecutionException
-import kafka.log.LogConfig._
-import kafka.server.Constants._
-import org.junit.Assert._
-import org.apache.kafka.common.metrics.Quota
-import org.easymock.EasyMock
-import org.junit.Test
import kafka.integration.KafkaServerTestHarness
+import kafka.log.LogConfig._
import kafka.utils._
+import kafka.server.Constants._
import kafka.zk.ConfigEntityChangeNotificationZNode
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry}
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
+import org.apache.kafka.common.metrics.Quota
+import org.easymock.EasyMock
+import org.junit.Assert._
+import org.junit.Test
import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters._
@@ -203,6 +207,23 @@ class DynamicConfigChangeTest extends
KafkaServerTestHarness {
}
@Test
+ def testConfigChangeOnNonExistingTopicWithAdminClient(): Unit = {
+ val topic = TestUtils.tempTopic
+ val admin = createAdminClient()
+ try {
+ val resource = new ConfigResource(ConfigResource.Type.TOPIC, topic)
+ val op = new AlterConfigOp(new ConfigEntry(FlushMessagesProp, "10000"),
AlterConfigOp.OpType.SET)
+ admin.incrementalAlterConfigs(Map(resource ->
List(op).asJavaCollection).asJava).all.get
+ fail("Should fail with UnknownTopicOrPartitionException for topic
doesn't exist")
+ } catch {
+ case e: ExecutionException =>
+ assertTrue(e.getCause.isInstanceOf[UnknownTopicOrPartitionException])
+ } finally {
+ admin.close()
+ }
+ }
+
+ @Test
def testProcessNotification(): Unit = {
val props = new Properties()
props.put("a.b", "10")
@@ -314,4 +335,11 @@ class DynamicConfigChangeTest extends
KafkaServerTestHarness {
def parse(configHandler: TopicConfigHandler, value: String): Seq[Int] = {
configHandler.parseThrottledPartitions(CoreUtils.propsWith(LeaderReplicationThrottledReplicasProp,
value), 102, LeaderReplicationThrottledReplicasProp)
}
+
+ private def createAdminClient(): Admin = {
+ val props = new Properties()
+ props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+ Admin.create(props)
+ }
+
}