This is an automated email from the ASF dual-hosted git repository.
davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 10c789416c2 KAFKA-17619: Remove zk type and instance from ClusterTest
(#17284)
10c789416c2 is described below
commit 10c789416c254f48104b7f9a44c82375eff8fb00
Author: PoAn Yang <[email protected]>
AuthorDate: Fri Sep 27 23:38:15 2024 +0800
KAFKA-17619: Remove zk type and instance from ClusterTest (#17284)
Reviewers: Colin P. McCabe <[email protected]>, Chia-Ping Tsai
<[email protected]>, David Arthur <[email protected]>
---
core/src/test/java/kafka/admin/AclCommandTest.java | 124 ++-----
.../kafka/admin/ConfigCommandIntegrationTest.java | 301 +----------------
core/src/test/java/kafka/test/ClusterConfig.java | 4 +-
core/src/test/java/kafka/test/ClusterInstance.java | 16 +-
.../java/kafka/test/ClusterTestExtensionsTest.java | 85 ++---
.../test/annotation/ClusterConfigProperty.java | 3 -
.../kafka/test/annotation/ClusterTestDefaults.java | 2 +-
core/src/test/java/kafka/test/annotation/Type.java | 7 -
core/src/test/java/kafka/test/junit/README.md | 2 +-
.../test/junit/ZkClusterInvocationContext.java | 372 ---------------------
.../api/GroupCoordinatorIntegrationTest.scala | 2 +-
.../transaction/ProducerIdsIntegrationTest.scala | 52 +--
.../server/KafkaServerKRaftRegistrationTest.scala | 131 --------
.../unit/kafka/server/ApiVersionsRequestTest.scala | 75 +----
.../kafka/server/ClientQuotasRequestTest.scala | 6 +-
.../server/ConsumerGroupDescribeRequestTest.scala | 24 --
.../kafka/server/DeleteGroupsRequestTest.scala | 2 +-
.../kafka/server/DescribeGroupsRequestTest.scala | 2 +-
.../kafka/server/DescribeQuorumRequestTest.scala | 18 +-
.../unit/kafka/server/HeartbeatRequestTest.scala | 2 +-
.../unit/kafka/server/LeaveGroupRequestTest.scala | 2 +-
.../unit/kafka/server/ListGroupsRequestTest.scala | 2 +-
.../kafka/server/OffsetCommitRequestTest.scala | 2 +-
.../kafka/server/OffsetDeleteRequestTest.scala | 2 +-
.../unit/kafka/server/OffsetFetchRequestTest.scala | 6 +-
.../kafka/server/SaslApiVersionsRequestTest.scala | 5 +-
.../unit/kafka/server/SyncGroupRequestTest.scala | 2 +-
.../org/apache/kafka/tools/FeatureCommandTest.java | 40 ---
.../org/apache/kafka/tools/GetOffsetShellTest.java | 3 +-
.../kafka/tools/MetadataQuorumCommandTest.java | 31 +-
.../org/apache/kafka/tools/TopicCommandTest.java | 2 +-
.../group/ConsumerGroupCommandTestUtils.java | 32 +-
.../consumer/group/ListConsumerGroupTest.java | 8 +-
.../reassign/ReassignPartitionsCommandTest.java | 11 +-
34 files changed, 109 insertions(+), 1269 deletions(-)
diff --git a/core/src/test/java/kafka/admin/AclCommandTest.java
b/core/src/test/java/kafka/admin/AclCommandTest.java
index 97d643f2c25..01df0f83984 100644
--- a/core/src/test/java/kafka/admin/AclCommandTest.java
+++ b/core/src/test/java/kafka/admin/AclCommandTest.java
@@ -17,15 +17,12 @@
package kafka.admin;
import kafka.admin.AclCommand.AclCommandOptions;
-import kafka.security.authorizer.AclAuthorizer;
import kafka.test.ClusterInstance;
import kafka.test.annotation.ClusterConfigProperty;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.ClusterTestDefaults;
-import kafka.test.annotation.ClusterTests;
import kafka.test.annotation.Type;
import kafka.test.junit.ClusterTestExtensions;
-import kafka.test.junit.ZkClusterInvocationContext;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBindingFilter;
@@ -98,14 +95,17 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-@ClusterTestDefaults(serverProperties = {
- @ClusterConfigProperty(key = StandardAuthorizer.SUPER_USERS_CONFIG,
value = "User:ANONYMOUS"),
- @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value =
AclCommandTest.ACL_AUTHORIZER)
-})
+@ClusterTestDefaults(
+ types = {Type.KRAFT},
+ serverProperties = {
+ @ClusterConfigProperty(key =
StandardAuthorizer.SUPER_USERS_CONFIG, value = "User:ANONYMOUS"),
+ @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value =
AclCommandTest.STANDARD_AUTHORIZER)}
+
+)
@ExtendWith(ClusterTestExtensions.class)
public class AclCommandTest {
public static final String ACL_AUTHORIZER =
"kafka.security.authorizer.AclAuthorizer";
- private static final String STANDARD_AUTHORIZER =
"org.apache.kafka.metadata.authorizer.StandardAuthorizer";
+ public static final String STANDARD_AUTHORIZER =
"org.apache.kafka.metadata.authorizer.StandardAuthorizer";
private static final String LOCALHOST = "localhost:9092";
private static final String AUTHORIZER = "--authorizer";
private static final String AUTHORIZER_PROPERTIES = AUTHORIZER +
"-properties";
@@ -221,77 +221,38 @@ public class AclCommandTest {
}).collect(Collectors.toMap(Entry::getKey,
Entry::getValue)));
}};
- @ClusterTest(types = {Type.ZK})
- public void testAclCliWithAuthorizer(ClusterInstance cluster) throws
InterruptedException {
- testAclCli(cluster, zkArgs(cluster));
- }
-
- @ClusterTests({
- @ClusterTest(types = {Type.ZK}),
- @ClusterTest(types = {Type.KRAFT}, serverProperties = {
- @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG,
value = STANDARD_AUTHORIZER)
- })
- })
+ @ClusterTest
public void testAclCliWithAdminAPI(ClusterInstance cluster) throws
InterruptedException {
testAclCli(cluster, adminArgs(cluster.bootstrapServers(),
Optional.empty()));
}
- @ClusterTest(types = {Type.KRAFT}, serverProperties = {
- @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value =
STANDARD_AUTHORIZER)
- })
+ @ClusterTest
public void testAclCliWithAdminAPIAndBootstrapController(ClusterInstance
cluster) throws InterruptedException {
testAclCli(cluster,
adminArgsWithBootstrapController(cluster.bootstrapControllers(),
Optional.empty()));
}
- @ClusterTests({
- @ClusterTest(types = {Type.ZK}),
- @ClusterTest(types = {Type.KRAFT}, serverProperties = {
- @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG,
value = STANDARD_AUTHORIZER)
- })
- })
+ @ClusterTest
public void
testAclCliWithMisusingBootstrapServerToController(ClusterInstance cluster) {
assertThrows(RuntimeException.class, () -> testAclCli(cluster,
adminArgsWithBootstrapController(cluster.bootstrapServers(),
Optional.empty())));
}
- @ClusterTests({
- @ClusterTest(types = {Type.ZK}),
- @ClusterTest(types = {Type.KRAFT}, serverProperties = {
- @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG,
value = STANDARD_AUTHORIZER)
- })
- })
+ @ClusterTest
public void
testAclCliWithMisusingBootstrapControllerToServer(ClusterInstance cluster) {
assertThrows(RuntimeException.class, () -> testAclCli(cluster,
adminArgs(cluster.bootstrapControllers(), Optional.empty())));
}
- @ClusterTest(types = {Type.ZK})
- public void testProducerConsumerCliWithAuthorizer(ClusterInstance cluster)
throws InterruptedException {
- testProducerConsumerCli(cluster, zkArgs(cluster));
- }
-
- @ClusterTests({
- @ClusterTest(types = {Type.ZK}),
- @ClusterTest(types = {Type.KRAFT}, serverProperties = {
- @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG,
value = STANDARD_AUTHORIZER)
- })
- })
+ @ClusterTest
public void testProducerConsumerCliWithAdminAPI(ClusterInstance cluster)
throws InterruptedException {
testProducerConsumerCli(cluster, adminArgs(cluster.bootstrapServers(),
Optional.empty()));
}
- @ClusterTest(types = {Type.KRAFT}, serverProperties = {
- @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value =
STANDARD_AUTHORIZER)
- })
+ @ClusterTest
public void
testProducerConsumerCliWithAdminAPIAndBootstrapController(ClusterInstance
cluster) throws InterruptedException {
testProducerConsumerCli(cluster,
adminArgsWithBootstrapController(cluster.bootstrapControllers(),
Optional.empty()));
}
- @ClusterTests({
- @ClusterTest(types = {Type.ZK}),
- @ClusterTest(types = {Type.KRAFT}, serverProperties = {
- @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG,
value = STANDARD_AUTHORIZER)
- })
- })
+ @ClusterTest
public void testAclCliWithClientId(ClusterInstance cluster) throws
IOException, InterruptedException {
try (LogCaptureAppender appender =
LogCaptureAppender.createAndRegister()) {
appender.setClassLogger(AppInfoParser.class, Level.WARN);
@@ -303,9 +264,7 @@ public class AclCommandTest {
}
}
- @ClusterTest(types = {Type.KRAFT}, serverProperties = {
- @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value =
STANDARD_AUTHORIZER)
- })
+ @ClusterTest
public void testAclCliWithClientIdAndBootstrapController(ClusterInstance
cluster) throws IOException, InterruptedException {
try (LogCaptureAppender appender =
LogCaptureAppender.createAndRegister()) {
appender.setClassLogger(AppInfoParser.class, Level.WARN);
@@ -317,57 +276,22 @@ public class AclCommandTest {
}
}
- @ClusterTest(types = {Type.ZK})
- public void testAclsOnPrefixedResourcesWithAuthorizer(ClusterInstance
cluster) throws InterruptedException {
- testAclsOnPrefixedResources(cluster, zkArgs(cluster));
- }
-
- @ClusterTests({
- @ClusterTest(types = {Type.ZK}),
- @ClusterTest(types = {Type.KRAFT}, serverProperties = {
- @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG,
value = STANDARD_AUTHORIZER)
- })
- })
+ @ClusterTest
public void testAclsOnPrefixedResourcesWithAdminAPI(ClusterInstance
cluster) throws InterruptedException {
testAclsOnPrefixedResources(cluster,
adminArgs(cluster.bootstrapServers(), Optional.empty()));
}
- @ClusterTest(types = {Type.KRAFT}, serverProperties = {
- @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value =
STANDARD_AUTHORIZER)
- })
+ @ClusterTest
public void
testAclsOnPrefixedResourcesWithAdminAPIAndBootstrapController(ClusterInstance
cluster) throws InterruptedException {
testAclsOnPrefixedResources(cluster,
adminArgsWithBootstrapController(cluster.bootstrapControllers(),
Optional.empty()));
}
- @ClusterTest(types = {Type.ZK})
- public void testInvalidAuthorizerProperty(ClusterInstance cluster) {
- AclCommand.AuthorizerService aclCommandService = new
AclCommand.AuthorizerService(
- AclAuthorizer.class.getName(),
- new AclCommandOptions(new String[]{AUTHORIZER_PROPERTIES,
"zookeeper.connect " + zkConnect(cluster)})
- );
- assertThrows(IllegalArgumentException.class,
aclCommandService::listAcls);
- }
-
- @ClusterTest(types = {Type.ZK})
- public void testPatternTypesWithAuthorizer(ClusterInstance cluster) {
- testPatternTypes(zkArgs(cluster));
- }
-
- @ClusterTests({
- @ClusterTest(types = {Type.ZK}),
- @ClusterTest(types = {Type.KRAFT}, serverProperties = {
- @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG,
value = STANDARD_AUTHORIZER)
- })
- })
+ @ClusterTest
public void testPatternTypesWithAdminAPI(ClusterInstance cluster) {
testPatternTypes(adminArgs(cluster.bootstrapServers(),
Optional.empty()));
}
- @ClusterTests({
- @ClusterTest(types = {Type.KRAFT}, serverProperties = {
- @ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG,
value = STANDARD_AUTHORIZER)
- })
- })
+ @ClusterTest
public void
testPatternTypesWithAdminAPIAndBootstrapController(ClusterInstance cluster) {
testPatternTypes(adminArgsWithBootstrapController(cluster.bootstrapControllers(),
Optional.empty()));
}
@@ -728,14 +652,6 @@ public class AclCommandTest {
return JavaConverters.setAsJavaSet(scalaSet);
}
- private String zkConnect(ClusterInstance cluster) {
- return ((ZkClusterInvocationContext.ZkClusterInstance)
cluster).getUnderlying().zkConnect();
- }
-
- private List<String> zkArgs(ClusterInstance cluster) {
- return Arrays.asList("--authorizer-properties", "zookeeper.connect=" +
zkConnect(cluster));
- }
-
private void assertInitializeInvalidOptionsExitCodeAndMsg(List<String>
args, String expectedMsg) {
Exit.setExitProcedure((exitCode, message) -> {
assertEquals(1, exitCode);
diff --git a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
index c2941847b27..7115ca366c0 100644
--- a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
+++ b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
@@ -16,27 +16,15 @@
*/
package kafka.admin;
-import kafka.cluster.Broker;
-import kafka.cluster.EndPoint;
import kafka.test.ClusterInstance;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.Type;
import kafka.test.junit.ClusterTestExtensions;
-import kafka.test.junit.ZkClusterInvocationContext;
-import kafka.zk.AdminZkClient;
-import kafka.zk.BrokerInfo;
-import kafka.zk.KafkaZkClient;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConfigEntry;
-import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource;
-import org.apache.kafka.common.network.ListenerName;
-import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Exit;
-import org.apache.kafka.security.PasswordEncoder;
-import org.apache.kafka.server.common.MetadataVersion;
-import org.apache.kafka.server.config.ZooKeeperInternals;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -50,7 +38,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
-import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
@@ -62,13 +49,8 @@ import static java.util.Arrays.asList;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
-import static
org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG;
-import static
org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG;
-import static
org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG;
-import static
org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG;
import static
org.apache.kafka.security.PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -98,7 +80,7 @@ public class ConfigCommandIntegrationTest {
this.cluster = cluster;
}
- @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT})
+ @ClusterTest
public void testExitWithNonZeroStatusOnUpdatingUnallowedConfig() {
assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
"--entity-name", cluster.isKRaftTest() ? "0" : "1",
@@ -108,46 +90,7 @@ public class ConfigCommandIntegrationTest {
errOut -> assertTrue(errOut.contains("Cannot update these configs
dynamically: Set(security.inter.broker.protocol)"), errOut));
}
- @ClusterTest(types = {Type.ZK})
- public void testExitWithNonZeroStatusOnZkCommandAlterUserQuota() {
- assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
- "--entity-type", "users",
- "--entity-name", "admin",
- "--alter", "--add-config", "consumer_byte_rate=20000")),
- errOut -> assertTrue(errOut.contains("User configuration updates
using ZooKeeper are only supported for SCRAM credential updates."), errOut));
- }
-
- @ClusterTest(types = {Type.ZK})
- public void testExitWithNonZeroStatusOnZkCommandAlterGroup() {
- assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
- "--entity-type", "groups",
- "--entity-name", "group",
- "--alter", "--add-config",
"consumer.session.timeout.ms=50000")),
- errOut -> assertTrue(errOut.contains("Invalid entity type groups,
the entity type must be one of users, brokers with a --zookeeper argument"),
errOut));
-
- // Test for the --group alias
- assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
- "--group", "group",
- "--alter", "--add-config",
"consumer.session.timeout.ms=50000")),
- errOut -> assertTrue(errOut.contains("Invalid entity type groups,
the entity type must be one of users, brokers with a --zookeeper argument"),
errOut));
- }
-
- @ClusterTest(types = {Type.ZK})
- public void testExitWithNonZeroStatusOnZkCommandAlterClientMetrics() {
- assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
- "--entity-type", "client-metrics",
- "--entity-name", "cm",
- "--alter", "--add-config", "metrics=org.apache")),
- errOut -> assertTrue(errOut.contains("Invalid entity type
client-metrics, the entity type must be one of users, brokers with a
--zookeeper argument"), errOut));
-
- // Test for the --client-metrics alias
- assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
- "--client-metrics", "cm",
- "--alter", "--add-config",
"consumer.session.timeout.ms=50000")),
- errOut -> assertTrue(errOut.contains("Invalid entity type
client-metrics, the entity type must be one of users, brokers with a
--zookeeper argument"), errOut));
- }
-
- @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
+ @ClusterTest
public void testNullStatusOnKraftCommandAlterUserQuota() {
Stream<String> command = Stream.concat(quorumArgs(), Stream.of(
"--entity-type", "users",
@@ -158,7 +101,7 @@ public class ConfigCommandIntegrationTest {
assertTrue(StringUtils.isBlank(message), message);
}
- @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
+ @ClusterTest
public void testNullStatusOnKraftCommandAlterGroup() {
Stream<String> command = Stream.concat(quorumArgs(), Stream.of(
"--entity-type", "groups",
@@ -175,7 +118,7 @@ public class ConfigCommandIntegrationTest {
assertTrue(StringUtils.isBlank(message), message);
}
- @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
+ @ClusterTest
public void testNullStatusOnKraftCommandAlterClientMetrics() {
Stream<String> command = Stream.concat(quorumArgs(), Stream.of(
"--entity-type", "client-metrics",
@@ -192,96 +135,7 @@ public class ConfigCommandIntegrationTest {
assertTrue(StringUtils.isBlank(message), message);
}
- @ClusterTest(types = Type.ZK)
- public void testDynamicBrokerConfigUpdateUsingZooKeeper() throws Exception
{
- cluster.shutdownBroker(0);
- String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance)
cluster).getUnderlying().zkConnect();
- KafkaZkClient zkClient =
((ZkClusterInvocationContext.ZkClusterInstance)
cluster).getUnderlying().zkClient();
-
- String brokerId = "1";
- AdminZkClient adminZkClient = new AdminZkClient(zkClient,
scala.None$.empty());
- alterOpts = asList("--zookeeper", zkConnect, "--entity-type",
"brokers", "--alter");
-
- // Add config
- alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
- singletonMap("message.max.bytes", "110000"));
- alterAndVerifyConfig(zkClient, adminZkClient, Optional.empty(),
- singletonMap("message.max.bytes", "120000"));
-
- // Change config
- alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
- singletonMap("message.max.bytes", "130000"));
- alterAndVerifyConfig(zkClient, adminZkClient, Optional.empty(),
- singletonMap("message.max.bytes", "140000"));
-
- // Delete config
- deleteAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
- singleton("message.max.bytes"));
- deleteAndVerifyConfig(zkClient, adminZkClient, Optional.empty(),
- singleton("message.max.bytes"));
-
- // Listener configs: should work only with listener name
- alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
- singletonMap("listener.name.internal.ssl.keystore.location",
"/tmp/test.jks"));
- assertThrows(ConfigException.class,
- () -> alterConfigWithZk(zkClient, adminZkClient,
Optional.of(brokerId),
- singletonMap("ssl.keystore.location",
"/tmp/test.jks")));
-
- // Per-broker config configured at default cluster-level should fail
- assertThrows(ConfigException.class,
- () -> alterConfigWithZk(zkClient, adminZkClient,
Optional.empty(),
-
singletonMap("listener.name.internal.ssl.keystore.location", "/tmp/test.jks")));
- deleteAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
- singleton("listener.name.internal.ssl.keystore.location"));
-
- // Password config update without encoder secret should fail
- assertThrows(IllegalArgumentException.class,
- () -> alterConfigWithZk(zkClient, adminZkClient,
Optional.of(brokerId),
-
singletonMap("listener.name.external.ssl.keystore.password", "secret")));
-
- // Password config update with encoder secret should succeed and
encoded password must be stored in ZK
- Map<String, String> configs = new HashMap<>();
- configs.put("listener.name.external.ssl.keystore.password", "secret");
- configs.put("log.cleaner.threads", "2");
- Map<String, String> encoderConfigs = new HashMap<>(configs);
- encoderConfigs.put(PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret");
- alterConfigWithZk(zkClient, adminZkClient, Optional.of(brokerId),
encoderConfigs);
- Properties brokerConfigs = zkClient.getEntityConfigs("brokers",
brokerId);
- assertFalse(brokerConfigs.contains(PASSWORD_ENCODER_SECRET_CONFIG),
"Encoder secret stored in ZooKeeper");
- assertEquals("2", brokerConfigs.getProperty("log.cleaner.threads"));
// not encoded
- String encodedPassword =
brokerConfigs.getProperty("listener.name.external.ssl.keystore.password");
- PasswordEncoder passwordEncoder =
ConfigCommand.createPasswordEncoder(encoderConfigs);
- assertEquals("secret",
passwordEncoder.decode(encodedPassword).value());
- assertEquals(configs.size(), brokerConfigs.size());
-
- // Password config update with overrides for encoder parameters
- Map<String, String> encoderConfigs2 = generateEncodeConfig();
- alterConfigWithZk(zkClient, adminZkClient, Optional.of(brokerId),
encoderConfigs2);
- Properties brokerConfigs2 = zkClient.getEntityConfigs("brokers",
brokerId);
- String encodedPassword2 =
brokerConfigs2.getProperty("listener.name.external.ssl.keystore.password");
- assertEquals("secret2",
ConfigCommand.createPasswordEncoder(encoderConfigs)
- .decode(encodedPassword2).value());
- assertEquals("secret2",
ConfigCommand.createPasswordEncoder(encoderConfigs2)
- .decode(encodedPassword2).value());
-
- // Password config update at default cluster-level should fail
- assertThrows(ConfigException.class,
- () -> alterConfigWithZk(zkClient, adminZkClient,
Optional.empty(), encoderConfigs));
-
- // Dynamic config updates using ZK should fail if broker is running.
- registerBrokerInZk(zkClient, Integer.parseInt(brokerId));
- assertThrows(IllegalArgumentException.class,
- () -> alterConfigWithZk(zkClient, adminZkClient,
- Optional.of(brokerId),
singletonMap("message.max.bytes", "210000")));
- assertThrows(IllegalArgumentException.class,
- () -> alterConfigWithZk(zkClient, adminZkClient,
- Optional.empty(), singletonMap("message.max.bytes",
"220000")));
-
- // Dynamic config updates using ZK should for a different broker that
is not running should succeed
- alterAndVerifyConfig(zkClient, adminZkClient, Optional.of("2"),
singletonMap("message.max.bytes", "230000"));
- }
-
- @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
+ @ClusterTest
public void testDynamicBrokerConfigUpdateUsingKraft() throws Exception {
alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers());
@@ -322,7 +176,7 @@ public class ConfigCommandIntegrationTest {
}
}
- @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT})
+ @ClusterTest
public void testGroupConfigUpdateUsingKraft() throws Exception {
alterOpts = asList("--bootstrap-server", cluster.bootstrapServers(),
"--entity-type", "groups", "--alter");
verifyGroupConfigUpdate();
@@ -381,26 +235,7 @@ public class ConfigCommandIntegrationTest {
}
}
- @ClusterTest(types = {Type.ZK})
- public void testAlterReadOnlyConfigInZookeeperThenShouldFail() {
- cluster.shutdownBroker(0);
- String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance)
cluster).getUnderlying().zkConnect();
- KafkaZkClient zkClient =
((ZkClusterInvocationContext.ZkClusterInstance)
cluster).getUnderlying().zkClient();
- AdminZkClient adminZkClient = new AdminZkClient(zkClient,
scala.None$.empty());
- alterOpts = generateDefaultAlterOpts(zkConnect);
-
- assertThrows(ConfigException.class,
- () -> alterConfigWithZk(zkClient, adminZkClient,
Optional.of(defaultBrokerId),
- singletonMap("auto.create.topics.enable", "false")));
- assertThrows(ConfigException.class,
- () -> alterConfigWithZk(zkClient, adminZkClient,
Optional.of(defaultBrokerId),
- singletonMap("auto.leader.rebalance.enable",
"false")));
- assertThrows(ConfigException.class,
- () -> alterConfigWithZk(zkClient, adminZkClient,
Optional.of(defaultBrokerId),
- singletonMap("broker.id", "1")));
- }
-
- @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
+ @ClusterTest
public void testAlterReadOnlyConfigInKRaftThenShouldFail() {
alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers());
@@ -417,23 +252,7 @@ public class ConfigCommandIntegrationTest {
}
}
- @ClusterTest(types = {Type.ZK})
- public void testUpdateClusterWideConfigInZookeeperThenShouldSuccessful() {
- cluster.shutdownBroker(0);
- String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance)
cluster).getUnderlying().zkConnect();
- KafkaZkClient zkClient =
((ZkClusterInvocationContext.ZkClusterInstance)
cluster).getUnderlying().zkClient();
- AdminZkClient adminZkClient = new AdminZkClient(zkClient,
scala.None$.empty());
- alterOpts = generateDefaultAlterOpts(zkConnect);
-
- Map<String, String> configs = new HashMap<>();
- configs.put("log.flush.interval.messages", "100");
- configs.put("log.retention.bytes", "20");
- configs.put("log.retention.ms", "2");
-
- alterAndVerifyConfig(zkClient, adminZkClient,
Optional.of(defaultBrokerId), configs);
- }
-
- @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
+ @ClusterTest
public void testUpdateClusterWideConfigInKRaftThenShouldSuccessful()
throws Exception {
alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers());
@@ -447,34 +266,7 @@ public class ConfigCommandIntegrationTest {
}
}
- @ClusterTest(types = {Type.ZK})
- public void
testUpdatePerBrokerConfigWithListenerNameInZookeeperThenShouldSuccessful() {
- cluster.shutdownBroker(0);
- String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance)
cluster).getUnderlying().zkConnect();
- KafkaZkClient zkClient =
((ZkClusterInvocationContext.ZkClusterInstance)
cluster).getUnderlying().zkClient();
- AdminZkClient adminZkClient = new AdminZkClient(zkClient,
scala.None$.empty());
- alterOpts = generateDefaultAlterOpts(zkConnect);
-
- String listenerName = "listener.name.internal.";
- String sslTruststoreType = listenerName + "ssl.truststore.type";
- String sslTruststoreLocation = listenerName +
"ssl.truststore.location";
- String sslTruststorePassword = listenerName +
"ssl.truststore.password";
-
- Map<String, String> configs = new HashMap<>();
- configs.put(sslTruststoreType, "PKCS12");
- configs.put(sslTruststoreLocation, "/temp/test.jks");
- configs.put("password.encoder.secret", "encoder-secret");
- configs.put(sslTruststorePassword, "password");
-
- alterConfigWithZk(zkClient, adminZkClient,
Optional.of(defaultBrokerId), configs);
-
- Properties properties = zkClient.getEntityConfigs("brokers",
defaultBrokerId);
- assertTrue(properties.containsKey(sslTruststorePassword));
- assertEquals(configs.get(sslTruststoreType),
properties.getProperty(sslTruststoreType));
- assertEquals(configs.get(sslTruststoreLocation),
properties.getProperty(sslTruststoreLocation));
- }
-
- @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
+ @ClusterTest
public void
testUpdatePerBrokerConfigWithListenerNameInKRaftThenShouldSuccessful() throws
Exception {
alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers());
String listenerName = "listener.name.internal.";
@@ -492,26 +284,7 @@ public class ConfigCommandIntegrationTest {
}
}
- @ClusterTest(types = {Type.ZK})
- public void testUpdatePerBrokerConfigInZookeeperThenShouldFail() {
- cluster.shutdownBroker(0);
- String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance)
cluster).getUnderlying().zkConnect();
- KafkaZkClient zkClient =
((ZkClusterInvocationContext.ZkClusterInstance)
cluster).getUnderlying().zkClient();
- AdminZkClient adminZkClient = new AdminZkClient(zkClient,
scala.None$.empty());
- alterOpts = generateDefaultAlterOpts(zkConnect);
-
- assertThrows(ConfigException.class, () ->
- alterAndVerifyConfig(zkClient, adminZkClient,
Optional.of(defaultBrokerId),
- singletonMap("ssl.truststore.type", "PKCS12")));
- assertThrows(ConfigException.class, () ->
- alterAndVerifyConfig(zkClient, adminZkClient,
Optional.of(defaultBrokerId),
- singletonMap("ssl.truststore.location",
"/temp/test.jks")));
- assertThrows(ConfigException.class, () ->
- alterAndVerifyConfig(zkClient, adminZkClient,
Optional.of(defaultBrokerId),
- singletonMap("ssl.truststore.password", "password")));
- }
-
- @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
+ @ClusterTest
public void testUpdatePerBrokerConfigInKRaftThenShouldFail() {
alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers());
@@ -543,29 +316,7 @@ public class ConfigCommandIntegrationTest {
}
private Stream<String> quorumArgs() {
- return cluster.isKRaftTest()
- ? Stream.of("--bootstrap-server", cluster.bootstrapServers())
- : Stream.of("--zookeeper",
((ZkClusterInvocationContext.ZkClusterInstance)
cluster).getUnderlying().zkConnect());
- }
-
- private void verifyConfig(KafkaZkClient zkClient, Optional<String>
brokerId, Map<String, String> config) {
- Properties entityConfigs = zkClient.getEntityConfigs("brokers",
- brokerId.orElse(ZooKeeperInternals.DEFAULT_STRING));
- assertEquals(config, entityConfigs);
- }
-
- private void alterAndVerifyConfig(KafkaZkClient zkClient, AdminZkClient
adminZkClient,
- Optional<String> brokerId, Map<String,
String> configs) {
- alterConfigWithZk(zkClient, adminZkClient, brokerId, configs);
- verifyConfig(zkClient, brokerId, configs);
- }
-
- private void alterConfigWithZk(KafkaZkClient zkClient, AdminZkClient
adminZkClient,
- Optional<String> brokerId, Map<String,
String> config) {
- String configStr = transferConfigMapToString(config);
- ConfigCommand.ConfigCommandOptions addOpts =
- new ConfigCommand.ConfigCommandOptions(toArray(alterOpts,
entityOp(brokerId), asList("--add-config", configStr)));
- ConfigCommand.alterConfigWithZk(zkClient, addOpts, adminZkClient);
+ return Stream.of("--bootstrap-server", cluster.bootstrapServers());
}
private List<String> entityOp(Optional<String> brokerId) {
@@ -573,36 +324,6 @@ public class ConfigCommandIntegrationTest {
.orElse(singletonList("--entity-default"));
}
- private void deleteAndVerifyConfig(KafkaZkClient zkClient, AdminZkClient
adminZkClient,
- Optional<String> brokerId, Set<String>
configNames) {
- ConfigCommand.ConfigCommandOptions deleteOpts =
- new ConfigCommand.ConfigCommandOptions(toArray(alterOpts,
entityOp(brokerId),
- asList("--delete-config", String.join(",",
configNames))));
- ConfigCommand.alterConfigWithZk(zkClient, deleteOpts, adminZkClient);
- verifyConfig(zkClient, brokerId, Collections.emptyMap());
- }
-
- private Map<String, String> generateEncodeConfig() {
- Map<String, String> map = new HashMap<>();
- map.put(PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret");
- map.put(PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG,
"DES/CBC/PKCS5Padding");
- map.put(PASSWORD_ENCODER_ITERATIONS_CONFIG, "1024");
- map.put(PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG,
"PBKDF2WithHmacSHA1");
- map.put(PASSWORD_ENCODER_KEY_LENGTH_CONFIG, "64");
- map.put("listener.name.external.ssl.keystore.password", "secret2");
- return map;
- }
-
- private void registerBrokerInZk(KafkaZkClient zkClient, int id) {
- zkClient.createTopLevelPaths();
- SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
- EndPoint endpoint = new EndPoint("localhost", 9092,
- ListenerName.forSecurityProtocol(securityProtocol),
securityProtocol);
- BrokerInfo brokerInfo = BrokerInfo.apply(Broker.apply(id, endpoint,
- scala.None$.empty()), MetadataVersion.latestTesting(), 9192);
- zkClient.registerBroker(brokerInfo);
- }
-
private List<String> generateDefaultAlterOpts(String bootstrapServers) {
return asList("--bootstrap-server", bootstrapServers,
"--entity-type", "brokers", "--alter");
diff --git a/core/src/test/java/kafka/test/ClusterConfig.java
b/core/src/test/java/kafka/test/ClusterConfig.java
index 4e9e469705e..f1b120803ef 100644
--- a/core/src/test/java/kafka/test/ClusterConfig.java
+++ b/core/src/test/java/kafka/test/ClusterConfig.java
@@ -69,7 +69,7 @@ public class ClusterConfig {
Map<String, String> consumerProperties, Map<String, String>
adminClientProperties, Map<String, String> saslServerProperties,
Map<String, String> saslClientProperties, Map<Integer,
Map<String, String>> perServerProperties, List<String> tags,
Map<Features, Short> features) {
- // do fail fast. the following values are invalid for both zk and
kraft modes.
+ // do fail fast. the following values are invalid for kraft modes.
if (brokers < 0) throw new IllegalArgumentException("Number of brokers
must be greater or equal to zero.");
if (controllers < 0) throw new IllegalArgumentException("Number of
controller must be greater or equal to zero.");
if (disksPerBroker <= 0) throw new IllegalArgumentException("Number of
disks must be greater than zero.");
@@ -176,7 +176,7 @@ public class ClusterConfig {
public static Builder defaultBuilder() {
return new Builder()
- .setTypes(Stream.of(Type.ZK, Type.KRAFT,
Type.CO_KRAFT).collect(Collectors.toSet()))
+ .setTypes(Stream.of(Type.KRAFT,
Type.CO_KRAFT).collect(Collectors.toSet()))
.setBrokers(1)
.setControllers(1)
.setDisksPerBroker(1)
diff --git a/core/src/test/java/kafka/test/ClusterInstance.java
b/core/src/test/java/kafka/test/ClusterInstance.java
index fe32c76001a..3909da754af 100644
--- a/core/src/test/java/kafka/test/ClusterInstance.java
+++ b/core/src/test/java/kafka/test/ClusterInstance.java
@@ -74,7 +74,6 @@ public interface ClusterInstance {
/**
* Return the set of all controller IDs configured for this test. For
kraft, this
* will return only the nodes which have the "controller" role enabled in
`process.roles`.
- * For zookeeper, this will return all broker IDs since they are all
eligible controllers.
*/
Set<Integer> controllerIds();
@@ -92,19 +91,12 @@ public interface ClusterInstance {
ListenerName clientListener();
/**
- * The listener for the kraft cluster controller configured by
controller.listener.names. In ZK-based clusters, return Optional.empty
+ * The listener for the kraft cluster controller configured by
controller.listener.names.
*/
default Optional<ListenerName> controllerListenerName() {
return Optional.empty();
}
- /**
- * The listener for the zk controller configured by
control.plane.listener.name. In Raft-based clusters, return Optional.empty
- */
- default Optional<ListenerName> controlPlaneListenerName() {
- return Optional.empty();
- }
-
/**
* The broker connect string which can be used by clients for bootstrapping
*/
@@ -116,8 +108,7 @@ public interface ClusterInstance {
String bootstrapControllers();
/**
- * A collection of all brokers in the cluster. In ZK-based clusters this
will also include the broker which is
- * acting as the controller (since ZK controllers serve both broker and
controller roles).
+ * A collection of all brokers in the cluster.
*/
default Collection<SocketServer> brokerSocketServers() {
return brokers().values().stream()
@@ -126,8 +117,7 @@ public interface ClusterInstance {
}
/**
- * A collection of all controllers in the cluster. For ZK-based clusters,
this will return the broker which is also
- * currently the active controller. For Raft-based clusters, this will
return all controller servers.
+ * A collection of all controllers in the cluster.
*/
Collection<SocketServer> controllerSocketServers();
diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
index 74fcf3fc0e5..4995774e900 100644
--- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
+++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
@@ -53,7 +53,7 @@ import static
org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG;
-@ClusterTestDefaults(types = {Type.ZK}, serverProperties = {
+@ClusterTestDefaults(types = {Type.KRAFT}, serverProperties = {
@ClusterConfigProperty(key = "default.key", value = "default.value"),
@ClusterConfigProperty(id = 0, key = "queued.max.requests", value = "100"),
}) // Set defaults for a few params in @ClusterTest(s)
@@ -71,7 +71,7 @@ public class ClusterTestExtensionsTest {
Map<String, String> serverProperties = new HashMap<>();
serverProperties.put("foo", "bar");
return Collections.singletonList(ClusterConfig.defaultBuilder()
- .setTypes(Collections.singleton(Type.ZK))
+ .setTypes(Collections.singleton(Type.KRAFT))
.setServerProperties(serverProperties)
.setTags(Collections.singletonList("Generated Test"))
.build());
@@ -81,29 +81,21 @@ public class ClusterTestExtensionsTest {
@ClusterTest
public void testClusterTest(ClusterInstance clusterInstance) {
Assertions.assertSame(this.clusterInstance, clusterInstance, "Injected
objects should be the same");
- Assertions.assertEquals(Type.ZK, clusterInstance.type()); // From the
class level default
+ Assertions.assertEquals(Type.KRAFT, clusterInstance.type()); // From
the class level default
Assertions.assertEquals("default.value",
clusterInstance.config().serverProperties().get("default.key"));
}
// generate1 is a template method which generates any number of cluster
configs
@ClusterTemplate("generate1")
public void testClusterTemplate() {
- Assertions.assertEquals(Type.ZK, clusterInstance.type(),
- "generate1 provided a Zk cluster, so we should see that here");
+ Assertions.assertEquals(Type.KRAFT, clusterInstance.type(),
+ "generate1 provided a KRAFT cluster, so we should see that here");
Assertions.assertEquals("bar",
clusterInstance.config().serverProperties().get("foo"));
Assertions.assertEquals(Collections.singletonList("Generated Test"),
clusterInstance.config().tags());
}
// Multiple @ClusterTest can be used with @ClusterTests
@ClusterTests({
- @ClusterTest(types = {Type.ZK}, serverProperties = {
- @ClusterConfigProperty(key = "foo", value = "bar"),
- @ClusterConfigProperty(key = "spam", value = "eggs"),
- @ClusterConfigProperty(id = 86400, key = "baz", value = "qux"), //
this one will be ignored as there is no broker id is 86400
- @ClusterConfigProperty(key = "spam", value = "eggs")
- }, tags = {
- "default.display.key1", "default.display.key2"
- }),
@ClusterTest(types = {Type.KRAFT}, serverProperties = {
@ClusterConfigProperty(key = "foo", value = "baz"),
@ClusterConfigProperty(key = "spam", value = "eggz"),
@@ -127,49 +119,34 @@ public class ClusterTestExtensionsTest {
})
})
public void testClusterTests() throws ExecutionException,
InterruptedException {
- if (!clusterInstance.isKRaftTest()) {
- Assertions.assertEquals("bar",
clusterInstance.config().serverProperties().get("foo"));
- Assertions.assertEquals("eggs",
clusterInstance.config().serverProperties().get("spam"));
- Assertions.assertEquals("default.value",
clusterInstance.config().serverProperties().get("default.key"));
- Assertions.assertEquals(Arrays.asList("default.display.key1",
"default.display.key2"), clusterInstance.config().tags());
+ Assertions.assertEquals("baz",
clusterInstance.config().serverProperties().get("foo"));
+ Assertions.assertEquals("eggs",
clusterInstance.config().serverProperties().get("spam"));
+ Assertions.assertEquals("overwrite.value",
clusterInstance.config().serverProperties().get("default.key"));
+ Assertions.assertEquals(Arrays.asList("default.display.key1",
"default.display.key2"), clusterInstance.config().tags());
- // assert broker server 0 contains property queued.max.requests
100 from ClusterTestDefaults
- try (Admin admin = clusterInstance.createAdminClient()) {
- ConfigResource configResource = new
ConfigResource(ConfigResource.Type.BROKER, "0");
- Map<ConfigResource, Config> configs =
admin.describeConfigs(Collections.singletonList(configResource)).all().get();
- Assertions.assertEquals(1, configs.size());
- Assertions.assertEquals("100",
configs.get(configResource).get("queued.max.requests").value());
- }
- } else {
- Assertions.assertEquals("baz",
clusterInstance.config().serverProperties().get("foo"));
- Assertions.assertEquals("eggs",
clusterInstance.config().serverProperties().get("spam"));
- Assertions.assertEquals("overwrite.value",
clusterInstance.config().serverProperties().get("default.key"));
- Assertions.assertEquals(Arrays.asList("default.display.key1",
"default.display.key2"), clusterInstance.config().tags());
-
- // assert broker server 0 contains property queued.max.requests
200 from ClusterTest which overrides
- // the value 100 in server property in ClusterTestDefaults
- try (Admin admin = clusterInstance.createAdminClient()) {
- ConfigResource configResource = new
ConfigResource(ConfigResource.Type.BROKER, "0");
+ // assert broker server 0 contains property queued.max.requests 200
from ClusterTest which overrides
+ // the value 100 in server property in ClusterTestDefaults
+ try (Admin admin = clusterInstance.createAdminClient()) {
+ ConfigResource configResource = new
ConfigResource(ConfigResource.Type.BROKER, "0");
+ Map<ConfigResource, Config> configs =
admin.describeConfigs(Collections.singletonList(configResource)).all().get();
+ Assertions.assertEquals(1, configs.size());
+ Assertions.assertEquals("200",
configs.get(configResource).get("queued.max.requests").value());
+ }
+ // In KRaft cluster non-combined mode, assert the controller server
3000 contains the property queued.max.requests 300
+ if (clusterInstance.type() == Type.KRAFT) {
+ try (Admin admin = Admin.create(Collections.singletonMap(
+ AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG,
clusterInstance.bootstrapControllers()))) {
+ ConfigResource configResource = new
ConfigResource(ConfigResource.Type.BROKER, "3000");
Map<ConfigResource, Config> configs =
admin.describeConfigs(Collections.singletonList(configResource)).all().get();
Assertions.assertEquals(1, configs.size());
- Assertions.assertEquals("200",
configs.get(configResource).get("queued.max.requests").value());
- }
- // In KRaft cluster non-combined mode, assert the controller
server 3000 contains the property queued.max.requests 300
- if (clusterInstance.type() == Type.KRAFT) {
- try (Admin admin = Admin.create(Collections.singletonMap(
- AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG,
clusterInstance.bootstrapControllers()))) {
- ConfigResource configResource = new
ConfigResource(ConfigResource.Type.BROKER, "3000");
- Map<ConfigResource, Config> configs =
admin.describeConfigs(Collections.singletonList(configResource)).all().get();
- Assertions.assertEquals(1, configs.size());
- Assertions.assertEquals("300",
configs.get(configResource).get("queued.max.requests").value());
- }
+ Assertions.assertEquals("300",
configs.get(configResource).get("queued.max.requests").value());
}
}
}
@ClusterTests({
- @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}),
- @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT},
disksPerBroker = 2),
+ @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}),
+ @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, disksPerBroker = 2),
})
public void testClusterTestWithDisksPerBroker() throws ExecutionException,
InterruptedException {
Admin admin = clusterInstance.createAdminClient();
@@ -201,10 +178,10 @@ public class ClusterTestExtensionsTest {
}
@ClusterTests({
- @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT},
serverProperties = {
+ @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key =
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic"),
}),
- @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT},
serverProperties = {
+ @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
@ClusterConfigProperty(key = NEW_GROUP_COORDINATOR_ENABLE_CONFIG,
value = "false"),
})
})
@@ -214,7 +191,7 @@ public class ClusterTestExtensionsTest {
- @ClusterTest(types = {Type.ZK, Type.CO_KRAFT, Type.KRAFT}, brokers = 3)
+ @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}, brokers = 3)
public void testCreateTopic(ClusterInstance clusterInstance) throws
Exception {
String topicName = "test";
int numPartition = 3;
@@ -230,7 +207,7 @@ public class ClusterTestExtensionsTest {
}
}
- @ClusterTest(types = {Type.ZK, Type.CO_KRAFT, Type.KRAFT}, brokers = 4)
+ @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}, brokers = 4)
public void testShutdownAndSyncMetadata(ClusterInstance clusterInstance)
throws Exception {
String topicName = "test";
int numPartition = 3;
@@ -240,7 +217,7 @@ public class ClusterTestExtensionsTest {
clusterInstance.waitForTopic(topicName, numPartition);
}
- @ClusterTest(types = {Type.ZK, Type.CO_KRAFT, Type.KRAFT}, brokers = 4)
+ @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}, brokers = 4)
public void testClusterAliveBrokers(ClusterInstance clusterInstance)
throws Exception {
clusterInstance.waitForReadyBrokers();
@@ -256,7 +233,7 @@ public class ClusterTestExtensionsTest {
}
- @ClusterTest(types = {Type.ZK, Type.CO_KRAFT, Type.KRAFT}, brokers = 4)
+ @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}, brokers = 4)
public void testVerifyTopicDeletion(ClusterInstance clusterInstance)
throws Exception {
try (Admin admin = clusterInstance.createAdminClient()) {
String testTopic = "testTopic";
diff --git
a/core/src/test/java/kafka/test/annotation/ClusterConfigProperty.java
b/core/src/test/java/kafka/test/annotation/ClusterConfigProperty.java
index fa98c00a1ff..e1ec38b094c 100644
--- a/core/src/test/java/kafka/test/annotation/ClusterConfigProperty.java
+++ b/core/src/test/java/kafka/test/annotation/ClusterConfigProperty.java
@@ -32,9 +32,6 @@ public @interface ClusterConfigProperty {
* all controller/broker servers. Note that the "controller" here refers
to the KRaft quorum controller.
* The id can vary depending on the different {@link
kafka.test.annotation.Type}.
* <ul>
- * <li> Under {@link kafka.test.annotation.Type#ZK}, the broker id starts
from
- * {@link kafka.testkit.TestKitNodes#BROKER_ID_OFFSET 0} and increases by
1
- * with each additional broker, and there is no controller server under
this mode. </li>
* <li> Under {@link kafka.test.annotation.Type#KRAFT}, the broker id
starts from
* {@link kafka.testkit.TestKitNodes#BROKER_ID_OFFSET 0}, the controller
id
* starts from {@link kafka.testkit.TestKitNodes#CONTROLLER_ID_OFFSET
3000}
diff --git a/core/src/test/java/kafka/test/annotation/ClusterTestDefaults.java
b/core/src/test/java/kafka/test/annotation/ClusterTestDefaults.java
index 5e19c6f0e33..b06ef99aacb 100644
--- a/core/src/test/java/kafka/test/annotation/ClusterTestDefaults.java
+++ b/core/src/test/java/kafka/test/annotation/ClusterTestDefaults.java
@@ -35,7 +35,7 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME;
@Target({TYPE})
@Retention(RUNTIME)
public @interface ClusterTestDefaults {
- Type[] types() default {Type.ZK, Type.KRAFT, Type.CO_KRAFT};
+ Type[] types() default {Type.KRAFT, Type.CO_KRAFT};
int brokers() default 1;
int controllers() default 1;
int disksPerBroker() default 1;
diff --git a/core/src/test/java/kafka/test/annotation/Type.java
b/core/src/test/java/kafka/test/annotation/Type.java
index 7e03047e576..59b0d0efefb 100644
--- a/core/src/test/java/kafka/test/annotation/Type.java
+++ b/core/src/test/java/kafka/test/annotation/Type.java
@@ -19,7 +19,6 @@ package kafka.test.annotation;
import kafka.test.ClusterConfig;
import kafka.test.junit.RaftClusterInvocationContext;
-import kafka.test.junit.ZkClusterInvocationContext;
import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
@@ -39,12 +38,6 @@ public enum Type {
public TestTemplateInvocationContext invocationContexts(String
baseDisplayName, ClusterConfig config) {
return new RaftClusterInvocationContext(baseDisplayName, config,
true);
}
- },
- ZK {
- @Override
- public TestTemplateInvocationContext invocationContexts(String
baseDisplayName, ClusterConfig config) {
- return new ZkClusterInvocationContext(baseDisplayName, config);
- }
};
public abstract TestTemplateInvocationContext invocationContexts(String
baseDisplayName, ClusterConfig config);
diff --git a/core/src/test/java/kafka/test/junit/README.md
b/core/src/test/java/kafka/test/junit/README.md
index 1a2432fabd2..9f3c1b6efe3 100644
--- a/core/src/test/java/kafka/test/junit/README.md
+++ b/core/src/test/java/kafka/test/junit/README.md
@@ -14,7 +14,7 @@ This annotation has fields for a set of cluster types and
number of brokers, as
Arbitrary server properties can also be provided in the annotation:
```java
-@ClusterTest(types = {Type.Zk}, securityProtocol = "PLAINTEXT", properties = {
+@ClusterTest(types = {Type.KRAFT}, securityProtocol = "PLAINTEXT", properties
= {
@ClusterProperty(key = "inter.broker.protocol.version", value = "2.7-IV2"),
@ClusterProperty(key = "socket.send.buffer.bytes", value = "10240"),
})
diff --git
a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
deleted file mode 100644
index 67955fd5d18..00000000000
--- a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
+++ /dev/null
@@ -1,372 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.test.junit;
-
-import kafka.api.IntegrationTestHarness;
-import kafka.network.SocketServer;
-import kafka.server.ControllerServer;
-import kafka.server.KafkaBroker;
-import kafka.server.KafkaServer;
-import kafka.test.ClusterConfig;
-import kafka.test.ClusterInstance;
-import kafka.test.annotation.Type;
-import kafka.utils.EmptyTestInfo;
-import kafka.utils.TestUtils;
-
-import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.AdminClientConfig;
-import org.apache.kafka.common.network.ListenerName;
-import org.apache.kafka.common.security.auth.SecurityProtocol;
-
-import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
-import org.junit.jupiter.api.extension.BeforeTestExecutionCallback;
-import org.junit.jupiter.api.extension.Extension;
-import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
-
-import java.io.File;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-
-import scala.Option;
-import scala.collection.JavaConverters;
-import scala.collection.Seq;
-import scala.compat.java8.OptionConverters;
-
-import static java.util.Objects.requireNonNull;
-import static
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG;
-
-/**
- * Wraps a {@link IntegrationTestHarness} inside lifecycle methods for a test
invocation. Each instance of this
- * class is provided with a configuration for the cluster.
- *
- * This context also provides parameter resolvers for:
- *
- * <ul>
- * <li>ClusterConfig (the same instance passed to the constructor)</li>
- * <li>ClusterInstance (includes methods to expose underlying
SocketServer-s)</li>
- * </ul>
- */
-public class ZkClusterInvocationContext implements
TestTemplateInvocationContext {
-
- private final String baseDisplayName;
- private final ClusterConfig clusterConfig;
- private final AtomicReference<ClusterConfigurableIntegrationHarness>
clusterReference;
-
- public ZkClusterInvocationContext(String baseDisplayName, ClusterConfig
clusterConfig) {
- this.baseDisplayName = baseDisplayName;
- this.clusterConfig = clusterConfig;
- this.clusterReference = new AtomicReference<>();
- }
-
- @Override
- public String getDisplayName(int invocationIndex) {
- return String.format("%s [%d] Type=ZK, %s", baseDisplayName,
invocationIndex, String.join(",", clusterConfig.displayTags()));
- }
-
- @Override
- public List<Extension> getAdditionalExtensions() {
- if (clusterConfig.numControllers() != 1) {
- throw new IllegalArgumentException("For ZK clusters, please
specify exactly 1 controller.");
- }
- ClusterInstance clusterShim = new ZkClusterInstance(clusterConfig,
clusterReference);
- return Arrays.asList(
- (BeforeTestExecutionCallback) context -> {
- // We have to wait to actually create the underlying cluster
until after our @BeforeEach methods
- // have run. This allows tests to set up external dependencies
like ZK, MiniKDC, etc.
- // However, since we cannot create this instance until we are
inside the test invocation, we have
- // to use a container class (AtomicReference) to provide this
cluster object to the test itself
- clusterReference.set(new
ClusterConfigurableIntegrationHarness(clusterConfig));
- if (clusterConfig.isAutoStart()) {
- clusterShim.start();
- }
- },
- (AfterTestExecutionCallback) context -> clusterShim.stop(),
- new ClusterInstanceParameterResolver(clusterShim)
- );
- }
-
- public static class ZkClusterInstance implements ClusterInstance {
-
- final AtomicReference<ClusterConfigurableIntegrationHarness>
clusterReference;
- final ClusterConfig config;
- final AtomicBoolean started = new AtomicBoolean(false);
- final AtomicBoolean stopped = new AtomicBoolean(false);
-
- ZkClusterInstance(ClusterConfig config,
AtomicReference<ClusterConfigurableIntegrationHarness> clusterReference) {
- this.config = config;
- this.clusterReference = clusterReference;
- }
-
- @Override
- public String bootstrapServers() {
- return
TestUtils.bootstrapServers(clusterReference.get().servers(),
clusterReference.get().listenerName());
- }
-
- @Override
- public String bootstrapControllers() {
- throw new RuntimeException("Cannot use --bootstrap-controller with
ZK-based clusters.");
- }
-
- @Override
- public ListenerName clientListener() {
- return clusterReference.get().listenerName();
- }
-
-
- @Override
- public Optional<ListenerName> controlPlaneListenerName() {
- return
OptionConverters.toJava(clusterReference.get().servers().head().config().controlPlaneListenerName());
- }
-
- @Override
- public Collection<SocketServer> controllerSocketServers() {
- return brokers().values().stream()
- .filter(s -> ((KafkaServer) s).kafkaController().isActive())
- .map(KafkaBroker::socketServer)
- .collect(Collectors.toList());
- }
-
- @Override
- public String clusterId() {
- return
brokers().values().stream().findFirst().map(KafkaBroker::clusterId).orElseThrow(
- () -> new RuntimeException("No broker instances found"));
- }
-
- @Override
- public Type type() {
- return Type.ZK;
- }
-
- @Override
- public ClusterConfig config() {
- return config;
- }
-
- @Override
- public Set<Integer> controllerIds() {
- return brokerIds();
- }
-
- @Override
- public IntegrationTestHarness getUnderlying() {
- return clusterReference.get();
- }
-
- @Override
- public Admin createAdminClient(Properties configOverrides) {
- return clusterReference.get().createAdminClient(clientListener(),
configOverrides);
- }
-
- @Override
- public void start() {
- if (started.compareAndSet(false, true)) {
- clusterReference.get().setUp(new EmptyTestInfo());
- }
- }
-
- @Override
- public void stop() {
- if (stopped.compareAndSet(false, true)) {
- clusterReference.get().tearDown();
- }
- }
-
- @Override
- public void shutdownBroker(int brokerId) {
- findBrokerOrThrow(brokerId).shutdown();
- }
-
- @Override
- public void startBroker(int brokerId) {
- findBrokerOrThrow(brokerId).startup();
- }
-
- @Override
- public void waitTopicDeletion(String topic) throws
InterruptedException {
- org.apache.kafka.test.TestUtils.waitForCondition(
- () ->
!clusterReference.get().zkClient().isTopicMarkedForDeletion(topic),
- String.format("Admin path /admin/delete_topics/%s path not
deleted even after a replica is restarted", topic)
- );
-
- org.apache.kafka.test.TestUtils.waitForCondition(
- () ->
!clusterReference.get().zkClient().topicExists(topic),
- String.format("Topic path /brokers/topics/%s not deleted
after /admin/delete_topics/%s path is deleted", topic, topic)
- );
-
- ClusterInstance.super.waitTopicDeletion(topic);
- }
-
-
- /**
- * Restart brokers with given cluster config.
- *
- * @param clusterConfig clusterConfig is optional. If left
Optional.empty(), brokers will restart without
- * reconfiguring configurations. Otherwise, the
restart will reconfigure configurations
- * according to the provided cluster config.
- */
- public void rollingBrokerRestart(Optional<ClusterConfig>
clusterConfig) {
- requireNonNull(clusterConfig);
- if (!started.get()) {
- throw new IllegalStateException("Tried to restart brokers but
the cluster has not been started!");
- }
- for (int i = 0; i < clusterReference.get().brokerCount(); i++) {
- clusterReference.get().killBroker(i);
- }
- clusterConfig.ifPresent(config ->
clusterReference.get().setClusterConfig(config));
- clusterReference.get().restartDeadBrokers(true);
-
clusterReference.get().adminClientConfig().put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers());
- }
-
- @Override
- public void waitForReadyBrokers() throws InterruptedException {
- org.apache.kafka.test.TestUtils.waitForCondition(() -> {
- int numRegisteredBrokers =
clusterReference.get().zkClient().getAllBrokersInCluster().size();
- return numRegisteredBrokers == config.numBrokers();
- }, "Timed out while waiting for brokers to become ready");
- }
-
- private KafkaServer findBrokerOrThrow(int brokerId) {
- return brokers().values().stream()
- .filter(server -> server.config().brokerId() == brokerId)
- .map(s -> (KafkaServer) s)
- .findFirst()
- .orElseThrow(() -> new IllegalArgumentException("Unknown
brokerId " + brokerId));
- }
-
- @Override
- public Map<Integer, ControllerServer> controllers() {
- return Collections.emptyMap();
- }
-
- @Override
- public Map<Integer, KafkaBroker> brokers() {
- return
JavaConverters.asJavaCollection(clusterReference.get().servers())
- .stream().collect(Collectors.toMap(s ->
s.config().brokerId(), s -> s));
- }
- }
-
- // This is what tests normally extend from to start a cluster, here we
extend it and
- // configure the cluster using values from ClusterConfig.
- private static class ClusterConfigurableIntegrationHarness extends
IntegrationTestHarness {
- private ClusterConfig clusterConfig;
-
- private ClusterConfigurableIntegrationHarness(ClusterConfig
clusterConfig) {
- this.clusterConfig = Objects.requireNonNull(clusterConfig);
- }
-
- public void setClusterConfig(ClusterConfig clusterConfig) {
- this.clusterConfig = Objects.requireNonNull(clusterConfig);
- }
-
- @Override
- public void modifyConfigs(Seq<Properties> props) {
- super.modifyConfigs(props);
- for (int i = 0; i < props.length(); i++) {
-
props.apply(i).putAll(clusterConfig.perServerOverrideProperties().getOrDefault(i,
Collections.emptyMap()));
- }
- }
-
- @Override
- public Properties serverConfig() {
- Properties props = new Properties();
- props.putAll(clusterConfig.serverProperties());
- props.put(INTER_BROKER_PROTOCOL_VERSION_CONFIG,
clusterConfig.metadataVersion().version());
- return props;
- }
-
- @Override
- public Properties adminClientConfig() {
- Properties props = new Properties();
- props.putAll(clusterConfig.adminClientProperties());
- return props;
- }
-
- @Override
- public Properties consumerConfig() {
- Properties props = new Properties();
- props.putAll(clusterConfig.consumerProperties());
- return props;
- }
-
- @Override
- public Properties producerConfig() {
- Properties props = new Properties();
- props.putAll(clusterConfig.producerProperties());
- return props;
- }
-
- @Override
- public SecurityProtocol securityProtocol() {
- return clusterConfig.securityProtocol();
- }
-
- @Override
- public ListenerName listenerName() {
- return clusterConfig.listenerName().map(ListenerName::normalised)
- .orElseGet(() ->
ListenerName.forSecurityProtocol(securityProtocol()));
- }
-
- @Override
- public Option<Properties> serverSaslProperties() {
- if (clusterConfig.saslServerProperties().isEmpty()) {
- return Option.empty();
- } else {
- Properties props = new Properties();
- props.putAll(clusterConfig.saslServerProperties());
- return Option.apply(props);
- }
- }
-
- @Override
- public Option<Properties> clientSaslProperties() {
- if (clusterConfig.saslClientProperties().isEmpty()) {
- return Option.empty();
- } else {
- Properties props = new Properties();
- props.putAll(clusterConfig.saslClientProperties());
- return Option.apply(props);
- }
- }
-
- @Override
- public int brokerCount() {
- // Controllers are also brokers in zk mode, so just use broker
count
- return clusterConfig.numBrokers();
- }
-
- @Override
- public int logDirCount() {
- return clusterConfig.numDisksPerBroker();
- }
-
- @Override
- public Option<File> trustStoreFile() {
- return OptionConverters.toScala(clusterConfig.trustStoreFile());
- }
- }
-}
diff --git
a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
index a666a740a14..84f6f620c8e 100644
---
a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
@@ -39,7 +39,7 @@ import java.util.concurrent.TimeUnit
class GroupCoordinatorIntegrationTest(cluster: ClusterInstance) {
@ClusterTest(
- types = Array(Type.KRAFT, Type.ZK),
+ types = Array(Type.KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
diff --git
a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
index 639c095c9c9..5debad75664 100644
---
a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
@@ -19,41 +19,22 @@ package kafka.coordinator.transaction
import kafka.network.SocketServer
import kafka.server.IntegrationTestUtils
-import kafka.test.{ClusterConfig, ClusterInstance}
-import kafka.test.annotation.{AutoStart, ClusterConfigProperty,
ClusterTemplate, ClusterTest, ClusterTestDefaults, ClusterTests, Type}
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest,
ClusterTestDefaults, ClusterTests, Type}
import kafka.test.junit.ClusterTestExtensions
import org.apache.kafka.common.message.InitProducerIdRequestData
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{InitProducerIdRequest,
InitProducerIdResponse}
-import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.extension.ExtendWith
-import org.junit.jupiter.api.{Disabled, Timeout}
import java.util.stream.{Collectors, IntStream}
import scala.concurrent.duration.DurationInt
import scala.jdk.CollectionConverters._
-object ProducerIdsIntegrationTest {
- def uniqueProducerIdsBumpIBP(): java.util.List[ClusterConfig] = {
- val serverProperties =
java.util.Collections.singletonMap(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG,
"2.8")
- val perBrokerProperties: java.util.Map[Integer, java.util.Map[String,
String]] =
- java.util.Collections.singletonMap(0,
-
java.util.Collections.singletonMap(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG,
"3.0-IV0"))
-
- List(ClusterConfig.defaultBuilder()
- .setTypes(Set(Type.ZK).asJava)
- .setBrokers(3)
- .setAutoStart(false)
- .setServerProperties(serverProperties)
- .setPerServerProperties(perBrokerProperties)
- .build()).asJava
- }
-}
-
@ClusterTestDefaults(serverProperties = Array(
new ClusterConfigProperty(key = "transaction.state.log.num.partitions",
value = "1")
))
@@ -61,41 +42,12 @@ object ProducerIdsIntegrationTest {
class ProducerIdsIntegrationTest {
@ClusterTests(Array(
- new ClusterTest(types = Array(Type.ZK), brokers = 3, metadataVersion =
MetadataVersion.IBP_2_8_IV1),
- new ClusterTest(types = Array(Type.ZK), brokers = 3, metadataVersion =
MetadataVersion.IBP_3_0_IV0),
new ClusterTest(types = Array(Type.KRAFT), brokers = 3, metadataVersion =
MetadataVersion.IBP_3_3_IV0)
))
def testUniqueProducerIds(clusterInstance: ClusterInstance): Unit = {
verifyUniqueIds(clusterInstance)
}
- @ClusterTemplate("uniqueProducerIdsBumpIBP")
- def testUniqueProducerIdsBumpIBP(clusterInstance: ClusterInstance): Unit = {
- clusterInstance.start()
- verifyUniqueIds(clusterInstance)
- clusterInstance.stop()
- }
-
- @ClusterTest(types = Array(Type.ZK), brokers = 1, autoStart = AutoStart.NO,
serverProperties = Array(
- new ClusterConfigProperty(key = "num.io.threads", value = "1")
- ))
- @Timeout(20)
- def testHandleAllocateProducerIdsSingleRequestHandlerThread(clusterInstance:
ClusterInstance): Unit = {
- clusterInstance.start()
- verifyUniqueIds(clusterInstance)
- clusterInstance.stop()
- }
-
- @Disabled // TODO: Enable once producer id block size is configurable
(KAFKA-15029)
- @ClusterTest(types = Array(Type.ZK), brokers = 1, autoStart = AutoStart.NO,
serverProperties = Array(
- new ClusterConfigProperty(key = "num.io.threads", value = "2")
- ))
- def testMultipleAllocateProducerIdsRequest(clusterInstance:
ClusterInstance): Unit = {
- clusterInstance.start()
- verifyUniqueIds(clusterInstance)
- clusterInstance.stop()
- }
-
private def verifyUniqueIds(clusterInstance: ClusterInstance): Unit = {
// Request enough PIDs from each broker to ensure each broker generates
two blocks
val ids = clusterInstance.brokerSocketServers().stream().flatMap( broker
=> {
diff --git
a/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
b/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
deleted file mode 100644
index 68dbcc03862..00000000000
---
a/core/src/test/scala/integration/kafka/server/KafkaServerKRaftRegistrationTest.scala
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import kafka.test.{ClusterConfig, ClusterInstance}
-import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, Type}
-import kafka.test.junit.ClusterTestExtensions
-import kafka.test.junit.ZkClusterInvocationContext.ZkClusterInstance
-import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
-import org.apache.kafka.network.SocketServerConfigs
-import org.apache.kafka.raft.QuorumConfig
-import org.apache.kafka.server.common.MetadataVersion
-import org.apache.kafka.server.config.{KRaftConfigs, ZkConfigs}
-import org.junit.jupiter.api.Assertions.{assertThrows, fail}
-import org.junit.jupiter.api.extension.ExtendWith
-
-import java.util.Optional
-import java.util.concurrent.{TimeUnit, TimeoutException}
-import scala.jdk.CollectionConverters._
-
-
-/**
- * This test creates a full ZK cluster and a controller-only KRaft cluster and
configures the ZK brokers to register
- * themselves with the KRaft controller. This is mainly a happy-path test
since the only way to reliably test the
- * failure paths is to use timeouts. See {@link
unit.kafka.server.BrokerRegistrationRequestTest} for integration test
- * of just the broker registration path.
- */
-@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-class KafkaServerKRaftRegistrationTest {
-
- @ClusterTest(types = Array(Type.ZK), brokers = 3, metadataVersion =
MetadataVersion.IBP_3_4_IV0, serverProperties = Array(
- new ClusterConfigProperty(key = "inter.broker.listener.name", value =
"EXTERNAL"),
- new ClusterConfigProperty(key = "listeners", value =
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
- new ClusterConfigProperty(key = "advertised.listeners", value =
"PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
- new ClusterConfigProperty(key = "listener.security.protocol.map", value =
"EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
- ))
- def testRegisterZkBrokerInKraft(zkCluster: ClusterInstance): Unit = {
- val clusterId = zkCluster.clusterId()
-
- // Bootstrap the ZK cluster ID into KRaft
- val kraftCluster = new KafkaClusterTestKit.Builder(
- new TestKitNodes.Builder().
- setBootstrapMetadataVersion(MetadataVersion.IBP_3_4_IV0).
- setClusterId(clusterId).
- setNumBrokerNodes(0).
- setNumControllerNodes(1).build())
- .setConfigProp(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true")
- .setConfigProp(ZkConfigs.ZK_CONNECT_CONFIG,
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
- .build()
- try {
- kraftCluster.format()
- kraftCluster.startup()
- val readyFuture =
kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3)
-
- // Enable migration configs and restart brokers
- val serverProperties = new java.util.HashMap[String,
String](zkCluster.config().serverProperties())
- serverProperties.put(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true")
- serverProperties.put(QuorumConfig.QUORUM_VOTERS_CONFIG,
kraftCluster.quorumVotersConfig())
- serverProperties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG,
"CONTROLLER")
-
serverProperties.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
"CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
- val clusterConfig = ClusterConfig.builder(zkCluster.config())
- .setServerProperties(serverProperties)
- .build()
-
zkCluster.asInstanceOf[ZkClusterInstance].rollingBrokerRestart(Optional.of(clusterConfig))
- zkCluster.waitForReadyBrokers()
-
- try {
- // Wait until all three ZK brokers are registered with KRaft controller
- readyFuture.get(30, TimeUnit.SECONDS)
- } catch {
- case _: TimeoutException => fail("Did not see 3 brokers within 30
seconds")
- case t: Throwable => fail("Had some other error waiting for brokers",
t)
- }
- } finally {
- shutdownInSequence(zkCluster, kraftCluster)
- }
- }
-
- @ClusterTest(types = Array(Type.ZK), brokers = 3, metadataVersion =
MetadataVersion.IBP_3_3_IV0)
- def testRestartOldIbpZkBrokerInMigrationMode(zkCluster: ClusterInstance):
Unit = {
- // Bootstrap the ZK cluster ID into KRaft
- val clusterId = zkCluster.clusterId()
- val kraftCluster = new KafkaClusterTestKit.Builder(
- new TestKitNodes.Builder().
- setBootstrapMetadataVersion(MetadataVersion.IBP_3_4_IV0).
- setClusterId(clusterId).
- setNumBrokerNodes(0).
- setNumControllerNodes(1).build())
- .setConfigProp(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true")
- .setConfigProp(ZkConfigs.ZK_CONNECT_CONFIG,
zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
- .build()
- try {
- kraftCluster.format()
- kraftCluster.startup()
-
- // Enable migration configs and restart brokers
- val serverProperties = new java.util.HashMap[String,
String](zkCluster.config().serverProperties())
- serverProperties.put(KRaftConfigs.MIGRATION_ENABLED_CONFIG, "true")
- serverProperties.put(QuorumConfig.QUORUM_VOTERS_CONFIG,
kraftCluster.quorumVotersConfig())
- serverProperties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG,
"CONTROLLER")
-
serverProperties.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
"CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
- val clusterConfig = ClusterConfig.builder(zkCluster.config())
- .setServerProperties(serverProperties)
- .build()
- assertThrows(classOf[IllegalArgumentException], () =>
zkCluster.asInstanceOf[ZkClusterInstance].rollingBrokerRestart(Optional.of(clusterConfig)))
- } finally {
- shutdownInSequence(zkCluster, kraftCluster)
- }
- }
-
- def shutdownInSequence(zkCluster: ClusterInstance, kraftCluster:
KafkaClusterTestKit): Unit = {
- zkCluster.brokerIds().forEach(zkCluster.shutdownBroker(_))
- kraftCluster.close()
- zkCluster.stop()
- }
-}
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
index 4734f053547..cc58931778f 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
@@ -17,72 +17,19 @@
package kafka.server
-import kafka.test.{ClusterConfig, ClusterInstance}
+import kafka.test.ClusterInstance
import org.apache.kafka.common.message.ApiVersionsRequestData
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.ApiVersionsRequest
-import kafka.test.annotation.{ClusterConfigProperty, ClusterTemplate,
ClusterTest, Type}
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, Type}
import kafka.test.junit.ClusterTestExtensions
import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.extension.ExtendWith
-import scala.jdk.CollectionConverters._
-
-object ApiVersionsRequestTest {
-
- def controlPlaneListenerProperties(): java.util.HashMap[String, String] = {
- // Configure control plane listener to make sure we have separate
listeners for testing.
- val serverProperties = new java.util.HashMap[String, String]()
- serverProperties.put("control.plane.listener.name", "CONTROL_PLANE")
- serverProperties.put("listener.security.protocol.map",
"CONTROL_PLANE:PLAINTEXT,PLAINTEXT:PLAINTEXT")
- serverProperties.put("listeners",
"PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0")
- serverProperties.put("advertised.listeners",
"PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0")
- serverProperties
- }
-
- def testApiVersionsRequestTemplate(): java.util.List[ClusterConfig] = {
- val serverProperties: java.util.HashMap[String, String] =
controlPlaneListenerProperties()
- serverProperties.put("unstable.api.versions.enable", "false")
- serverProperties.put("unstable.feature.versions.enable", "true")
- List(ClusterConfig.defaultBuilder()
- .setTypes(java.util.Collections.singleton(Type.ZK))
- .setServerProperties(serverProperties)
- .setMetadataVersion(MetadataVersion.latestTesting())
- .build()).asJava
- }
-
- def testApiVersionsRequestIncludesUnreleasedApisTemplate():
java.util.List[ClusterConfig] = {
- val serverProperties: java.util.HashMap[String, String] =
controlPlaneListenerProperties()
- serverProperties.put("unstable.api.versions.enable", "true")
- serverProperties.put("unstable.feature.versions.enable", "true")
- List(ClusterConfig.defaultBuilder()
- .setTypes(java.util.Collections.singleton(Type.ZK))
- .setServerProperties(serverProperties)
- .build()).asJava
- }
-
- def testApiVersionsRequestValidationV0Template():
java.util.List[ClusterConfig] = {
- val serverProperties: java.util.HashMap[String, String] =
controlPlaneListenerProperties()
- serverProperties.put("unstable.api.versions.enable", "false")
- serverProperties.put("unstable.feature.versions.enable", "false")
- List(ClusterConfig.defaultBuilder()
- .setTypes(java.util.Collections.singleton(Type.ZK))
- .setMetadataVersion(MetadataVersion.latestProduction())
- .build()).asJava
- }
-
- def zkApiVersionsRequest(): java.util.List[ClusterConfig] = {
- List(ClusterConfig.defaultBuilder()
- .setTypes(java.util.Collections.singleton(Type.ZK))
- .setServerProperties(controlPlaneListenerProperties())
- .build()).asJava
- }
-}
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class ApiVersionsRequestTest(cluster: ClusterInstance) extends
AbstractApiVersionsRequestTest(cluster) {
- @ClusterTemplate("testApiVersionsRequestTemplate")
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties =
Array(
new ClusterConfigProperty(key = "unstable.api.versions.enable", value =
"false"),
new ClusterConfigProperty(key = "unstable.feature.versions.enable", value
= "true")
@@ -93,7 +40,6 @@ class ApiVersionsRequestTest(cluster: ClusterInstance)
extends AbstractApiVersio
validateApiVersionsResponse(apiVersionsResponse)
}
- @ClusterTemplate("testApiVersionsRequestIncludesUnreleasedApisTemplate")
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties =
Array(
new ClusterConfigProperty(key = "unstable.api.versions.enable", value =
"true"),
new ClusterConfigProperty(key = "unstable.feature.versions.enable", value
= "true"),
@@ -104,13 +50,6 @@ class ApiVersionsRequestTest(cluster: ClusterInstance)
extends AbstractApiVersio
validateApiVersionsResponse(apiVersionsResponse, enableUnstableLastVersion
= true)
}
- @ClusterTemplate("zkApiVersionsRequest")
- def testApiVersionsRequestThroughControlPlaneListener(): Unit = {
- val request = new ApiVersionsRequest.Builder().build()
- val apiVersionsResponse = sendApiVersionsRequest(request,
cluster.controlPlaneListenerName().get())
- validateApiVersionsResponse(apiVersionsResponse,
cluster.controlPlaneListenerName().get(), true)
- }
-
@ClusterTest(types = Array(Type.KRAFT))
def testApiVersionsRequestThroughControllerListener(): Unit = {
val request = new ApiVersionsRequest.Builder().build()
@@ -118,7 +57,6 @@ class ApiVersionsRequestTest(cluster: ClusterInstance)
extends AbstractApiVersio
validateApiVersionsResponse(apiVersionsResponse,
cluster.controllerListenerName.get(), enableUnstableLastVersion = true)
}
- @ClusterTemplate("zkApiVersionsRequest")
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT))
def testApiVersionsRequestWithUnsupportedVersion(): Unit = {
val apiVersionsRequest = new ApiVersionsRequest.Builder().build()
@@ -132,7 +70,6 @@ class ApiVersionsRequestTest(cluster: ClusterInstance)
extends AbstractApiVersio
}
// Use the latest production MV for this test
- @ClusterTemplate("testApiVersionsRequestValidationV0Template")
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), metadataVersion =
MetadataVersion.IBP_3_8_IV0, serverProperties = Array(
new ClusterConfigProperty(key = "unstable.api.versions.enable", value =
"false"),
new ClusterConfigProperty(key = "unstable.feature.versions.enable",
value = "false"),
@@ -145,13 +82,6 @@ class ApiVersionsRequestTest(cluster: ClusterInstance)
extends AbstractApiVersio
cluster.config().serverProperties().get("unstable.api.versions.enable")))
}
- @ClusterTemplate("zkApiVersionsRequest")
- def testApiVersionsRequestValidationV0ThroughControlPlaneListener(): Unit = {
- val apiVersionsRequest = new
ApiVersionsRequest.Builder().build(0.asInstanceOf[Short])
- val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest,
cluster.controlPlaneListenerName().get())
- validateApiVersionsResponse(apiVersionsResponse,
cluster.controlPlaneListenerName().get(), true)
- }
-
@ClusterTest(types = Array(Type.KRAFT))
def testApiVersionsRequestValidationV0ThroughControllerListener(): Unit = {
val apiVersionsRequest = new
ApiVersionsRequest.Builder().build(0.asInstanceOf[Short])
@@ -159,7 +89,6 @@ class ApiVersionsRequestTest(cluster: ClusterInstance)
extends AbstractApiVersio
validateApiVersionsResponse(apiVersionsResponse,
cluster.controllerListenerName.get(), apiVersion = 0, enableUnstableLastVersion
= true)
}
- @ClusterTemplate("zkApiVersionsRequest")
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT))
def testApiVersionsRequestValidationV3(): Unit = {
// Invalid request because Name and Version are empty by default
diff --git
a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
index 3ac2d7c6814..618961828a8 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
@@ -21,7 +21,7 @@ import java.net.InetAddress
import java.util
import java.util.concurrent.{ExecutionException, TimeUnit}
import kafka.test.ClusterInstance
-import kafka.test.annotation.{ClusterTest, Type}
+import kafka.test.annotation.ClusterTest
import kafka.test.junit.ClusterTestExtensions
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.{ScramCredentialInfo, ScramMechanism,
UserScramCredentialUpsertion}
@@ -31,6 +31,7 @@ import org.apache.kafka.common.quota.{ClientQuotaAlteration,
ClientQuotaEntity,
import org.apache.kafka.common.requests.{AlterClientQuotasRequest,
AlterClientQuotasResponse, DescribeClientQuotasRequest,
DescribeClientQuotasResponse}
import org.apache.kafka.server.config.{QuotaConfigs, ZooKeeperInternals}
import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.extension.ExtendWith
import scala.jdk.CollectionConverters._
@@ -165,7 +166,8 @@ class ClientQuotasRequestTest(cluster: ClusterInstance) {
))
}
- @ClusterTest(types = Array(Type.ZK)) // No SCRAM for Raft yet
+ @Disabled("TODO: KAFKA-17630 - Convert
ClientQuotasRequestTest#testClientQuotasForScramUsers to kraft")
+ @ClusterTest
def testClientQuotasForScramUsers(): Unit = {
val userName = "user"
diff --git
a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
index 17a237aef21..69a3bb4d3f9 100644
---
a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
@@ -40,30 +40,6 @@ import scala.jdk.CollectionConverters._
@ClusterTestDefaults(types = Array(Type.KRAFT), brokers = 1)
class ConsumerGroupDescribeRequestTest(cluster: ClusterInstance) extends
GroupCoordinatorBaseRequestTest(cluster) {
- @ClusterTest(types = Array(Type.ZK), serverProperties = Array(
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
- ))
- def testConsumerGroupDescribeWithZookeeperCluster(): Unit = {
- val consumerGroupDescribeRequest = new
ConsumerGroupDescribeRequest.Builder(
- new ConsumerGroupDescribeRequestData().setGroupIds(List("grp-1",
"grp-2").asJava)
-
).build(ApiKeys.CONSUMER_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled))
-
- val consumerGroupDescribeResponse =
connectAndReceive[ConsumerGroupDescribeResponse](consumerGroupDescribeRequest)
- val expectedResponse = new ConsumerGroupDescribeResponseData()
- expectedResponse.groups().add(
- new ConsumerGroupDescribeResponseData.DescribedGroup()
- .setGroupId("grp-1")
- .setErrorCode(Errors.UNSUPPORTED_VERSION.code)
- )
- expectedResponse.groups.add(
- new ConsumerGroupDescribeResponseData.DescribedGroup()
- .setGroupId("grp-2")
- .setErrorCode(Errors.UNSUPPORTED_VERSION.code)
- )
-
- assertEquals(expectedResponse, consumerGroupDescribeResponse.data)
- }
-
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
diff --git
a/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
index 8a6a213a258..397c53f8cea 100644
--- a/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala
@@ -50,7 +50,7 @@ class DeleteGroupsRequestTest(cluster: ClusterInstance)
extends GroupCoordinator
}
@ClusterTest(
- types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT),
+ types = Array(Type.KRAFT, Type.CO_KRAFT),
serverProperties = Array(
new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic"),
diff --git
a/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala
index 248683ab0e2..3c68a2d299a 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala
@@ -39,7 +39,7 @@ class DescribeGroupsRequestTest(cluster: ClusterInstance)
extends GroupCoordinat
testDescribeGroups()
}
- @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT),
serverProperties = Array(
+ @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties =
Array(
new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
diff --git
a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
index c5e153348e2..137c01da09d 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
@@ -16,13 +16,12 @@
*/
package kafka.server
-import java.io.IOException
import kafka.test.ClusterInstance
import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type}
import kafka.test.junit.ClusterTestExtensions
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.DescribeQuorumRequest.singletonRequest
-import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse,
ApiVersionsRequest, ApiVersionsResponse, DescribeQuorumRequest,
DescribeQuorumResponse}
+import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse,
DescribeQuorumRequest, DescribeQuorumResponse}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.extension.ExtendWith
@@ -33,21 +32,6 @@ import scala.reflect.ClassTag
@ClusterTestDefaults(types = Array(Type.KRAFT))
class DescribeQuorumRequestTest(cluster: ClusterInstance) {
- @ClusterTest(types = Array(Type.ZK))
- def testDescribeQuorumNotSupportedByZkBrokers(): Unit = {
- val apiRequest = new ApiVersionsRequest.Builder().build()
- val apiResponse = connectAndReceive[ApiVersionsResponse](apiRequest)
- assertNull(apiResponse.apiVersion(ApiKeys.DESCRIBE_QUORUM.id))
-
- val describeQuorumRequest = new DescribeQuorumRequest.Builder(
- singletonRequest(KafkaRaftServer.MetadataPartition)
- ).build()
-
- assertThrows(classOf[IOException], () => {
- connectAndReceive[DescribeQuorumResponse](describeQuorumRequest)
- })
- }
-
@ClusterTest
def testDescribeQuorum(): Unit = {
for (version <- ApiKeys.DESCRIBE_QUORUM.allVersions.asScala) {
diff --git a/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala
b/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala
index 5a3aef31780..d4a38fc962a 100644
--- a/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HeartbeatRequestTest.scala
@@ -43,7 +43,7 @@ class HeartbeatRequestTest(cluster: ClusterInstance) extends
GroupCoordinatorBas
testHeartbeat()
}
- @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT),
serverProperties = Array(
+ @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties =
Array(
new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
diff --git a/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala
b/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala
index cd1bd9b7105..9517619ac44 100644
--- a/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala
@@ -37,7 +37,7 @@ class LeaveGroupRequestTest(cluster: ClusterInstance) extends
GroupCoordinatorBa
testLeaveGroup()
}
- @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT),
serverProperties = Array(
+ @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties =
Array(
new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
diff --git a/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
index e2af044a002..72c834a747c 100644
--- a/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
@@ -50,7 +50,7 @@ class ListGroupsRequestTest(cluster: ClusterInstance) extends
GroupCoordinatorBa
testListGroups(false)
}
- @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT),
serverProperties = Array(
+ @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties =
Array(
new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
diff --git
a/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
index 5232480172e..ca8e177b18b 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitRequestTest.scala
@@ -48,7 +48,7 @@ class OffsetCommitRequestTest(cluster: ClusterInstance)
extends GroupCoordinator
testOffsetCommit(false)
}
- @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT),
serverProperties = Array(
+ @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties =
Array(
new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
diff --git
a/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala
b/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala
index 46ff47dcc30..b0c2ef8c998 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala
@@ -45,7 +45,7 @@ class OffsetDeleteRequestTest(cluster: ClusterInstance)
extends GroupCoordinator
testOffsetDelete(false)
}
- @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT),
serverProperties = Array(
+ @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties =
Array(
new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
index 917fcb1460e..1c9b02cf39b 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
@@ -53,7 +53,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance)
extends GroupCoordinatorB
testSingleGroupOffsetFetch(useNewProtocol = false, requireStable = false)
}
- @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT),
serverProperties = Array(
+ @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties =
Array(
new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
@@ -81,7 +81,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance)
extends GroupCoordinatorB
testSingleGroupAllOffsetFetch(useNewProtocol = false, requireStable =
false)
}
- @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT),
serverProperties = Array(
+ @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties =
Array(
new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
@@ -109,7 +109,7 @@ class OffsetFetchRequestTest(cluster: ClusterInstance)
extends GroupCoordinatorB
testMultipleGroupsOffsetFetch(useNewProtocol = false, requireStable =
false)
}
- @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT),
serverProperties = Array(
+ @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties =
Array(
new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
diff --git
a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
index 5f1e1465059..e36dc6b9d0a 100644
--- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
@@ -31,7 +31,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.network.SocketServerConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.extension.ExtendWith
-import org.junit.jupiter.api.{AfterEach, BeforeEach}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled}
import java.net.Socket
import java.util.Collections
@@ -60,7 +60,7 @@ object SaslApiVersionsRequestTest {
List(ClusterConfig.defaultBuilder
.setSecurityProtocol(securityProtocol)
- .setTypes(Set(Type.ZK).asJava)
+ .setTypes(Set(Type.KRAFT).asJava)
.setSaslServerProperties(saslServerProperties)
.setSaslClientProperties(saslClientProperties)
.setServerProperties(serverProperties)
@@ -68,6 +68,7 @@ object SaslApiVersionsRequestTest {
}
}
+@Disabled("TODO: KAFKA-17631 - Convert SaslApiVersionsRequestTest to kraft")
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends
AbstractApiVersionsRequestTest(cluster) {
private var sasl: SaslSetup = _
diff --git a/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala
b/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala
index 6f50ad612bc..878bfc68ed3 100644
--- a/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SyncGroupRequestTest.scala
@@ -45,7 +45,7 @@ class SyncGroupRequestTest(cluster: ClusterInstance) extends
GroupCoordinatorBas
testSyncGroup()
}
- @ClusterTest(types = Array(Type.ZK, Type.KRAFT, Type.CO_KRAFT),
serverProperties = Array(
+ @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties =
Array(
new ClusterConfigProperty(key =
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
index bdff4de775e..8b66cf5e596 100644
--- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
@@ -49,13 +49,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@ExtendWith(value = ClusterTestExtensions.class)
public class FeatureCommandTest {
- @ClusterTest(types = {Type.ZK}, metadataVersion =
MetadataVersion.IBP_3_3_IV1)
- public void testDescribeWithZK(ClusterInstance cluster) {
- String commandOutput = ToolsTestUtils.captureStandardOut(() ->
- assertEquals(0,
FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
"describe"))
- );
- assertEquals("", commandOutput);
- }
@ClusterTest(types = {Type.KRAFT}, metadataVersion =
MetadataVersion.IBP_3_3_IV1)
public void testDescribeWithKRaft(ClusterInstance cluster) {
@@ -96,16 +89,6 @@ public class FeatureCommandTest {
"SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(features.get(3)));
}
- @ClusterTest(types = {Type.ZK}, metadataVersion =
MetadataVersion.IBP_3_3_IV1)
- public void testUpgradeMetadataVersionWithZk(ClusterInstance cluster) {
- String commandOutput = ToolsTestUtils.captureStandardOut(() ->
- assertEquals(1,
FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
- "upgrade", "--metadata", "3.3-IV2"))
- );
- assertEquals("Could not upgrade metadata.version to 6. Could not apply
finalized feature " +
- "update because the provided feature is not supported.",
commandOutput);
- }
-
@ClusterTest(types = {Type.KRAFT}, metadataVersion =
MetadataVersion.IBP_3_3_IV1)
public void testUpgradeMetadataVersionWithKraft(ClusterInstance cluster) {
String commandOutput = ToolsTestUtils.captureStandardOut(() ->
@@ -121,29 +104,6 @@ public class FeatureCommandTest {
assertEquals("metadata.version was upgraded to 6.", commandOutput);
}
- @ClusterTest(types = {Type.ZK}, metadataVersion =
MetadataVersion.IBP_3_3_IV1)
- public void testDowngradeMetadataVersionWithZk(ClusterInstance cluster) {
- String commandOutput = ToolsTestUtils.captureStandardOut(() ->
- assertEquals(1,
FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
- "disable", "--feature", "metadata.version"))
- );
- assertEquals("Could not disable metadata.version. Can not delete
non-existing finalized feature.", commandOutput);
-
- commandOutput = ToolsTestUtils.captureStandardOut(() ->
- assertEquals(1,
FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
- "downgrade", "--metadata", "3.3-IV0"))
- );
- assertEquals("Could not downgrade metadata.version to 4. Could not
apply finalized feature " +
- "update because the provided feature is not
supported.", commandOutput);
-
- commandOutput = ToolsTestUtils.captureStandardOut(() ->
- assertEquals(1,
FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
- "downgrade", "--unsafe", "--metadata", "3.3-IV0"))
- );
- assertEquals("Could not downgrade metadata.version to 4. Could not
apply finalized feature " +
- "update because the provided feature is not supported.",
commandOutput);
- }
-
@ClusterTest(types = {Type.KRAFT}, metadataVersion =
MetadataVersion.IBP_3_3_IV1)
public void testDowngradeMetadataVersionWithKRaft(ClusterInstance cluster)
{
String commandOutput = ToolsTestUtils.captureStandardOut(() ->
diff --git a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
index 95007d7bf85..05e4b8db5ce 100644
--- a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
@@ -62,7 +62,6 @@ import java.util.stream.Stream;
import static kafka.test.annotation.Type.CO_KRAFT;
import static kafka.test.annotation.Type.KRAFT;
-import static kafka.test.annotation.Type.ZK;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -149,7 +148,7 @@ public class GetOffsetShellTest {
// align listener name here as KafkaClusterTestKit
(KRAFT/CO_KRAFT) the default
// broker listener name is EXTERNAL while in ZK it is PLAINTEXT
ClusterConfig.defaultBuilder()
- .setTypes(Stream.of(ZK, KRAFT,
CO_KRAFT).collect(Collectors.toSet()))
+ .setTypes(Stream.of(KRAFT,
CO_KRAFT).collect(Collectors.toSet()))
.setServerProperties(serverProperties)
.setListenerName("EXTERNAL")
.build());
diff --git
a/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java
index 37c7650766c..af472c56644 100644
--- a/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java
@@ -23,7 +23,6 @@ import kafka.test.annotation.Type;
import kafka.test.junit.ClusterTestExtensions;
import org.apache.kafka.clients.admin.AdminClientConfig;
-import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
@@ -33,14 +32,11 @@ import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
-import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertInstanceOf;
-import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ExtendWith(value = ClusterTestExtensions.class)
@@ -52,9 +48,9 @@ class MetadataQuorumCommandTest {
* 3. Fewer brokers than controllers
*/
@ClusterTests({
- @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 2,
controllers = 2),
- @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 2,
controllers = 1),
- @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 1,
controllers = 2),
+ @ClusterTest(brokers = 2, controllers = 2),
+ @ClusterTest(brokers = 2, controllers = 1),
+ @ClusterTest(brokers = 1, controllers = 2),
})
public void testDescribeQuorumReplicationSuccessful(ClusterInstance
cluster) throws InterruptedException {
cluster.waitForReadyBrokers();
@@ -94,9 +90,9 @@ class MetadataQuorumCommandTest {
* 3. Fewer brokers than controllers
*/
@ClusterTests({
- @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 2,
controllers = 2),
- @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 2,
controllers = 1),
- @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, brokers = 1,
controllers = 2),
+ @ClusterTest(brokers = 2, controllers = 2),
+ @ClusterTest(brokers = 2, controllers = 1),
+ @ClusterTest(brokers = 1, controllers = 2),
})
public void testDescribeQuorumStatusSuccessful(ClusterInstance cluster)
throws InterruptedException {
cluster.waitForReadyBrokers();
@@ -135,7 +131,7 @@ class MetadataQuorumCommandTest {
}
}
- @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT})
+ @ClusterTest
public void testOnlyOneBrokerAndOneController(ClusterInstance cluster) {
String statusOutput = ToolsTestUtils.captureStandardOut(() ->
MetadataQuorumCommand.mainNoExit("--bootstrap-server",
cluster.bootstrapServers(), "describe", "--status")
@@ -157,19 +153,6 @@ class MetadataQuorumCommandTest {
"--command-config", tmpfile.getAbsolutePath(),
"describe", "--status"));
}
- @ClusterTest(types = {Type.ZK})
- public void testDescribeQuorumInZkMode(ClusterInstance cluster) {
- assertInstanceOf(UnsupportedVersionException.class, assertThrows(
- ExecutionException.class,
- () -> MetadataQuorumCommand.execute("--bootstrap-server",
cluster.bootstrapServers(), "describe", "--status")
- ).getCause());
-
- assertInstanceOf(UnsupportedVersionException.class, assertThrows(
- ExecutionException.class,
- () -> MetadataQuorumCommand.execute("--bootstrap-server",
cluster.bootstrapServers(), "describe", "--replication")
- ).getCause());
- }
-
@ClusterTest(types = {Type.CO_KRAFT})
public void testHumanReadableOutput(ClusterInstance cluster) {
assertEquals(1, MetadataQuorumCommand.mainNoExit("--bootstrap-server",
cluster.bootstrapServers(), "describe", "--human-readable"));
diff --git a/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java
index 4a2b93d2dbc..e105926ccbb 100644
--- a/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java
@@ -367,7 +367,7 @@ public class TopicCommandTest {
.setBrokers(6)
.setServerProperties(serverProp)
.setPerServerProperties(rackInfo)
- .setTypes(Stream.of(Type.ZK,
Type.KRAFT).collect(Collectors.toSet()))
+ .setTypes(Stream.of(Type.KRAFT).collect(Collectors.toSet()))
.build()
);
}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
index e45d50b6823..e6f8f21c35d 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
@@ -37,11 +37,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
import static kafka.test.annotation.Type.CO_KRAFT;
-import static kafka.test.annotation.Type.ZK;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
@@ -51,26 +48,24 @@ import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_
/**
* The old test framework {@link
kafka.api.BaseConsumerTest#getTestQuorumAndGroupProtocolParametersAll} test for
the following cases:
* <ul>
- * <li>(ZK / KRAFT servers) with (group.coordinator.new.enable=false) with
(classic group protocol) = 2 cases</li>
+ * <li>(KRAFT servers) with (group.coordinator.new.enable=false) with
(classic group protocol) = 1 cases</li>
* <li>(KRAFT server) with (group.coordinator.new.enable=true) with
(classic group protocol) = 1 case</li>
* <li>(KRAFT server) with (group.coordinator.new.enable=true) with
(consumer group protocol) = 1 case</li>
* </ul>
* <p>
* The new test framework run seven cases for the following cases:
* <ul>
- * <li>(ZK / KRAFT / CO_KRAFT servers) with
(group.coordinator.new.enable=false) with (classic group protocol) = 3
cases</li>
+ * <li>(KRAFT / CO_KRAFT servers) with
(group.coordinator.new.enable=false) with (classic group protocol) = 2
cases</li>
* <li>(KRAFT / CO_KRAFT servers) with (group.coordinator.new.enable=true)
with (classic group protocol) = 2 cases</li>
* <li>(KRAFT / CO_KRAFT servers) with (group.coordinator.new.enable=true)
with (consumer group protocol) = 2 cases</li>
* </ul>
* <p>
* We can reduce the number of cases as same as the old test framework by
using the following methods:
* <ul>
- * <li>{@link #forKRaftGroupCoordinator} for the case of (consumer group
protocol)</li>
* <li>(CO_KRAFT servers) with (group.coordinator.new.enable=true) with
(classic / consumer group protocols) = 2 cases</li>
* </ul>
* <ul>
- * <li>{@link #forZkGroupCoordinator} for the case of (classic group
protocol)</li>
- * <li>(ZK / KRAFT servers) with (group.coordinator.new.enable=false) with
(classic group protocol) = 2 cases</li>
+ * <li>(KRAFT servers) with (group.coordinator.new.enable=false) with
(classic group protocol) = 1 cases</li>
* </ul>
*/
class ConsumerGroupCommandTestUtils {
@@ -79,12 +74,6 @@ class ConsumerGroupCommandTestUtils {
}
static List<ClusterConfig> generator() {
- return Stream
- .concat(forKRaftGroupCoordinator().stream(),
forZkGroupCoordinator().stream())
- .collect(Collectors.toList());
- }
-
- static List<ClusterConfig> forKRaftGroupCoordinator() {
Map<String, String> serverProperties = new HashMap<>();
serverProperties.put(OFFSETS_TOPIC_PARTITIONS_CONFIG, "1");
serverProperties.put(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
@@ -99,21 +88,6 @@ class ConsumerGroupCommandTestUtils {
.build());
}
- static List<ClusterConfig> forZkGroupCoordinator() {
- Map<String, String> serverProperties = new HashMap<>();
- serverProperties.put(OFFSETS_TOPIC_PARTITIONS_CONFIG, "1");
- serverProperties.put(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
- serverProperties.put(GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "1000");
- serverProperties.put(CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG,
"500");
- serverProperties.put(CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
"500");
-
- return Collections.singletonList(ClusterConfig.defaultBuilder()
- .setTypes(Collections.singleton(ZK))
- .setServerProperties(serverProperties)
- .setTags(Collections.singletonList("zkGroupCoordinator"))
- .build());
- }
-
static <T> AutoCloseable buildConsumers(int numberOfConsumers,
Set<TopicPartition> partitions,
Supplier<KafkaConsumer<T, T>>
consumerSupplier) {
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java
index 0ff622bf143..eb71efd900b 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java
@@ -82,10 +82,6 @@ public class ListConsumerGroupTest {
return ConsumerGroupCommandTestUtils.generator();
}
- private static List<ClusterConfig> consumerProtocolOnlyGenerator() {
- return ConsumerGroupCommandTestUtils.forKRaftGroupCoordinator();
- }
-
private List<GroupProtocol> supportedGroupProtocols() {
return new ArrayList<>(clusterInstance.supportedGroupProtocols());
}
@@ -243,7 +239,7 @@ public class ListConsumerGroupTest {
}
}
- @ClusterTemplate("consumerProtocolOnlyGenerator")
+ @ClusterTemplate("defaultGenerator")
public void testListConsumerGroupsWithTypesConsumerProtocol() throws
Exception {
GroupProtocol groupProtocol = GroupProtocol.CONSUMER;
String topic = TOPIC_PREFIX + groupProtocol.name;
@@ -415,7 +411,7 @@ public class ListConsumerGroupTest {
}
}
- @ClusterTemplate("consumerProtocolOnlyGenerator")
+ @ClusterTemplate("defaultGenerator")
public void testListGroupCommandConsumerProtocol() throws Exception {
GroupProtocol groupProtocol = GroupProtocol.CONSUMER;
String topic = TOPIC_PREFIX + groupProtocol.name;
diff --git
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
index 15658b8fb37..c5e303c697f 100644
---
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
@@ -81,7 +81,6 @@ import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_2_7_IV1;
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV0;
import static
org.apache.kafka.server.config.QuotaConfigs.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG;
import static
org.apache.kafka.server.config.QuotaConfigs.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG;
@@ -133,7 +132,6 @@ public class ReassignPartitionsCommandTest {
}
@ClusterTests({
- @ClusterTest(types = {Type.ZK}, metadataVersion = IBP_2_7_IV1),
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, metadataVersion
= IBP_3_3_IV0)
})
public void testReassignmentWithAlterPartitionDisabled() throws Exception {
@@ -146,11 +144,6 @@ public class ReassignPartitionsCommandTest {
}
@ClusterTests({
- @ClusterTest(types = {Type.ZK}, serverProperties = {
- @ClusterConfigProperty(id = 1, key =
INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "2.7-IV1"),
- @ClusterConfigProperty(id = 2, key =
INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "2.7-IV1"),
- @ClusterConfigProperty(id = 3, key =
INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "2.7-IV1"),
- }),
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties
= {
@ClusterConfigProperty(id = 1, key =
INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"),
@ClusterConfigProperty(id = 2, key =
INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"),
@@ -416,7 +409,7 @@ public class ReassignPartitionsCommandTest {
/**
* Test moving partitions between directories.
*/
- @ClusterTest(types = {Type.ZK, Type.KRAFT})
+ @ClusterTest(types = {Type.KRAFT})
public void testLogDirReassignment() throws Exception {
createTopics();
TopicPartition topicPartition = new TopicPartition("foo", 0);
@@ -464,7 +457,7 @@ public class ReassignPartitionsCommandTest {
}
}
- @ClusterTest(types = {Type.ZK, Type.KRAFT})
+ @ClusterTest(types = {Type.KRAFT})
public void testAlterLogDirReassignmentThrottle() throws Exception {
createTopics();
TopicPartition topicPartition = new TopicPartition("foo", 0);