This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new c01723340ff MINOR: Migrate EligibleLeaderReplicasIntegrationTest to use new test infra (#20199) c01723340ff is described below commit c01723340ffe14f61379980495c08f7f3f7dec25 Author: Chang-Chi Hsu <jim0987795...@gmail.com> AuthorDate: Sat Aug 23 19:35:20 2025 +0200 MINOR: Migrate EligibleLeaderReplicasIntegrationTest to use new test infra (#20199) **Changes**: Use ClusterTest to rewrite EligibleLeaderReplicasIntegrationTest. **Validation**: Run the test 50 times locally with consistent success. Reviewers: Chia-Ping Tsai <chia7...@gmail.com> --- .../EligibleLeaderReplicasIntegrationTest.java | 458 --------------------- .../EligibleLeaderReplicasIntegrationTest.java | 384 +++++++++++++++++ 2 files changed, 384 insertions(+), 458 deletions(-) diff --git a/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java b/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java deleted file mode 100644 index 28c12cf6bce..00000000000 --- a/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java +++ /dev/null @@ -1,458 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.server.integration; -import kafka.integration.KafkaServerTestHarness; -import kafka.server.KafkaBroker; -import kafka.server.KafkaConfig; -import kafka.utils.Logging; -import kafka.utils.TestUtils; - -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.clients.admin.AlterConfigOp; -import org.apache.kafka.clients.admin.ConfigEntry; -import org.apache.kafka.clients.admin.FeatureUpdate; -import org.apache.kafka.clients.admin.NewTopic; -import org.apache.kafka.clients.admin.TopicDescription; -import org.apache.kafka.clients.admin.UpdateFeaturesOptions; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.Node; -import org.apache.kafka.common.TopicPartitionInfo; -import org.apache.kafka.common.config.ConfigResource; -import org.apache.kafka.common.config.TopicConfig; -import org.apache.kafka.common.security.auth.SecurityProtocol; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.server.common.EligibleLeaderReplicasVersion; -import org.apache.kafka.server.common.MetadataVersion; -import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler; - -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInfo; - -import java.io.File; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.function.BiFunction; -import java.util.stream.Collectors; - -import scala.collection.JavaConverters; -import scala.collection.Seq; -import scala.collection.mutable.HashMap; - -import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class EligibleLeaderReplicasIntegrationTest extends KafkaServerTestHarness implements Logging { - private String bootstrapServer; - private String testTopicName; - private Admin adminClient; - - @Override - public MetadataVersion metadataVersion() { - return MetadataVersion.IBP_4_0_IV1; - } - - @Override - public Seq<KafkaConfig> generateConfigs() { - List<Properties> brokerConfigs = new ArrayList<>(); - brokerConfigs.addAll(scala.collection.JavaConverters.seqAsJavaList(TestUtils.createBrokerConfigs( - 5, // The tests require 4 brokers to host the partition. However, we need the 5th broker to handle the admin client requests. - true, - true, - scala.Option.<SecurityProtocol>empty(), - scala.Option.<File>empty(), - scala.Option.<Properties>empty(), - true, - false, - false, - false, - new HashMap<>(), - 1, - false, - 1, - (short) 4, - 0, - false - ))); - List<KafkaConfig> configs = new ArrayList<>(); - for (Properties props : brokerConfigs) { - configs.add(KafkaConfig.fromProps(props)); - } - return JavaConverters.asScalaBuffer(configs).toSeq(); - } - - @BeforeEach - @Override - public void setUp(TestInfo info) { - super.setUp(info); - // create adminClient - Properties props = new Properties(); - bootstrapServer = bootstrapServers(listenerName()); - props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); - adminClient = Admin.create(props); - adminClient.updateFeatures( - Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, - new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)), - new UpdateFeaturesOptions() - ); - testTopicName = String.format("%s-%s", info.getTestMethod().get().getName(), "ELR-test"); - } - - @AfterEach - public void close() throws Exception { - if (adminClient != null) adminClient.close(); - } - - @Test - public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionException, InterruptedException { - adminClient.createTopics( - List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); - TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000); - - ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); - Collection<AlterConfigOp> ops = new ArrayList<>(); - ops.add(new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), AlterConfigOp.OpType.SET)); - Map<ConfigResource, Collection<AlterConfigOp>> configOps = Map.of(configResource, ops); - // alter configs on target cluster - adminClient.incrementalAlterConfigs(configOps).all().get(); - Producer producer = null; - Consumer consumer = null; - try { - TopicDescription testTopicDescription = adminClient.describeTopics(List.of(testTopicName)) - .allTopicNames().get().get(testTopicName); - TopicPartitionInfo topicPartitionInfo = testTopicDescription.partitions().get(0); - List<Node> initialReplicas = topicPartitionInfo.replicas(); - assertEquals(4, topicPartitionInfo.isr().size()); - assertEquals(0, topicPartitionInfo.elr().size()); - assertEquals(0, topicPartitionInfo.lastKnownElr().size()); - - Properties producerProps = new Properties(); - producerProps.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - producerProps.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); - // Use Ack=1 for the producer. - producerProps.put(ProducerConfig.ACKS_CONFIG, "1"); - producer = new KafkaProducer(producerProps); - - Properties consumerProps = new Properties(); - consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); - consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); - consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "10"); - consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - consumerProps.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - consumerProps.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - consumer = new KafkaConsumer<>(consumerProps); - consumer.subscribe(Set.of(testTopicName)); - - producer.send(new ProducerRecord<>(testTopicName, "0", "0")).get(); - waitUntilOneMessageIsConsumed(consumer); - - killBroker(initialReplicas.get(0).id()); - killBroker(initialReplicas.get(1).id()); - - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 2 && elrSize == 1; - }); - - // Now the partition is under min ISR. HWM should not advance. - producer.send(new ProducerRecord<>(testTopicName, "1", "1")).get(); - Thread.sleep(100); - assertEquals(0, consumer.poll(Duration.ofSeconds(1L)).count()); - - // Restore the min ISR and the previous log should be visible. - startBroker(initialReplicas.get(1).id()); - startBroker(initialReplicas.get(0).id()); - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 4 && elrSize == 0; - }); - - waitUntilOneMessageIsConsumed(consumer); - } finally { - restartDeadBrokers(false); - if (consumer != null) consumer.close(); - if (producer != null) producer.close(); - } - } - - void waitUntilOneMessageIsConsumed(Consumer consumer) { - TestUtils.waitUntilTrue( - () -> { - try { - ConsumerRecords record = consumer.poll(Duration.ofMillis(100L)); - return record.count() >= 1; - } catch (Exception e) { - return false; - } - }, - () -> "fail to consume messages", - DEFAULT_MAX_WAIT_MS, 100L - ); - } - - @Test - public void testElrMemberCanBeElected() throws ExecutionException, InterruptedException { - adminClient.createTopics( - List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); - TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000); - - ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); - Collection<AlterConfigOp> ops = new ArrayList<>(); - ops.add(new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), AlterConfigOp.OpType.SET)); - Map<ConfigResource, Collection<AlterConfigOp>> configOps = Map.of(configResource, ops); - // alter configs on target cluster - adminClient.incrementalAlterConfigs(configOps).all().get(); - - try { - TopicDescription testTopicDescription = adminClient.describeTopics(List.of(testTopicName)) - .allTopicNames().get().get(testTopicName); - TopicPartitionInfo topicPartitionInfo = testTopicDescription.partitions().get(0); - List<Node> initialReplicas = topicPartitionInfo.replicas(); - assertEquals(4, topicPartitionInfo.isr().size()); - assertEquals(0, topicPartitionInfo.elr().size()); - assertEquals(0, topicPartitionInfo.lastKnownElr().size()); - - killBroker(initialReplicas.get(0).id()); - killBroker(initialReplicas.get(1).id()); - killBroker(initialReplicas.get(2).id()); - - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 1 && elrSize == 2; - }); - - killBroker(initialReplicas.get(3).id()); - - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 0 && elrSize == 3; - }); - - topicPartitionInfo = adminClient.describeTopics(List.of(testTopicName)) - .allTopicNames().get().get(testTopicName).partitions().get(0); - assertEquals(1, topicPartitionInfo.lastKnownElr().size(), topicPartitionInfo.toString()); - int expectLastKnownLeader = initialReplicas.get(3).id(); - assertEquals(expectLastKnownLeader, topicPartitionInfo.lastKnownElr().get(0).id(), topicPartitionInfo.toString()); - - // At this point, all the replicas are failed and the last know leader is No.3 and 3 members in the ELR. - // Restart one broker of the ELR and it should be the leader. - - int expectLeader = topicPartitionInfo.elr().stream() - .filter(node -> node.id() != expectLastKnownLeader).toList().get(0).id(); - - startBroker(expectLeader); - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 1 && elrSize == 2; - }); - - topicPartitionInfo = adminClient.describeTopics(List.of(testTopicName)) - .allTopicNames().get().get(testTopicName).partitions().get(0); - assertEquals(0, topicPartitionInfo.lastKnownElr().size(), topicPartitionInfo.toString()); - assertEquals(expectLeader, topicPartitionInfo.leader().id(), topicPartitionInfo.toString()); - - // Start another 2 brokers and the ELR fields should be cleaned. - topicPartitionInfo.replicas().stream().filter(node -> node.id() != expectLeader).limit(2) - .forEach(node -> startBroker(node.id())); - - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 3 && elrSize == 0; - }); - - topicPartitionInfo = adminClient.describeTopics(List.of(testTopicName)) - .allTopicNames().get().get(testTopicName).partitions().get(0); - assertEquals(0, topicPartitionInfo.lastKnownElr().size(), topicPartitionInfo.toString()); - assertEquals(expectLeader, topicPartitionInfo.leader().id(), topicPartitionInfo.toString()); - } finally { - restartDeadBrokers(false); - } - } - - @Test - public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionException, InterruptedException { - adminClient.createTopics( - List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); - TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000); - - ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); - Collection<AlterConfigOp> ops = new ArrayList<>(); - ops.add(new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), AlterConfigOp.OpType.SET)); - Map<ConfigResource, Collection<AlterConfigOp>> configOps = Map.of(configResource, ops); - // alter configs on target cluster - adminClient.incrementalAlterConfigs(configOps).all().get(); - - try { - TopicDescription testTopicDescription = adminClient.describeTopics(List.of(testTopicName)) - .allTopicNames().get().get(testTopicName); - TopicPartitionInfo topicPartitionInfo = testTopicDescription.partitions().get(0); - List<Node> initialReplicas = topicPartitionInfo.replicas(); - assertEquals(4, topicPartitionInfo.isr().size()); - assertEquals(0, topicPartitionInfo.elr().size()); - assertEquals(0, topicPartitionInfo.lastKnownElr().size()); - - killBroker(initialReplicas.get(0).id()); - killBroker(initialReplicas.get(1).id()); - killBroker(initialReplicas.get(2).id()); - killBroker(initialReplicas.get(3).id()); - - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 0 && elrSize == 3; - }); - topicPartitionInfo = adminClient.describeTopics(List.of(testTopicName)) - .allTopicNames().get().get(testTopicName).partitions().get(0); - - int brokerToBeUncleanShutdown = topicPartitionInfo.elr().get(0).id(); - KafkaBroker broker = brokers().find(b -> { - return b.config().brokerId() == brokerToBeUncleanShutdown; - }).get(); - Seq<File> dirs = broker.logManager().liveLogDirs(); - assertEquals(1, dirs.size()); - CleanShutdownFileHandler handler = new CleanShutdownFileHandler(dirs.apply(0).toString()); - assertTrue(handler.exists()); - assertDoesNotThrow(() -> handler.delete()); - - // After remove the clean shutdown file, the broker should report unclean shutdown during restart. - startBroker(brokerToBeUncleanShutdown); - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 0 && elrSize == 2; - }); - topicPartitionInfo = adminClient.describeTopics(List.of(testTopicName)) - .allTopicNames().get().get(testTopicName).partitions().get(0); - assertNull(topicPartitionInfo.leader()); - assertEquals(1, topicPartitionInfo.lastKnownElr().size()); - } finally { - restartDeadBrokers(false); - } - } - - /* - This test is only valid for KIP-966 part 1. When the unclean recovery is implemented, it should be removed. - */ - @Test - public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionException, InterruptedException { - adminClient.createTopics( - List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); - TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000); - - ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); - Collection<AlterConfigOp> ops = new ArrayList<>(); - ops.add(new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), AlterConfigOp.OpType.SET)); - Map<ConfigResource, Collection<AlterConfigOp>> configOps = Map.of(configResource, ops); - // alter configs on target cluster - adminClient.incrementalAlterConfigs(configOps).all().get(); - - try { - TopicDescription testTopicDescription = adminClient.describeTopics(List.of(testTopicName)) - .allTopicNames().get().get(testTopicName); - TopicPartitionInfo topicPartitionInfo = testTopicDescription.partitions().get(0); - List<Node> initialReplicas = topicPartitionInfo.replicas(); - assertEquals(4, topicPartitionInfo.isr().size()); - assertEquals(0, topicPartitionInfo.elr().size()); - assertEquals(0, topicPartitionInfo.lastKnownElr().size()); - - killBroker(initialReplicas.get(0).id()); - killBroker(initialReplicas.get(1).id()); - killBroker(initialReplicas.get(2).id()); - killBroker(initialReplicas.get(3).id()); - - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 0 && elrSize == 3; - }); - topicPartitionInfo = adminClient.describeTopics(List.of(testTopicName)) - .allTopicNames().get().get(testTopicName).partitions().get(0); - int lastKnownLeader = topicPartitionInfo.lastKnownElr().get(0).id(); - - Set<Integer> initialReplicaSet = initialReplicas.stream().map(node -> node.id()).collect(Collectors.toSet()); - brokers().foreach(broker -> { - if (initialReplicaSet.contains(broker.config().brokerId())) { - Seq<File> dirs = broker.logManager().liveLogDirs(); - assertEquals(1, dirs.size()); - CleanShutdownFileHandler handler = new CleanShutdownFileHandler(dirs.apply(0).toString()); - assertDoesNotThrow(() -> handler.delete()); - } - return true; - }); - - - // After remove the clean shutdown file, the broker should report unclean shutdown during restart. - topicPartitionInfo.replicas().forEach(replica -> { - if (replica.id() != lastKnownLeader) startBroker(replica.id()); - }); - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize == 0 && elrSize == 1; - }); - topicPartitionInfo = adminClient.describeTopics(List.of(testTopicName)) - .allTopicNames().get().get(testTopicName).partitions().get(0); - assertNull(topicPartitionInfo.leader()); - assertEquals(1, topicPartitionInfo.lastKnownElr().size()); - - // Now if the last known leader goes through unclean shutdown, it will still be elected. - startBroker(lastKnownLeader); - waitForIsrAndElr((isrSize, elrSize) -> { - return isrSize > 0 && elrSize == 0; - }); - - TestUtils.waitUntilTrue( - () -> { - try { - TopicPartitionInfo partition = adminClient.describeTopics(List.of(testTopicName)) - .allTopicNames().get().get(testTopicName).partitions().get(0); - if (partition.leader() == null) return false; - return partition.lastKnownElr().isEmpty() && partition.elr().isEmpty() && partition.leader().id() == lastKnownLeader; - } catch (Exception e) { - return false; - } - }, - () -> String.format("Partition metadata for %s is not correct", testTopicName), - DEFAULT_MAX_WAIT_MS, 100L - ); - } finally { - restartDeadBrokers(false); - } - } - - void waitForIsrAndElr(BiFunction<Integer, Integer, Boolean> isIsrAndElrSizeSatisfied) { - TestUtils.waitUntilTrue( - () -> { - try { - TopicDescription topicDescription = adminClient.describeTopics(List.of(testTopicName)) - .allTopicNames().get().get(testTopicName); - TopicPartitionInfo partition = topicDescription.partitions().get(0); - return isIsrAndElrSizeSatisfied.apply(partition.isr().size(), partition.elr().size()); - } catch (Exception e) { - return false; - } - }, - () -> String.format("Partition metadata for %s is not propagated", testTopicName), - DEFAULT_MAX_WAIT_MS, 100L); - } -} diff --git a/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java new file mode 100644 index 00000000000..61863688f2e --- /dev/null +++ b/server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.FeatureUpdate; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.admin.UpdateFeaturesOptions; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Type; +import org.apache.kafka.server.common.EligibleLeaderReplicasVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.config.ReplicationConfigs; +import org.apache.kafka.server.config.ServerConfigs; +import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler; +import org.apache.kafka.test.TestUtils; + +import java.io.File; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.function.BiFunction; +import java.util.stream.Collectors; + +import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@ClusterTestDefaults( + brokers = 5, + serverProperties = { + @ClusterConfigProperty(key = ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, value = "true"), + @ClusterConfigProperty(key = ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, value = "true"), + @ClusterConfigProperty(key = ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, value = "4") + } +) +public class EligibleLeaderReplicasIntegrationTest { + private final ClusterInstance clusterInstance; + + EligibleLeaderReplicasIntegrationTest(ClusterInstance clusterInstance) { + this.clusterInstance = clusterInstance; + } + + @ClusterTest(types = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_4_0_IV1) + public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionException, InterruptedException { + try (var admin = clusterInstance.admin(); + var producer = clusterInstance.producer(Map.of( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers(), + ProducerConfig.ACKS_CONFIG, "1")); + var consumer = clusterInstance.consumer(Map.of( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers(), + ConsumerConfig.GROUP_ID_CONFIG, "test", + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "10", + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest", + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(), + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()))) { + String testTopicName = String.format("%s-%s", "testHighWatermarkShouldNotAdvanceIfUnderMinIsr", "ELR-test"); + admin.updateFeatures( + Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, + new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)), + new UpdateFeaturesOptions()).all().get(); + + admin.createTopics(List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); + clusterInstance.waitTopicCreation(testTopicName, 1); + + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); + Collection<AlterConfigOp> ops = new ArrayList<>(); + ops.add(new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), AlterConfigOp.OpType.SET)); + Map<ConfigResource, Collection<AlterConfigOp>> configOps = Map.of(configResource, ops); + // alter configs on target cluster + admin.incrementalAlterConfigs(configOps).all().get(); + + TopicDescription testTopicDescription = admin.describeTopics(List.of(testTopicName)) + .allTopicNames().get().get(testTopicName); + TopicPartitionInfo topicPartitionInfo = testTopicDescription.partitions().get(0); + List<Node> initialReplicas = topicPartitionInfo.replicas(); + assertEquals(4, topicPartitionInfo.isr().size()); + assertEquals(0, topicPartitionInfo.elr().size()); + assertEquals(0, topicPartitionInfo.lastKnownElr().size()); + + consumer.subscribe(Set.of(testTopicName)); + producer.send(new ProducerRecord<>(testTopicName, "0", "0")).get(); + waitUntilOneMessageIsConsumed(consumer); + + clusterInstance.shutdownBroker(initialReplicas.get(0).id()); + clusterInstance.shutdownBroker(initialReplicas.get(1).id()); + + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 2 && elrSize == 1, admin, testTopicName); + + TopicPartition partition = new TopicPartition(testTopicName, 0); + long leoBeforeSend = admin.listOffsets(Map.of(partition, OffsetSpec.latest())).partitionResult(partition).get().offset(); + // Now the partition is under min ISR. HWM should not advance. + producer.send(new ProducerRecord<>(testTopicName, "1", "1")).get(); + long leoAfterSend = admin.listOffsets(Map.of(partition, OffsetSpec.latest())).partitionResult(partition).get().offset(); + assertEquals(leoBeforeSend, leoAfterSend); + + // Restore the min ISR and the previous log should be visible. + clusterInstance.startBroker(initialReplicas.get(1).id()); + clusterInstance.startBroker(initialReplicas.get(0).id()); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 4 && elrSize == 0, admin, testTopicName); + + waitUntilOneMessageIsConsumed(consumer); + } + } + + void waitUntilOneMessageIsConsumed(Consumer<?, ?> consumer) throws InterruptedException { + TestUtils.waitForCondition( + () -> { + try { + return consumer.poll(Duration.ofMillis(100L)).count() >= 1; + } catch (Exception e) { + return false; + } + }, + DEFAULT_MAX_WAIT_MS, + () -> "fail to consume messages" + ); + } + + @ClusterTest(types = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_4_0_IV1) + public void testElrMemberCanBeElected() throws ExecutionException, InterruptedException { + try (var admin = clusterInstance.admin()) { + String testTopicName = String.format("%s-%s", "testElrMemberCanBeElected", "ELR-test"); + + admin.updateFeatures( + Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, + new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)), + new UpdateFeaturesOptions()).all().get(); + admin.createTopics(List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); + clusterInstance.waitTopicCreation(testTopicName, 1); + + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); + Collection<AlterConfigOp> ops = new ArrayList<>(); + ops.add(new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), AlterConfigOp.OpType.SET)); + Map<ConfigResource, Collection<AlterConfigOp>> configOps = Map.of(configResource, ops); + // alter configs on target cluster + admin.incrementalAlterConfigs(configOps).all().get(); + + TopicDescription testTopicDescription = admin.describeTopics(List.of(testTopicName)) + .allTopicNames().get().get(testTopicName); + TopicPartitionInfo topicPartitionInfo = testTopicDescription.partitions().get(0); + List<Node> initialReplicas = topicPartitionInfo.replicas(); + assertEquals(4, topicPartitionInfo.isr().size()); + assertEquals(0, topicPartitionInfo.elr().size()); + assertEquals(0, topicPartitionInfo.lastKnownElr().size()); + + clusterInstance.shutdownBroker(initialReplicas.get(0).id()); + clusterInstance.shutdownBroker(initialReplicas.get(1).id()); + clusterInstance.shutdownBroker(initialReplicas.get(2).id()); + + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 1 && elrSize == 2, admin, testTopicName); + + clusterInstance.shutdownBroker(initialReplicas.get(3).id()); + + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 3, admin, testTopicName); + + topicPartitionInfo = admin.describeTopics(List.of(testTopicName)) + .allTopicNames().get().get(testTopicName).partitions().get(0); + assertEquals(1, topicPartitionInfo.lastKnownElr().size(), topicPartitionInfo.toString()); + int expectLastKnownLeader = initialReplicas.get(3).id(); + assertEquals(expectLastKnownLeader, topicPartitionInfo.lastKnownElr().get(0).id(), topicPartitionInfo.toString()); + + // At this point, all the replicas are failed and the last know leader is No.3 and 3 members in the ELR. + // Restart one broker of the ELR and it should be the leader. + + int expectLeader = topicPartitionInfo.elr().stream() + .filter(node -> node.id() != expectLastKnownLeader).toList().get(0).id(); + + clusterInstance.startBroker(expectLeader); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 1 && elrSize == 2, admin, testTopicName); + + topicPartitionInfo = admin.describeTopics(List.of(testTopicName)) + .allTopicNames().get().get(testTopicName).partitions().get(0); + assertEquals(0, topicPartitionInfo.lastKnownElr().size(), topicPartitionInfo.toString()); + assertEquals(expectLeader, topicPartitionInfo.leader().id(), topicPartitionInfo.toString()); + + // Start another 2 brokers and the ELR fields should be cleaned. + topicPartitionInfo.replicas().stream().filter(node -> node.id() != expectLeader).limit(2) + .forEach(node -> clusterInstance.startBroker(node.id())); + + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 3 && elrSize == 0, admin, testTopicName); + + topicPartitionInfo = admin.describeTopics(List.of(testTopicName)) + .allTopicNames().get().get(testTopicName).partitions().get(0); + assertEquals(0, topicPartitionInfo.lastKnownElr().size(), topicPartitionInfo.toString()); + assertEquals(expectLeader, topicPartitionInfo.leader().id(), topicPartitionInfo.toString()); + } + } + + @ClusterTest(types = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_4_0_IV1) + public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionException, InterruptedException { + try (var admin = clusterInstance.admin()) { + String testTopicName = String.format("%s-%s", "testElrMemberShouldBeKickOutWhenUncleanShutdown", "ELR-test"); + + admin.updateFeatures( + Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, + new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)), + new UpdateFeaturesOptions()).all().get(); + admin.createTopics(List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); + clusterInstance.waitTopicCreation(testTopicName, 1); + + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); + Collection<AlterConfigOp> ops = new ArrayList<>(); + ops.add(new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), AlterConfigOp.OpType.SET)); + Map<ConfigResource, Collection<AlterConfigOp>> configOps = Map.of(configResource, ops); + // alter configs on target cluster + admin.incrementalAlterConfigs(configOps).all().get(); + + TopicDescription testTopicDescription = admin.describeTopics(List.of(testTopicName)) + .allTopicNames().get().get(testTopicName); + TopicPartitionInfo topicPartitionInfo = testTopicDescription.partitions().get(0); + List<Node> initialReplicas = topicPartitionInfo.replicas(); + assertEquals(4, topicPartitionInfo.isr().size()); + assertEquals(0, topicPartitionInfo.elr().size()); + assertEquals(0, topicPartitionInfo.lastKnownElr().size()); + + clusterInstance.shutdownBroker(initialReplicas.get(0).id()); + clusterInstance.shutdownBroker(initialReplicas.get(1).id()); + clusterInstance.shutdownBroker(initialReplicas.get(2).id()); + clusterInstance.shutdownBroker(initialReplicas.get(3).id()); + + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 3, admin, testTopicName); + topicPartitionInfo = admin.describeTopics(List.of(testTopicName)) + .allTopicNames().get().get(testTopicName).partitions().get(0); + + int brokerToBeUncleanShutdown = topicPartitionInfo.elr().get(0).id(); + var broker = clusterInstance.brokers().values().stream().filter(b -> b.config().brokerId() == brokerToBeUncleanShutdown) + .findFirst().get(); + List<File> dirs = new ArrayList<>(); + broker.logManager().liveLogDirs().foreach(dirs::add); + assertEquals(1, dirs.size()); + CleanShutdownFileHandler handler = new CleanShutdownFileHandler(dirs.get(0).toString()); + assertTrue(handler.exists()); + assertDoesNotThrow(handler::delete); + + // After remove the clean shutdown file, the broker should report unclean shutdown during restart. + clusterInstance.startBroker(brokerToBeUncleanShutdown); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 2, admin, testTopicName); + topicPartitionInfo = admin.describeTopics(List.of(testTopicName)) + .allTopicNames().get().get(testTopicName).partitions().get(0); + assertNull(topicPartitionInfo.leader()); + assertEquals(1, topicPartitionInfo.lastKnownElr().size()); + } + } + + /* + This test is only valid for KIP-966 part 1. When the unclean recovery is implemented, it should be removed. + */ + @ClusterTest(types = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_4_0_IV1) + public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionException, InterruptedException { + try (var admin = clusterInstance.admin()) { + String testTopicName = String.format("%s-%s", "testLastKnownLeaderShouldBeElectedIfEmptyElr", "ELR-test"); + + admin.updateFeatures( + Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, + new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)), + new UpdateFeaturesOptions()).all().get(); + admin.createTopics(List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get(); + clusterInstance.waitTopicCreation(testTopicName, 1); + + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); + Collection<AlterConfigOp> ops = new ArrayList<>(); + ops.add(new AlterConfigOp(new ConfigEntry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "3"), AlterConfigOp.OpType.SET)); + Map<ConfigResource, Collection<AlterConfigOp>> configOps = Map.of(configResource, ops); + // alter configs on target cluster + admin.incrementalAlterConfigs(configOps).all().get(); + + + TopicDescription testTopicDescription = admin.describeTopics(List.of(testTopicName)) + .allTopicNames().get().get(testTopicName); + TopicPartitionInfo topicPartitionInfo = testTopicDescription.partitions().get(0); + List<Node> initialReplicas = topicPartitionInfo.replicas(); + assertEquals(4, topicPartitionInfo.isr().size()); + assertEquals(0, topicPartitionInfo.elr().size()); + assertEquals(0, topicPartitionInfo.lastKnownElr().size()); + + clusterInstance.shutdownBroker(initialReplicas.get(0).id()); + clusterInstance.shutdownBroker(initialReplicas.get(1).id()); + clusterInstance.shutdownBroker(initialReplicas.get(2).id()); + clusterInstance.shutdownBroker(initialReplicas.get(3).id()); + + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 3, admin, testTopicName); + topicPartitionInfo = admin.describeTopics(List.of(testTopicName)) + .allTopicNames().get().get(testTopicName).partitions().get(0); + int lastKnownLeader = topicPartitionInfo.lastKnownElr().get(0).id(); + + Set<Integer> initialReplicaSet = initialReplicas.stream().map(node -> node.id()).collect(Collectors.toSet()); + clusterInstance.brokers().forEach((id, broker) -> { + if (initialReplicaSet.contains(id)) { + List<File> dirs = new ArrayList<>(); + broker.logManager().liveLogDirs().foreach(dirs::add); + assertEquals(1, dirs.size()); + CleanShutdownFileHandler handler = new CleanShutdownFileHandler(dirs.get(0).toString()); + assertDoesNotThrow(handler::delete); + } + }); + + // After remove the clean shutdown file, the broker should report unclean shutdown during restart. + topicPartitionInfo.replicas().forEach(replica -> { + if (replica.id() != lastKnownLeader) clusterInstance.startBroker(replica.id()); + }); + waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 1, admin, testTopicName); + topicPartitionInfo = admin.describeTopics(List.of(testTopicName)) + .allTopicNames().get().get(testTopicName).partitions().get(0); + assertNull(topicPartitionInfo.leader()); + assertEquals(1, topicPartitionInfo.lastKnownElr().size()); + + // Now if the last known leader goes through unclean shutdown, it will still be elected. + clusterInstance.startBroker(lastKnownLeader); + waitForIsrAndElr((isrSize, elrSize) -> isrSize > 0 && elrSize == 0, admin, testTopicName); + TestUtils.waitForCondition( + () -> { + try { + TopicPartitionInfo partition = admin.describeTopics(List.of(testTopicName)) + .allTopicNames().get().get(testTopicName).partitions().get(0); + if (partition.leader() == null) return false; + return partition.lastKnownElr().isEmpty() && partition.elr().isEmpty() && partition.leader().id() == lastKnownLeader; + } catch (Exception e) { + return false; + } + }, + DEFAULT_MAX_WAIT_MS, + () -> String.format("Partition metadata for %s is not correct", testTopicName) + ); + } + } + + void waitForIsrAndElr(BiFunction<Integer, Integer, Boolean> isIsrAndElrSizeSatisfied, Admin admin, String testTopicName) throws InterruptedException { + TestUtils.waitForCondition( + () -> { + try { + TopicDescription topicDescription = admin.describeTopics(List.of(testTopicName)) + .allTopicNames().get().get(testTopicName); + TopicPartitionInfo partition = topicDescription.partitions().get(0); + return isIsrAndElrSizeSatisfied.apply(partition.isr().size(), partition.elr().size()); + } catch (Exception e) { + return false; + } + }, + DEFAULT_MAX_WAIT_MS, + () -> String.format("Partition metadata for %s is not propagated", testTopicName) + ); + } +}