mumrah commented on code in PR #14312:
URL: https://github.com/apache/kafka/pull/14312#discussion_r1349018067


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -203,6 +203,7 @@ static public class Builder {
         private QuorumFeatures quorumFeatures = null;
         private short defaultReplicationFactor = 3;
         private int defaultNumPartitions = 1;
+        private int defaultMinInSyncIsr = 1;

Review Comment:
   nit: this name is redundant. ISR == InSyncReplica, so i think this should 
probably be `defaultMinIsr` 



##########
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##########
@@ -328,13 +353,37 @@ public Optional<ApiMessageAndVersion> build() {
 
         completeReassignmentIfNeeded();
 
+        if (PartitionChangeBuilder.this.eligibleLeaderReplicasEnabled) {
+            populateTargetElr();
+        }
+
         tryElection(record);
 
         triggerLeaderEpochBumpIfNeeded(record);
 
-        if (record.isr() == null && !targetIsr.isEmpty() && 
!targetIsr.equals(Replicas.toList(partition.isr))) {
-            // Set the new ISR if it is different from the current ISR and 
unclean leader election didn't already set it.
-            record.setIsr(targetIsr);
+        // During the leader election, it can set the record isr if an unclean 
leader election happens.

Review Comment:
   Could we separate the ELR election code into a separate method? Then we can 
have one branch point where we decided if we need to perform ELR election vs 
the old code. This would also let us keep the old code unmodified which has 
some merit.



##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -149,6 +150,8 @@ static class Builder {
         private LogContext logContext = null;
         private short defaultReplicationFactor = (short) 3;
         private int defaultNumPartitions = 1;
+
+        private int defaultMinInSyncIsr = 1;

Review Comment:
   same naming nit as above



##########
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##########
@@ -434,6 +434,15 @@ Map<String, String> getConfigs(ConfigResource 
configResource) {
         }
     }
 
+    Map<String, String> getTopicConfigs(String topicName) {

Review Comment:
   We should avoid copying data structures when we can. It looks like the 
callers of this method are reading an individual key anyways, so maybe `String 
getTopicConfig(String topicName, String configKey)` would work?



##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1008,9 +1024,11 @@ ControllerResult<AlterPartitionResponseData> 
alterPartition(
                     topic.id,
                     partitionId,
                     clusterControl::isActive,
-                    featureControl.metadataVersion()
+                    featureControl.metadataVersion(),
+                    getTopicEffectiveMinIsr(topic.name)
                 );
-                
builder.setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
+                
builder.setZkMigrationEnabled(clusterControl.zkRegistrationAllowed())
+                    
.setEligibleLeaderReplicasEnabled(clusterControl.eligibleLeaderReplicasAllowed());

Review Comment:
   nit: just do another `builder.` line here



##########
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##########
@@ -362,6 +411,35 @@ private void setAssignmentChanges(PartitionChangeRecord 
record) {
         }
     }
 
+    private void populateTargetElr() {
+        // If the ISR is larger or equal to the min ISR, clear the ELR and 
lastKnownELR.
+        if (targetIsr.size() >= minISR) {
+            targetElr = Collections.emptyList();
+            targetLastKnownElr = Collections.emptyList();
+            return;
+        }
+
+
+        Set<Integer> targetIsrSet = 
targetIsr.stream().collect(Collectors.toSet());
+        // Tracking the ELR. The new elr is expected to
+        // 1. Include the current ISR
+        // 2. Exclude the duplicate replicas between elr and target ISR.
+        // 3. Exclude unclean shutdown replicas.
+        // To do that, we first union the current ISR and current elr, then 
filter out the target ISR and unclean shutdown
+        // Replicas.
+        Set<Integer> candidateSet = 
Arrays.stream(partition.elr).boxed().collect(Collectors.toSet());
+        Arrays.stream(partition.isr).boxed().forEach(ii -> 
candidateSet.add(ii));
+        targetElr = candidateSet.stream()
+            .filter(replica -> !targetIsrSet.contains(replica) && 
(uncleanShutdownReplicas == null || !uncleanShutdownReplicas.contains(replica)))

Review Comment:
   Can we split this into two filters for better readability?



##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1396,9 +1414,12 @@ ApiError electLeader(String topic, int partitionId, 
ElectionType electionType,
             topicId,
             partitionId,
             clusterControl::isActive,
-            featureControl.metadataVersion()
+            featureControl.metadataVersion(),
+            getTopicEffectiveMinIsr(topic)
         );
-        
builder.setElection(election).setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
+        builder.setElection(election)
+            .setZkMigrationEnabled(clusterControl.zkRegistrationAllowed())
+            
.setEligibleLeaderReplicasEnabled(clusterControl.eligibleLeaderReplicasAllowed());

Review Comment:
   nit: same comment as above



##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1531,10 +1552,12 @@ ControllerResult<Boolean> 
maybeBalancePartitionLeaders() {
                 topicPartition.topicId(),
                 topicPartition.partitionId(),
                 clusterControl::isActive,
-                featureControl.metadataVersion()
+                featureControl.metadataVersion(),
+                getTopicEffectiveMinIsr(topic.name)
             );
             builder.setElection(PartitionChangeBuilder.Election.PREFERRED)
-                .setZkMigrationEnabled(clusterControl.zkRegistrationAllowed());
+                .setZkMigrationEnabled(clusterControl.zkRegistrationAllowed())
+                
.setEligibleLeaderReplicasEnabled(clusterControl.eligibleLeaderReplicasAllowed());

Review Comment:
   nit: and here



##########
metadata/src/main/resources/common/metadata/PartitionChangeRecord.json:
##########
@@ -40,6 +42,15 @@
       "versions": "0+", "nullableVersions": "0+", "taggedVersions": "0+", 
"tag": 4,
       "about": "null if the adding replicas didn't change; the new adding 
replicas otherwise." },
     { "name": "LeaderRecoveryState", "type": "int8", "default": "-1", 
"versions": "0+", "taggedVersions": "0+", "tag": 5,
-      "about": "-1 if it didn't change; 0 if the leader was elected from the 
ISR or recovered from an unclean election; 1 if the leader that was elected 
using unclean leader election and it is still recovering." }
+      "about": "-1 if it didn't change; 0 if the leader was elected from the 
ISR or recovered from an unclean election; 1 if the leader that was elected 
using unclean leader election and it is still recovering." },
+    { "name": "EligibleLeaderReplicas", "type": "[]int32", "default": "null", 
"entityType": "brokerId",
+      "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", 
"tag": 6,
+      "about": "null if the ELR didn't change; the new eligible leader 
replicas otherwise." },
+    { "name": "LastKnownELR", "type": "[]int32", "default": "null", 
"entityType": "brokerId",
+      "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", 
"tag": 7,
+      "about": "null if the LastKnownELR didn't change; the last known 
eligible leader replicas otherwise." },
+    { "name": "LastKnownLeader", "type": "int32", "default": "-2", 
"entityType": "brokerId",

Review Comment:
   I see we want to convey one of three things:
   
   * An updated LastKnownLeader (some value 0+)
   * An empty LastKnownLeader (-1)
   * An unchanged LastKnownLeader
   
   The convention we have established in this record is to use the absence of a 
tagged field as "unchanged". Since these fields are only available at record 
version 1+, can we make the "unchanged" condition: "record version is 1+ and 
LastKnownLeader is absent" -- would that work?



##########
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##########
@@ -362,6 +411,35 @@ private void setAssignmentChanges(PartitionChangeRecord 
record) {
         }
     }
 
+    private void populateTargetElr() {
+        // If the ISR is larger or equal to the min ISR, clear the ELR and 
lastKnownELR.
+        if (targetIsr.size() >= minISR) {
+            targetElr = Collections.emptyList();
+            targetLastKnownElr = Collections.emptyList();
+            return;
+        }
+
+
+        Set<Integer> targetIsrSet = 
targetIsr.stream().collect(Collectors.toSet());
+        // Tracking the ELR. The new elr is expected to
+        // 1. Include the current ISR
+        // 2. Exclude the duplicate replicas between elr and target ISR.
+        // 3. Exclude unclean shutdown replicas.
+        // To do that, we first union the current ISR and current elr, then 
filter out the target ISR and unclean shutdown
+        // Replicas.
+        Set<Integer> candidateSet = 
Arrays.stream(partition.elr).boxed().collect(Collectors.toSet());
+        Arrays.stream(partition.isr).boxed().forEach(ii -> 
candidateSet.add(ii));
+        targetElr = candidateSet.stream()
+            .filter(replica -> !targetIsrSet.contains(replica) && 
(uncleanShutdownReplicas == null || !uncleanShutdownReplicas.contains(replica)))
+            .collect(Collectors.toList());
+
+        // Calculate the new last known ELR. Includes any ISR members since 
the ISR size drops below min ISR.
+        // In order to reduce the metadata usage, the last known ELR excludes 
the members in ELR and current ISR.
+        Arrays.stream(partition.lastKnownElr).boxed().forEach(ii -> 
candidateSet.add(ii));
+        targetLastKnownElr =  candidateSet.stream()
+            .filter(replica -> !targetIsrSet.contains(replica) && 
!targetElr.contains(replica)).collect(Collectors.toList());

Review Comment:
   nit: break the `collect` into a new line



##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1994,6 +2014,24 @@ private void 
listReassigningTopic(ListPartitionReassignmentsResponseData respons
             setReplicas(Replicas.toList(partition.replicas)));
     }
 
+    // Make it public to be visible in test.

Review Comment:
   nit: we usually write `// Visible for tests`
   
   does it need to be public? Can it be package-private?



##########
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##########
@@ -79,29 +84,39 @@ public enum Election {
     private List<Integer> targetReplicas;
     private List<Integer> targetRemoving;
     private List<Integer> targetAdding;
+    private List<Integer> targetElr;
+    private List<Integer> targetLastKnownElr;
+    private List<Integer> uncleanShutdownReplicas;
     private Election election = Election.ONLINE;
     private LeaderRecoveryState targetLeaderRecoveryState;
     private boolean zkMigrationEnabled;
+    private boolean eligibleLeaderReplicasEnabled;
+    private int minISR;
 
 
     public PartitionChangeBuilder(
         PartitionRegistration partition,
         Uuid topicId,
         int partitionId,
         IntPredicate isAcceptableLeader,
-        MetadataVersion metadataVersion
+        MetadataVersion metadataVersion,
+        int minISR
     ) {
         this.partition = partition;
         this.topicId = topicId;
         this.partitionId = partitionId;
         this.isAcceptableLeader = isAcceptableLeader;
         this.metadataVersion = metadataVersion;
         this.zkMigrationEnabled = false;
+        this.eligibleLeaderReplicasEnabled = false;
+        this.minISR = minISR;
 
         this.targetIsr = Replicas.toList(partition.isr);
         this.targetReplicas = Replicas.toList(partition.replicas);
         this.targetRemoving = Replicas.toList(partition.removingReplicas);
         this.targetAdding = Replicas.toList(partition.addingReplicas);
+        this.targetElr = Collections.emptyList();
+        this.targetLastKnownElr = Collections.emptyList();

Review Comment:
   Why not copy the arrays out of the record at this point? I think that would 
help us avoid the boxing. We can copy the data out from the record and just not 
use if if the feature is not supported or disabled.



##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -311,6 +325,10 @@ boolean zkRegistrationAllowed() {
         return zkMigrationEnabled && 
featureControl.metadataVersion().isMigrationSupported();
     }
 
+    boolean eligibleLeaderReplicasAllowed() {
+        return eligibleLeaderReplicasEnabled;

Review Comment:
   This should also be checking the MV



##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1994,6 +2014,24 @@ private void 
listReassigningTopic(ListPartitionReassignmentsResponseData respons
             setReplicas(Replicas.toList(partition.replicas)));
     }
 
+    // Make it public to be visible in test.
+    public Integer getTopicEffectiveMinIsr(String topicName) {
+        Integer currentMinIsr = 
Integer.parseInt(configurationControl.getTopicConfigs(topicName)
+            .getOrDefault(MIN_IN_SYNC_REPLICAS_CONFIG, 
String.valueOf(defaultMinInSyncIsr)));
+        Integer replicationFactor = (int) defaultReplicationFactor;
+        try {
+            Uuid topicId = topicsByName.get(topicName);
+            replicationFactor = 
topics.get(topicId).parts.get(0).replicas.length;
+        } catch (Exception e) {
+            log.debug("Can't find the replication factor for topic: " + 
topicName);
+        }
+        return Math.min(currentMinIsr, replicationFactor);
+    }
+
+    private short partitionRecordVersion(boolean isElrAllowed) {

Review Comment:
   This should be static _or_ it should compute the record version based on 
ClusterControl



##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1994,6 +2014,24 @@ private void 
listReassigningTopic(ListPartitionReassignmentsResponseData respons
             setReplicas(Replicas.toList(partition.replicas)));
     }
 
+    // Make it public to be visible in test.
+    public Integer getTopicEffectiveMinIsr(String topicName) {
+        Integer currentMinIsr = 
Integer.parseInt(configurationControl.getTopicConfigs(topicName)
+            .getOrDefault(MIN_IN_SYNC_REPLICAS_CONFIG, 
String.valueOf(defaultMinInSyncIsr)));
+        Integer replicationFactor = (int) defaultReplicationFactor;
+        try {
+            Uuid topicId = topicsByName.get(topicName);
+            replicationFactor = 
topics.get(topicId).parts.get(0).replicas.length;
+        } catch (Exception e) {
+            log.debug("Can't find the replication factor for topic: " + 
topicName);
+        }

Review Comment:
   Let's not rely on an error here. We can check if the replicas array is 
zero-length



##########
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##########
@@ -362,6 +411,35 @@ private void setAssignmentChanges(PartitionChangeRecord 
record) {
         }
     }
 
+    private void populateTargetElr() {
+        // If the ISR is larger or equal to the min ISR, clear the ELR and 
lastKnownELR.
+        if (targetIsr.size() >= minISR) {
+            targetElr = Collections.emptyList();
+            targetLastKnownElr = Collections.emptyList();
+            return;
+        }
+
+
+        Set<Integer> targetIsrSet = 
targetIsr.stream().collect(Collectors.toSet());

Review Comment:
   You can just use the HashSet constructor here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to