This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0927049a617 KAFKA-14371: Remove unused clusterId field from 
quorum-state file (#13102)
0927049a617 is described below

commit 0927049a617fa2937a211aab895f6590403130fb
Author: Gantigmaa Selenge <39860586+tinasele...@users.noreply.github.com>
AuthorDate: Wed Mar 1 02:13:38 2023 +0000

    KAFKA-14371: Remove unused clusterId field from quorum-state file (#13102)
    
    Remove clusterId field from the KRaft controller's quorum-state file 
$LOG_DIR/__cluster_metadata-0/quorum-state
    
    Reviewers: Luke Chen <show...@gmail.com>, dengziming 
<dengziming1...@gmail.com>, Christo Lolov <christolo...@gmail.com>
---
 .../resources/common/message/QuorumStateData.json  |  4 +-
 .../apache/kafka/raft/FileBasedStateStoreTest.java | 46 ++++++++++++++++++++++
 2 files changed, 48 insertions(+), 2 deletions(-)

diff --git a/raft/src/main/resources/common/message/QuorumStateData.json 
b/raft/src/main/resources/common/message/QuorumStateData.json
index d71a32c75de..d0320d16b1b 100644
--- a/raft/src/main/resources/common/message/QuorumStateData.json
+++ b/raft/src/main/resources/common/message/QuorumStateData.json
@@ -16,10 +16,10 @@
 {
   "type": "data",
   "name": "QuorumStateData",
-  "validVersions": "0",
+  // Version 1 removes clusterId field.
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   "fields": [
-    {"name": "ClusterId", "type": "string", "versions": "0+"},
     {"name": "LeaderId", "type": "int32", "versions": "0+", "default": "-1"},
     {"name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": 
"-1"},
     {"name": "VotedId", "type": "int32", "versions": "0+", "default": "-1"},
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/FileBasedStateStoreTest.java 
b/raft/src/test/java/org/apache/kafka/raft/FileBasedStateStoreTest.java
index 5fa4f5c6880..97002897265 100644
--- a/raft/src/test/java/org/apache/kafka/raft/FileBasedStateStoreTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/FileBasedStateStoreTest.java
@@ -16,6 +16,13 @@
  */
 package org.apache.kafka.raft;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.AfterEach;
@@ -90,6 +97,45 @@ public class FileBasedStateStoreTest {
         assertFalse(stateFile.exists());
     }
 
+    @Test
+    public void testCompatibilityWithClusterId() throws IOException {
+        final File stateFile = TestUtils.tempFile();
+        stateStore = new FileBasedStateStore(stateFile);
+
+        // We initialized a state from the metadata log
+        assertTrue(stateFile.exists());
+
+        final int epoch = 2;
+        final int leaderId = 1;
+        Set<Integer> voters = Utils.mkSet(leaderId);
+        String jsonString = "{\"clusterId\":\"abc\",\"leaderId\":" + leaderId 
+ ",\"leaderEpoch\":" + epoch + 
",\"votedId\":-1,\"appliedOffset\":0,\"currentVoters\":[],\"data_version\":0}";
+        writeToStateFile(stateFile, jsonString);
+
+        // verify that we can read the state file that contains the removed 
"cluserId" field.
+        assertEquals(ElectionState.withElectedLeader(epoch, leaderId, voters), 
stateStore.readElectionState());
+
+        stateStore.clear();
+        assertFalse(stateFile.exists());
+    }
+
+    private void writeToStateFile(final File stateFile, String jsonString) {
+        try (final FileOutputStream fileOutputStream = new 
FileOutputStream(stateFile);
+             final BufferedWriter writer = new BufferedWriter(
+                     new OutputStreamWriter(fileOutputStream, 
StandardCharsets.UTF_8))) {
+            ObjectMapper mapper = new ObjectMapper();
+            JsonNode node = mapper.readTree(jsonString);
+
+            writer.write(node.toString());
+            writer.flush();
+            fileOutputStream.getFD().sync();
+
+        } catch (IOException e) {
+            throw new UncheckedIOException(
+                    String.format("Error while writing to Quorum state file 
%s",
+                            stateFile.getAbsolutePath()), e);
+        }
+    }
+
     @AfterEach
     public void cleanup() throws IOException {
         if (stateStore != null) {

Reply via email to