mumrah commented on code in PR #14312:
URL: https://github.com/apache/kafka/pull/14312#discussion_r1362167841
##########
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##########
@@ -328,13 +353,37 @@ public Optional<ApiMessageAndVersion> build() {
completeReassignmentIfNeeded();
+ if (PartitionChangeBuilder.this.eligibleLeaderReplicasEnabled) {
+ populateTargetElr();
+ }
+
tryElection(record);
triggerLeaderEpochBumpIfNeeded(record);
- if (record.isr() == null && !targetIsr.isEmpty() &&
!targetIsr.equals(Replicas.toList(partition.isr))) {
- // Set the new ISR if it is different from the current ISR and
unclean leader election didn't already set it.
- record.setIsr(targetIsr);
+ // During the leader election, it can set the record isr if an unclean
leader election happens.
Review Comment:
I guess what I don't like is the logic split between `populateTargetElr` and
the lines below. For example, in populateTargetElr we always set targetElr and
targetLastKnownElr, but in some cases we immediately clear those collections.
It would be nice if all the ELR logic was inside populateTargetElr.
E.g.,
```
if (eligibleLeaderReplicasEnabled) {
populateTargetElr(); // compute the ELR fields and set them on the record
} else {
// set the ELR fields to empty (or whatever defaults)
}
```
It might actually be more fitting with the code in this class to have a
method like `maybePopulateTargetElr` that is unconditionally called and put the
all the ELR logic there.
E.g.,
```
completeReassignmentIfNeeded();
tryElection(record);
triggerLeaderEpochBumpIfNeeded(record);
maybePopulateTargetElr(record);
```
##########
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##########
@@ -434,6 +434,14 @@ Map<String, String> getConfigs(ConfigResource
configResource) {
}
}
+ String getTopicConfig(String topicName, String configKey) {
Review Comment:
Let's add some javadoc to this explaining the behavior (like the default
empty string)
##########
metadata/src/main/resources/common/metadata/PartitionChangeRecord.json:
##########
@@ -17,7 +17,9 @@
"apiKey": 5,
"type": "metadata",
"name": "PartitionChangeRecord",
- "validVersions": "0",
+ "validVersions": "0-1",
Review Comment:
If we're just adding tagged fields, no we don't need the version bump. If we
consider the downgrade case where someone decreases the MV below 3.7-IV1, what
we (will eventually) do is take a snapshot of the current metadata log at the
older MV which can be a lossy operation. Since tagged fields are generic and
optional, there's no problem re-writing a record to _not_ include the tagged
fields.
Actually, as a side note, PartitionChangeRecord isn't even included in the
snapshot (only PartitionRecord is), so it doesn't affect downgrades at all.
##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1995,6 +2015,19 @@ private void
listReassigningTopic(ListPartitionReassignmentsResponseData respons
setReplicas(Replicas.toList(partition.replicas)));
}
+ // Visible to test.
+ Integer getTopicEffectiveMinIsr(String topicName) {
+ String minIsrConfig = configurationControl.getTopicConfig(topicName,
MIN_IN_SYNC_REPLICAS_CONFIG);
+ Integer currentMinIsr = minIsrConfig.isEmpty() ? defaultMinIsr :
Integer.parseInt(minIsrConfig);
+ Uuid topicId = topicsByName.get(topicName);
+ Integer replicationFactor =
topics.get(topicId).parts.get(0).replicas.length;
Review Comment:
It doesn't hurt to be safe. Even a RuntimeException is nicer than an NPE.
Related to this change, can we have this method return `int` instead of
`Integer`? Doesn't look like this can return `null` anyways.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]