This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ff1875f  KAFKA-6778; AdminClient.describeConfigs() should return error 
for non-existent topics (#4866)
ff1875f is described below

commit ff1875fce0a82737069e195060b6a93881954a23
Author: Manikumar Reddy O <[email protected]>
AuthorDate: Fri Apr 27 02:31:04 2018 +0530

    KAFKA-6778; AdminClient.describeConfigs() should return error for 
non-existent topics (#4866)
    
    Reviewers: Ismael Juma <[email protected]>, Jason Gustafson 
<[email protected]>
---
 .../src/main/scala/kafka/server/AdminManager.scala | 12 ++++++++----
 .../kafka/api/AdminClientIntegrationTest.scala     | 22 ++++++++++++++++++++++
 2 files changed, 30 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/server/AdminManager.scala 
b/core/src/main/scala/kafka/server/AdminManager.scala
index b54defc..01457a1 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -304,10 +304,14 @@ class AdminManager(val config: KafkaConfig,
           case ResourceType.TOPIC =>
             val topic = resource.name
             Topic.validate(topic)
-            // Consider optimizing this by caching the configs or retrieving 
them from the `Log` when possible
-            val topicProps = adminZkClient.fetchEntityConfig(ConfigType.Topic, 
topic)
-            val logConfig = 
LogConfig.fromProps(KafkaServer.copyKafkaConfigToLog(config), topicProps)
-            createResponseConfig(allConfigs(logConfig), 
createTopicConfigEntry(logConfig, topicProps, includeSynonyms))
+            if (metadataCache.contains(topic)) {
+              // Consider optimizing this by caching the configs or retrieving 
them from the `Log` when possible
+              val topicProps = 
adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
+              val logConfig = 
LogConfig.fromProps(KafkaServer.copyKafkaConfigToLog(config), topicProps)
+              createResponseConfig(allConfigs(logConfig), 
createTopicConfigEntry(logConfig, topicProps, includeSynonyms))
+            } else {
+              new DescribeConfigsResponse.Config(new 
ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, null), 
Collections.emptyList[DescribeConfigsResponse.ConfigEntry])
+            }
 
           case ResourceType.BROKER =>
             if (resource.name == null || resource.name.isEmpty)
diff --git 
a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 33c14c6..b31c09d 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -49,6 +49,7 @@ import scala.collection.JavaConverters._
 import java.lang.{Long => JLong}
 
 import kafka.zk.KafkaZkClient
+import org.scalatest.Assertions.intercept
 
 import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, Future}
@@ -821,6 +822,27 @@ class AdminClientIntegrationTest extends 
IntegrationTestHarness with Logging {
     client.close()
   }
 
+  @Test
+  def testDescribeConfigsForTopic(): Unit = {
+    createTopic(topic, numPartitions = 2, replicationFactor = serverCount)
+    client = AdminClient.create(createConfig)
+
+    val existingTopic = new ConfigResource(ConfigResource.Type.TOPIC, topic)
+    
client.describeConfigs(Collections.singletonList(existingTopic)).values.get(existingTopic).get()
+
+    val nonExistentTopic = new ConfigResource(ConfigResource.Type.TOPIC, 
"unknown")
+    val describeResult1 = 
client.describeConfigs(Collections.singletonList(nonExistentTopic))
+
+    
assertTrue(intercept[ExecutionException](describeResult1.values.get(nonExistentTopic).get).getCause.isInstanceOf[UnknownTopicOrPartitionException])
+
+    val invalidTopic = new ConfigResource(ConfigResource.Type.TOPIC, "(invalid 
topic)")
+    val describeResult2 = 
client.describeConfigs(Collections.singletonList(invalidTopic))
+
+    
assertTrue(intercept[ExecutionException](describeResult2.values.get(invalidTopic).get).getCause.isInstanceOf[InvalidTopicException])
+
+    client.close()
+  }
+
   private def subscribeAndWaitForAssignment(topic: String, consumer: 
KafkaConsumer[Array[Byte], Array[Byte]]): Unit = {
     consumer.subscribe(Collections.singletonList(topic))
     TestUtils.waitUntilTrue(() => {

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to