This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4a3ab89f95a KAFKA-17386 Remove broker-list, threads and
num-fetch-threads in ConsumerPerformance (#16983)
4a3ab89f95a is described below
commit 4a3ab89f95aba294bb536af55548522d946d1ee3
Author: PoAn Yang <[email protected]>
AuthorDate: Fri Aug 30 22:09:37 2024 +0800
KAFKA-17386 Remove broker-list, threads and num-fetch-threads in
ConsumerPerformance (#16983)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../services/performance/consumer_performance.py | 18 ++---------
.../apache/kafka/tools/ConsumerPerformance.java | 24 ++------------
.../kafka/tools/ConsumerPerformanceTest.java | 37 ++--------------------
3 files changed, 8 insertions(+), 71 deletions(-)
diff --git a/tests/kafkatest/services/performance/consumer_performance.py
b/tests/kafkatest/services/performance/consumer_performance.py
index ed7cc99f86a..3325fe5298a 100644
--- a/tests/kafkatest/services/performance/consumer_performance.py
+++ b/tests/kafkatest/services/performance/consumer_performance.py
@@ -28,8 +28,6 @@ class ConsumerPerformanceService(PerformanceService):
"zookeeper" "The connection string for the zookeeper connection in the
form host:port. Multiple URLS can
be given to allow fail-over. This option is only used
with the old consumer."
- "broker-list", "A broker list to use for connecting if using the new
consumer."
-
"topic", "REQUIRED: The topic to consume from."
"group", "The group id to consume on."
@@ -41,10 +39,6 @@ class ConsumerPerformanceService(PerformanceService):
"socket-buffer-size", "The size of the tcp RECV size."
- "threads", "Number of processing threads."
-
- "num-fetch-threads", "Number of fetcher threads. Defaults to 1"
-
"new-consumer", "Use the new consumer implementation."
"consumer.config", "Consumer config properties file."
"""
@@ -92,8 +86,6 @@ class ConsumerPerformanceService(PerformanceService):
# These less-frequently used settings can be updated manually after
instantiation
self.fetch_size = None
self.socket_buffer_size = None
- self.threads = None
- self.num_fetch_threads = None
self.group = None
self.from_latest = None
@@ -110,7 +102,9 @@ class ConsumerPerformanceService(PerformanceService):
if self.new_consumer:
if version <= LATEST_0_10_0:
args['new-consumer'] = ""
- args['broker-list'] =
self.kafka.bootstrap_servers(self.security_config.security_protocol)
+ args['broker-list'] =
self.kafka.bootstrap_servers(self.security_config.security_protocol)
+ else:
+ args['bootstrap-server'] =
self.kafka.bootstrap_servers(self.security_config.security_protocol)
else:
args['zookeeper'] = self.kafka.zk_connect_setting()
@@ -120,12 +114,6 @@ class ConsumerPerformanceService(PerformanceService):
if self.socket_buffer_size is not None:
args['socket-buffer-size'] = self.socket_buffer_size
- if self.threads is not None:
- args['threads'] = self.threads
-
- if self.num_fetch_threads is not None:
- args['num-fetch-threads'] = self.num_fetch_threads
-
if self.group is not None:
args['group'] = self.group
diff --git
a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
index a98760c0cb4..0a5106d0e9c 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
@@ -245,15 +245,12 @@ public class ConsumerPerformance {
}
protected static class ConsumerPerfOptions extends CommandDefaultOptions {
- private final OptionSpec<String> brokerListOpt;
private final OptionSpec<String> bootstrapServerOpt;
private final OptionSpec<String> topicOpt;
private final OptionSpec<String> groupIdOpt;
private final OptionSpec<Integer> fetchSizeOpt;
private final OptionSpec<Void> resetBeginningOffsetOpt;
private final OptionSpec<Integer> socketBufferSizeOpt;
- private final OptionSpec<Integer> numThreadsOpt;
- private final OptionSpec<Integer> numFetchersOpt;
private final OptionSpec<String> consumerConfigOpt;
private final OptionSpec<Void> printMetricsOpt;
private final OptionSpec<Void> showDetailedStatsOpt;
@@ -265,12 +262,7 @@ public class ConsumerPerformance {
public ConsumerPerfOptions(String[] args) {
super(args);
- brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use
--bootstrap-server instead; ignored if --bootstrap-server is specified. The
broker list string in the form HOST1:PORT1,HOST2:PORT2.")
- .withRequiredArg()
- .describedAs("broker-list")
- .ofType(String.class);
- bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED
unless --broker-list(deprecated) is specified. The server(s) to connect to.")
- .requiredUnless("broker-list")
+ bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED:
The server(s) to connect to.")
.withRequiredArg()
.describedAs("server to connect to")
.ofType(String.class);
@@ -295,16 +287,6 @@ public class ConsumerPerformance {
.describedAs("size")
.ofType(Integer.class)
.defaultsTo(2 * 1024 * 1024);
- numThreadsOpt = parser.accepts("threads", "DEPRECATED AND IGNORED:
Number of processing threads.")
- .withRequiredArg()
- .describedAs("count")
- .ofType(Integer.class)
- .defaultsTo(10);
- numFetchersOpt = parser.accepts("num-fetch-threads", "DEPRECATED
AND IGNORED: Number of fetcher threads.")
- .withRequiredArg()
- .describedAs("count")
- .ofType(Integer.class)
- .defaultsTo(1);
consumerConfigOpt = parser.accepts("consumer.config", "Consumer
config properties file.")
.withRequiredArg()
.describedAs("config file")
@@ -341,8 +323,6 @@ public class ConsumerPerformance {
return;
}
if (options != null) {
- if (options.has(numThreadsOpt) || options.has(numFetchersOpt))
- System.out.println("WARNING: option [threads] and
[num-fetch-threads] have been deprecated and will be ignored by the test");
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool is
used to verify the consumer performance.");
CommandLineUtils.checkRequiredArgs(parser, options, topicOpt,
numMessagesOpt);
}
@@ -353,7 +333,7 @@ public class ConsumerPerformance {
}
public String brokerHostsAndPorts() {
- return options.valueOf(options.has(bootstrapServerOpt) ?
bootstrapServerOpt : brokerListOpt);
+ return options.valueOf(bootstrapServerOpt);
}
public Properties props() throws IOException {
diff --git
a/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java
b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java
index 61e30a135cd..270fab2cf80 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java
@@ -63,21 +63,6 @@ public class ConsumerPerformanceTest {
() -> ConsumerPerformance.printConsumerProgress(1, 1024 * 1024, 0,
1, 0, 0, 1, dateFormat, 1L));
}
- @Test
- public void testConfigBrokerList() {
- String[] args = new String[]{
- "--broker-list", "localhost:9092",
- "--topic", "test",
- "--messages", "10"
- };
-
- ConsumerPerformance.ConsumerPerfOptions config = new
ConsumerPerformance.ConsumerPerfOptions(args);
-
- assertEquals("localhost:9092", config.brokerHostsAndPorts());
- assertTrue(config.topic().contains("test"));
- assertEquals(10, config.numMessages());
- }
-
@Test
public void testConfigBootStrapServer() {
String[] args = new String[]{
@@ -94,26 +79,10 @@ public class ConsumerPerformanceTest {
assertEquals(10, config.numMessages());
}
- @Test
- public void testBrokerListOverride() {
- String[] args = new String[]{
- "--broker-list", "localhost:9094",
- "--bootstrap-server", "localhost:9092",
- "--topic", "test",
- "--messages", "10"
- };
-
- ConsumerPerformance.ConsumerPerfOptions config = new
ConsumerPerformance.ConsumerPerfOptions(args);
-
- assertEquals("localhost:9092", config.brokerHostsAndPorts());
- assertTrue(config.topic().contains("test"));
- assertEquals(10, config.numMessages());
- }
-
@Test
public void testConfigWithUnrecognizedOption() {
String[] args = new String[]{
- "--broker-list", "localhost:9092",
+ "--bootstrap-server", "localhost:9092",
"--topic", "test",
"--messages", "10",
"--new-consumer"
@@ -133,7 +102,7 @@ public class ConsumerPerformanceTest {
}
String[] args = new String[]{
- "--broker-list", "localhost:9092",
+ "--bootstrap-server", "localhost:9092",
"--topic", "test",
"--messages", "10",
"--consumer.config", tempFile.getAbsolutePath()
@@ -147,7 +116,7 @@ public class ConsumerPerformanceTest {
@Test
public void testDefaultClientId() throws IOException {
String[] args = new String[]{
- "--broker-list", "localhost:9092",
+ "--bootstrap-server", "localhost:9092",
"--topic", "test",
"--messages", "10"
};