This is an automated email from the ASF dual-hosted git repository.

divijv pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 701f924352d KAFKA-15140: Use TestUtils methods and add logs for 
assertion failure at TopicCommandIntegrationTest (#13950)
701f924352d is described below

commit 701f924352da1225a881f0f78f19ddf51485030a
Author: DL1231 <[email protected]>
AuthorDate: Tue Jul 4 22:02:39 2023 +0800

    KAFKA-15140: Use TestUtils methods and add logs for assertion failure at 
TopicCommandIntegrationTest (#13950)
    
    This commit utilizes TestUtils methods to create a topic and adds logs when 
assertions fail.
    
    Reviewers: Divij Vaidya <[email protected]>
    
    ---------
    
    Co-authored-by: d00791190 <[email protected]>
---
 .../kafka/admin/TopicCommandIntegrationTest.scala  | 169 ++++++++-------------
 1 file changed, 65 insertions(+), 104 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala
index ea4c748da92..9e1beaf6adf 100644
--- 
a/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala
@@ -54,7 +54,7 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
     * `testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress`.
     */
   override def generateConfigs: Seq[KafkaConfig] = 
TestUtils.createBrokerConfigs(
-    numConfigs = 6,
+    numConfigs = numBrokers,
     zkConnect = zkConnectOrNull,
     rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4 
-> "rack3", 5 -> "rack3"),
     numPartitions = numPartitions,
@@ -66,6 +66,8 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
 
   private val numPartitions = 1
   private val defaultReplicationFactor = 1.toShort
+  private val numBrokers = 6
+  private val lineSeparator = System.lineSeparator()
 
   private var topicService: TopicService = _
   private var adminClient: Admin = _
@@ -253,7 +255,7 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
     val output = TestUtils.grabConsoleOutput(
       topicService.listTopics(new TopicCommandOptions(Array())))
 
-    assertTrue(output.contains(testTopicName))
+    assertTrue(output.contains(testTopicName), s"Unexpected output: $output")
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@@ -262,46 +264,36 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
     val topic1 = "kafka.testTopic1"
     val topic2 = "kafka.testTopic2"
     val topic3 = "oooof.testTopic1"
-    adminClient.createTopics(
-      List(new NewTopic(topic1, 2, 2.toShort),
-        new NewTopic(topic2, 2, 2.toShort),
-        new NewTopic(topic3, 2, 2.toShort)).asJavaCollection)
-      .all().get()
-    waitForTopicCreated(topic1)
-    waitForTopicCreated(topic2)
-    waitForTopicCreated(topic3)
+    TestUtils.createTopicWithAdmin(adminClient, topic1, brokers, 2, 2)
+    TestUtils.createTopicWithAdmin(adminClient, topic2, brokers, 2, 2)
+    TestUtils.createTopicWithAdmin(adminClient, topic3, brokers, 2, 2)
 
     val output = TestUtils.grabConsoleOutput(
       topicService.listTopics(new TopicCommandOptions(Array("--topic", 
"kafka.*"))))
 
-    assertTrue(output.contains(topic1))
-    assertTrue(output.contains(topic2))
-    assertFalse(output.contains(topic3))
+    assertTrue(output.contains(topic1), s"Unexpected output: $output")
+    assertTrue(output.contains(topic2), s"Unexpected output: $output")
+    assertFalse(output.contains(topic3), s"Unexpected output: $output")
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
   def testListTopicsWithExcludeInternal(quorum: String): Unit = {
     val topic1 = "kafka.testTopic1"
-    adminClient.createTopics(
-      List(new NewTopic(topic1, 2, 2.toShort),
-        new NewTopic(Topic.GROUP_METADATA_TOPIC_NAME, 2, 
2.toShort)).asJavaCollection)
-      .all().get()
-    waitForTopicCreated(topic1)
+    TestUtils.createTopicWithAdmin(adminClient, topic1, brokers, 2, 2)
+    TestUtils.createTopicWithAdmin(adminClient, 
Topic.GROUP_METADATA_TOPIC_NAME, brokers, 2, 2)
 
     val output = TestUtils.grabConsoleOutput(
       topicService.listTopics(new 
TopicCommandOptions(Array("--exclude-internal"))))
 
-    assertTrue(output.contains(topic1))
-    assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME))
+    assertTrue(output.contains(topic1), s"Unexpected output: $output")
+    assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME), s"Unexpected 
output: $output")
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
   def testAlterPartitionCount(quorum: String): Unit = {
-    adminClient.createTopics(
-      List(new NewTopic(testTopicName, 2, 
2.toShort)).asJavaCollection).all().get()
-    waitForTopicCreated(testTopicName)
+    TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 2, 2)
 
     topicService.alterTopic(new TopicCommandOptions(
       Array("--topic", testTopicName, "--partitions", "3")))
