This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3a0c83e4903 KAFKA-16551 add integration test for ClusterTool (#16408)
3a0c83e4903 is described below
commit 3a0c83e49030074271dd34c20fa99baad6270c93
Author: PoAn Yang <[email protected]>
AuthorDate: Mon Jun 24 14:04:45 2024 +0800
KAFKA-16551 add integration test for ClusterTool (#16408)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../org/apache/kafka/tools/ClusterToolTest.java | 37 ++++++++++++++++++++++
1 file changed, 37 insertions(+)
diff --git a/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java
b/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java
index 16a8ef8b2c3..232d4466d88 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java
@@ -16,20 +16,57 @@
*/
package org.apache.kafka.tools;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.junit.ClusterTestExtensions;
+
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
+import java.util.Set;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(value = 60)
+@ExtendWith(value = ClusterTestExtensions.class)
public class ClusterToolTest {
+ @ClusterTest
+ public void testClusterId(ClusterInstance clusterInstance) {
+ String output = ToolsTestUtils.captureStandardOut(() ->
+ assertDoesNotThrow(() -> ClusterTool.execute("cluster-id",
"--bootstrap-server", clusterInstance.bootstrapServers())));
+ assertTrue(output.contains("Cluster ID: " +
clusterInstance.clusterId()));
+ }
+
+ @ClusterTest(brokers = 3)
+ public void testUnregister(ClusterInstance clusterInstance) {
+ int brokerId;
+ if (!clusterInstance.isKRaftTest()) {
+ brokerId = assertDoesNotThrow(() ->
clusterInstance.brokerIds().stream().findFirst().get());
+ } else {
+ Set<Integer> brokerIds = clusterInstance.brokerIds();
+ brokerIds.removeAll(clusterInstance.controllerIds());
+ brokerId = assertDoesNotThrow(() ->
brokerIds.stream().findFirst().get());
+ }
+ clusterInstance.shutdownBroker(brokerId);
+ String output = ToolsTestUtils.captureStandardOut(() ->
+ assertDoesNotThrow(() -> ClusterTool.execute("unregister",
"--bootstrap-server", clusterInstance.bootstrapServers(), "--id",
String.valueOf(brokerId))));
+
+ if (clusterInstance.isKRaftTest()) {
+ assertTrue(output.contains("Broker " + brokerId + " is no longer
registered."));
+ } else {
+ assertTrue(output.contains("The target cluster does not support
the broker unregistration API."));
+ }
+ }
+
@Test
public void testPrintClusterId() throws Exception {
Admin adminClient = new MockAdminClient.Builder().