This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 3.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push: new fe7bced KAFKA-13173; Ensure KRaft controller handles concurrent broker expirations correctly (#11191) fe7bced is described below commit fe7bced3f690b96fc1e973ee6867989d8a2dcd5e Author: Niket <niket-g...@users.noreply.github.com> AuthorDate: Thu Aug 12 09:38:44 2021 -0700 KAFKA-13173; Ensure KRaft controller handles concurrent broker expirations correctly (#11191) Prior to this patch, the controller did not accumulate ISR/leader changes correctly when multiple broker's sessions expired at the same time. This patch fixes the problem by having the controller handle one expiration at a time. Reviewers: Luke Chen <show...@gmail.com>, Jason Gustafson <ja...@confluent.io> --- .../kafka/controller/BrokerHeartbeatManager.java | 21 ++--- .../kafka/controller/ClusterControlManager.java | 10 ++ .../apache/kafka/controller/QuorumController.java | 4 +- .../controller/ReplicationControlManager.java | 15 ++- .../controller/BrokerHeartbeatManagerTest.java | 24 +++-- .../kafka/controller/QuorumControllerTest.java | 104 +++++++++++++++++++++ .../kafka/controller/QuorumControllerTestEnv.java | 24 ++++- .../controller/ReplicationControlManagerTest.java | 55 ++++++++++- 8 files changed, 227 insertions(+), 30 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java index f5ed4a2..b95f0d3 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java @@ -24,7 +24,6 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.metadata.UsableBroker; import org.slf4j.Logger; -import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; import java.util.HashMap; @@ -420,22 +419,22 @@ public class BrokerHeartbeatManager { } /** - * Find the stale brokers which haven't heartbeated in a long time, and which need to - * be fenced. + * Check if the oldest broker to have hearbeated has already violated the + * sessionTimeoutNs timeout and needs to be fenced. * - * @return A list of node IDs. + * @return An Optional broker node id. */ - List<Integer> findStaleBrokers() { - List<Integer> nodes = new ArrayList<>(); + Optional<Integer> findOneStaleBroker() { BrokerHeartbeatStateIterator iterator = unfenced.iterator(); - while (iterator.hasNext()) { + if (iterator.hasNext()) { BrokerHeartbeatState broker = iterator.next(); - if (hasValidSession(broker)) { - break; + // The unfenced list is sorted on last contact time from each + // broker. If the first broker is not stale, then none is. + if (!hasValidSession(broker)) { + return Optional.of(broker.id); } - nodes.add(broker.id); } - return nodes; + return Optional.empty(); } /** diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java index cabe7b7..c04a8fb 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java @@ -17,6 +17,8 @@ package org.apache.kafka.controller; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.kafka.common.Endpoint; import org.apache.kafka.common.errors.DuplicateBrokerRegistrationException; import org.apache.kafka.common.errors.StaleBrokerEpochException; @@ -161,6 +163,14 @@ public class ClusterControlManager { return brokerRegistrations; } + Set<Integer> fencedBrokerIds() { + return brokerRegistrations.values() + .stream() + .filter(BrokerRegistration::fenced) + .map(BrokerRegistration::id) + .collect(Collectors.toSet()); + } + /** * Process an incoming broker registration request. */ diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 7c1215d..d2ff546 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -858,7 +858,9 @@ public final class QuorumController implements Controller { return; } scheduleDeferredWriteEvent(MAYBE_FENCE_REPLICAS, nextCheckTimeNs, () -> { - ControllerResult<Void> result = replicationControl.maybeFenceStaleBrokers(); + ControllerResult<Void> result = replicationControl.maybeFenceOneStaleBroker(); + // This following call ensures that if there are multiple brokers that + // are currently stale, then fencing for them is scheduled immediately rescheduleMaybeFenceStaleBrokers(); return result; }); diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index d9d3e9d..3066f4b 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -925,18 +925,25 @@ public class ReplicationControlManager { return ControllerResult.of(records, null); } - ControllerResult<Void> maybeFenceStaleBrokers() { + ControllerResult<Void> maybeFenceOneStaleBroker() { List<ApiMessageAndVersion> records = new ArrayList<>(); BrokerHeartbeatManager heartbeatManager = clusterControl.heartbeatManager(); - List<Integer> staleBrokers = heartbeatManager.findStaleBrokers(); - for (int brokerId : staleBrokers) { + heartbeatManager.findOneStaleBroker().ifPresent(brokerId -> { + // Even though multiple brokers can go stale at a time, we will process + // fencing one at a time so that the effect of fencing each broker is visible + // to the system prior to processing the next one log.info("Fencing broker {} because its session has timed out.", brokerId); handleBrokerFenced(brokerId, records); heartbeatManager.fence(brokerId); - } + }); return ControllerResult.of(records, null); } + // Visible for testing + Boolean isBrokerUnfenced(int brokerId) { + return clusterControl.unfenced(brokerId); + } + ControllerResult<List<CreatePartitionsTopicResult>> createPartitions(List<CreatePartitionsTopic> topics) { List<ApiMessageAndVersion> records = new ArrayList<>(); diff --git a/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java index 6c8ef7e..c5c46ab 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java @@ -17,7 +17,6 @@ package org.apache.kafka.controller; -import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; @@ -78,7 +77,7 @@ public class BrokerHeartbeatManagerTest { } @Test - public void testFindStaleBrokers() { + public void testFindOneStaleBroker() { BrokerHeartbeatManager manager = newBrokerHeartbeatManager(); MockTime time = (MockTime) manager.time(); assertFalse(manager.hasValidSession(0)); @@ -93,22 +92,24 @@ public class BrokerHeartbeatManagerTest { assertEquals(1, iter.next().id()); assertEquals(2, iter.next().id()); assertFalse(iter.hasNext()); - assertEquals(Collections.emptyList(), manager.findStaleBrokers()); + assertEquals(Optional.empty(), manager.findOneStaleBroker()); time.sleep(5); - assertEquals(Collections.singletonList(0), manager.findStaleBrokers()); + assertEquals(Optional.of(0), manager.findOneStaleBroker()); manager.fence(0); - assertEquals(Collections.emptyList(), manager.findStaleBrokers()); + assertEquals(Optional.empty(), manager.findOneStaleBroker()); iter = manager.unfenced().iterator(); assertEquals(1, iter.next().id()); assertEquals(2, iter.next().id()); assertFalse(iter.hasNext()); time.sleep(20); - assertEquals(Arrays.asList(1, 2), manager.findStaleBrokers()); + assertEquals(Optional.of(1), manager.findOneStaleBroker()); manager.fence(1); + assertEquals(Optional.of(2), manager.findOneStaleBroker()); manager.fence(2); - assertEquals(Collections.emptyList(), manager.findStaleBrokers()); + + assertEquals(Optional.empty(), manager.findOneStaleBroker()); iter = manager.unfenced().iterator(); assertFalse(iter.hasNext()); } @@ -125,17 +126,20 @@ public class BrokerHeartbeatManagerTest { manager.touch(2, false, 0); time.sleep(1); manager.touch(3, false, 0); - assertEquals(Collections.emptyList(), manager.findStaleBrokers()); + assertEquals(Optional.empty(), manager.findOneStaleBroker()); assertEquals(10_000_000, manager.nextCheckTimeNs()); time.sleep(7); assertEquals(10_000_000, manager.nextCheckTimeNs()); - assertEquals(Collections.singletonList(0), manager.findStaleBrokers()); + assertEquals(Optional.of(0), manager.findOneStaleBroker()); manager.fence(0); assertEquals(12_000_000, manager.nextCheckTimeNs()); + time.sleep(3); - assertEquals(Arrays.asList(1, 2), manager.findStaleBrokers()); + assertEquals(Optional.of(1), manager.findOneStaleBroker()); manager.fence(1); + assertEquals(Optional.of(2), manager.findOneStaleBroker()); manager.fence(2); + assertEquals(14_000_000, manager.nextCheckTimeNs()); } diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java index ad92e2d..c274de6 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java @@ -79,6 +79,7 @@ import org.apache.kafka.raft.Batch; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.snapshot.RawSnapshotReader; import org.apache.kafka.snapshot.SnapshotReader; +import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -174,6 +175,89 @@ public class QuorumControllerTest { } @Test + public void testFenceMultipleBrokers() throws Throwable { + List<Integer> allBrokers = Arrays.asList(1, 2, 3, 4, 5); + List<Integer> brokersToKeepUnfenced = Arrays.asList(1); + List<Integer> brokersToFence = Arrays.asList(2, 3, 4, 5); + short replicationFactor = 5; + long sessionTimeoutMillis = 1000; + + try ( + LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty()); + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv( + logEnv, b -> b.setConfigDefs(CONFIGS), Optional.of(sessionTimeoutMillis)); + ) { + ListenerCollection listeners = new ListenerCollection(); + listeners.add(new Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092)); + QuorumController active = controlEnv.activeController(); + Map<Integer, Long> brokerEpochs = new HashMap<>(); + + for (Integer brokerId : allBrokers) { + CompletableFuture<BrokerRegistrationReply> reply = active.registerBroker( + new BrokerRegistrationRequestData(). + setBrokerId(brokerId). + setClusterId("06B-K3N1TBCNYFgruEVP0Q"). + setIncarnationId(Uuid.randomUuid()). + setListeners(listeners)); + brokerEpochs.put(brokerId, reply.get().epoch()); + } + + // Brokers are only registered and should still be fenced + allBrokers.forEach(brokerId -> { + assertFalse(active.replicationControl().isBrokerUnfenced(brokerId), + "Broker " + brokerId + " should have been fenced"); + }); + + // Unfence all brokers and create a topic foo + sendBrokerheartbeat(active, allBrokers, brokerEpochs); + CreateTopicsRequestData createTopicsRequestData = new CreateTopicsRequestData().setTopics( + new CreatableTopicCollection(Collections.singleton( + new CreatableTopic().setName("foo").setNumPartitions(1). + setReplicationFactor(replicationFactor)).iterator())); + CreateTopicsResponseData createTopicsResponseData = active.createTopics(createTopicsRequestData).get(); + assertEquals(Errors.NONE, Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode())); + Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId(); + + // Fence some of the brokers + TestUtils.waitForCondition(() -> { + sendBrokerheartbeat(active, brokersToKeepUnfenced, brokerEpochs); + for (Integer brokerId : brokersToFence) { + if (active.replicationControl().isBrokerUnfenced(brokerId)) { + return false; + } + } + return true; + }, sessionTimeoutMillis * 3, + "Fencing of brokers did not process within expected time" + ); + + // Send another heartbeat to the brokers we want to keep alive + sendBrokerheartbeat(active, brokersToKeepUnfenced, brokerEpochs); + + // At this point only the brokers we want fenced should be fenced. + brokersToKeepUnfenced.forEach(brokerId -> { + assertTrue(active.replicationControl().isBrokerUnfenced(brokerId), + "Broker " + brokerId + " should have been unfenced"); + }); + brokersToFence.forEach(brokerId -> { + assertFalse(active.replicationControl().isBrokerUnfenced(brokerId), + "Broker " + brokerId + " should have been fenced"); + }); + + // Verify the isr and leaders for the topic partition + int[] expectedIsr = {1}; + int[] isrFoo = active.replicationControl().getPartition(topicIdFoo, 0).isr; + + assertTrue(Arrays.equals(isrFoo, expectedIsr), + "The ISR for topic foo was " + Arrays.toString(isrFoo) + + ". It is expected to be " + Arrays.toString(expectedIsr)); + + int fooLeader = active.replicationControl().getPartition(topicIdFoo, 0).leader; + assertEquals(expectedIsr[0], fooLeader); + } + } + + @Test public void testUnregisterBroker() throws Throwable { try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) { try (QuorumControllerTestEnv controlEnv = @@ -761,4 +845,24 @@ public class QuorumControllerTest { return brokerEpochs; } + private void sendBrokerheartbeat( + QuorumController controller, + List<Integer> brokers, + Map<Integer, Long> brokerEpochs + ) throws Exception { + if (brokers.isEmpty()) { + return; + } + for (Integer brokerId : brokers) { + BrokerHeartbeatReply reply = controller.processBrokerHeartbeat( + new BrokerHeartbeatRequestData() + .setWantFence(false) + .setBrokerEpoch(brokerEpochs.get(brokerId)) + .setBrokerId(brokerId) + .setCurrentMetadataOffset(100000) + ).get(); + assertEquals(new BrokerHeartbeatReply(true, false, false, false), reply); + } + } + } diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java index bcfc534..7487882 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java @@ -17,6 +17,11 @@ package org.apache.kafka.controller; +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import org.apache.kafka.controller.QuorumController.Builder; import org.apache.kafka.metalog.LocalLogManagerTestEnv; import org.apache.kafka.raft.LeaderAndEpoch; import org.apache.kafka.test.TestUtils; @@ -36,9 +41,18 @@ public class QuorumControllerTestEnv implements AutoCloseable { private final List<QuorumController> controllers; private final LocalLogManagerTestEnv logEnv; - public QuorumControllerTestEnv(LocalLogManagerTestEnv logEnv, - Consumer<QuorumController.Builder> builderConsumer) - throws Exception { + public QuorumControllerTestEnv( + LocalLogManagerTestEnv logEnv, + Consumer<QuorumController.Builder> builderConsumer + ) throws Exception { + this(logEnv, builderConsumer, Optional.empty()); + } + + public QuorumControllerTestEnv( + LocalLogManagerTestEnv logEnv, + Consumer<Builder> builderConsumer, + Optional<Long> sessionTimeoutMillis + ) throws Exception { this.logEnv = logEnv; int numControllers = logEnv.logManagers().size(); this.controllers = new ArrayList<>(numControllers); @@ -46,6 +60,10 @@ public class QuorumControllerTestEnv implements AutoCloseable { for (int i = 0; i < numControllers; i++) { QuorumController.Builder builder = new QuorumController.Builder(i); builder.setRaftClient(logEnv.logManagers().get(i)); + if (sessionTimeoutMillis.isPresent()) { + builder.setSessionTimeoutNs(NANOSECONDS.convert( + sessionTimeoutMillis.get(), TimeUnit.MILLISECONDS)); + } builderConsumer.accept(builder); this.controllers.add(builder.build()); } diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index f67756d..09449d3 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.controller; +import java.util.Optional; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.config.ConfigResource; @@ -61,6 +62,7 @@ import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.metadata.PartitionRegistration; import org.apache.kafka.metadata.RecordTestUtils; import org.apache.kafka.metadata.Replicas; @@ -94,8 +96,10 @@ import static org.apache.kafka.common.protocol.Errors.NO_REASSIGNMENT_IN_PROGRES import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_ID; import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION; import static org.apache.kafka.controller.BrokersToIsrs.TopicIdPartition; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -105,6 +109,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; @Timeout(40) public class ReplicationControlManagerTest { private final static Logger log = LoggerFactory.getLogger(ReplicationControlManagerTest.class); + private final static int BROKER_SESSION_TIMEOUT_MS = 1000; private static class ReplicationControlTestContext { final SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); @@ -112,7 +117,7 @@ public class ReplicationControlManagerTest { final MockTime time = new MockTime(); final MockRandom random = new MockRandom(); final ClusterControlManager clusterControl = new ClusterControlManager( - logContext, time, snapshotRegistry, 1000, + logContext, time, snapshotRegistry, BROKER_SESSION_TIMEOUT_MS, new StripedReplicaPlacer(random)); final ControllerMetrics metrics = new MockControllerMetrics(); final ConfigurationControlManager configurationControl = new ConfigurationControlManager( @@ -208,6 +213,24 @@ public class ReplicationControlManagerTest { } } + void fenceBrokers(Set<Integer> brokerIds) throws Exception { + time.sleep(BROKER_SESSION_TIMEOUT_MS); + + Set<Integer> unfencedBrokerIds = clusterControl.brokerRegistrations().keySet().stream() + .filter(brokerId -> !brokerIds.contains(brokerId)) + .collect(Collectors.toSet()); + unfenceBrokers(unfencedBrokerIds.toArray(new Integer[0])); + + Optional<Integer> staleBroker = clusterControl.heartbeatManager().findOneStaleBroker(); + while (staleBroker.isPresent()) { + ControllerResult<Void> fenceResult = replicationControl.maybeFenceOneStaleBroker(); + replay(fenceResult.records()); + staleBroker = clusterControl.heartbeatManager().findOneStaleBroker(); + } + + assertEquals(brokerIds, clusterControl.fencedBrokerIds()); + } + long currentBrokerEpoch(int brokerId) { Map<Integer, BrokerRegistration> registrations = clusterControl.brokerRegistrations(); BrokerRegistration registration = registrations.get(brokerId); @@ -1030,6 +1053,36 @@ public class ReplicationControlManagerTest { } @Test + public void testFenceMultipleBrokers() throws Exception { + ReplicationControlTestContext ctx = new ReplicationControlTestContext(); + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2, 3, 4); + ctx.unfenceBrokers(0, 1, 2, 3, 4); + + Uuid fooId = ctx.createTestTopic("foo", new int[][]{ + new int[]{1, 2, 3}, new int[]{2, 3, 4}, new int[]{0, 2, 1}}).topicId(); + + assertTrue(ctx.clusterControl.fencedBrokerIds().isEmpty()); + ctx.fenceBrokers(Utils.mkSet(2, 3)); + + PartitionRegistration partition0 = replication.getPartition(fooId, 0); + PartitionRegistration partition1 = replication.getPartition(fooId, 1); + PartitionRegistration partition2 = replication.getPartition(fooId, 2); + + assertArrayEquals(new int[]{1, 2, 3}, partition0.replicas); + assertArrayEquals(new int[]{1}, partition0.isr); + assertEquals(1, partition0.leader); + + assertArrayEquals(new int[]{2, 3, 4}, partition1.replicas); + assertArrayEquals(new int[]{4}, partition1.isr); + assertEquals(4, partition1.leader); + + assertArrayEquals(new int[]{0, 2, 1}, partition2.replicas); + assertArrayEquals(new int[]{0, 1}, partition2.isr); + assertNotEquals(2, partition2.leader); + } + + @Test public void testElectLeaders() throws Exception { ReplicationControlTestContext ctx = new ReplicationControlTestContext(); ReplicationControlManager replication = ctx.replicationControl;