artemlivshits commented on code in PR #14312:
URL: https://github.com/apache/kafka/pull/14312#discussion_r1353913582
##########
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##########
@@ -328,13 +346,38 @@ public Optional<ApiMessageAndVersion> build() {
completeReassignmentIfNeeded();
+ boolean isElrEnabled = metadataVersion.isElrSupported();
+ if (isElrEnabled) {
+ populateTargetElr();
+ }
+
tryElection(record);
triggerLeaderEpochBumpIfNeeded(record);
- if (record.isr() == null && !targetIsr.isEmpty() &&
!targetIsr.equals(Replicas.toList(partition.isr))) {
- // Set the new ISR if it is different from the current ISR and
unclean leader election didn't already set it.
- record.setIsr(targetIsr);
+ // During the leader election, it can set the record isr if an unclean
leader election happens.
+ boolean isCleanLeaderElection = record.isr() == null;
+
+ // Clean the ELR related fields if it is an unclean election or ELR is
disabled.
+ if (!isCleanLeaderElection || !isElrEnabled) {
+ targetElr = Collections.emptyList();
+ targetLastKnownElr = Collections.emptyList();
+ }
+
+ if (!targetElr.equals(Replicas.toList(partition.elr))) {
+ record.setEligibleLeaderReplicas(targetElr);
+ }
+ if
(!targetLastKnownElr.equals(Replicas.toList(partition.lastKnownElr))) {
+ record.setLastKnownELR(targetLastKnownElr);
+ }
+
+ // The record.isr is null if it is a clean election. In this case, it
will
+ // 1. Set the new ISR if it is different from the current ISR.
+ // 2. Set the new ELR/LastKnowElr if it is different from the current
ones.
+ if (isCleanLeaderElection) {
Review Comment:
Looks like the logic here is "set ISR if it wasn't set", so checking `if
(record.isr() == null)` seems more intentional here.
##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1995,6 +2015,19 @@ private void
listReassigningTopic(ListPartitionReassignmentsResponseData respons
setReplicas(Replicas.toList(partition.replicas)));
}
+ // Visible to test.
+ Integer getTopicEffectiveMinIsr(String topicName) {
+ String minIsrConfig = configurationControl.getTopicConfig(topicName,
MIN_IN_SYNC_REPLICAS_CONFIG);
+ Integer currentMinIsr = minIsrConfig.isEmpty() ? defaultMinIsr :
Integer.parseInt(minIsrConfig);
+ Uuid topicId = topicsByName.get(topicName);
+ Integer replicationFactor =
topics.get(topicId).parts.get(0).replicas.length;
Review Comment:
Is it guaranteed that we have elements in all maps here? We don't check any
nulls.
##########
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##########
@@ -186,7 +186,10 @@ public enum MetadataVersion {
IBP_3_6_IV2(14, "3.6", "IV2", true),
// Implement KIP-919 controller registration.
- IBP_3_7_IV0(15, "3.7", "IV0", true);
+ IBP_3_7_IV0(15, "3.7", "IV0", true),
+
+ // Add ELR related supports (KIP-966).
+ IBP_3_7_IV1(16, "3.7", "IV1", true);
Review Comment:
Wouldn't having this prevent version bumps until KIP-966 is fully
implemented? Would it be enough to just have `eligible.leader.replicas.enable`
config that would be used for testing, and then when the feature is ready to be
release, flip `eligible.leader.replicas.enable` to `true` and add a new
metadata version.
##########
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##########
@@ -328,13 +346,38 @@ public Optional<ApiMessageAndVersion> build() {
completeReassignmentIfNeeded();
+ boolean isElrEnabled = metadataVersion.isElrSupported();
+ if (isElrEnabled) {
+ populateTargetElr();
+ }
+
tryElection(record);
triggerLeaderEpochBumpIfNeeded(record);
- if (record.isr() == null && !targetIsr.isEmpty() &&
!targetIsr.equals(Replicas.toList(partition.isr))) {
- // Set the new ISR if it is different from the current ISR and
unclean leader election didn't already set it.
- record.setIsr(targetIsr);
+ // During the leader election, it can set the record isr if an unclean
leader election happens.
+ boolean isCleanLeaderElection = record.isr() == null;
+
+ // Clean the ELR related fields if it is an unclean election or ELR is
disabled.
+ if (!isCleanLeaderElection || !isElrEnabled) {
+ targetElr = Collections.emptyList();
+ targetLastKnownElr = Collections.emptyList();
+ }
+
+ if (!targetElr.equals(Replicas.toList(partition.elr))) {
+ record.setEligibleLeaderReplicas(targetElr);
+ }
+ if
(!targetLastKnownElr.equals(Replicas.toList(partition.lastKnownElr))) {
+ record.setLastKnownELR(targetLastKnownElr);
+ }
+
+ // The record.isr is null if it is a clean election. In this case, it
will
+ // 1. Set the new ISR if it is different from the current ISR.
+ // 2. Set the new ELR/LastKnowElr if it is different from the current
ones.
Review Comment:
Is the comment about ELR out of date? It's not set in this condition.
##########
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##########
@@ -362,6 +405,35 @@ private void setAssignmentChanges(PartitionChangeRecord
record) {
}
}
+ private void populateTargetElr() {
+ // If the ISR is larger or equal to the min ISR, clear the ELR and
lastKnownELR.
+ if (targetIsr.size() >= minISR) {
+ targetElr = Collections.emptyList();
+ targetLastKnownElr = Collections.emptyList();
+ return;
+ }
+
Review Comment:
Extra line?
##########
metadata/src/main/resources/common/metadata/PartitionChangeRecord.json:
##########
@@ -17,7 +17,9 @@
"apiKey": 5,
"type": "metadata",
"name": "PartitionChangeRecord",
- "validVersions": "0",
+ "validVersions": "0-1",
Review Comment:
Do we need to bump the version if we add just the tagged fields?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]