artemlivshits commented on code in PR #14312:
URL: https://github.com/apache/kafka/pull/14312#discussion_r1362802596


##########
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##########
@@ -362,6 +393,58 @@ private void setAssignmentChanges(PartitionChangeRecord 
record) {
         }
     }
 
+    private void maybeUpdateRecordElr(PartitionChangeRecord record) {
+        // During the leader election, it can set the record isr if an unclean 
leader election happens.
+        boolean isCleanLeaderElection = record.isr() == null;
+
+        // Clean the ELR related fields if it is an unclean election or ELR is 
disabled.
+        if (!isCleanLeaderElection || !eligibleLeaderReplicasEnabled) {
+            targetElr = Collections.emptyList();
+            targetLastKnownElr = Collections.emptyList();
+        }
+
+        if (!targetElr.equals(Replicas.toList(partition.elr))) {
+            record.setEligibleLeaderReplicas(targetElr);
+        }
+
+        if 
(!targetLastKnownElr.equals(Replicas.toList(partition.lastKnownElr))) {
+            record.setLastKnownELR(targetLastKnownElr);
+        }
+    }
+
+    private void maybePopulateTargetElr() {
+        if (!eligibleLeaderReplicasEnabled) return;
+
+        // If the ISR is larger or equal to the min ISR, clear the ELR and 
lastKnownELR.
+        if (targetIsr.size() >= minISR) {
+            targetElr = Collections.emptyList();
+            targetLastKnownElr = Collections.emptyList();
+            return;
+        }
+
+        Set<Integer> targetIsrSet = new HashSet<>(targetIsr);
+        // Tracking the ELR. The new elr is expected to
+        // 1. Include the current ISR
+        // 2. Exclude the duplicate replicas between elr and target ISR.
+        // 3. Exclude unclean shutdown replicas.
+        // To do that, we first union the current ISR and current elr, then 
filter out the target ISR and unclean shutdown
+        // Replicas.
+        Set<Integer> candidateSet = new HashSet<>(targetElr);
+        Arrays.stream(partition.isr).forEach(ii -> candidateSet.add(ii));
+        targetElr = candidateSet.stream()
+            .filter(replica -> !targetIsrSet.contains(replica))
+            .filter(replica -> uncleanShutdownReplicas == null || 
!uncleanShutdownReplicas.contains(replica))
+            .collect(Collectors.toList());
+
+        // Calculate the new last known ELR. Includes any ISR members since 
the ISR size drops below min ISR.
+        // In order to reduce the metadata usage, the last known ELR excludes 
the members in ELR and current ISR.
+        targetLastKnownElr.forEach(ii -> candidateSet.add(ii));

Review Comment:
   Can we populate the last known leader as the first member of last known ELR? 
 Basically, when the last replica is removed from ISR, set it as the first 
member of the last known ELR.  This could result in cases when a replica is 
both in ELR and the last known ELR, but this wouldn't be worse than with the 
current change because then we wouldn't need to have the last known leader.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to