This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new fdbed6c458b KAFKA-18649: complete ClearElrRecord handling (#18708)
fdbed6c458b is described below
commit fdbed6c458bdf23da642d65afe02c33cb5427052
Author: Calvin Liu <[email protected]>
AuthorDate: Wed Jan 29 15:07:44 2025 -0800
KAFKA-18649: complete ClearElrRecord handling (#18708)
Implement ClearElrRecord handling in the TopicDelta. Also, the
ReplicationControlManager should not merge updates if ELR/LastKnownElr are
empty, becuase that will cause an unnecessary partition epoch bump.
Reviewers: Colin P. McCabe <[email protected]>
---
.../controller/ReplicationControlManager.java | 15 ++--
.../java/org/apache/kafka/image/MetadataDelta.java | 8 ++
.../java/org/apache/kafka/image/TopicDelta.java | 27 +++++++
.../java/org/apache/kafka/image/TopicsDelta.java | 35 +++++++--
.../controller/ReplicationControlManagerTest.java | 17 ++++
.../org/apache/kafka/image/TopicsImageTest.java | 91 ++++++++++++++++++++++
6 files changed, 180 insertions(+), 13 deletions(-)
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index d6eaf089f1e..fc4ae6b38bb 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -609,14 +609,13 @@ public class ReplicationControlManager {
List<Integer> partitionIds = new ArrayList<>(topic.parts.keySet());
for (int partitionId : partitionIds) {
PartitionRegistration partition = topic.parts.get(partitionId);
- PartitionRegistration nextPartition = partition.merge(
- new PartitionChangeRecord().
- setPartitionId(partitionId).
- setTopicId(topic.id).
- setEligibleLeaderReplicas(Collections.emptyList()).
- setLastKnownElr(Collections.emptyList()));
- if (!nextPartition.equals(partition)) {
- topic.parts.put(partitionId, nextPartition);
+ if (partition.elr.length != 0 || partition.lastKnownElr.length !=
0) {
+ topic.parts.put(partitionId, partition.merge(
+ new PartitionChangeRecord().
+ setPartitionId(partitionId).
+ setTopicId(topic.id).
+ setEligibleLeaderReplicas(Collections.emptyList()).
+ setLastKnownElr(Collections.emptyList())));
numRemoved++;
}
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
index ae021a6f2fb..b934d10f6d1 100644
--- a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
@@ -19,6 +19,7 @@ package org.apache.kafka.image;
import org.apache.kafka.common.metadata.AccessControlEntryRecord;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.ClearElrRecord;
import org.apache.kafka.common.metadata.ClientQuotaRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.DelegationTokenRecord;
@@ -231,6 +232,9 @@ public final class MetadataDelta {
case ACCESS_CONTROL_ENTRY_RECORD:
replay((AccessControlEntryRecord) record);
break;
+ case CLEAR_ELR_RECORD:
+ replay((ClearElrRecord) record);
+ break;
case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
replay((RemoveAccessControlEntryRecord) record);
break;
@@ -278,6 +282,10 @@ public final class MetadataDelta {
getOrCreateConfigsDelta().replay(record);
}
+ public void replay(ClearElrRecord record) {
+ getOrCreateTopicsDelta().replay(record);
+ }
+
public void replay(PartitionChangeRecord record) {
getOrCreateTopicsDelta().replay(record);
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
b/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
index 6623ae85902..a663cbfd7d9 100644
--- a/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
@@ -20,11 +20,13 @@ package org.apache.kafka.image;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.ClearElrRecord;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.Replicas;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -83,6 +85,31 @@ public final class TopicDelta {
partitionChanges.put(record.partitionId(), partition.merge(record));
}
+ public void replay(ClearElrRecord record) {
+ // Some partitions are not added to the image yet, let's check the
partitionChanges first.
+ partitionChanges.forEach((partitionId, partition) -> {
+ maybeClearElr(partitionId, partition);
+ });
+
+ image.partitions().forEach((partitionId, partition) -> {
+ if (!partitionChanges.containsKey(partitionId)) {
+ maybeClearElr(partitionId, partition);
+ }
+ });
+ }
+
+ void maybeClearElr(int partitionId, PartitionRegistration partition) {
+ if (partition.elr.length != 0 || partition.lastKnownElr.length != 0) {
+ partitionChanges.put(partitionId, partition.merge(
+ new PartitionChangeRecord().
+ setPartitionId(partitionId).
+ setTopicId(image.id()).
+ setEligibleLeaderReplicas(Collections.emptyList()).
+ setLastKnownElr(Collections.emptyList())
+ ));
+ }
+ }
+
public TopicImage apply() {
Map<Integer, PartitionRegistration> newPartitions = new HashMap<>();
for (Entry<Integer, PartitionRegistration> entry :
image.partitions().entrySet()) {
diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
b/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
index 2b441776003..ec5cee135b3 100644
--- a/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
@@ -20,6 +20,7 @@ package org.apache.kafka.image;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.ClearElrRecord;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
@@ -28,6 +29,7 @@ import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.immutable.ImmutableMap;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -54,7 +56,7 @@ public final class TopicsDelta {
*/
private final Set<Uuid> deletedTopicIds = new HashSet<>();
- private final Set<Uuid> createdTopicIds = new HashSet<>();
+ private final Map<String, Uuid> createdTopics = new HashMap<>();
public TopicsDelta(TopicsImage image) {
this.image = image;
@@ -72,7 +74,7 @@ public final class TopicsDelta {
TopicDelta delta = new TopicDelta(
new TopicImage(record.name(), record.topicId(),
Collections.emptyMap()));
changedTopics.put(record.topicId(), delta);
- createdTopicIds.add(record.topicId());
+ createdTopics.put(record.name(), record.topicId());
}
TopicDelta getOrCreateTopicDelta(Uuid id) {
@@ -94,6 +96,29 @@ public final class TopicsDelta {
topicDelta.replay(record);
}
+ public void replay(ClearElrRecord record) {
+ if (!record.topicName().isEmpty()) {
+ Uuid topicId;
+ if (image.getTopic(record.topicName()) != null) {
+ topicId = image.getTopic(record.topicName()).id();
+ } else {
+ topicId = createdTopics.get(record.topicName());
+ }
+ if (topicId == null) {
+ throw new RuntimeException("Unable to clear elr for topic with
name " +
+ record.topicName() + ": no such topic found.");
+ }
+ TopicDelta topicDelta = getOrCreateTopicDelta(topicId);
+ topicDelta.replay(record);
+ } else {
+ // Update all the existing topics
+ image.topicsById().forEach((topicId, image) -> {
+ TopicDelta topicDelta = getOrCreateTopicDelta(topicId);
+ topicDelta.replay(record);
+ });
+ }
+ }
+
public String replay(RemoveTopicRecord record) {
TopicDelta topicDelta = changedTopics.remove(record.topicId());
String topicName;
@@ -172,8 +197,8 @@ public final class TopicsDelta {
return deletedTopicIds;
}
- public Set<Uuid> createdTopicIds() {
- return createdTopicIds;
+ public Collection<Uuid> createdTopicIds() {
+ return createdTopics.values();
}
/**
@@ -231,7 +256,7 @@ public final class TopicsDelta {
return "TopicsDelta(" +
"changedTopics=" + changedTopics +
", deletedTopicIds=" + deletedTopicIds +
- ", createdTopicIds=" + createdTopicIds +
+ ", createdTopics=" + createdTopics +
')';
}
}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index bd4cad63fda..a6c81f6510e 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -62,6 +62,7 @@ import
org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import
org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment;
import
org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.ClearElrRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
@@ -131,6 +132,7 @@ import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.apache.kafka.common.config.TopicConfig.SEGMENT_BYTES_CONFIG;
+import static
org.apache.kafka.common.metadata.MetadataRecordType.CLEAR_ELR_RECORD;
import static org.apache.kafka.common.protocol.Errors.ELECTION_NOT_NEEDED;
import static
org.apache.kafka.common.protocol.Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE;
import static org.apache.kafka.common.protocol.Errors.FENCED_LEADER_EPOCH;
@@ -3287,4 +3289,19 @@ public class ReplicationControlManagerTest {
assertArrayEquals(new int[]{1},
ctx.replicationControl.getPartition(barId, 0).elr);
}
}
+
+ @Test
+ void testElrsRemovedShouldNotBumpPartitionEpochIfNoChange() {
+ ReplicationControlTestContext ctx = new
ReplicationControlTestContext.Builder().
+ setIsElrEnabled(true).
+ setStaticConfig(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2").
+ build();
+ ctx.registerBrokers(1, 2, 3, 4);
+ ctx.unfenceBrokers(1, 2, 3, 4);
+ Uuid fooId = ctx.createTestTopic("foo", new int[][]{
+ new int[]{1, 2, 4}, new int[]{1, 3, 4}}).topicId();
+ int partitionEpoch = ctx.replicationControl.getPartition(fooId,
0).partitionEpoch;
+ ctx.replay(Arrays.asList(new ApiMessageAndVersion(new
ClearElrRecord(), CLEAR_ELR_RECORD.highestSupportedVersion())));
+ assertEquals(partitionEpoch,
ctx.replicationControl.getPartition(fooId, 0).partitionEpoch);
+ }
}
diff --git a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
index 4805fea058a..398cb84b5aa 100644
--- a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
@@ -20,6 +20,7 @@ package org.apache.kafka.image;
import org.apache.kafka.common.DirectoryId;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.ClearElrRecord;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
@@ -45,6 +46,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
+import static
org.apache.kafka.common.metadata.MetadataRecordType.CLEAR_ELR_RECORD;
import static
org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD;
import static
org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_RECORD;
import static
org.apache.kafka.common.metadata.MetadataRecordType.REMOVE_TOPIC_RECORD;
@@ -352,6 +354,95 @@ public class TopicsImageTest {
assertEquals(Collections.emptyMap(), changes.followers());
}
+ @Test
+ public void testClearElrRecords() {
+ Uuid fooId = Uuid.fromString("0hHJ3X5ZQ-CFfQ5xgpj90w");
+ Uuid barId = Uuid.fromString("f62ptyETTjet8SL5ZeREiw");
+
+ List<TopicImage> topics = new ArrayList<>();
+ topics.add(
+ newTopicImage(
+ "foo",
+ fooId,
+ newPartition(new int[] {0, 1, 2, 3})
+ )
+ );
+ TopicsImage image = new TopicsImage(newTopicsByIdMap(topics),
+ newTopicsByNameMap(topics));
+
+ List<ApiMessageAndVersion> topicRecords = new ArrayList<>();
+ topicRecords.add(
+ new ApiMessageAndVersion(
+ new
PartitionChangeRecord().setTopicId(fooId).setPartitionId(0).
+ setIsr(Arrays.asList(0, 1)).
+ setEligibleLeaderReplicas(Arrays.asList(2)).
+ setLastKnownElr(Arrays.asList(3)),
+ PARTITION_CHANGE_RECORD.highestSupportedVersion()
+ )
+ );
+
+ TopicsDelta delta = new TopicsDelta(image);
+ RecordTestUtils.replayAll(delta, topicRecords);
+ image = delta.apply();
+
+ assertEquals(1, image.getTopic(fooId).partitions().get(0).elr.length);
+ assertEquals(1,
image.getTopic(fooId).partitions().get(0).lastKnownElr.length);
+
+ topicRecords = new ArrayList<>();
+
+ /* Test the following:
+ 1. The clear elr record should work on all existing topics(foo).
+ 2. The clear elr record should work on the new topic(bar) in the same
batch.
+ */
+ topicRecords.addAll(Arrays.asList(
+ new ApiMessageAndVersion(
+ new TopicRecord().setTopicId(barId).
+ setName("bar"),
+ TOPIC_RECORD.highestSupportedVersion()
+ ),
+ new ApiMessageAndVersion(
+ new PartitionRecord().setTopicId(barId).
+ setPartitionId(0).
+ setLeader(0).
+ setIsr(Arrays.asList(1)).
+ setEligibleLeaderReplicas(Arrays.asList(2)).
+ setLastKnownElr(Arrays.asList(3)),
+ PARTITION_RECORD.highestSupportedVersion()
+ ),
+ new ApiMessageAndVersion(
+ new ClearElrRecord().setTopicName("bar"),
+ CLEAR_ELR_RECORD.highestSupportedVersion()
+ ),
+ new ApiMessageAndVersion(
+ new ClearElrRecord(),
+ CLEAR_ELR_RECORD.highestSupportedVersion()
+ ))
+ );
+ delta = new TopicsDelta(image);
+ RecordTestUtils.replayAll(delta, topicRecords);
+ image = delta.apply();
+
+ assertEquals(0, image.getTopic(fooId).partitions().get(0).elr.length);
+ assertEquals(0,
image.getTopic(fooId).partitions().get(0).lastKnownElr.length);
+ assertEquals(0, image.getTopic(barId).partitions().get(0).elr.length);
+ assertEquals(0,
image.getTopic(barId).partitions().get(0).lastKnownElr.length);
+ }
+
+ @Test
+ public void testClearElrRecordForNonExistTopic() {
+ TopicsImage image = new
TopicsImage(newTopicsByIdMap(Collections.emptyList()),
+ newTopicsByNameMap(Collections.emptyList()));
+ TopicsDelta delta = new TopicsDelta(image);
+ List<ApiMessageAndVersion> topicRecords = new ArrayList<>();
+ topicRecords.addAll(Collections.singletonList(
+ new ApiMessageAndVersion(
+ new ClearElrRecord().setTopicName("non-exist"),
+ CLEAR_ELR_RECORD.highestSupportedVersion()
+ ))
+ );
+ assertThrows(RuntimeException.class, () ->
RecordTestUtils.replayAll(delta, topicRecords));
+ }
+
@Test
public void testLocalReassignmentChanges() {
int localId = 3;