@@ -316,9 +308,7 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
   def testAlterAssignment(quorum: String): Unit = {
-    adminClient.createTopics(
-      Collections.singletonList(new NewTopic(testTopicName, 2, 
2.toShort))).all().get()
-    waitForTopicCreated(testTopicName)
+    TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 2, 2)
 
     topicService.alterTopic(new TopicCommandOptions(
       Array("--topic", testTopicName, "--replica-assignment", "5:3,3:1,4:2", 
"--partitions", "3")))
@@ -334,9 +324,7 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
   def testAlterAssignmentWithMoreAssignmentThanPartitions(quorum: String): 
Unit = {
-    adminClient.createTopics(
-      List(new NewTopic(testTopicName, 2, 
2.toShort)).asJavaCollection).all().get()
-    waitForTopicCreated(testTopicName)
+    TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 2, 2)
 
     assertThrows(classOf[ExecutionException],
       () => topicService.alterTopic(new TopicCommandOptions(
@@ -346,9 +334,7 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
   def testAlterAssignmentWithMorePartitionsThanAssignment(quorum: String): 
Unit = {
-    adminClient.createTopics(
-      List(new NewTopic(testTopicName, 2, 
2.toShort)).asJavaCollection).all().get()
-    waitForTopicCreated(testTopicName)
+    TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 2, 2)
 
     assertThrows(classOf[ExecutionException],
       () => topicService.alterTopic(new TopicCommandOptions(
@@ -534,15 +520,13 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
   def testDescribe(quorum: String): Unit = {
-    adminClient.createTopics(
-      Collections.singletonList(new NewTopic(testTopicName, 2, 
2.toShort))).all().get()
-    waitForTopicCreated(testTopicName)
+    TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 2, 2)
 
     val output = TestUtils.grabConsoleOutput(
       topicService.describeTopic(new TopicCommandOptions(Array("--topic", 
testTopicName))))
-    val rows = output.split("\n")
-    assertEquals(3, rows.size)
-    assertTrue(rows(0).startsWith(s"Topic: $testTopicName"))
+    val rows = output.split(lineSeparator)
+    assertEquals(3, rows.size, s"Unexpected output: $output")
+    assertTrue(rows(0).startsWith(s"Topic: $testTopicName"), s"Unexpected 
output: ${rows(0)}")
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@@ -561,9 +545,7 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
   def testDescribeUnavailablePartitions(quorum: String): Unit = {
-    adminClient.createTopics(
-      Collections.singletonList(new NewTopic(testTopicName, 6, 
1.toShort))).all().get()
-    waitForTopicCreated(testTopicName)
+    TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 
numBrokers, 1)
 
     try {
       // check which partition is on broker 0 which we'll kill
@@ -594,9 +576,9 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
       val output = TestUtils.grabConsoleOutput(
           topicService.describeTopic(new TopicCommandOptions(
             Array("--topic", testTopicName, "--unavailable-partitions"))))
-      val rows = output.split("\n")
-      assertTrue(rows(0).startsWith(s"\tTopic: $testTopicName"))
-      assertTrue(rows(0).contains("Leader: none\tReplicas: 0\tIsr:"))
+      val rows = output.split(lineSeparator)
+      assertTrue(rows(0).startsWith(s"\tTopic: $testTopicName"), s"Unexpected 
output: ${rows(0)}")
+      assertTrue(rows(0).contains("Leader: none\tReplicas: 0\tIsr:"), 
s"Unexpected output: ${rows(0)}")
     } finally {
       restartDeadBrokers()
     }
@@ -605,9 +587,7 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
   def testDescribeUnderReplicatedPartitions(quorum: String): Unit = {
-    adminClient.createTopics(
-      Collections.singletonList(new NewTopic(testTopicName, 1, 
6.toShort))).all().get()
-    waitForTopicCreated(testTopicName)
+    TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 1, 
numBrokers)
 
     try {
       killBroker(0)
@@ -618,7 +598,7 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
       }
       val output = TestUtils.grabConsoleOutput(
         topicService.describeTopic(new 
TopicCommandOptions(Array("--under-replicated-partitions"))))
-      val rows = output.split("\n")
+      val rows = output.split(lineSeparator)
       assertTrue(rows(0).startsWith(s"\tTopic: $testTopicName"), s"Unexpected 
output: ${rows(0)}")
     } finally {
       restartDeadBrokers()
@@ -628,12 +608,11 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
   def testDescribeUnderMinIsrPartitions(quorum: String): Unit = {
-    val configMap = new java.util.HashMap[String, String]()
-    configMap.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "6")
+    val topicProps = new Properties()
+    topicProps.setProperty(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, 
numBrokers.toString)
 
-    adminClient.createTopics(
-      Collections.singletonList(new NewTopic(testTopicName, 1, 
6.toShort).configs(configMap))).all().get()
-    waitForTopicCreated(testTopicName)
+    // create topic
+    TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 1, 
numBrokers, topicConfig = topicProps)
 
     try {
       killBroker(0)
@@ -647,8 +626,8 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
       }
       val output = TestUtils.grabConsoleOutput(
         topicService.describeTopic(new 
TopicCommandOptions(Array("--under-min-isr-partitions"))))
-      val rows = output.split("\n")
-      assertTrue(rows(0).startsWith(s"\tTopic: $testTopicName"))
+      val rows = output.split(lineSeparator)
+      assertTrue(rows(0).startsWith(s"\tTopic: $testTopicName"), s"Unexpected 
output: ${rows(0)}")
     } finally {
       restartDeadBrokers()
     }
@@ -657,14 +636,9 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
   def 
testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(quorum: 
String): Unit = {
-    val configMap = new java.util.HashMap[String, String]()
-    val replicationFactor: Short = 1
-    val partitions = 1
     val tp = new TopicPartition(testTopicName, 0)
 
-    adminClient.createTopics(
-      Collections.singletonList(new NewTopic(testTopicName, partitions, 
replicationFactor).configs(configMap))).all().get()
-    waitForTopicCreated(testTopicName)
+    TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 1, 1)
 
     // Produce multiple batches.
     TestUtils.generateAndProduceMessages(brokers, testTopicName, numMessages = 
10, acks = -1)
@@ -693,9 +667,9 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
     // describe the topic and test if it's under-replicated
     val simpleDescribeOutput = TestUtils.grabConsoleOutput(
       topicService.describeTopic(new TopicCommandOptions(Array("--topic", 
testTopicName))))
-    val simpleDescribeOutputRows = simpleDescribeOutput.split("\n")
-    assertTrue(simpleDescribeOutputRows(0).startsWith(s"Topic: 
$testTopicName"))
-    assertEquals(2, simpleDescribeOutputRows.size)
+    val simpleDescribeOutputRows = simpleDescribeOutput.split(lineSeparator)
+    assertTrue(simpleDescribeOutputRows(0).startsWith(s"Topic: 
$testTopicName"), s"Unexpected output: ${simpleDescribeOutputRows(0)}")
+    assertEquals(2, simpleDescribeOutputRows.size, s"Unexpected output: 
$simpleDescribeOutput")
 
     val underReplicatedOutput = TestUtils.grabConsoleOutput(
       topicService.describeTopic(new 
TopicCommandOptions(Array("--under-replicated-partitions"))))
@@ -712,12 +686,11 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
   def testDescribeAtMinIsrPartitions(quorum: String): Unit = {
-    val configMap = new java.util.HashMap[String, String]()
-    configMap.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "4")
+    val topicProps = new Properties()
+    topicProps.setProperty(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "4")
 
-    adminClient.createTopics(
-      Collections.singletonList(new NewTopic(testTopicName, 1, 
6.toShort).configs(configMap))).all().get()
-    waitForTopicCreated(testTopicName)
+    // create topic
+    TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 1, 
numBrokers, topicConfig = topicProps)
 
     try {
       killBroker(0)
@@ -734,9 +707,9 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
 
       val output = TestUtils.grabConsoleOutput(
         topicService.describeTopic(new 
TopicCommandOptions(Array("--at-min-isr-partitions"))))
-      val rows = output.split("\n")
-      assertTrue(rows(0).startsWith(s"\tTopic: $testTopicName"))
-      assertEquals(1, rows.length)
+      val rows = output.split(lineSeparator)
+      assertTrue(rows(0).startsWith(s"\tTopic: $testTopicName"), s"Unexpected 
output: ${rows(0)}")
+      assertEquals(1, rows.length, s"Unexpected output: $output")
     } finally {
       restartDeadBrokers()
     }
@@ -758,21 +731,14 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
     val notUnderMinIsrTopic = "not-under-min-isr-topic"
     val offlineTopic = "offline-topic"
     val fullyReplicatedTopic = "fully-replicated-topic"
+    val topicProps = new Properties()
+    topicProps.setProperty(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, 
numBrokers.toString)
 
-    val configMap = new java.util.HashMap[String, String]()
-    configMap.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "6")
-
-    adminClient.createTopics(
-      java.util.Arrays.asList(
-        new NewTopic(underMinIsrTopic, 1, 6.toShort).configs(configMap),
-        new NewTopic(notUnderMinIsrTopic, 1, 6.toShort),
-        new NewTopic(offlineTopic, Collections.singletonMap(0, 
Collections.singletonList(0))),
-        new NewTopic(fullyReplicatedTopic, Collections.singletonMap(0, 
java.util.Arrays.asList(1, 2, 3))))).all().get()
-
-    waitForTopicCreated(underMinIsrTopic)
-    waitForTopicCreated(notUnderMinIsrTopic)
-    waitForTopicCreated(offlineTopic)
-    waitForTopicCreated(fullyReplicatedTopic)
+    // create topic
+    TestUtils.createTopicWithAdmin(adminClient, underMinIsrTopic, brokers, 1, 
numBrokers, topicConfig = topicProps)
+    TestUtils.createTopicWithAdmin(adminClient, notUnderMinIsrTopic, brokers, 
1, numBrokers)
+    TestUtils.createTopicWithAdmin(adminClient, offlineTopic, brokers, 1, 
replicaAssignment = Map(0 -> Seq(0)))
+    TestUtils.createTopicWithAdmin(adminClient, fullyReplicatedTopic, brokers, 
1, replicaAssignment = Map(0 -> Seq(1, 2, 3)))
 
     try {
       killBroker(0)
@@ -782,17 +748,17 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
         TestUtils.waitUntilTrue(
           () => aliveBrokers.forall(
             broker =>
-              broker.metadataCache.getPartitionInfo(underMinIsrTopic, 
0).get.isr().size() < 6 &&
+              broker.metadataCache.getPartitionInfo(underMinIsrTopic, 
0).get.isr().size() < numBrokers &&
                 broker.metadataCache.getPartitionInfo(offlineTopic, 
0).get.leader() == MetadataResponse.NO_LEADER_ID),
           "Timeout waiting for partition metadata propagating to brokers for 
underMinIsrTopic topic"
         )
       }
       val output = TestUtils.grabConsoleOutput(
         topicService.describeTopic(new 
TopicCommandOptions(Array("--under-min-isr-partitions"))))
-      val rows = output.split("\n")
-      assertTrue(rows(0).startsWith(s"\tTopic: $underMinIsrTopic"))
-      assertTrue(rows(1).startsWith(s"\tTopic: $offlineTopic"))
-      assertEquals(2, rows.length)
+      val rows = output.split(lineSeparator)
+      assertTrue(rows(0).startsWith(s"\tTopic: $underMinIsrTopic"), 
s"Unexpected output: ${rows(0)}")
+      assertTrue(rows(1).startsWith(s"\tTopic: $offlineTopic"), s"Unexpected 
output: ${rows(1)}")
+      assertEquals(2, rows.length, s"Unexpected output: $output")
     } finally {
       restartDeadBrokers()
     }
@@ -822,12 +788,12 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
     var output = TestUtils.grabConsoleOutput(topicService.describeTopic(new 
TopicCommandOptions(
       Array("--describe", "--exclude-internal"))))
     assertTrue(output.contains(testTopicName), s"Output should have contained 
$testTopicName")
-    assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME))
+    assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME), s"Unexpected 
output: $output")
 
     // test list
     output = TestUtils.grabConsoleOutput(topicService.listTopics(new 
TopicCommandOptions(Array("--list", "--exclude-internal"))))
-    assertTrue(output.contains(testTopicName))
-    assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME))
+    assertTrue(output.contains(testTopicName), s"Unexpected output: $output")
+    assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME), s"Unexpected 
output: $output")
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@@ -845,24 +811,19 @@ class TopicCommandIntegrationTest extends 
KafkaServerTestHarness with Logging wi
       Set(new TopicPartition(testTopicName, 0)).asJava
     )
 
-    adminClient.createTopics(
-      Collections.singletonList(new NewTopic(testTopicName, 1, 1.toShort))
-    ).all().get()
-    waitForTopicCreated(testTopicName)
+    TestUtils.createTopicWithAdmin(adminClient, testTopicName, brokers, 1, 1)
 
     val output = TestUtils.grabConsoleOutput(
       topicService.describeTopic(new TopicCommandOptions(Array("--topic", 
testTopicName))))
-    val rows = output.split("\n")
-    assertEquals(2, rows.size)
-    assertTrue(rows(0).startsWith(s"Topic: $testTopicName"))
+    val rows = output.split(lineSeparator)
+    assertEquals(2, rows.size, s"Unexpected output: $output")
+    assertTrue(rows(0).startsWith(s"Topic: $testTopicName"), s"Unexpected 
output: ${rows(0)}")
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
   def testCreateWithTopicNameCollision(quorum: String): Unit = {
-    adminClient.createTopics(
-      Collections.singletonList(new NewTopic("foo_bar", 1, 
6.toShort))).all().get()
-    waitForTopicCreated("foo_bar")
+    TestUtils.createTopicWithAdmin(adminClient, "foo_bar", brokers, 1, 
numBrokers)
 
     assertThrows(classOf[InvalidTopicException],
       () => topicService.createTopic(new TopicCommandOptions(Array("--topic", 
"foo.bar"))))

Reply via email to