This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new efb28af  KAFKA-10588; Rename kafka-console-consumer CLI command line 
arguments for KIP-629 (#11008)
efb28af is described below

commit efb28afc72451a7b9d72de0ced2f23f46668dbcd
Author: Omnia G H Ibrahim <[email protected]>
AuthorDate: Wed Jul 14 07:26:07 2021 +0100

    KAFKA-10588; Rename kafka-console-consumer CLI command line arguments for 
KIP-629 (#11008)
    
    This patch marks --whitelist as deprecated argument and introduce --include 
for kafka-console-consumer as described in KIP-629: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-629%3A+Use+racially+neutral+terms+in+our+codebase.
    
    Reviewers: Xavier Léauté <[email protected]>, David Jacot 
<[email protected]>
---
 .../main/scala/kafka/tools/ConsoleConsumer.scala   | 38 ++++++----
 .../unit/kafka/tools/ConsoleConsumerTest.scala     | 88 ++++++++++++++++++++++
 2 files changed, 113 insertions(+), 13 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala 
b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 5e450c2..627bd10 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -68,7 +68,7 @@ object ConsoleConsumer extends Logging {
       if (conf.partitionArg.isDefined)
         new ConsumerWrapper(Option(conf.topicArg), conf.partitionArg, 
Option(conf.offsetArg), None, consumer, timeoutMs)
       else
-        new ConsumerWrapper(Option(conf.topicArg), None, None, 
Option(conf.whitelistArg), consumer, timeoutMs)
+        new ConsumerWrapper(Option(conf.topicArg), None, None, 
Option(conf.includedTopicsArg), consumer, timeoutMs)
 
     addShutdownHook(consumerWrapper, conf)
 
@@ -191,9 +191,15 @@ object ConsoleConsumer extends Logging {
       .withRequiredArg
       .describedAs("topic")
       .ofType(classOf[String])
-    val whitelistOpt = parser.accepts("whitelist", "Regular expression 
specifying whitelist of topics to include for consumption.")
+    val whitelistOpt = parser.accepts("whitelist",
+      "DEPRECATED, use --include instead; ignored if --include specified. 
Regular expression specifying list of topics to include for consumption.")
       .withRequiredArg
-      .describedAs("whitelist")
+      .describedAs("Java regex (String)")
+      .ofType(classOf[String])
+    val includeOpt = parser.accepts("include",
+      "Regular expression specifying list of topics to include for 
consumption.")
+      .withRequiredArg
+      .describedAs("Java regex (String)")
       .ofType(classOf[String])
     val partitionIdOpt = parser.accepts("partition", "The partition to consume 
from. Consumption " +
       "starts from the end of the partition unless '--offset' is specified.")
@@ -287,7 +293,7 @@ object ConsoleConsumer extends Logging {
 
     // topic must be specified.
     var topicArg: String = null
-    var whitelistArg: String = null
+    var includedTopicsArg: String = null
     var filterSpec: TopicFilter = null
     val extraConsumerProps = 
CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerPropertyOpt).asScala)
     val consumerProps = if (options.has(consumerConfigOpt))
@@ -315,11 +321,17 @@ object ConsoleConsumer extends Logging {
 
     formatter.configure(formatterArgs.asScala.asJava)
 
-    val topicOrFilterOpt = List(topicOpt, whitelistOpt).filter(options.has)
-    if (topicOrFilterOpt.size != 1)
-      CommandLineUtils.printUsageAndDie(parser, "Exactly one of 
whitelist/topic is required.")
     topicArg = options.valueOf(topicOpt)
-    whitelistArg = options.valueOf(whitelistOpt)
+    includedTopicsArg = if (options.has(includeOpt))
+      options.valueOf(includeOpt)
+    else
+      options.valueOf(whitelistOpt)
+
+    val topicOrFilterArgs = List(topicArg, includedTopicsArg).filterNot(_ == 
null)
+    // user need to specify value for either --topic or one of the include 
filters options (--include or --whitelist)
+    if (topicOrFilterArgs.size != 1)
+      CommandLineUtils.printUsageAndDie(parser, s"Exactly one of 
--include/--topic is required. " +
+        s"${if (options.has(whitelistOpt)) "--whitelist is DEPRECATED use 
--include instead; ignored if --include specified."}")
 
     if (partitionArg.isDefined) {
       if (!options.has(topicOpt))
@@ -392,13 +404,13 @@ object ConsoleConsumer extends Logging {
     }
   }
 
-  private[tools] class ConsumerWrapper(topic: Option[String], partitionId: 
Option[Int], offset: Option[Long], whitelist: Option[String],
+  private[tools] class ConsumerWrapper(topic: Option[String], partitionId: 
Option[Int], offset: Option[Long], includedTopics: Option[String],
                                        consumer: Consumer[Array[Byte], 
Array[Byte]], val timeoutMs: Long = Long.MaxValue) {
     consumerInit()
     var recordIter = Collections.emptyList[ConsumerRecord[Array[Byte], 
Array[Byte]]]().iterator()
 
     def consumerInit(): Unit = {
-      (topic, partitionId, offset, whitelist) match {
+      (topic, partitionId, offset, includedTopics) match {
         case (Some(topic), Some(partitionId), Some(offset), None) =>
           seek(topic, partitionId, offset)
         case (Some(topic), Some(partitionId), None, None) =>
@@ -406,11 +418,11 @@ object ConsoleConsumer extends Logging {
           seek(topic, partitionId, ListOffsetsRequest.LATEST_TIMESTAMP)
         case (Some(topic), None, None, None) =>
           consumer.subscribe(Collections.singletonList(topic))
-        case (None, None, None, Some(whitelist)) =>
-          consumer.subscribe(Pattern.compile(whitelist))
+        case (None, None, None, Some(include)) =>
+          consumer.subscribe(Pattern.compile(include))
         case _ =>
           throw new IllegalArgumentException("An invalid combination of 
arguments is provided. " +
-            "Exactly one of 'topic' or 'whitelist' must be provided. " +
+            "Exactly one of 'topic' or 'include' must be provided. " +
             "If 'topic' is provided, an optional 'partition' may also be 
provided. " +
             "If 'partition' is provided, an optional 'offset' may also be 
provided, otherwise, consumption starts from the end of the partition.")
       }
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala 
b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index 91ccbcc..a87800a 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -131,6 +131,58 @@ class ConsoleConsumerTest {
   }
 
   @Test
+  def shouldParseIncludeArgument(): Unit = {
+    //Given
+    val args: Array[String] = Array(
+      "--bootstrap-server", "localhost:9092",
+      "--include", "includeTest*",
+      "--from-beginning")
+
+    //When
+    val config = new ConsoleConsumer.ConsumerConfig(args)
+
+    //Then
+    assertEquals("localhost:9092", config.bootstrapServer)
+    assertEquals("includeTest*", config.includedTopicsArg)
+    assertEquals(true, config.fromBeginning)
+  }
+
+  @Test
+  def shouldParseWhitelistArgument(): Unit = {
+    //Given
+    val args: Array[String] = Array(
+      "--bootstrap-server", "localhost:9092",
+      "--whitelist", "whitelistTest*",
+      "--from-beginning")
+
+    //When
+    val config = new ConsoleConsumer.ConsumerConfig(args)
+
+    //Then
+    assertEquals("localhost:9092", config.bootstrapServer)
+    assertEquals("whitelistTest*", config.includedTopicsArg)
+    assertEquals(true, config.fromBeginning)
+  }
+
+  @Test
+  def shouldIgnoreWhitelistArgumentIfIncludeSpecified(): Unit = {
+    //Given
+    val args: Array[String] = Array(
+      "--bootstrap-server", "localhost:9092",
+      "--include", "includeTest*",
+      "--whitelist", "whitelistTest*",
+      "--from-beginning")
+
+    //When
+    val config = new ConsoleConsumer.ConsumerConfig(args)
+
+    //Then
+    assertEquals("localhost:9092", config.bootstrapServer)
+    assertEquals("includeTest*", config.includedTopicsArg)
+    assertEquals(true, config.fromBeginning)
+  }
+
+  @Test
   def shouldParseValidSimpleConsumerValidConfigWithNumericOffset(): Unit = {
     //Given
     val args: Array[String] = Array(
@@ -516,4 +568,40 @@ class ConsoleConsumerTest {
     assertEquals("", out.toString)
   }
 
+  @Test
+  def shouldExitIfNoTopicOrFilterSpecified(): Unit = {
+    Exit.setExitProcedure((_, message) => throw new 
IllegalArgumentException(message.orNull))
+    //Given
+    val args: Array[String] = Array(
+      "--bootstrap-server", "localhost:9092")
+
+    try assertThrows(classOf[IllegalArgumentException], () => new 
ConsoleConsumer.ConsumerConfig(args))
+    finally Exit.resetExitProcedure()
+  }
+
+  @Test
+  def shouldExitIfTopicAndIncludeSpecified(): Unit = {
+    Exit.setExitProcedure((_, message) => throw new 
IllegalArgumentException(message.orNull))
+    //Given
+    val args: Array[String] = Array(
+      "--bootstrap-server", "localhost:9092",
+      "--topic", "test",
+      "--include", "includeTest*")
+
+    try assertThrows(classOf[IllegalArgumentException], () => new 
ConsoleConsumer.ConsumerConfig(args))
+    finally Exit.resetExitProcedure()
+  }
+
+  @Test
+  def shouldExitIfTopicAndWhitelistSpecified(): Unit = {
+    Exit.setExitProcedure((_, message) => throw new 
IllegalArgumentException(message.orNull))
+    //Given
+    val args: Array[String] = Array(
+      "--bootstrap-server", "localhost:9092",
+      "--topic", "test",
+      "--whitelist", "whitelistTest*")
+
+    try assertThrows(classOf[IllegalArgumentException], () => new 
ConsoleConsumer.ConsumerConfig(args))
+    finally Exit.resetExitProcedure()
+  }
 }

Reply via email to