CalvinConfluent commented on code in PR #14312:
URL: https://github.com/apache/kafka/pull/14312#discussion_r1362922279


##########
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##########
@@ -362,6 +393,58 @@ private void setAssignmentChanges(PartitionChangeRecord 
record) {
         }
     }
 
+    private void maybeUpdateRecordElr(PartitionChangeRecord record) {
+        // During the leader election, it can set the record isr if an unclean 
leader election happens.
+        boolean isCleanLeaderElection = record.isr() == null;
+
+        // Clean the ELR related fields if it is an unclean election or ELR is 
disabled.
+        if (!isCleanLeaderElection || !eligibleLeaderReplicasEnabled) {
+            targetElr = Collections.emptyList();
+            targetLastKnownElr = Collections.emptyList();
+        }
+
+        if (!targetElr.equals(Replicas.toList(partition.elr))) {
+            record.setEligibleLeaderReplicas(targetElr);
+        }
+
+        if 
(!targetLastKnownElr.equals(Replicas.toList(partition.lastKnownElr))) {
+            record.setLastKnownELR(targetLastKnownElr);
+        }
+    }
+
+    private void maybePopulateTargetElr() {
+        if (!eligibleLeaderReplicasEnabled) return;
+
+        // If the ISR is larger or equal to the min ISR, clear the ELR and 
lastKnownELR.
+        if (targetIsr.size() >= minISR) {
+            targetElr = Collections.emptyList();
+            targetLastKnownElr = Collections.emptyList();
+            return;
+        }
+
+        Set<Integer> targetIsrSet = new HashSet<>(targetIsr);
+        // Tracking the ELR. The new elr is expected to
+        // 1. Include the current ISR
+        // 2. Exclude the duplicate replicas between elr and target ISR.
+        // 3. Exclude unclean shutdown replicas.
+        // To do that, we first union the current ISR and current elr, then 
filter out the target ISR and unclean shutdown
+        // Replicas.
+        Set<Integer> candidateSet = new HashSet<>(targetElr);
+        Arrays.stream(partition.isr).forEach(ii -> candidateSet.add(ii));
+        targetElr = candidateSet.stream()
+            .filter(replica -> !targetIsrSet.contains(replica))
+            .filter(replica -> uncleanShutdownReplicas == null || 
!uncleanShutdownReplicas.contains(replica))
+            .collect(Collectors.toList());
+
+        // Calculate the new last known ELR. Includes any ISR members since 
the ISR size drops below min ISR.
+        // In order to reduce the metadata usage, the last known ELR excludes 
the members in ELR and current ISR.
+        targetLastKnownElr.forEach(ii -> candidateSet.add(ii));

Review Comment:
   Had an offline discussion. 
   Before the Unclean recovery is in place, we can just skip setting the 
lastKnownElr with the real last known ELR but storing the last known leader.
   Removed the lastKnownLeader field. In order to save the progress, in the 
following leader election change, I will add a flag only for testing that 
switches between storing last known leader and last known ELR in the 
lastKnownElr field.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to