This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8d38189  MINOR: clean up some replication code (#10564)
8d38189 is described below

commit 8d38189eddd66d8ae53749f15bfff557f102a936
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Thu Apr 29 11:20:30 2021 -0700

    MINOR: clean up some replication code (#10564)
    
    Centralize leader and ISR changes in generateLeaderAndIsrUpdates.
    Consolidate handleNodeDeactivated and handleNodeActivated into this
    function.
    
    Rename BrokersToIsrs#noLeaderIterator to 
BrokersToIsrs#partitionsWithNoLeader.
    Create BrokersToIsrs#partitionsLedByBroker, 
BrokersToIsrs#partitionsWithBrokerInIsr
    
    In ReplicationControlManagerTest, createTestTopic should be a member
    function of ReplicationControlTestContext.  It should invoke
    ReplicationControlTestContext#replay so that records are applied to all
    parts of the test context.
    
    Reviewers: Jun Rao <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../org/apache/kafka/controller/BrokersToIsrs.java |  10 +-
 .../controller/ConfigurationControlManager.java    |   4 +
 .../controller/ReplicationControlManager.java      | 300 +++++++++++----------
 .../apache/kafka/controller/BrokersToIsrsTest.java |   4 +-
 .../kafka/controller/QuorumControllerTest.java     |   2 +-
 .../controller/ReplicationControlManagerTest.java  |  97 ++++---
 6 files changed, 241 insertions(+), 176 deletions(-)

diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java 
b/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
index 9d54c20..d8e0319 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
@@ -311,10 +311,18 @@ public class BrokersToIsrs {
         return new PartitionsOnReplicaIterator(topicMap, leadersOnly);
     }
 
-    PartitionsOnReplicaIterator noLeaderIterator() {
+    PartitionsOnReplicaIterator partitionsWithNoLeader() {
         return iterator(NO_LEADER, true);
     }
 
+    PartitionsOnReplicaIterator partitionsLedByBroker(int brokerId) {
+        return iterator(brokerId, true);
+    }
+
+    PartitionsOnReplicaIterator partitionsWithBrokerInIsr(int brokerId) {
+        return iterator(brokerId, false);
+    }
+
     boolean hasLeaderships(int brokerId) {
         return iterator(brokerId, true).hasNext();
     }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
index 3e9e9e1..b53926e 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
@@ -374,6 +374,10 @@ public class ConfigurationControlManager {
         configData.remove(new ConfigResource(Type.TOPIC, name));
     }
 
+    boolean uncleanLeaderElectionEnabledForTopic(String name) {
+        return false; // TODO: support configuring unclean leader election.
+    }
+
     class ConfigurationControlIterator implements 
Iterator<List<ApiMessageAndVersion>> {
         private final long epoch;
         private final Iterator<Entry<ConfigResource, TimelineHashMap<String, 
String>>> iterator;
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index f169d1f..ea94a00 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -72,9 +72,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.function.Function;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.ListIterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
@@ -172,47 +174,54 @@ public class ReplicationControlManager {
             StringBuilder builder = new StringBuilder();
             String prefix = "";
             if (!Arrays.equals(replicas, prev.replicas)) {
-                
builder.append(prefix).append("oldReplicas=").append(Arrays.toString(prev.replicas));
+                builder.append(prefix).append("replicas: ").
+                    append(Arrays.toString(prev.replicas)).
+                    append(" -> ").append(Arrays.toString(replicas));
                 prefix = ", ";
-                
builder.append(prefix).append("newReplicas=").append(Arrays.toString(replicas));
             }
             if (!Arrays.equals(isr, prev.isr)) {
-                
builder.append(prefix).append("oldIsr=").append(Arrays.toString(prev.isr));
+                builder.append(prefix).append("isr: ").
+                    append(Arrays.toString(prev.isr)).
+                    append(" -> ").append(Arrays.toString(isr));
                 prefix = ", ";
-                
builder.append(prefix).append("newIsr=").append(Arrays.toString(isr));
             }
             if (!Arrays.equals(removingReplicas, prev.removingReplicas)) {
-                builder.append(prefix).append("oldRemovingReplicas=").
-                    append(Arrays.toString(prev.removingReplicas));
+                builder.append(prefix).append("removingReplicas: ").
+                    append(Arrays.toString(prev.removingReplicas)).
+                    append(" -> ").append(Arrays.toString(removingReplicas));
                 prefix = ", ";
-                builder.append(prefix).append("newRemovingReplicas=").
-                    append(Arrays.toString(removingReplicas));
             }
             if (!Arrays.equals(addingReplicas, prev.addingReplicas)) {
-                builder.append(prefix).append("oldAddingReplicas=").
-                    append(Arrays.toString(prev.addingReplicas));
+                builder.append(prefix).append("addingReplicas: ").
+                    append(Arrays.toString(prev.addingReplicas)).
+                    append(" -> ").append(Arrays.toString(addingReplicas));
                 prefix = ", ";
-                builder.append(prefix).append("newAddingReplicas=").
-                    append(Arrays.toString(addingReplicas));
             }
             if (leader != prev.leader) {
-                
builder.append(prefix).append("oldLeader=").append(prev.leader);
+                builder.append(prefix).append("leader: ").
+                    append(prev.leader).append(" -> ").append(leader);
                 prefix = ", ";
-                builder.append(prefix).append("newLeader=").append(leader);
             }
             if (leaderEpoch != prev.leaderEpoch) {
-                
builder.append(prefix).append("oldLeaderEpoch=").append(prev.leaderEpoch);
+                builder.append(prefix).append("leaderEpoch: ").
+                    append(prev.leaderEpoch).append(" -> 
").append(leaderEpoch);
                 prefix = ", ";
-                
builder.append(prefix).append("newLeaderEpoch=").append(leaderEpoch);
             }
             if (partitionEpoch != prev.partitionEpoch) {
-                
builder.append(prefix).append("oldPartitionEpoch=").append(prev.partitionEpoch);
-                prefix = ", ";
-                
builder.append(prefix).append("newPartitionEpoch=").append(partitionEpoch);
+                builder.append(prefix).append("partitionEpoch: ").
+                    append(prev.partitionEpoch).append(" -> 
").append(partitionEpoch);
             }
             return builder.toString();
         }
 
+        void maybeLogPartitionChange(Logger log, String description, 
PartitionControlInfo prev) {
+            if (!electionWasClean(leader, prev.isr)) {
+                log.info("UNCLEAN partition change for {}: {}", description, 
diff(prev));
+            } else if (log.isDebugEnabled()) {
+                log.debug("partition change for {}: {}", description, 
diff(prev));
+            }
+        }
+
         boolean hasLeader() {
             return leader != NO_LEADER;
         }
@@ -231,7 +240,13 @@ public class ReplicationControlManager {
         public boolean equals(Object o) {
             if (!(o instanceof PartitionControlInfo)) return false;
             PartitionControlInfo other = (PartitionControlInfo) o;
-            return diff(other).isEmpty();
+            return Arrays.equals(replicas, other.replicas) &&
+                Arrays.equals(isr, other.isr) &&
+                Arrays.equals(removingReplicas, other.removingReplicas) &&
+                Arrays.equals(addingReplicas, other.addingReplicas) &&
+                leader == other.leader &&
+                leaderEpoch == other.leaderEpoch &&
+                partitionEpoch == other.partitionEpoch;
         }
 
         @Override
@@ -310,7 +325,7 @@ public class ReplicationControlManager {
         topicsByName.put(record.name(), record.topicId());
         topics.put(record.topicId(),
             new TopicControlInfo(record.name(), snapshotRegistry, 
record.topicId()));
-        log.info("Created topic {} with ID {}.", record.name(), 
record.topicId());
+        log.info("Created topic {} with topic ID {}.", record.name(), 
record.topicId());
     }
 
     public void replay(PartitionRecord record) {
@@ -321,22 +336,18 @@ public class ReplicationControlManager {
         }
         PartitionControlInfo newPartInfo = new PartitionControlInfo(record);
         PartitionControlInfo prevPartInfo = 
topicInfo.parts.get(record.partitionId());
+        String description = topicInfo.name + "-" + record.partitionId() +
+            " with topic ID " + record.topicId();
         if (prevPartInfo == null) {
-            log.info("Created partition {}:{} with {}.", record.topicId(),
-                record.partitionId(), newPartInfo.toString());
+            log.info("Created partition {} and {}.", description, newPartInfo);
             topicInfo.parts.put(record.partitionId(), newPartInfo);
             brokersToIsrs.update(record.topicId(), record.partitionId(), null,
                 newPartInfo.isr, NO_LEADER, newPartInfo.leader);
-        } else {
-            String diff = newPartInfo.diff(prevPartInfo);
-            if (!diff.isEmpty()) {
-                log.info("Modified partition {}:{}: {}.", record.topicId(),
-                    record.partitionId(), diff);
-                topicInfo.parts.put(record.partitionId(), newPartInfo);
-                brokersToIsrs.update(record.topicId(), record.partitionId(),
-                    prevPartInfo.isr, newPartInfo.isr, prevPartInfo.leader,
-                    newPartInfo.leader);
-            }
+        } else if (!newPartInfo.equals(prevPartInfo)) {
+            newPartInfo.maybeLogPartitionChange(log, description, 
prevPartInfo);
+            topicInfo.parts.put(record.partitionId(), newPartInfo);
+            brokersToIsrs.update(record.topicId(), record.partitionId(), 
prevPartInfo.isr,
+                newPartInfo.isr, prevPartInfo.leader, newPartInfo.leader);
         }
     }
 
@@ -356,7 +367,9 @@ public class ReplicationControlManager {
         brokersToIsrs.update(record.topicId(), record.partitionId(),
             prevPartitionInfo.isr, newPartitionInfo.isr, 
prevPartitionInfo.leader,
             newPartitionInfo.leader);
-        log.debug("Applied ISR change record: {}", record.toString());
+        String topicPart = topicInfo.name + "-" + record.partitionId() + " 
with topic ID " +
+            record.topicId();
+        newPartitionInfo.maybeLogPartitionChange(log, topicPart, 
prevPartitionInfo);
     }
 
     public void replay(RemoveTopicRecord record) {
@@ -723,7 +736,8 @@ public class ReplicationControlManager {
         if (brokerRegistration == null) {
             throw new RuntimeException("Can't find broker registration for 
broker " + brokerId);
         }
-        handleNodeDeactivated(brokerId, records);
+        generateLeaderAndIsrUpdates("handleBrokerFenced", brokerId, NO_LEADER, 
records,
+            brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
         records.add(new ApiMessageAndVersion(new FenceBrokerRecord().
             setId(brokerId).setEpoch(brokerRegistration.epoch()), (short) 0));
     }
@@ -740,61 +754,13 @@ public class ReplicationControlManager {
      */
     void handleBrokerUnregistered(int brokerId, long brokerEpoch,
                                   List<ApiMessageAndVersion> records) {
-        handleNodeDeactivated(brokerId, records);
+        generateLeaderAndIsrUpdates("handleBrokerUnregistered", brokerId, 
NO_LEADER, records,
+            brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
         records.add(new ApiMessageAndVersion(new UnregisterBrokerRecord().
             setBrokerId(brokerId).setBrokerEpoch(brokerEpoch), (short) 0));
     }
 
     /**
-     * Handle a broker being deactivated. This means we remove it from any ISR 
that has
-     * more than one element. We do not remove the broker from ISRs where it 
is the only
-     * member since this would preclude clean leader election in the future.
-     * It is removed as the leader for all partitions it leads.
-     *
-     * @param brokerId              The broker id.
-     * @param records               The record list to append to.
-     */
-    void handleNodeDeactivated(int brokerId, List<ApiMessageAndVersion> 
records) {
-        Iterator<TopicIdPartition> iterator = brokersToIsrs.iterator(brokerId, 
false);
-        while (iterator.hasNext()) {
-            TopicIdPartition topicIdPartition = iterator.next();
-            TopicControlInfo topic = topics.get(topicIdPartition.topicId());
-            if (topic == null) {
-                throw new RuntimeException("Topic ID " + 
topicIdPartition.topicId() + " existed in " +
-                    "isrMembers, but not in the topics map.");
-            }
-            PartitionControlInfo partition = 
topic.parts.get(topicIdPartition.partitionId());
-            if (partition == null) {
-                throw new RuntimeException("Partition " + topicIdPartition +
-                    " existed in isrMembers, but not in the partitions map.");
-            }
-            PartitionChangeRecord record = new PartitionChangeRecord().
-                setPartitionId(topicIdPartition.partitionId()).
-                setTopicId(topic.id);
-            int[] newIsr = Replicas.copyWithout(partition.isr, brokerId);
-            if (newIsr.length == 0) {
-                // We don't want to shrink the ISR to size 0. So, leave the 
node in the ISR.
-                if (record.leader() != NO_LEADER) {
-                    // The partition is now leaderless, so set its leader to 
-1.
-                    record.setLeader(-1);
-                    records.add(new ApiMessageAndVersion(record, (short) 0));
-                }
-            } else {
-                record.setIsr(Replicas.toList(newIsr));
-                if (partition.leader == brokerId) {
-                    // The fenced node will no longer be the leader.
-                    int newLeader = bestLeader(partition.replicas, newIsr, 
false);
-                    record.setLeader(newLeader);
-                } else {
-                    // Bump the partition leader epoch.
-                    record.setLeader(partition.leader);
-                }
-                records.add(new ApiMessageAndVersion(record, (short) 0));
-            }
-        }
-    }
-
-    /**
      * Generate the appropriate records to handle a broker becoming unfenced.
      *
      * First, we create an UnfenceBrokerRecord. Then, we check if if there are 
any
@@ -808,43 +774,12 @@ public class ReplicationControlManager {
     void handleBrokerUnfenced(int brokerId, long brokerEpoch, 
List<ApiMessageAndVersion> records) {
         records.add(new ApiMessageAndVersion(new UnfenceBrokerRecord().
             setId(brokerId).setEpoch(brokerEpoch), (short) 0));
-        handleNodeActivated(brokerId, records);
-    }
-
-    /**
-     * Handle a broker being activated. This means we check if it can become 
the leader
-     * for any partition that currently has no leader (aka offline partition).
-     *
-     * @param brokerId      The broker id.
-     * @param records       The record list to append to.
-     */
-    void handleNodeActivated(int brokerId, List<ApiMessageAndVersion> records) 
{
-        Iterator<TopicIdPartition> iterator = brokersToIsrs.noLeaderIterator();
-        while (iterator.hasNext()) {
-            TopicIdPartition topicIdPartition = iterator.next();
-            TopicControlInfo topic = topics.get(topicIdPartition.topicId());
-            if (topic == null) {
-                throw new RuntimeException("Topic ID " + 
topicIdPartition.topicId() + " existed in " +
-                    "isrMembers, but not in the topics map.");
-            }
-            PartitionControlInfo partition = 
topic.parts.get(topicIdPartition.partitionId());
-            if (partition == null) {
-                throw new RuntimeException("Partition " + topicIdPartition +
-                    " existed in isrMembers, but not in the partitions map.");
-            }
-            // TODO: if this partition is configured for unclean leader 
election,
-            // check the replica set rather than the ISR.
-            if (Replicas.contains(partition.isr, brokerId)) {
-                records.add(new ApiMessageAndVersion(new 
PartitionChangeRecord().
-                    setPartitionId(topicIdPartition.partitionId()).
-                    setTopicId(topic.id).
-                    setLeader(brokerId), (short) 0));
-            }
-        }
+        generateLeaderAndIsrUpdates("handleBrokerUnfenced", NO_LEADER, 
brokerId, records,
+            brokersToIsrs.partitionsWithNoLeader());
     }
 
     ControllerResult<ElectLeadersResponseData> 
electLeaders(ElectLeadersRequestData request) {
-        boolean unclean = electionIsUnclean(request.electionType());
+        boolean uncleanOk = electionTypeIsUnclean(request.electionType());
         List<ApiMessageAndVersion> records = new ArrayList<>();
         ElectLeadersResponseData response = new ElectLeadersResponseData();
         for (TopicPartitions topic : request.topicPartitions()) {
@@ -852,7 +787,7 @@ public class ReplicationControlManager {
                 new ReplicaElectionResult().setTopic(topic.topic());
             response.replicaElectionResults().add(topicResults);
             for (int partitionId : topic.partitions()) {
-                ApiError error = electLeader(topic.topic(), partitionId, 
unclean, records);
+                ApiError error = electLeader(topic.topic(), partitionId, 
uncleanOk, records);
                 topicResults.partitionResult().add(new PartitionResult().
                     setPartitionId(partitionId).
                     setErrorCode(error.error().code()).
@@ -862,7 +797,7 @@ public class ReplicationControlManager {
         return ControllerResult.of(records, response);
     }
 
-    static boolean electionIsUnclean(byte electionType) {
+    static boolean electionTypeIsUnclean(byte electionType) {
         ElectionType type;
         try {
             type = ElectionType.valueOf(electionType);
@@ -872,7 +807,7 @@ public class ReplicationControlManager {
         return type == ElectionType.UNCLEAN;
     }
 
-    ApiError electLeader(String topic, int partitionId, boolean unclean,
+    ApiError electLeader(String topic, int partitionId, boolean uncleanOk,
                          List<ApiMessageAndVersion> records) {
         Uuid topicId = topicsByName.get(topic);
         if (topicId == null) {
@@ -889,7 +824,8 @@ public class ReplicationControlManager {
             return new ApiError(UNKNOWN_TOPIC_OR_PARTITION,
                 "No such partition as " + topic + "-" + partitionId);
         }
-        int newLeader = bestLeader(partitionInfo.replicas, partitionInfo.isr, 
unclean);
+        int newLeader = bestLeader(partitionInfo.replicas, partitionInfo.isr, 
uncleanOk,
+            r -> clusterControl.unfenced(r));
         if (newLeader == NO_LEADER) {
             // If we can't find any leader for the partition, return an error.
             return new ApiError(Errors.LEADER_NOT_AVAILABLE,
@@ -907,13 +843,13 @@ public class ReplicationControlManager {
         }
         PartitionChangeRecord record = new PartitionChangeRecord().
             setPartitionId(partitionId).
-            setTopicId(topicId);
-        if (unclean && !Replicas.contains(partitionInfo.isr, newLeader)) {
-            // If the election was unclean, we may have to forcibly add the 
replica to
-            // the ISR.  This can result in data loss!
+            setTopicId(topicId).
+            setLeader(newLeader);
+        if (!electionWasClean(newLeader, partitionInfo.isr)) {
+            // If the election was unclean, we have to forcibly set the ISR to 
just the
+            // new leader. This can result in data loss!
             record.setIsr(Collections.singletonList(newLeader));
         }
-        record.setLeader(newLeader);
         records.add(new ApiMessageAndVersion(record, (short) 0));
         return ApiError.NONE;
     }
@@ -936,10 +872,8 @@ public class ReplicationControlManager {
                     handleBrokerUnfenced(brokerId, brokerEpoch, records);
                     break;
                 case CONTROLLED_SHUTDOWN:
-                    // Note: we always bump the leader epoch of each partition 
that the
-                    // shutting down broker is in here.  This prevents the 
broker from
-                    // getting re-added to the ISR later.
-                    handleNodeDeactivated(brokerId, records);
+                    generateLeaderAndIsrUpdates("enterControlledShutdown[" + 
brokerId + "]",
+                        brokerId, NO_LEADER, records, 
brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
                     break;
                 case SHUTDOWN_NOW:
                     handleBrokerFenced(brokerId, records);
@@ -957,22 +891,27 @@ public class ReplicationControlManager {
         return ControllerResult.of(records, reply);
     }
 
-    int bestLeader(int[] replicas, int[] isr, boolean unclean) {
+    static boolean isGoodLeader(int[] isr, int leader) {
+        return Replicas.contains(isr, leader);
+    }
+
+    static int bestLeader(int[] replicas, int[] isr, boolean uncleanOk,
+                          Function<Integer, Boolean> isAcceptableLeader) {
+        int bestUnclean = NO_LEADER;
         for (int i = 0; i < replicas.length; i++) {
             int replica = replicas[i];
-            if (Replicas.contains(isr, replica)) {
-                return replica;
-            }
-        }
-        if (unclean) {
-            for (int i = 0; i < replicas.length; i++) {
-                int replica = replicas[i];
-                if (clusterControl.unfenced(replica)) {
+            if (isAcceptableLeader.apply(replica)) {
+                if (bestUnclean == NO_LEADER) bestUnclean = replica;
+                if (Replicas.contains(isr, replica)) {
                     return replica;
                 }
             }
         }
-        return NO_LEADER;
+        return uncleanOk ? bestUnclean : NO_LEADER;
+    }
+
+    static boolean electionWasClean(int newLeader, int[] isr) {
+        return newLeader == NO_LEADER || Replicas.contains(isr, newLeader);
     }
 
     public ControllerResult<Void> unregisterBroker(int brokerId) {
@@ -1119,6 +1058,83 @@ public class ReplicationControlManager {
         }
     }
 
+    /**
+     * Iterate over a sequence of partitions and generate ISR changes and/or 
leader
+     * changes if necessary.
+     *
+     * @param context           A human-readable context string used in log4j 
logging.
+     * @param brokerToRemove    NO_LEADER if no broker is being removed; the 
ID of the
+     *                          broker to remove from the ISR and leadership, 
otherwise.
+     * @param brokerToAdd       NO_LEADER if no broker is being added; the ID 
of the
+     *                          broker which is now eligible to be a leader, 
otherwise.
+     * @param records           A list of records which we will append to.
+     * @param iterator          The iterator containing the partitions to 
examine.
+     */
+    void generateLeaderAndIsrUpdates(String context,
+                                     int brokerToRemove,
+                                     int brokerToAdd,
+                                     List<ApiMessageAndVersion> records,
+                                     Iterator<TopicIdPartition> iterator) {
+        int oldSize = records.size();
+        Function<Integer, Boolean> isAcceptableLeader =
+            r -> (r != brokerToRemove) && (r == brokerToAdd || 
clusterControl.unfenced(r));
+        while (iterator.hasNext()) {
+            TopicIdPartition topicIdPart = iterator.next();
+            TopicControlInfo topic = topics.get(topicIdPart.topicId());
+            if (topic == null) {
+                throw new RuntimeException("Topic ID " + topicIdPart.topicId() 
+
+                        " existed in isrMembers, but not in the topics map.");
+            }
+            PartitionControlInfo partition = 
topic.parts.get(topicIdPart.partitionId());
+            if (partition == null) {
+                throw new RuntimeException("Partition " + topicIdPart +
+                    " existed in isrMembers, but not in the partitions map.");
+            }
+            int[] newIsr = Replicas.copyWithout(partition.isr, brokerToRemove);
+            int newLeader;
+            if (isGoodLeader(newIsr, partition.leader)) {
+                // If the current leader is good, don't change.
+                newLeader = partition.leader;
+            } else {
+                // Choose a new leader.
+                boolean uncleanOk = 
configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name);
+                newLeader = bestLeader(partition.replicas, newIsr, uncleanOk, 
isAcceptableLeader);
+            }
+            if (!electionWasClean(newLeader, newIsr)) {
+                // After an unclean leader election, the ISR is reset to just 
the new leader.
+                newIsr = new int[] {newLeader};
+            } else if (newIsr.length == 0) {
+                // We never want to shrink the ISR to size 0.
+                newIsr = partition.isr;
+            }
+            PartitionChangeRecord record = new PartitionChangeRecord().
+                setPartitionId(topicIdPart.partitionId()).
+                setTopicId(topic.id);
+            if (newLeader != partition.leader) record.setLeader(newLeader);
+            if (!Arrays.equals(newIsr, partition.isr)) 
record.setIsr(Replicas.toList(newIsr));
+            if (record.leader() != NO_LEADER_CHANGE || record.isr() != null) {
+                records.add(new ApiMessageAndVersion(record, (short) 0));
+            }
+        }
+        if (records.size() != oldSize) {
+            if (log.isDebugEnabled()) {
+                StringBuilder bld = new StringBuilder();
+                String prefix = "";
+                for (ListIterator<ApiMessageAndVersion> iter = 
records.listIterator(oldSize);
+                     iter.hasNext(); ) {
+                    ApiMessageAndVersion apiMessageAndVersion = iter.next();
+                    PartitionChangeRecord record = (PartitionChangeRecord) 
apiMessageAndVersion.message();
+                    
bld.append(prefix).append(topics.get(record.topicId()).name).append("-").
+                        append(record.partitionId());
+                    prefix = ", ";
+                }
+                log.debug("{}: changing partition(s): {}", context, 
bld.toString());
+            } else if (log.isInfoEnabled()) {
+                log.info("{}: changing {} partition(s)", context, 
records.size() - oldSize);
+            }
+        }
+    }
+
     class ReplicationControlIterator implements 
Iterator<List<ApiMessageAndVersion>> {
         private final long epoch;
         private final Iterator<TopicControlInfo> iterator;
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/BrokersToIsrsTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/BrokersToIsrsTest.java
index 525bf1e..6510ee5 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/BrokersToIsrsTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/BrokersToIsrsTest.java
@@ -101,9 +101,9 @@ public class BrokersToIsrsTest {
         assertEquals(toSet(new TopicIdPartition(UUIDS[0], 2)),
             toSet(brokersToIsrs.iterator(3, true)));
         assertEquals(toSet(), toSet(brokersToIsrs.iterator(2, true)));
-        assertEquals(toSet(), toSet(brokersToIsrs.noLeaderIterator()));
+        assertEquals(toSet(), toSet(brokersToIsrs.partitionsWithNoLeader()));
         brokersToIsrs.update(UUIDS[0], 2, new int[]{1, 2, 3}, new int[]{1, 2, 
3}, 3, -1);
         assertEquals(toSet(new TopicIdPartition(UUIDS[0], 2)),
-            toSet(brokersToIsrs.noLeaderIterator()));
+            toSet(brokersToIsrs.partitionsWithNoLeader()));
     }
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index d0588a0..8c64cec 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -185,7 +185,7 @@ public class QuorumControllerTest {
                 topicPartitionFuture = active.appendReadEvent(
                     "debugGetPartition", () -> {
                         Iterator<TopicIdPartition> iterator = active.
-                            
replicationControl().brokersToIsrs().noLeaderIterator();
+                            
replicationControl().brokersToIsrs().partitionsWithNoLeader();
                         assertTrue(iterator.hasNext());
                         return iterator.next();
                     });
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index e524581..2a456be 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -34,6 +34,7 @@ import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
 import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
 import org.apache.kafka.common.message.CreateTopicsResponseData;
 import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
 import org.apache.kafka.common.metadata.PartitionRecord;
 import org.apache.kafka.common.metadata.RegisterBrokerRecord;
 import org.apache.kafka.common.metadata.TopicRecord;
@@ -105,6 +106,27 @@ public class ReplicationControlManagerTest {
         ReplicationControlTestContext() {
             clusterControl.activate();
         }
+
+        CreatableTopicResult createTestTopic(String name, int[][] replicas) 
throws Exception {
+            assertFalse(replicas.length == 0);
+            CreateTopicsRequestData request = new CreateTopicsRequestData();
+            CreatableTopic topic = new CreatableTopic().setName(name);
+            topic.setNumPartitions(-1).setReplicationFactor((short) -1);
+            for (int i = 0; i < replicas.length; i++) {
+                topic.assignments().add(new CreatableReplicaAssignment().
+                    
setPartitionIndex(i).setBrokerIds(Replicas.toList(replicas[i])));
+            }
+            request.topics().add(topic);
+            ControllerResult<CreateTopicsResponseData> result =
+                replicationControl.createTopics(request);
+            CreatableTopicResult topicResult = 
result.response().topics().find(name);
+            assertNotNull(topicResult);
+            assertEquals((short) 0, topicResult.errorCode());
+            assertEquals(replicas.length, topicResult.numPartitions());
+            assertEquals(replicas[0].length, topicResult.replicationFactor());
+            replay(result.records());
+            return topicResult;
+        }
     }
 
     private static void registerBroker(int brokerId, 
ReplicationControlTestContext ctx) {
@@ -125,7 +147,7 @@ public class ReplicationControlManagerTest {
                 setBrokerId(brokerId).setBrokerEpoch(brokerId + 
100).setCurrentMetadataOffset(1).
                 setWantFence(false).setWantShutDown(false), 0);
         assertEquals(new BrokerHeartbeatReply(true, false, false, false), 
result.response());
-        ControllerTestUtils.replayAll(ctx.clusterControl, result.records());
+        ctx.replay(result.records());
     }
 
     @Test
@@ -157,7 +179,7 @@ public class ReplicationControlManagerTest {
             setErrorMessage(null).setErrorCode((short) 0).
             setTopicId(result2.response().topics().find("foo").topicId()));
         assertEquals(expectedResponse2, result2.response());
-        ControllerTestUtils.replayAll(replicationControl, result2.records());
+        ctx.replay(result2.records());
         assertEquals(new PartitionControlInfo(new int[] {2, 0, 1},
             new int[] {2, 0, 1}, null, null, 2, 0, 0),
             replicationControl.getPartition(
@@ -197,29 +219,6 @@ public class ReplicationControlManagerTest {
         assertEquals(expectedTopicErrors, topicErrors);
     }
 
-    private static CreatableTopicResult createTestTopic(
-            ReplicationControlManager replicationControl, String name,
-            int[][] replicas) throws Exception {
-        assertFalse(replicas.length == 0);
-        CreateTopicsRequestData request = new CreateTopicsRequestData();
-        CreatableTopic topic = new CreatableTopic().setName(name);
-        topic.setNumPartitions(-1).setReplicationFactor((short) -1);
-        for (int i = 0; i < replicas.length; i++) {
-            topic.assignments().add(new CreatableReplicaAssignment().
-                
setPartitionIndex(i).setBrokerIds(Replicas.toList(replicas[i])));
-        }
-        request.topics().add(topic);
-        ControllerResult<CreateTopicsResponseData> result =
-            replicationControl.createTopics(request);
-        CreatableTopicResult topicResult = 
result.response().topics().find(name);
-        assertNotNull(topicResult);
-        assertEquals((short) 0, topicResult.errorCode());
-        assertEquals(replicas.length, topicResult.numPartitions());
-        assertEquals(replicas[0].length, topicResult.replicationFactor());
-        ControllerTestUtils.replayAll(replicationControl, result.records());
-        return topicResult;
-    }
-
     @Test
     public void testRemoveLeaderships() throws Exception {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext();
@@ -228,7 +227,7 @@ public class ReplicationControlManagerTest {
             registerBroker(i, ctx);
             unfenceBroker(i, ctx);
         }
-        CreatableTopicResult result = createTestTopic(replicationControl, 
"foo",
+        CreatableTopicResult result = ctx.createTestTopic("foo",
             new int[][] {
                 new int[] {0, 1, 2},
                 new int[] {1, 2, 3},
@@ -241,8 +240,8 @@ public class ReplicationControlManagerTest {
         assertEquals(expectedPartitions, ControllerTestUtils.
             iteratorToSet(replicationControl.brokersToIsrs().iterator(0, 
true)));
         List<ApiMessageAndVersion> records = new ArrayList<>();
-        replicationControl.handleNodeDeactivated(0, records);
-        ControllerTestUtils.replayAll(replicationControl, records);
+        replicationControl.handleBrokerFenced(0, records);
+        ctx.replay(records);
         assertEquals(Collections.emptySet(), ControllerTestUtils.
             iteratorToSet(replicationControl.brokersToIsrs().iterator(0, 
true)));
     }
@@ -255,7 +254,7 @@ public class ReplicationControlManagerTest {
             registerBroker(i, ctx);
             unfenceBroker(i, ctx);
         }
-        CreatableTopicResult createTopicResult = 
createTestTopic(replicationControl, "foo",
+        CreatableTopicResult createTopicResult = ctx.createTestTopic("foo",
             new int[][] {new int[] {0, 1, 2}});
 
         TopicIdPartition topicIdPartition = new 
TopicIdPartition(createTopicResult.topicId(), 0);
@@ -287,7 +286,7 @@ public class ReplicationControlManagerTest {
             registerBroker(i, ctx);
             unfenceBroker(i, ctx);
         }
-        CreatableTopicResult createTopicResult = 
createTestTopic(replicationControl, "foo",
+        CreatableTopicResult createTopicResult = ctx.createTestTopic("foo",
             new int[][] {new int[] {0, 1, 2}});
 
         TopicIdPartition topicIdPartition = new 
TopicIdPartition(createTopicResult.topicId(), 0);
@@ -652,4 +651,42 @@ public class ReplicationControlManagerTest {
                     
ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1, 2),
                         OptionalInt.of(3))).getMessage());
     }
+
+    @Test
+    public void testElectionWasClean() {
+        assertTrue(ReplicationControlManager.electionWasClean(1, new int[] {1, 
2}));
+        assertFalse(ReplicationControlManager.electionWasClean(1, new int[] 
{0, 2}));
+        assertFalse(ReplicationControlManager.electionWasClean(1, new int[] 
{}));
+        assertTrue(ReplicationControlManager.electionWasClean(3, new int[] {1, 
2, 3, 4, 5, 6}));
+    }
+
+    @Test
+    public void testPartitionControlInfoMergeAndDiff() {
+        PartitionControlInfo a = new PartitionControlInfo(
+            new int[]{1, 2, 3}, new int[]{1, 2}, null, null, 1, 0, 0);
+        PartitionControlInfo b = new PartitionControlInfo(
+            new int[]{1, 2, 3}, new int[]{3}, null, null, 3, 1, 1);
+        PartitionControlInfo c = new PartitionControlInfo(
+            new int[]{1, 2, 3}, new int[]{1}, null, null, 1, 0, 1);
+        assertEquals(b, a.merge(new PartitionChangeRecord().
+            setLeader(3).setIsr(Arrays.asList(3))));
+        assertEquals("isr: [1, 2] -> [3], leader: 1 -> 3, leaderEpoch: 0 -> 1, 
partitionEpoch: 0 -> 1",
+            b.diff(a));
+        assertEquals("isr: [1, 2] -> [1], partitionEpoch: 0 -> 1",
+            c.diff(a));
+    }
+
+    @Test
+    public void testBestLeader() {
+        assertEquals(2, ReplicationControlManager.bestLeader(
+            new int[]{1, 2, 3, 4}, new int[]{4, 2, 3}, false, __ -> true));
+        assertEquals(3, ReplicationControlManager.bestLeader(
+            new int[]{3, 2, 1, 4}, new int[]{4, 2, 3}, false, __ -> true));
+        assertEquals(4, ReplicationControlManager.bestLeader(
+            new int[]{3, 2, 1, 4}, new int[]{4, 2, 3}, false, r -> r == 4));
+        assertEquals(-1, ReplicationControlManager.bestLeader(
+            new int[]{3, 4, 5}, new int[]{1, 2}, false, r -> r == 4));
+        assertEquals(4, ReplicationControlManager.bestLeader(
+            new int[]{3, 4, 5}, new int[]{1, 2}, true, r -> r == 4));
+    }
 }

Reply via email to