This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e4b8644400e KAFKA-17992 Remove getUnderlying and isKRaftTest from
ClusterInstance (#17802)
e4b8644400e is described below
commit e4b8644400e4442303947aecb34f4f8d416bd766
Author: Yung <[email protected]>
AuthorDate: Thu Nov 14 17:11:19 2024 +0800
KAFKA-17992 Remove getUnderlying and isKRaftTest from ClusterInstance
(#17802)
Reviewers: TengYao Chi <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../kafka/admin/ConfigCommandIntegrationTest.java | 2 +-
.../kafka/server/LogManagerIntegrationTest.java | 6 +--
.../server/AbstractApiVersionsRequestTest.scala | 13 ++----
.../server/BrokerRegistrationRequestTest.scala | 2 +-
.../unit/kafka/server/ReplicationQuotasTest.scala | 35 +++++----------
.../kafka/common/test/api/ClusterInstance.java | 15 +------
.../test/api/RaftClusterInvocationContext.java | 1 -
.../kafka/tools/BrokerApiVersionsCommandTest.java | 3 +-
.../org/apache/kafka/tools/ClusterToolTest.java | 16 ++-----
.../org/apache/kafka/tools/GetOffsetShellTest.java | 12 +----
.../org/apache/kafka/tools/TopicCommandTest.java | 7 +--
.../kafka/tools/consumer/ConsoleConsumerTest.java | 52 +++++++---------------
12 files changed, 45 insertions(+), 119 deletions(-)
diff --git a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
index 6b285c448c2..5ecd0337a96 100644
--- a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
+++ b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
@@ -87,7 +87,7 @@ public class ConfigCommandIntegrationTest {
@ClusterTest
public void testExitWithNonZeroStatusOnUpdatingUnallowedConfig() {
assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
- "--entity-name", cluster.isKRaftTest() ? "0" : "1",
+ "--entity-name", "0",
"--entity-type", "brokers",
"--alter",
"--add-config", "security.inter.broker.protocol=PLAINTEXT")),
diff --git a/core/src/test/java/kafka/server/LogManagerIntegrationTest.java
b/core/src/test/java/kafka/server/LogManagerIntegrationTest.java
index 5a08dada8d2..390794b8f25 100644
--- a/core/src/test/java/kafka/server/LogManagerIntegrationTest.java
+++ b/core/src/test/java/kafka/server/LogManagerIntegrationTest.java
@@ -76,12 +76,12 @@ public class LogManagerIntegrationTest {
cluster.waitForTopic("foo", 1);
Optional<PartitionMetadataFile> partitionMetadataFile =
Optional.ofNullable(
- raftInstance.getUnderlying().brokers().get(0).logManager()
+ raftInstance.brokers().get(0).logManager()
.getLog(new TopicPartition("foo", 0), false).get()
.partitionMetadataFile().getOrElse(null));
assertTrue(partitionMetadataFile.isPresent());
- raftInstance.getUnderlying().brokers().get(0).shutdown();
+ raftInstance.brokers().get(0).shutdown();
try (Admin admin = cluster.admin()) {
TestUtils.waitForCondition(() -> {
List<TopicPartitionInfo> partitionInfos =
admin.describeTopics(Collections.singletonList("foo"))
@@ -93,7 +93,7 @@ public class LogManagerIntegrationTest {
// delete partition.metadata file here to simulate the scenario that
partition.metadata not flush to disk yet
partitionMetadataFile.get().delete();
assertFalse(partitionMetadataFile.get().exists());
- raftInstance.getUnderlying().brokers().get(0).startup();
+ raftInstance.brokers().get(0).startup();
// make sure there is no error during load logs
assertDoesNotThrow(() ->
raftInstance.getUnderlying().fatalFaultHandler().maybeRethrowFirstException());
try (Admin admin = cluster.admin()) {
diff --git
a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
index 05c3bc2eade..4f442d3194e 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
@@ -64,7 +64,7 @@ abstract class AbstractApiVersionsRequestTest(cluster:
ClusterInstance) {
clientTelemetryEnabled: Boolean = false,
apiVersion: Short = ApiKeys.API_VERSIONS.latestVersion
): Unit = {
- if (cluster.isKRaftTest && apiVersion >= 3) {
+ if (apiVersion >= 3) {
assertEquals(3, apiVersionsResponse.data().finalizedFeatures().size())
assertEquals(MetadataVersion.latestTesting().featureLevel(),
apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).minVersionLevel())
assertEquals(MetadataVersion.latestTesting().featureLevel(),
apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).maxVersionLevel())
@@ -84,12 +84,7 @@ abstract class AbstractApiVersionsRequestTest(cluster:
ClusterInstance) {
assertEquals(0,
apiVersionsResponse.data().supportedFeatures().find(GroupVersion.FEATURE_NAME).minVersion())
assertEquals(GroupVersion.GV_1.featureLevel(),
apiVersionsResponse.data().supportedFeatures().find(GroupVersion.FEATURE_NAME).maxVersion())
}
- val expectedApis = if (!cluster.isKRaftTest) {
- ApiVersionsResponse.collectApis(
- ApiKeys.apisForListener(ApiMessageType.ListenerType.ZK_BROKER),
- enableUnstableLastVersion
- )
- } else if
(cluster.controllerListenerName().toScala.contains(listenerName)) {
+ val expectedApis = if
(cluster.controllerListenerName().toScala.contains(listenerName)) {
ApiVersionsResponse.collectApis(
ApiKeys.apisForListener(ApiMessageType.ListenerType.CONTROLLER),
enableUnstableLastVersion
@@ -107,9 +102,7 @@ abstract class AbstractApiVersionsRequestTest(cluster:
ClusterInstance) {
assertEquals(expectedApis.size, apiVersionsResponse.data.apiKeys.size,
"API keys in ApiVersionsResponse must match API keys supported by
broker.")
- val defaultApiVersionsResponse = if (!cluster.isKRaftTest) {
- TestUtils.defaultApiVersionsResponse(0, ListenerType.ZK_BROKER,
enableUnstableLastVersion)
- } else if
(cluster.controllerListenerName().toScala.contains(listenerName)) {
+ val defaultApiVersionsResponse = if
(cluster.controllerListenerName().toScala.contains(listenerName)) {
TestUtils.defaultApiVersionsResponse(0, ListenerType.CONTROLLER,
enableUnstableLastVersion)
} else {
TestUtils.createApiVersionsResponse(0, expectedApis)
diff --git
a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
index e697eede581..3fec68da273 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
@@ -57,7 +57,7 @@ class BrokerRegistrationRequestTest {
val saslMechanism: String = ""
- def isZkController: Boolean = !clusterInstance.isKRaftTest
+ def isZkController: Boolean = false
override def getControllerInfo(): ControllerInformation =
ControllerInformation(node, listenerName, securityProtocol,
saslMechanism, isZkController)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index b6122575703..64ef5641f23 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -22,7 +22,6 @@ import java.util.{Collections, Properties}
import java.util.Map.Entry
import kafka.server.KafkaConfig.fromProps
import kafka.utils.TestUtils._
-import kafka.utils.CoreUtils._
import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET
import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry, NewTopic}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
@@ -113,30 +112,20 @@ class ReplicationQuotasTest extends QuorumTestHarness {
if (!leaderThrottle) throttle = throttle * 3
Using(createAdminClient(brokers, listenerName)) { admin =>
- if (isKRaftTest()) {
- (106 to 107).foreach(registerBroker)
- }
+ (106 to 107).foreach(registerBroker)
admin.createTopics(List(new NewTopic(topic, assignment.map(a =>
a._1.asInstanceOf[Integer] ->
a._2.map(_.asInstanceOf[Integer]).toList.asJava).asJava)).asJava).all().get()
//Set the throttle limit on all 8 brokers, but only assign throttled
replicas to the six leaders, or two followers
(100 to 107).foreach { brokerId =>
- if (isKRaftTest()) {
- val entry = new SimpleImmutableEntry[AlterConfigOp.OpType,
String](SET, throttle.toString)
- .asInstanceOf[Entry[AlterConfigOp.OpType, String]]
- controllerServer.controller.incrementalAlterConfigs(
- ControllerRequestContextUtil.ANONYMOUS_CONTEXT,
- Map(new ConfigResource(BROKER, String.valueOf(brokerId)) -> Map(
- QuotaConfig.LEADER_REPLICATION_THROTTLED_RATE_CONFIG -> entry,
- QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG ->
entry).asJava).asJava,
- false
- ).get()
- } else {
- adminZkClient.changeBrokerConfig(Seq(brokerId),
- propsWith(
- (QuotaConfig.LEADER_REPLICATION_THROTTLED_RATE_CONFIG,
throttle.toString),
- (QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG,
throttle.toString)
- ))
- }
+ val entry = new SimpleImmutableEntry[AlterConfigOp.OpType,
String](SET, throttle.toString)
+ .asInstanceOf[Entry[AlterConfigOp.OpType, String]]
+ controllerServer.controller.incrementalAlterConfigs(
+ ControllerRequestContextUtil.ANONYMOUS_CONTEXT,
+ Map(new ConfigResource(BROKER, String.valueOf(brokerId)) -> Map(
+ QuotaConfig.LEADER_REPLICATION_THROTTLED_RATE_CONFIG -> entry,
+ QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG ->
entry).asJava).asJava,
+ false
+ ).get()
}
//Either throttle the six leaders or the two followers
val configEntry = if (leaderThrottle)
@@ -224,9 +213,7 @@ class ReplicationQuotasTest extends QuorumTestHarness {
val throttle: Long = msg.length * msgCount / expectedDuration
Using(createAdminClient(brokers, listenerName)) { admin =>
- if (isKRaftTest()) {
- registerBroker(101)
- }
+ registerBroker(101)
admin.createTopics(
List(new NewTopic(topic, Collections.singletonMap(0, List(100,
101).map(_.asInstanceOf[Integer]).asJava))).asJava
).all().get()
diff --git
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
index cd0f91b18d0..150e3c5e651 100644
---
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
+++
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
@@ -70,10 +70,6 @@ public interface ClusterInstance {
Type type();
- default boolean isKRaftTest() {
- return type() == Type.KRAFT || type() == Type.CO_KRAFT;
- }
-
Map<Integer, KafkaBroker> brokers();
default Map<Integer, KafkaBroker> aliveBrokers() {
@@ -158,15 +154,6 @@ public interface ClusterInstance {
String clusterId();
- /**
- * The underlying object which is responsible for setting up and tearing
down the cluster.
- */
- Object getUnderlying();
-
- default <T> T getUnderlying(Class<T> asClass) {
- return asClass.cast(getUnderlying());
- }
-
//---------------------------[producer/consumer/admin]---------------------------//
default <K, V> Producer<K, V> producer(Map<String, Object> configs) {
@@ -216,7 +203,7 @@ public interface ClusterInstance {
}
default Set<GroupProtocol> supportedGroupProtocols() {
- if (isKRaftTest() && brokers().values().stream().allMatch(b ->
b.dataPlaneRequestProcessor().isConsumerGroupProtocolEnabled())) {
+ if (brokers().values().stream().allMatch(b ->
b.dataPlaneRequestProcessor().isConsumerGroupProtocolEnabled())) {
return Set.of(CLASSIC, CONSUMER);
} else {
return Collections.singleton(CLASSIC);
diff --git
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java
index 927e8bad9fa..19c1f1b9fa3 100644
---
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java
+++
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java
@@ -165,7 +165,6 @@ public class RaftClusterInvocationContext implements
TestTemplateInvocationConte
return controllers().keySet();
}
- @Override
public KafkaClusterTestKit getUnderlying() {
return clusterTestKit;
}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java
index 0c2c7d0881d..f83024837ba 100644
---
a/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/BrokerApiVersionsCommandTest.java
@@ -56,8 +56,7 @@ public class BrokerApiVersionsCommandTest {
assertTrue(lineIter.hasNext());
assertEquals(clusterInstance.bootstrapServers() + " (id: 0 rack: null)
-> (", lineIter.next());
- ApiMessageType.ListenerType listenerType =
clusterInstance.isKRaftTest() ?
- ApiMessageType.ListenerType.BROKER :
ApiMessageType.ListenerType.ZK_BROKER;
+ ApiMessageType.ListenerType listenerType =
ApiMessageType.ListenerType.BROKER;
NodeApiVersions nodeApiVersions = new NodeApiVersions(
ApiVersionsResponse.collectApis(ApiKeys.clientApis(), true),
diff --git a/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java
b/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java
index ea9ad446795..d27839269f4 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java
@@ -51,22 +51,14 @@ public class ClusterToolTest {
@ClusterTest(brokers = 3)
public void testUnregister(ClusterInstance clusterInstance) {
int brokerId;
- if (!clusterInstance.isKRaftTest()) {
- brokerId = assertDoesNotThrow(() ->
clusterInstance.brokerIds().stream().findFirst().get());
- } else {
- Set<Integer> brokerIds = clusterInstance.brokerIds();
- brokerIds.removeAll(clusterInstance.controllerIds());
- brokerId = assertDoesNotThrow(() ->
brokerIds.stream().findFirst().get());
- }
+ Set<Integer> brokerIds = clusterInstance.brokerIds();
+ brokerIds.removeAll(clusterInstance.controllerIds());
+ brokerId = assertDoesNotThrow(() ->
brokerIds.stream().findFirst().get());
clusterInstance.shutdownBroker(brokerId);
String output = ToolsTestUtils.captureStandardOut(() ->
assertDoesNotThrow(() -> ClusterTool.execute("unregister",
"--bootstrap-server", clusterInstance.bootstrapServers(), "--id",
String.valueOf(brokerId))));
- if (clusterInstance.isKRaftTest()) {
- assertTrue(output.contains("Broker " + brokerId + " is no longer
registered."));
- } else {
- assertTrue(output.contains("The target cluster does not support
the broker unregistration API."));
- }
+ assertTrue(output.contains("Broker " + brokerId + " is no longer
registered."));
}
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT})
diff --git a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
index dcac2f11e50..a641758a827 100644
--- a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
@@ -205,11 +205,7 @@ public class GetOffsetShellTest {
setUp();
List<Row> output = executeAndParse();
- if (!cluster.isKRaftTest()) {
- assertEquals(expectedOffsetsWithInternal(), output);
- } else {
- assertEquals(expectedTestTopicOffsets(), output);
- }
+ assertEquals(expectedTestTopicOffsets(), output);
}
@ClusterTest
@@ -247,11 +243,7 @@ public class GetOffsetShellTest {
setUp();
List<Row> offsets = executeAndParse("--partitions", "0,1");
- if (!cluster.isKRaftTest()) {
- assertEquals(expectedOffsetsWithInternal().stream().filter(r ->
r.partition <= 1).collect(Collectors.toList()), offsets);
- } else {
- assertEquals(expectedTestTopicOffsets().stream().filter(r ->
r.partition <= 1).collect(Collectors.toList()), offsets);
- }
+ assertEquals(expectedTestTopicOffsets().stream().filter(r ->
r.partition <= 1).collect(Collectors.toList()), offsets);
}
@ClusterTest
diff --git a/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java
index 0675dadbeb5..aec130ad915 100644
--- a/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java
@@ -908,11 +908,8 @@ public class TopicCommandTest {
TopicCommand.TopicService topicService = new
TopicCommand.TopicService(adminClient)) {
// create the offset topic
- // In ZK mode, Topic.GROUP_METADATA_TOPIC_NAME exist when cluster
is created.
- if (clusterInstance.isKRaftTest()) {
- adminClient.createTopics(Collections.singletonList(new
NewTopic(Topic.GROUP_METADATA_TOPIC_NAME, defaultNumPartitions,
defaultReplicationFactor)));
- clusterInstance.waitForTopic(Topic.GROUP_METADATA_TOPIC_NAME,
defaultNumPartitions);
- }
+ adminClient.createTopics(Collections.singletonList(new
NewTopic(Topic.GROUP_METADATA_TOPIC_NAME, defaultNumPartitions,
defaultReplicationFactor)));
+ clusterInstance.waitForTopic(Topic.GROUP_METADATA_TOPIC_NAME,
defaultNumPartitions);
// Try to delete the Topic.GROUP_METADATA_TOPIC_NAME which is
allowed by default.
// This is a difference between the new and the old command as the
old one didn't allow internal topic deletion.
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java
index 3a679abc121..03fffa2f236 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerTest.java
@@ -396,42 +396,22 @@ public class ConsoleConsumerTest {
JsonNode jsonNode =
objectMapper.reader().readTree(out.toByteArray());
- // The new group coordinator writes an empty group metadata
record when the group is created for
- // the first time whereas the old group coordinator only
writes a group metadata record when
- // the first rebalance completes.
- if (cluster.isKRaftTest()) {
- JsonNode keyNode = jsonNode.get("key");
- GroupMetadataKey groupMetadataKey =
-
GroupMetadataKeyJsonConverter.read(keyNode.get("data"),
GroupMetadataKey.HIGHEST_SUPPORTED_VERSION);
- assertNotNull(groupMetadataKey);
- assertEquals(groupId, groupMetadataKey.group());
-
- JsonNode valueNode = jsonNode.get("value");
- GroupMetadataValue groupMetadataValue =
-
GroupMetadataValueJsonConverter.read(valueNode.get("data"),
GroupMetadataValue.HIGHEST_SUPPORTED_VERSION);
- assertNotNull(groupMetadataValue);
- assertEquals("", groupMetadataValue.protocolType());
- assertEquals(0, groupMetadataValue.generation());
- assertNull(groupMetadataValue.protocol());
- assertNull(groupMetadataValue.leader());
- assertEquals(0, groupMetadataValue.members().size());
- } else {
- JsonNode keyNode = jsonNode.get("key");
- GroupMetadataKey groupMetadataKey =
-
GroupMetadataKeyJsonConverter.read(keyNode.get("data"),
GroupMetadataKey.HIGHEST_SUPPORTED_VERSION);
- assertNotNull(groupMetadataKey);
- assertEquals(groupId, groupMetadataKey.group());
-
- JsonNode valueNode = jsonNode.get("value");
- GroupMetadataValue groupMetadataValue =
-
GroupMetadataValueJsonConverter.read(valueNode.get("data"),
GroupMetadataValue.HIGHEST_SUPPORTED_VERSION);
- assertNotNull(groupMetadataValue);
- assertEquals("consumer",
groupMetadataValue.protocolType());
- assertEquals(1, groupMetadataValue.generation());
- assertEquals("range", groupMetadataValue.protocol());
- assertNotNull(groupMetadataValue.leader());
- assertEquals(1, groupMetadataValue.members().size());
- }
+ // The group coordinator writes an empty group metadata record
when the group is created for the first time
+ JsonNode keyNode = jsonNode.get("key");
+ GroupMetadataKey groupMetadataKey =
+ GroupMetadataKeyJsonConverter.read(keyNode.get("data"),
GroupMetadataKey.HIGHEST_SUPPORTED_VERSION);
+ assertNotNull(groupMetadataKey);
+ assertEquals(groupId, groupMetadataKey.group());
+
+ JsonNode valueNode = jsonNode.get("value");
+ GroupMetadataValue groupMetadataValue =
+
GroupMetadataValueJsonConverter.read(valueNode.get("data"),
GroupMetadataValue.HIGHEST_SUPPORTED_VERSION);
+ assertNotNull(groupMetadataValue);
+ assertEquals("", groupMetadataValue.protocolType());
+ assertEquals(0, groupMetadataValue.generation());
+ assertNull(groupMetadataValue.protocol());
+ assertNull(groupMetadataValue.leader());
+ assertEquals(0, groupMetadataValue.members().size());
} finally {
consumerWrapper.cleanup();
}