This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 464051929d2 KAFKA-17388 Remove broker-list from VerifiableProducer 
(#16958)
464051929d2 is described below

commit 464051929d26c2330eeff58c155a2f59be528da1
Author: Logan Zhu <[email protected]>
AuthorDate: Thu Aug 29 20:02:29 2024 +0800

    KAFKA-17388 Remove broker-list from VerifiableProducer (#16958)
---
 tests/kafkatest/services/verifiable_producer.py           |  9 +++++++--
 .../java/org/apache/kafka/tools/VerifiableProducer.java   | 15 ++-------------
 2 files changed, 9 insertions(+), 15 deletions(-)

diff --git a/tests/kafkatest/services/verifiable_producer.py 
b/tests/kafkatest/services/verifiable_producer.py
index a49c91c24b0..ea6292d5772 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -23,7 +23,7 @@ from kafkatest.directory_layout.kafka_path import 
KafkaPathResolverMixin
 from kafkatest.services.kafka import TopicPartition
 from kafkatest.services.verifiable_client import VerifiableClientMixin
 from kafkatest.utils import is_int, is_int_with_prefix
-from kafkatest.version import DEV_BRANCH
+from kafkatest.version import get_version, V_2_5_0, DEV_BRANCH
 from kafkatest.services.kafka.util import fix_opts_for_new_jvm
 
 
@@ -224,7 +224,12 @@ class VerifiableProducer(KafkaPathResolverMixin, 
VerifiableClientMixin, Backgrou
         cmd += fix_opts_for_new_jvm(node)
         cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " 
% VerifiableProducer.LOG4J_CONFIG
         cmd += self.impl.exec_cmd(node)
-        cmd += " --topic %s --broker-list %s" % (self.topic, 
self.kafka.bootstrap_servers(self.security_config.security_protocol, True, 
self.offline_nodes))
+        version = get_version(node)
+        if version >= V_2_5_0:
+            server_option_flag = "--bootstrap-server"
+        else:
+            server_option_flag = "--broker-list"
+        cmd += " --topic %s %s %s" % (self.topic, server_option_flag, 
self.kafka.bootstrap_servers(self.security_config.security_protocol, True, 
self.offline_nodes))
         if self.max_messages > 0:
             cmd += " --max-messages %s" % str(self.max_messages)
         if self.throughput > 0:
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java 
b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
index 32fc94b4b3c..33f0b3142e8 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
@@ -131,14 +131,6 @@ public class VerifiableProducer implements AutoCloseable {
                 .dest("bootstrapServer")
                 .help("REQUIRED: The server(s) to connect to. Comma-separated 
list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");
 
-        connectionGroup.addArgument("--broker-list")
-                .action(store())
-                .required(false)
-                .type(String.class)
-                .metavar("HOST1:PORT1[,HOST2:PORT2[...]]")
-                .dest("brokerList")
-                .help("DEPRECATED, use --bootstrap-server instead; ignored if 
--bootstrap-server is specified.  Comma-separated list of Kafka brokers in the 
form HOST1:PORT1,HOST2:PORT2,...");
-
         parser.addArgument("--max-messages")
                 .action(store())
                 .required(false)
@@ -234,15 +226,12 @@ public class VerifiableProducer implements AutoCloseable {
 
         Properties producerProps = new Properties();
 
-        if (res.get("bootstrapServer") != null) {
-            producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
res.getString("bootstrapServer"));
-        } else if (res.getString("brokerList") != null) {
-            producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
res.getString("brokerList"));
-        } else {
+        if (res.get("bootstrapServer") == null) {
             parser.printHelp();
             // Can't use `Exit.exit` here because it didn't exist until 
0.11.0.0.
             System.exit(0);
         }
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
res.getString("bootstrapServer"));
 
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                 "org.apache.kafka.common.serialization.StringSerializer");

Reply via email to