This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 464051929d2 KAFKA-17388 Remove broker-list from VerifiableProducer
(#16958)
464051929d2 is described below
commit 464051929d26c2330eeff58c155a2f59be528da1
Author: Logan Zhu <[email protected]>
AuthorDate: Thu Aug 29 20:02:29 2024 +0800
KAFKA-17388 Remove broker-list from VerifiableProducer (#16958)
---
tests/kafkatest/services/verifiable_producer.py | 9 +++++++--
.../java/org/apache/kafka/tools/VerifiableProducer.java | 15 ++-------------
2 files changed, 9 insertions(+), 15 deletions(-)
diff --git a/tests/kafkatest/services/verifiable_producer.py
b/tests/kafkatest/services/verifiable_producer.py
index a49c91c24b0..ea6292d5772 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -23,7 +23,7 @@ from kafkatest.directory_layout.kafka_path import
KafkaPathResolverMixin
from kafkatest.services.kafka import TopicPartition
from kafkatest.services.verifiable_client import VerifiableClientMixin
from kafkatest.utils import is_int, is_int_with_prefix
-from kafkatest.version import DEV_BRANCH
+from kafkatest.version import get_version, V_2_5_0, DEV_BRANCH
from kafkatest.services.kafka.util import fix_opts_for_new_jvm
@@ -224,7 +224,12 @@ class VerifiableProducer(KafkaPathResolverMixin,
VerifiableClientMixin, Backgrou
cmd += fix_opts_for_new_jvm(node)
cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; "
% VerifiableProducer.LOG4J_CONFIG
cmd += self.impl.exec_cmd(node)
- cmd += " --topic %s --broker-list %s" % (self.topic,
self.kafka.bootstrap_servers(self.security_config.security_protocol, True,
self.offline_nodes))
+ version = get_version(node)
+ if version >= V_2_5_0:
+ server_option_flag = "--bootstrap-server"
+ else:
+ server_option_flag = "--broker-list"
+ cmd += " --topic %s %s %s" % (self.topic, server_option_flag,
self.kafka.bootstrap_servers(self.security_config.security_protocol, True,
self.offline_nodes))
if self.max_messages > 0:
cmd += " --max-messages %s" % str(self.max_messages)
if self.throughput > 0:
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
index 32fc94b4b3c..33f0b3142e8 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
@@ -131,14 +131,6 @@ public class VerifiableProducer implements AutoCloseable {
.dest("bootstrapServer")
.help("REQUIRED: The server(s) to connect to. Comma-separated
list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");
- connectionGroup.addArgument("--broker-list")
- .action(store())
- .required(false)
- .type(String.class)
- .metavar("HOST1:PORT1[,HOST2:PORT2[...]]")
- .dest("brokerList")
- .help("DEPRECATED, use --bootstrap-server instead; ignored if
--bootstrap-server is specified. Comma-separated list of Kafka brokers in the
form HOST1:PORT1,HOST2:PORT2,...");
-
parser.addArgument("--max-messages")
.action(store())
.required(false)
@@ -234,15 +226,12 @@ public class VerifiableProducer implements AutoCloseable {
Properties producerProps = new Properties();
- if (res.get("bootstrapServer") != null) {
- producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
res.getString("bootstrapServer"));
- } else if (res.getString("brokerList") != null) {
- producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
res.getString("brokerList"));
- } else {
+ if (res.get("bootstrapServer") == null) {
parser.printHelp();
// Can't use `Exit.exit` here because it didn't exist until
0.11.0.0.
System.exit(0);
}
+ producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
res.getString("bootstrapServer"));
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");