This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new fe7bced  KAFKA-13173; Ensure KRaft controller handles concurrent 
broker expirations correctly (#11191)
fe7bced is described below

commit fe7bced3f690b96fc1e973ee6867989d8a2dcd5e
Author: Niket <niket-g...@users.noreply.github.com>
AuthorDate: Thu Aug 12 09:38:44 2021 -0700

    KAFKA-13173; Ensure KRaft controller handles concurrent broker expirations 
correctly (#11191)
    
    Prior to this patch, the controller did not accumulate ISR/leader changes 
correctly when multiple broker's sessions expired at the same time. This patch 
fixes the problem by having the controller handle one expiration at a time.
    
    Reviewers: Luke Chen <show...@gmail.com>, Jason Gustafson 
<ja...@confluent.io>
---
 .../kafka/controller/BrokerHeartbeatManager.java   |  21 ++---
 .../kafka/controller/ClusterControlManager.java    |  10 ++
 .../apache/kafka/controller/QuorumController.java  |   4 +-
 .../controller/ReplicationControlManager.java      |  15 ++-
 .../controller/BrokerHeartbeatManagerTest.java     |  24 +++--
 .../kafka/controller/QuorumControllerTest.java     | 104 +++++++++++++++++++++
 .../kafka/controller/QuorumControllerTestEnv.java  |  24 ++++-
 .../controller/ReplicationControlManagerTest.java  |  55 ++++++++++-
 8 files changed, 227 insertions(+), 30 deletions(-)

diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
index f5ed4a2..b95f0d3 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.metadata.UsableBroker;
 import org.slf4j.Logger;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -420,22 +419,22 @@ public class BrokerHeartbeatManager {
     }
 
     /**
-     * Find the stale brokers which haven't heartbeated in a long time, and 
which need to
-     * be fenced.
+     * Check if the oldest broker to have hearbeated has already violated the
+     * sessionTimeoutNs timeout and needs to be fenced.
      *
-     * @return      A list of node IDs.
+     * @return      An Optional broker node id.
      */
-    List<Integer> findStaleBrokers() {
-        List<Integer> nodes = new ArrayList<>();
+    Optional<Integer> findOneStaleBroker() {
         BrokerHeartbeatStateIterator iterator = unfenced.iterator();
-        while (iterator.hasNext()) {
+        if (iterator.hasNext()) {
             BrokerHeartbeatState broker = iterator.next();
-            if (hasValidSession(broker)) {
-                break;
+            // The unfenced list is sorted on last contact time from each
+            // broker. If the first broker is not stale, then none is.
+            if (!hasValidSession(broker)) {
+                return Optional.of(broker.id);
             }
-            nodes.add(broker.id);
         }
-        return nodes;
+        return Optional.empty();
     }
 
     /**
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java 
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index cabe7b7..c04a8fb 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -17,6 +17,8 @@
 
 package org.apache.kafka.controller;
 
+import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.kafka.common.Endpoint;
 import org.apache.kafka.common.errors.DuplicateBrokerRegistrationException;
 import org.apache.kafka.common.errors.StaleBrokerEpochException;
@@ -161,6 +163,14 @@ public class ClusterControlManager {
         return brokerRegistrations;
     }
 
+    Set<Integer> fencedBrokerIds() {
+        return brokerRegistrations.values()
+            .stream()
+            .filter(BrokerRegistration::fenced)
+            .map(BrokerRegistration::id)
+            .collect(Collectors.toSet());
+    }
+
     /**
      * Process an incoming broker registration request.
      */
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 7c1215d..d2ff546 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -858,7 +858,9 @@ public final class QuorumController implements Controller {
             return;
         }
         scheduleDeferredWriteEvent(MAYBE_FENCE_REPLICAS, nextCheckTimeNs, () 
-> {
-            ControllerResult<Void> result = 
replicationControl.maybeFenceStaleBrokers();
+            ControllerResult<Void> result = 
replicationControl.maybeFenceOneStaleBroker();
+            // This following call ensures that if there are multiple brokers 
that
+            // are currently stale, then fencing for them is scheduled 
immediately
             rescheduleMaybeFenceStaleBrokers();
             return result;
         });
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index d9d3e9d..3066f4b 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -925,18 +925,25 @@ public class ReplicationControlManager {
         return ControllerResult.of(records, null);
     }
 
-    ControllerResult<Void> maybeFenceStaleBrokers() {
+    ControllerResult<Void> maybeFenceOneStaleBroker() {
         List<ApiMessageAndVersion> records = new ArrayList<>();
         BrokerHeartbeatManager heartbeatManager = 
clusterControl.heartbeatManager();
-        List<Integer> staleBrokers = heartbeatManager.findStaleBrokers();
-        for (int brokerId : staleBrokers) {
+        heartbeatManager.findOneStaleBroker().ifPresent(brokerId -> {
+            // Even though multiple brokers can go stale at a time, we will 
process
+            // fencing one at a time so that the effect of fencing each broker 
is visible
+            // to the system prior to processing the next one
             log.info("Fencing broker {} because its session has timed out.", 
brokerId);
             handleBrokerFenced(brokerId, records);
             heartbeatManager.fence(brokerId);
-        }
+        });
         return ControllerResult.of(records, null);
     }
 
+    // Visible for testing
+    Boolean isBrokerUnfenced(int brokerId) {
+        return clusterControl.unfenced(brokerId);
+    }
+
     ControllerResult<List<CreatePartitionsTopicResult>>
             createPartitions(List<CreatePartitionsTopic> topics) {
         List<ApiMessageAndVersion> records = new ArrayList<>();
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java
index 6c8ef7e..c5c46ab 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.controller;
 
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -78,7 +77,7 @@ public class BrokerHeartbeatManagerTest {
     }
 
     @Test
-    public void testFindStaleBrokers() {
+    public void testFindOneStaleBroker() {
         BrokerHeartbeatManager manager = newBrokerHeartbeatManager();
         MockTime time = (MockTime)  manager.time();
         assertFalse(manager.hasValidSession(0));
@@ -93,22 +92,24 @@ public class BrokerHeartbeatManagerTest {
         assertEquals(1, iter.next().id());
         assertEquals(2, iter.next().id());
         assertFalse(iter.hasNext());
-        assertEquals(Collections.emptyList(), manager.findStaleBrokers());
+        assertEquals(Optional.empty(), manager.findOneStaleBroker());
 
         time.sleep(5);
-        assertEquals(Collections.singletonList(0), manager.findStaleBrokers());
+        assertEquals(Optional.of(0), manager.findOneStaleBroker());
         manager.fence(0);
-        assertEquals(Collections.emptyList(), manager.findStaleBrokers());
+        assertEquals(Optional.empty(), manager.findOneStaleBroker());
         iter = manager.unfenced().iterator();
         assertEquals(1, iter.next().id());
         assertEquals(2, iter.next().id());
         assertFalse(iter.hasNext());
 
         time.sleep(20);
-        assertEquals(Arrays.asList(1, 2), manager.findStaleBrokers());
+        assertEquals(Optional.of(1), manager.findOneStaleBroker());
         manager.fence(1);
+        assertEquals(Optional.of(2), manager.findOneStaleBroker());
         manager.fence(2);
-        assertEquals(Collections.emptyList(), manager.findStaleBrokers());
+
+        assertEquals(Optional.empty(), manager.findOneStaleBroker());
         iter = manager.unfenced().iterator();
         assertFalse(iter.hasNext());
     }
@@ -125,17 +126,20 @@ public class BrokerHeartbeatManagerTest {
         manager.touch(2, false, 0);
         time.sleep(1);
         manager.touch(3, false, 0);
-        assertEquals(Collections.emptyList(), manager.findStaleBrokers());
+        assertEquals(Optional.empty(), manager.findOneStaleBroker());
         assertEquals(10_000_000, manager.nextCheckTimeNs());
         time.sleep(7);
         assertEquals(10_000_000, manager.nextCheckTimeNs());
-        assertEquals(Collections.singletonList(0), manager.findStaleBrokers());
+        assertEquals(Optional.of(0), manager.findOneStaleBroker());
         manager.fence(0);
         assertEquals(12_000_000, manager.nextCheckTimeNs());
+
         time.sleep(3);
-        assertEquals(Arrays.asList(1, 2), manager.findStaleBrokers());
+        assertEquals(Optional.of(1), manager.findOneStaleBroker());
         manager.fence(1);
+        assertEquals(Optional.of(2), manager.findOneStaleBroker());
         manager.fence(2);
+
         assertEquals(14_000_000, manager.nextCheckTimeNs());
     }
 
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index ad92e2d..c274de6 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -79,6 +79,7 @@ import org.apache.kafka.raft.Batch;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.snapshot.RawSnapshotReader;
 import org.apache.kafka.snapshot.SnapshotReader;
+import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
@@ -174,6 +175,89 @@ public class QuorumControllerTest {
     }
 
     @Test
+    public void testFenceMultipleBrokers() throws Throwable {
+        List<Integer> allBrokers = Arrays.asList(1, 2, 3, 4, 5);
+        List<Integer> brokersToKeepUnfenced = Arrays.asList(1);
+        List<Integer> brokersToFence = Arrays.asList(2, 3, 4, 5);
+        short replicationFactor = 5;
+        long sessionTimeoutMillis = 1000;
+
+        try (
+            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, 
Optional.empty());
+            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(
+                logEnv, b -> b.setConfigDefs(CONFIGS), 
Optional.of(sessionTimeoutMillis));
+        ) {
+            ListenerCollection listeners = new ListenerCollection();
+            listeners.add(new 
Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
+            QuorumController active = controlEnv.activeController();
+            Map<Integer, Long> brokerEpochs = new HashMap<>();
+
+            for (Integer brokerId : allBrokers) {
+                CompletableFuture<BrokerRegistrationReply> reply = 
active.registerBroker(
+                    new BrokerRegistrationRequestData().
+                        setBrokerId(brokerId).
+                        setClusterId("06B-K3N1TBCNYFgruEVP0Q").
+                        setIncarnationId(Uuid.randomUuid()).
+                        setListeners(listeners));
+                brokerEpochs.put(brokerId, reply.get().epoch());
+            }
+
+            // Brokers are only registered and should still be fenced
+            allBrokers.forEach(brokerId -> {
+                
assertFalse(active.replicationControl().isBrokerUnfenced(brokerId),
+                    "Broker " + brokerId + " should have been fenced");
+            });
+
+            // Unfence all brokers and create a topic foo
+            sendBrokerheartbeat(active, allBrokers, brokerEpochs);
+            CreateTopicsRequestData createTopicsRequestData = new 
CreateTopicsRequestData().setTopics(
+                new CreatableTopicCollection(Collections.singleton(
+                    new CreatableTopic().setName("foo").setNumPartitions(1).
+                        setReplicationFactor(replicationFactor)).iterator()));
+            CreateTopicsResponseData createTopicsResponseData = 
active.createTopics(createTopicsRequestData).get();
+            assertEquals(Errors.NONE, 
Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode()));
+            Uuid topicIdFoo = 
createTopicsResponseData.topics().find("foo").topicId();
+
+            // Fence some of the brokers
+            TestUtils.waitForCondition(() -> {
+                    sendBrokerheartbeat(active, brokersToKeepUnfenced, 
brokerEpochs);
+                    for (Integer brokerId : brokersToFence) {
+                        if 
(active.replicationControl().isBrokerUnfenced(brokerId)) {
+                            return false;
+                        }
+                    }
+                    return true;
+                }, sessionTimeoutMillis * 3,
+                "Fencing of brokers did not process within expected time"
+            );
+
+            // Send another heartbeat to the brokers we want to keep alive
+            sendBrokerheartbeat(active, brokersToKeepUnfenced, brokerEpochs);
+
+            // At this point only the brokers we want fenced should be fenced.
+            brokersToKeepUnfenced.forEach(brokerId -> {
+                
assertTrue(active.replicationControl().isBrokerUnfenced(brokerId),
+                    "Broker " + brokerId + " should have been unfenced");
+            });
+            brokersToFence.forEach(brokerId -> {
+                
assertFalse(active.replicationControl().isBrokerUnfenced(brokerId),
+                    "Broker " + brokerId + " should have been fenced");
+            });
+
+            // Verify the isr and leaders for the topic partition
+            int[] expectedIsr = {1};
+            int[] isrFoo = 
active.replicationControl().getPartition(topicIdFoo, 0).isr;
+
+            assertTrue(Arrays.equals(isrFoo, expectedIsr),
+                "The ISR for topic foo was " + Arrays.toString(isrFoo) +
+                    ". It is expected to be " + Arrays.toString(expectedIsr));
+
+            int fooLeader = 
active.replicationControl().getPartition(topicIdFoo, 0).leader;
+            assertEquals(expectedIsr[0], fooLeader);
+        }
+    }
+
+    @Test
     public void testUnregisterBroker() throws Throwable {
         try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, 
Optional.empty())) {
             try (QuorumControllerTestEnv controlEnv =
@@ -761,4 +845,24 @@ public class QuorumControllerTest {
         return brokerEpochs;
     }
 
+    private void sendBrokerheartbeat(
+        QuorumController controller,
+        List<Integer> brokers,
+        Map<Integer, Long> brokerEpochs
+    ) throws Exception {
+        if (brokers.isEmpty()) {
+            return;
+        }
+        for (Integer brokerId : brokers) {
+            BrokerHeartbeatReply reply = controller.processBrokerHeartbeat(
+                new BrokerHeartbeatRequestData()
+                    .setWantFence(false)
+                    .setBrokerEpoch(brokerEpochs.get(brokerId))
+                    .setBrokerId(brokerId)
+                    .setCurrentMetadataOffset(100000)
+            ).get();
+            assertEquals(new BrokerHeartbeatReply(true, false, false, false), 
reply);
+        }
+    }
+
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
index bcfc534..7487882 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
@@ -17,6 +17,11 @@
 
 package org.apache.kafka.controller;
 
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.controller.QuorumController.Builder;
 import org.apache.kafka.metalog.LocalLogManagerTestEnv;
 import org.apache.kafka.raft.LeaderAndEpoch;
 import org.apache.kafka.test.TestUtils;
@@ -36,9 +41,18 @@ public class QuorumControllerTestEnv implements 
AutoCloseable {
     private final List<QuorumController> controllers;
     private final LocalLogManagerTestEnv logEnv;
 
-    public QuorumControllerTestEnv(LocalLogManagerTestEnv logEnv,
-                                   Consumer<QuorumController.Builder> 
builderConsumer)
-                                   throws Exception {
+    public QuorumControllerTestEnv(
+        LocalLogManagerTestEnv logEnv,
+        Consumer<QuorumController.Builder> builderConsumer
+    ) throws Exception {
+        this(logEnv, builderConsumer, Optional.empty());
+    }
+
+    public QuorumControllerTestEnv(
+        LocalLogManagerTestEnv logEnv,
+        Consumer<Builder> builderConsumer,
+        Optional<Long> sessionTimeoutMillis
+    ) throws Exception {
         this.logEnv = logEnv;
         int numControllers = logEnv.logManagers().size();
         this.controllers = new ArrayList<>(numControllers);
@@ -46,6 +60,10 @@ public class QuorumControllerTestEnv implements 
AutoCloseable {
             for (int i = 0; i < numControllers; i++) {
                 QuorumController.Builder builder = new 
QuorumController.Builder(i);
                 builder.setRaftClient(logEnv.logManagers().get(i));
+                if (sessionTimeoutMillis.isPresent()) {
+                    builder.setSessionTimeoutNs(NANOSECONDS.convert(
+                        sessionTimeoutMillis.get(), TimeUnit.MILLISECONDS));
+                }
                 builderConsumer.accept(builder);
                 this.controllers.add(builder.build());
             }
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index f67756d..09449d3 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.controller;
 
+import java.util.Optional;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.config.ConfigResource;
@@ -61,6 +62,7 @@ import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.metadata.PartitionRegistration;
 import org.apache.kafka.metadata.RecordTestUtils;
 import org.apache.kafka.metadata.Replicas;
@@ -94,8 +96,10 @@ import static 
org.apache.kafka.common.protocol.Errors.NO_REASSIGNMENT_IN_PROGRES
 import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_ID;
 import static 
org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION;
 import static org.apache.kafka.controller.BrokersToIsrs.TopicIdPartition;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -105,6 +109,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 @Timeout(40)
 public class ReplicationControlManagerTest {
     private final static Logger log = 
LoggerFactory.getLogger(ReplicationControlManagerTest.class);
+    private final static int BROKER_SESSION_TIMEOUT_MS = 1000;
 
     private static class ReplicationControlTestContext {
         final SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
@@ -112,7 +117,7 @@ public class ReplicationControlManagerTest {
         final MockTime time = new MockTime();
         final MockRandom random = new MockRandom();
         final ClusterControlManager clusterControl = new ClusterControlManager(
-            logContext, time, snapshotRegistry, 1000,
+            logContext, time, snapshotRegistry, BROKER_SESSION_TIMEOUT_MS,
             new StripedReplicaPlacer(random));
         final ControllerMetrics metrics = new MockControllerMetrics();
         final ConfigurationControlManager configurationControl = new 
ConfigurationControlManager(
@@ -208,6 +213,24 @@ public class ReplicationControlManagerTest {
             }
         }
 
+        void fenceBrokers(Set<Integer> brokerIds) throws Exception {
+            time.sleep(BROKER_SESSION_TIMEOUT_MS);
+
+            Set<Integer> unfencedBrokerIds = 
clusterControl.brokerRegistrations().keySet().stream()
+                .filter(brokerId -> !brokerIds.contains(brokerId))
+                .collect(Collectors.toSet());
+            unfenceBrokers(unfencedBrokerIds.toArray(new Integer[0]));
+
+            Optional<Integer> staleBroker = 
clusterControl.heartbeatManager().findOneStaleBroker();
+            while (staleBroker.isPresent()) {
+                ControllerResult<Void> fenceResult = 
replicationControl.maybeFenceOneStaleBroker();
+                replay(fenceResult.records());
+                staleBroker = 
clusterControl.heartbeatManager().findOneStaleBroker();
+            }
+
+            assertEquals(brokerIds, clusterControl.fencedBrokerIds());
+        }
+
         long currentBrokerEpoch(int brokerId) {
             Map<Integer, BrokerRegistration> registrations = 
clusterControl.brokerRegistrations();
             BrokerRegistration registration = registrations.get(brokerId);
@@ -1030,6 +1053,36 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
+    public void testFenceMultipleBrokers() throws Exception {
+        ReplicationControlTestContext ctx = new 
ReplicationControlTestContext();
+        ReplicationControlManager replication = ctx.replicationControl;
+        ctx.registerBrokers(0, 1, 2, 3, 4);
+        ctx.unfenceBrokers(0, 1, 2, 3, 4);
+
+        Uuid fooId = ctx.createTestTopic("foo", new int[][]{
+            new int[]{1, 2, 3}, new int[]{2, 3, 4}, new int[]{0, 2, 
1}}).topicId();
+
+        assertTrue(ctx.clusterControl.fencedBrokerIds().isEmpty());
+        ctx.fenceBrokers(Utils.mkSet(2, 3));
+
+        PartitionRegistration partition0 = replication.getPartition(fooId, 0);
+        PartitionRegistration partition1 = replication.getPartition(fooId, 1);
+        PartitionRegistration partition2 = replication.getPartition(fooId, 2);
+
+        assertArrayEquals(new int[]{1, 2, 3}, partition0.replicas);
+        assertArrayEquals(new int[]{1}, partition0.isr);
+        assertEquals(1, partition0.leader);
+
+        assertArrayEquals(new int[]{2, 3, 4}, partition1.replicas);
+        assertArrayEquals(new int[]{4}, partition1.isr);
+        assertEquals(4, partition1.leader);
+
+        assertArrayEquals(new int[]{0, 2, 1}, partition2.replicas);
+        assertArrayEquals(new int[]{0, 1}, partition2.isr);
+        assertNotEquals(2, partition2.leader);
+    }
+
+    @Test
     public void testElectLeaders() throws Exception {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext();
         ReplicationControlManager replication = ctx.replicationControl;

Reply via email to