This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fdbed6c458b KAFKA-18649: complete ClearElrRecord handling (#18708)
fdbed6c458b is described below

commit fdbed6c458bdf23da642d65afe02c33cb5427052
Author: Calvin Liu <[email protected]>
AuthorDate: Wed Jan 29 15:07:44 2025 -0800

    KAFKA-18649: complete ClearElrRecord handling (#18708)
    
    Implement ClearElrRecord handling in the TopicDelta. Also, the 
ReplicationControlManager should not merge updates if ELR/LastKnownElr are 
empty, becuase that will cause an unnecessary partition epoch bump.
    
    Reviewers: Colin P. McCabe <[email protected]>
---
 .../controller/ReplicationControlManager.java      | 15 ++--
 .../java/org/apache/kafka/image/MetadataDelta.java |  8 ++
 .../java/org/apache/kafka/image/TopicDelta.java    | 27 +++++++
 .../java/org/apache/kafka/image/TopicsDelta.java   | 35 +++++++--
 .../controller/ReplicationControlManagerTest.java  | 17 ++++
 .../org/apache/kafka/image/TopicsImageTest.java    | 91 ++++++++++++++++++++++
 6 files changed, 180 insertions(+), 13 deletions(-)

diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index d6eaf089f1e..fc4ae6b38bb 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -609,14 +609,13 @@ public class ReplicationControlManager {
         List<Integer> partitionIds = new ArrayList<>(topic.parts.keySet());
         for (int partitionId : partitionIds) {
             PartitionRegistration partition = topic.parts.get(partitionId);
-            PartitionRegistration nextPartition = partition.merge(
-                new PartitionChangeRecord().
-                    setPartitionId(partitionId).
-                    setTopicId(topic.id).
-                    setEligibleLeaderReplicas(Collections.emptyList()).
-                    setLastKnownElr(Collections.emptyList()));
-            if (!nextPartition.equals(partition)) {
-                topic.parts.put(partitionId, nextPartition);
+            if (partition.elr.length != 0 || partition.lastKnownElr.length != 
0) {
+                topic.parts.put(partitionId, partition.merge(
+                    new PartitionChangeRecord().
+                        setPartitionId(partitionId).
+                        setTopicId(topic.id).
+                        setEligibleLeaderReplicas(Collections.emptyList()).
+                        setLastKnownElr(Collections.emptyList())));
                 numRemoved++;
             }
         }
diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java 
b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
index ae021a6f2fb..b934d10f6d1 100644
--- a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
@@ -19,6 +19,7 @@ package org.apache.kafka.image;
 
 import org.apache.kafka.common.metadata.AccessControlEntryRecord;
 import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.ClearElrRecord;
 import org.apache.kafka.common.metadata.ClientQuotaRecord;
 import org.apache.kafka.common.metadata.ConfigRecord;
 import org.apache.kafka.common.metadata.DelegationTokenRecord;
@@ -231,6 +232,9 @@ public final class MetadataDelta {
             case ACCESS_CONTROL_ENTRY_RECORD:
                 replay((AccessControlEntryRecord) record);
                 break;
+            case CLEAR_ELR_RECORD:
+                replay((ClearElrRecord) record);
+                break;
             case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
                 replay((RemoveAccessControlEntryRecord) record);
                 break;
@@ -278,6 +282,10 @@ public final class MetadataDelta {
         getOrCreateConfigsDelta().replay(record);
     }
 
+    public void replay(ClearElrRecord record) {
+        getOrCreateTopicsDelta().replay(record);
+    }
+
     public void replay(PartitionChangeRecord record) {
         getOrCreateTopicsDelta().replay(record);
     }
diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java 
b/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
index 6623ae85902..a663cbfd7d9 100644
--- a/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
@@ -20,11 +20,13 @@ package org.apache.kafka.image;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.ClearElrRecord;
 import org.apache.kafka.common.metadata.PartitionChangeRecord;
 import org.apache.kafka.common.metadata.PartitionRecord;
 import org.apache.kafka.metadata.PartitionRegistration;
 import org.apache.kafka.metadata.Replicas;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -83,6 +85,31 @@ public final class TopicDelta {
         partitionChanges.put(record.partitionId(), partition.merge(record));
     }
 
+    public void replay(ClearElrRecord record) {
+        // Some partitions are not added to the image yet, let's check the 
partitionChanges first.
+        partitionChanges.forEach((partitionId, partition) -> {
+            maybeClearElr(partitionId, partition);
+        });
+
+        image.partitions().forEach((partitionId, partition) -> {
+            if (!partitionChanges.containsKey(partitionId)) {
+                maybeClearElr(partitionId, partition);
+            }
+        });
+    }
+
+    void maybeClearElr(int partitionId, PartitionRegistration partition) {
+        if (partition.elr.length != 0 || partition.lastKnownElr.length != 0) {
+            partitionChanges.put(partitionId, partition.merge(
+                new PartitionChangeRecord().
+                    setPartitionId(partitionId).
+                    setTopicId(image.id()).
+                    setEligibleLeaderReplicas(Collections.emptyList()).
+                    setLastKnownElr(Collections.emptyList())
+            ));
+        }
+    }
+
     public TopicImage apply() {
         Map<Integer, PartitionRegistration> newPartitions = new HashMap<>();
         for (Entry<Integer, PartitionRegistration> entry : 
image.partitions().entrySet()) {
diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java 
b/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
index 2b441776003..ec5cee135b3 100644
--- a/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
@@ -20,6 +20,7 @@ package org.apache.kafka.image;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.ClearElrRecord;
 import org.apache.kafka.common.metadata.PartitionChangeRecord;
 import org.apache.kafka.common.metadata.PartitionRecord;
 import org.apache.kafka.common.metadata.RemoveTopicRecord;
@@ -28,6 +29,7 @@ import org.apache.kafka.metadata.Replicas;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.immutable.ImmutableMap;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -54,7 +56,7 @@ public final class TopicsDelta {
      */
     private final Set<Uuid> deletedTopicIds = new HashSet<>();
 
-    private final Set<Uuid> createdTopicIds = new HashSet<>();
+    private final Map<String, Uuid> createdTopics = new HashMap<>();
 
     public TopicsDelta(TopicsImage image) {
         this.image = image;
@@ -72,7 +74,7 @@ public final class TopicsDelta {
         TopicDelta delta = new TopicDelta(
             new TopicImage(record.name(), record.topicId(), 
Collections.emptyMap()));
         changedTopics.put(record.topicId(), delta);
-        createdTopicIds.add(record.topicId());
+        createdTopics.put(record.name(), record.topicId());
     }
 
     TopicDelta getOrCreateTopicDelta(Uuid id) {
@@ -94,6 +96,29 @@ public final class TopicsDelta {
         topicDelta.replay(record);
     }
 
+    public void replay(ClearElrRecord record) {
+        if (!record.topicName().isEmpty()) {
+            Uuid topicId;
+            if (image.getTopic(record.topicName()) != null) {
+                topicId = image.getTopic(record.topicName()).id();
+            } else {
+                topicId = createdTopics.get(record.topicName());
+            }
+            if (topicId == null) {
+                throw new RuntimeException("Unable to clear elr for topic with 
name " +
+                    record.topicName() + ": no such topic found.");
+            }
+            TopicDelta topicDelta = getOrCreateTopicDelta(topicId);
+            topicDelta.replay(record);
+        } else {
+            // Update all the existing topics
+            image.topicsById().forEach((topicId, image) -> {
+                TopicDelta topicDelta = getOrCreateTopicDelta(topicId);
+                topicDelta.replay(record);
+            });
+        }
+    }
+
     public String replay(RemoveTopicRecord record) {
         TopicDelta topicDelta = changedTopics.remove(record.topicId());
         String topicName;
@@ -172,8 +197,8 @@ public final class TopicsDelta {
         return deletedTopicIds;
     }
 
-    public Set<Uuid> createdTopicIds() {
-        return createdTopicIds;
+    public Collection<Uuid> createdTopicIds() {
+        return createdTopics.values();
     }
 
     /**
@@ -231,7 +256,7 @@ public final class TopicsDelta {
         return "TopicsDelta(" +
             "changedTopics=" + changedTopics +
             ", deletedTopicIds=" + deletedTopicIds +
-            ", createdTopicIds=" + createdTopicIds +
+            ", createdTopics=" + createdTopics +
             ')';
     }
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index bd4cad63fda..a6c81f6510e 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -62,6 +62,7 @@ import 
org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
 import 
org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment;
 import 
org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
 import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.ClearElrRecord;
 import org.apache.kafka.common.metadata.ConfigRecord;
 import org.apache.kafka.common.metadata.FeatureLevelRecord;
 import org.apache.kafka.common.metadata.PartitionChangeRecord;
@@ -131,6 +132,7 @@ import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
 import static org.apache.kafka.common.config.TopicConfig.SEGMENT_BYTES_CONFIG;
+import static 
org.apache.kafka.common.metadata.MetadataRecordType.CLEAR_ELR_RECORD;
 import static org.apache.kafka.common.protocol.Errors.ELECTION_NOT_NEEDED;
 import static 
org.apache.kafka.common.protocol.Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE;
 import static org.apache.kafka.common.protocol.Errors.FENCED_LEADER_EPOCH;
@@ -3287,4 +3289,19 @@ public class ReplicationControlManagerTest {
             assertArrayEquals(new int[]{1}, 
ctx.replicationControl.getPartition(barId, 0).elr);
         }
     }
+
+    @Test
+    void testElrsRemovedShouldNotBumpPartitionEpochIfNoChange() {
+        ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().
+            setIsElrEnabled(true).
+            setStaticConfig(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2").
+            build();
+        ctx.registerBrokers(1, 2, 3, 4);
+        ctx.unfenceBrokers(1, 2, 3, 4);
+        Uuid fooId = ctx.createTestTopic("foo", new int[][]{
+            new int[]{1, 2, 4}, new int[]{1, 3, 4}}).topicId();
+        int partitionEpoch = ctx.replicationControl.getPartition(fooId, 
0).partitionEpoch;
+        ctx.replay(Arrays.asList(new ApiMessageAndVersion(new 
ClearElrRecord(), CLEAR_ELR_RECORD.highestSupportedVersion())));
+        assertEquals(partitionEpoch, 
ctx.replicationControl.getPartition(fooId, 0).partitionEpoch);
+    }
 }
diff --git a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java 
b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
index 4805fea058a..398cb84b5aa 100644
--- a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
@@ -20,6 +20,7 @@ package org.apache.kafka.image;
 import org.apache.kafka.common.DirectoryId;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.ClearElrRecord;
 import org.apache.kafka.common.metadata.PartitionChangeRecord;
 import org.apache.kafka.common.metadata.PartitionRecord;
 import org.apache.kafka.common.metadata.RemoveTopicRecord;
@@ -45,6 +46,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
+import static 
org.apache.kafka.common.metadata.MetadataRecordType.CLEAR_ELR_RECORD;
 import static 
org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD;
 import static 
org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_RECORD;
 import static 
org.apache.kafka.common.metadata.MetadataRecordType.REMOVE_TOPIC_RECORD;
@@ -352,6 +354,95 @@ public class TopicsImageTest {
         assertEquals(Collections.emptyMap(), changes.followers());
     }
 
+    @Test
+    public void testClearElrRecords() {
+        Uuid fooId = Uuid.fromString("0hHJ3X5ZQ-CFfQ5xgpj90w");
+        Uuid barId = Uuid.fromString("f62ptyETTjet8SL5ZeREiw");
+
+        List<TopicImage> topics = new ArrayList<>();
+        topics.add(
+            newTopicImage(
+                "foo",
+                fooId,
+                newPartition(new int[] {0, 1, 2, 3})
+            )
+        );
+        TopicsImage image = new TopicsImage(newTopicsByIdMap(topics),
+            newTopicsByNameMap(topics));
+
+        List<ApiMessageAndVersion> topicRecords = new ArrayList<>();
+        topicRecords.add(
+            new ApiMessageAndVersion(
+                new 
PartitionChangeRecord().setTopicId(fooId).setPartitionId(0).
+                    setIsr(Arrays.asList(0, 1)).
+                    setEligibleLeaderReplicas(Arrays.asList(2)).
+                    setLastKnownElr(Arrays.asList(3)),
+                PARTITION_CHANGE_RECORD.highestSupportedVersion()
+            )
+        );
+
+        TopicsDelta delta = new TopicsDelta(image);
+        RecordTestUtils.replayAll(delta, topicRecords);
+        image = delta.apply();
+
+        assertEquals(1, image.getTopic(fooId).partitions().get(0).elr.length);
+        assertEquals(1, 
image.getTopic(fooId).partitions().get(0).lastKnownElr.length);
+
+        topicRecords = new ArrayList<>();
+
+        /* Test the following:
+        1. The clear elr record should work on all existing topics(foo).
+        2. The clear elr record should work on the new topic(bar) in the same 
batch.
+        */
+        topicRecords.addAll(Arrays.asList(
+            new ApiMessageAndVersion(
+                new TopicRecord().setTopicId(barId).
+                    setName("bar"),
+                TOPIC_RECORD.highestSupportedVersion()
+            ),
+            new ApiMessageAndVersion(
+                new PartitionRecord().setTopicId(barId).
+                    setPartitionId(0).
+                    setLeader(0).
+                    setIsr(Arrays.asList(1)).
+                    setEligibleLeaderReplicas(Arrays.asList(2)).
+                    setLastKnownElr(Arrays.asList(3)),
+                PARTITION_RECORD.highestSupportedVersion()
+            ),
+            new ApiMessageAndVersion(
+                new ClearElrRecord().setTopicName("bar"),
+                CLEAR_ELR_RECORD.highestSupportedVersion()
+            ),
+            new ApiMessageAndVersion(
+                new ClearElrRecord(),
+                CLEAR_ELR_RECORD.highestSupportedVersion()
+            ))
+        );
+        delta = new TopicsDelta(image);
+        RecordTestUtils.replayAll(delta, topicRecords);
+        image = delta.apply();
+
+        assertEquals(0, image.getTopic(fooId).partitions().get(0).elr.length);
+        assertEquals(0, 
image.getTopic(fooId).partitions().get(0).lastKnownElr.length);
+        assertEquals(0, image.getTopic(barId).partitions().get(0).elr.length);
+        assertEquals(0, 
image.getTopic(barId).partitions().get(0).lastKnownElr.length);
+    }
+
+    @Test
+    public void testClearElrRecordForNonExistTopic() {
+        TopicsImage image = new 
TopicsImage(newTopicsByIdMap(Collections.emptyList()),
+            newTopicsByNameMap(Collections.emptyList()));
+        TopicsDelta delta = new TopicsDelta(image);
+        List<ApiMessageAndVersion> topicRecords = new ArrayList<>();
+        topicRecords.addAll(Collections.singletonList(
+            new ApiMessageAndVersion(
+                new ClearElrRecord().setTopicName("non-exist"),
+                CLEAR_ELR_RECORD.highestSupportedVersion()
+            ))
+        );
+        assertThrows(RuntimeException.class, () -> 
RecordTestUtils.replayAll(delta, topicRecords));
+    }
+
     @Test
     public void testLocalReassignmentChanges() {
         int localId = 3;

Reply via email to