This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ff1875f KAFKA-6778; AdminClient.describeConfigs() should return error
for non-existent topics (#4866)
ff1875f is described below
commit ff1875fce0a82737069e195060b6a93881954a23
Author: Manikumar Reddy O <[email protected]>
AuthorDate: Fri Apr 27 02:31:04 2018 +0530
KAFKA-6778; AdminClient.describeConfigs() should return error for
non-existent topics (#4866)
Reviewers: Ismael Juma <[email protected]>, Jason Gustafson
<[email protected]>
---
.../src/main/scala/kafka/server/AdminManager.scala | 12 ++++++++----
.../kafka/api/AdminClientIntegrationTest.scala | 22 ++++++++++++++++++++++
2 files changed, 30 insertions(+), 4 deletions(-)
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala
b/core/src/main/scala/kafka/server/AdminManager.scala
index b54defc..01457a1 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -304,10 +304,14 @@ class AdminManager(val config: KafkaConfig,
case ResourceType.TOPIC =>
val topic = resource.name
Topic.validate(topic)
- // Consider optimizing this by caching the configs or retrieving
them from the `Log` when possible
- val topicProps = adminZkClient.fetchEntityConfig(ConfigType.Topic,
topic)
- val logConfig =
LogConfig.fromProps(KafkaServer.copyKafkaConfigToLog(config), topicProps)
- createResponseConfig(allConfigs(logConfig),
createTopicConfigEntry(logConfig, topicProps, includeSynonyms))
+ if (metadataCache.contains(topic)) {
+ // Consider optimizing this by caching the configs or retrieving
them from the `Log` when possible
+ val topicProps =
adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
+ val logConfig =
LogConfig.fromProps(KafkaServer.copyKafkaConfigToLog(config), topicProps)
+ createResponseConfig(allConfigs(logConfig),
createTopicConfigEntry(logConfig, topicProps, includeSynonyms))
+ } else {
+ new DescribeConfigsResponse.Config(new
ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, null),
Collections.emptyList[DescribeConfigsResponse.ConfigEntry])
+ }
case ResourceType.BROKER =>
if (resource.name == null || resource.name.isEmpty)
diff --git
a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 33c14c6..b31c09d 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -49,6 +49,7 @@ import scala.collection.JavaConverters._
import java.lang.{Long => JLong}
import kafka.zk.KafkaZkClient
+import org.scalatest.Assertions.intercept
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
@@ -821,6 +822,27 @@ class AdminClientIntegrationTest extends
IntegrationTestHarness with Logging {
client.close()
}
+ @Test
+ def testDescribeConfigsForTopic(): Unit = {
+ createTopic(topic, numPartitions = 2, replicationFactor = serverCount)
+ client = AdminClient.create(createConfig)
+
+ val existingTopic = new ConfigResource(ConfigResource.Type.TOPIC, topic)
+
client.describeConfigs(Collections.singletonList(existingTopic)).values.get(existingTopic).get()
+
+ val nonExistentTopic = new ConfigResource(ConfigResource.Type.TOPIC,
"unknown")
+ val describeResult1 =
client.describeConfigs(Collections.singletonList(nonExistentTopic))
+
+
assertTrue(intercept[ExecutionException](describeResult1.values.get(nonExistentTopic).get).getCause.isInstanceOf[UnknownTopicOrPartitionException])
+
+ val invalidTopic = new ConfigResource(ConfigResource.Type.TOPIC, "(invalid
topic)")
+ val describeResult2 =
client.describeConfigs(Collections.singletonList(invalidTopic))
+
+
assertTrue(intercept[ExecutionException](describeResult2.values.get(invalidTopic).get).getCause.isInstanceOf[InvalidTopicException])
+
+ client.close()
+ }
+
private def subscribeAndWaitForAssignment(topic: String, consumer:
KafkaConsumer[Array[Byte], Array[Byte]]): Unit = {
consumer.subscribe(Collections.singletonList(topic))
TestUtils.waitUntilTrue(() => {
--
To stop receiving notification emails like this one, please contact
[email protected].