This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new efb28af KAFKA-10588; Rename kafka-console-consumer CLI command line
arguments for KIP-629 (#11008)
efb28af is described below
commit efb28afc72451a7b9d72de0ced2f23f46668dbcd
Author: Omnia G H Ibrahim <[email protected]>
AuthorDate: Wed Jul 14 07:26:07 2021 +0100
KAFKA-10588; Rename kafka-console-consumer CLI command line arguments for
KIP-629 (#11008)
This patch marks --whitelist as deprecated argument and introduce --include
for kafka-console-consumer as described in KIP-629:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-629%3A+Use+racially+neutral+terms+in+our+codebase.
Reviewers: Xavier Léauté <[email protected]>, David Jacot
<[email protected]>
---
.../main/scala/kafka/tools/ConsoleConsumer.scala | 38 ++++++----
.../unit/kafka/tools/ConsoleConsumerTest.scala | 88 ++++++++++++++++++++++
2 files changed, 113 insertions(+), 13 deletions(-)
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 5e450c2..627bd10 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -68,7 +68,7 @@ object ConsoleConsumer extends Logging {
if (conf.partitionArg.isDefined)
new ConsumerWrapper(Option(conf.topicArg), conf.partitionArg,
Option(conf.offsetArg), None, consumer, timeoutMs)
else
- new ConsumerWrapper(Option(conf.topicArg), None, None,
Option(conf.whitelistArg), consumer, timeoutMs)
+ new ConsumerWrapper(Option(conf.topicArg), None, None,
Option(conf.includedTopicsArg), consumer, timeoutMs)
addShutdownHook(consumerWrapper, conf)
@@ -191,9 +191,15 @@ object ConsoleConsumer extends Logging {
.withRequiredArg
.describedAs("topic")
.ofType(classOf[String])
- val whitelistOpt = parser.accepts("whitelist", "Regular expression
specifying whitelist of topics to include for consumption.")
+ val whitelistOpt = parser.accepts("whitelist",
+ "DEPRECATED, use --include instead; ignored if --include specified.
Regular expression specifying list of topics to include for consumption.")
.withRequiredArg
- .describedAs("whitelist")
+ .describedAs("Java regex (String)")
+ .ofType(classOf[String])
+ val includeOpt = parser.accepts("include",
+ "Regular expression specifying list of topics to include for
consumption.")
+ .withRequiredArg
+ .describedAs("Java regex (String)")
.ofType(classOf[String])
val partitionIdOpt = parser.accepts("partition", "The partition to consume
from. Consumption " +
"starts from the end of the partition unless '--offset' is specified.")
@@ -287,7 +293,7 @@ object ConsoleConsumer extends Logging {
// topic must be specified.
var topicArg: String = null
- var whitelistArg: String = null
+ var includedTopicsArg: String = null
var filterSpec: TopicFilter = null
val extraConsumerProps =
CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerPropertyOpt).asScala)
val consumerProps = if (options.has(consumerConfigOpt))
@@ -315,11 +321,17 @@ object ConsoleConsumer extends Logging {
formatter.configure(formatterArgs.asScala.asJava)
- val topicOrFilterOpt = List(topicOpt, whitelistOpt).filter(options.has)
- if (topicOrFilterOpt.size != 1)
- CommandLineUtils.printUsageAndDie(parser, "Exactly one of
whitelist/topic is required.")
topicArg = options.valueOf(topicOpt)
- whitelistArg = options.valueOf(whitelistOpt)
+ includedTopicsArg = if (options.has(includeOpt))
+ options.valueOf(includeOpt)
+ else
+ options.valueOf(whitelistOpt)
+
+ val topicOrFilterArgs = List(topicArg, includedTopicsArg).filterNot(_ ==
null)
+ // user need to specify value for either --topic or one of the include
filters options (--include or --whitelist)
+ if (topicOrFilterArgs.size != 1)
+ CommandLineUtils.printUsageAndDie(parser, s"Exactly one of
--include/--topic is required. " +
+ s"${if (options.has(whitelistOpt)) "--whitelist is DEPRECATED use
--include instead; ignored if --include specified."}")
if (partitionArg.isDefined) {
if (!options.has(topicOpt))
@@ -392,13 +404,13 @@ object ConsoleConsumer extends Logging {
}
}
- private[tools] class ConsumerWrapper(topic: Option[String], partitionId:
Option[Int], offset: Option[Long], whitelist: Option[String],
+ private[tools] class ConsumerWrapper(topic: Option[String], partitionId:
Option[Int], offset: Option[Long], includedTopics: Option[String],
consumer: Consumer[Array[Byte],
Array[Byte]], val timeoutMs: Long = Long.MaxValue) {
consumerInit()
var recordIter = Collections.emptyList[ConsumerRecord[Array[Byte],
Array[Byte]]]().iterator()
def consumerInit(): Unit = {
- (topic, partitionId, offset, whitelist) match {
+ (topic, partitionId, offset, includedTopics) match {
case (Some(topic), Some(partitionId), Some(offset), None) =>
seek(topic, partitionId, offset)
case (Some(topic), Some(partitionId), None, None) =>
@@ -406,11 +418,11 @@ object ConsoleConsumer extends Logging {
seek(topic, partitionId, ListOffsetsRequest.LATEST_TIMESTAMP)
case (Some(topic), None, None, None) =>
consumer.subscribe(Collections.singletonList(topic))
- case (None, None, None, Some(whitelist)) =>
- consumer.subscribe(Pattern.compile(whitelist))
+ case (None, None, None, Some(include)) =>
+ consumer.subscribe(Pattern.compile(include))
case _ =>
throw new IllegalArgumentException("An invalid combination of
arguments is provided. " +
- "Exactly one of 'topic' or 'whitelist' must be provided. " +
+ "Exactly one of 'topic' or 'include' must be provided. " +
"If 'topic' is provided, an optional 'partition' may also be
provided. " +
"If 'partition' is provided, an optional 'offset' may also be
provided, otherwise, consumption starts from the end of the partition.")
}
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index 91ccbcc..a87800a 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -131,6 +131,58 @@ class ConsoleConsumerTest {
}
@Test
+ def shouldParseIncludeArgument(): Unit = {
+ //Given
+ val args: Array[String] = Array(
+ "--bootstrap-server", "localhost:9092",
+ "--include", "includeTest*",
+ "--from-beginning")
+
+ //When
+ val config = new ConsoleConsumer.ConsumerConfig(args)
+
+ //Then
+ assertEquals("localhost:9092", config.bootstrapServer)
+ assertEquals("includeTest*", config.includedTopicsArg)
+ assertEquals(true, config.fromBeginning)
+ }
+
+ @Test
+ def shouldParseWhitelistArgument(): Unit = {
+ //Given
+ val args: Array[String] = Array(
+ "--bootstrap-server", "localhost:9092",
+ "--whitelist", "whitelistTest*",
+ "--from-beginning")
+
+ //When
+ val config = new ConsoleConsumer.ConsumerConfig(args)
+
+ //Then
+ assertEquals("localhost:9092", config.bootstrapServer)
+ assertEquals("whitelistTest*", config.includedTopicsArg)
+ assertEquals(true, config.fromBeginning)
+ }
+
+ @Test
+ def shouldIgnoreWhitelistArgumentIfIncludeSpecified(): Unit = {
+ //Given
+ val args: Array[String] = Array(
+ "--bootstrap-server", "localhost:9092",
+ "--include", "includeTest*",
+ "--whitelist", "whitelistTest*",
+ "--from-beginning")
+
+ //When
+ val config = new ConsoleConsumer.ConsumerConfig(args)
+
+ //Then
+ assertEquals("localhost:9092", config.bootstrapServer)
+ assertEquals("includeTest*", config.includedTopicsArg)
+ assertEquals(true, config.fromBeginning)
+ }
+
+ @Test
def shouldParseValidSimpleConsumerValidConfigWithNumericOffset(): Unit = {
//Given
val args: Array[String] = Array(
@@ -516,4 +568,40 @@ class ConsoleConsumerTest {
assertEquals("", out.toString)
}
+ @Test
+ def shouldExitIfNoTopicOrFilterSpecified(): Unit = {
+ Exit.setExitProcedure((_, message) => throw new
IllegalArgumentException(message.orNull))
+ //Given
+ val args: Array[String] = Array(
+ "--bootstrap-server", "localhost:9092")
+
+ try assertThrows(classOf[IllegalArgumentException], () => new
ConsoleConsumer.ConsumerConfig(args))
+ finally Exit.resetExitProcedure()
+ }
+
+ @Test
+ def shouldExitIfTopicAndIncludeSpecified(): Unit = {
+ Exit.setExitProcedure((_, message) => throw new
IllegalArgumentException(message.orNull))
+ //Given
+ val args: Array[String] = Array(
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--include", "includeTest*")
+
+ try assertThrows(classOf[IllegalArgumentException], () => new
ConsoleConsumer.ConsumerConfig(args))
+ finally Exit.resetExitProcedure()
+ }
+
+ @Test
+ def shouldExitIfTopicAndWhitelistSpecified(): Unit = {
+ Exit.setExitProcedure((_, message) => throw new
IllegalArgumentException(message.orNull))
+ //Given
+ val args: Array[String] = Array(
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--whitelist", "whitelistTest*")
+
+ try assertThrows(classOf[IllegalArgumentException], () => new
ConsoleConsumer.ConsumerConfig(args))
+ finally Exit.resetExitProcedure()
+ }
}