This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0927049a617 KAFKA-14371: Remove unused clusterId field from
quorum-state file (#13102)
0927049a617 is described below
commit 0927049a617fa2937a211aab895f6590403130fb
Author: Gantigmaa Selenge <[email protected]>
AuthorDate: Wed Mar 1 02:13:38 2023 +0000
KAFKA-14371: Remove unused clusterId field from quorum-state file (#13102)
Remove clusterId field from the KRaft controller's quorum-state file
$LOG_DIR/__cluster_metadata-0/quorum-state
Reviewers: Luke Chen <[email protected]>, dengziming
<[email protected]>, Christo Lolov <[email protected]>
---
.../resources/common/message/QuorumStateData.json | 4 +-
.../apache/kafka/raft/FileBasedStateStoreTest.java | 46 ++++++++++++++++++++++
2 files changed, 48 insertions(+), 2 deletions(-)
diff --git a/raft/src/main/resources/common/message/QuorumStateData.json
b/raft/src/main/resources/common/message/QuorumStateData.json
index d71a32c75de..d0320d16b1b 100644
--- a/raft/src/main/resources/common/message/QuorumStateData.json
+++ b/raft/src/main/resources/common/message/QuorumStateData.json
@@ -16,10 +16,10 @@
{
"type": "data",
"name": "QuorumStateData",
- "validVersions": "0",
+ // Version 1 removes clusterId field.
+ "validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
- {"name": "ClusterId", "type": "string", "versions": "0+"},
{"name": "LeaderId", "type": "int32", "versions": "0+", "default": "-1"},
{"name": "LeaderEpoch", "type": "int32", "versions": "0+", "default":
"-1"},
{"name": "VotedId", "type": "int32", "versions": "0+", "default": "-1"},
diff --git
a/raft/src/test/java/org/apache/kafka/raft/FileBasedStateStoreTest.java
b/raft/src/test/java/org/apache/kafka/raft/FileBasedStateStoreTest.java
index 5fa4f5c6880..97002897265 100644
--- a/raft/src/test/java/org/apache/kafka/raft/FileBasedStateStoreTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/FileBasedStateStoreTest.java
@@ -16,6 +16,13 @@
*/
package org.apache.kafka.raft;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
@@ -90,6 +97,45 @@ public class FileBasedStateStoreTest {
assertFalse(stateFile.exists());
}
+ @Test
+ public void testCompatibilityWithClusterId() throws IOException {
+ final File stateFile = TestUtils.tempFile();
+ stateStore = new FileBasedStateStore(stateFile);
+
+ // We initialized a state from the metadata log
+ assertTrue(stateFile.exists());
+
+ final int epoch = 2;
+ final int leaderId = 1;
+ Set<Integer> voters = Utils.mkSet(leaderId);
+ String jsonString = "{\"clusterId\":\"abc\",\"leaderId\":" + leaderId
+ ",\"leaderEpoch\":" + epoch +
",\"votedId\":-1,\"appliedOffset\":0,\"currentVoters\":[],\"data_version\":0}";
+ writeToStateFile(stateFile, jsonString);
+
+ // verify that we can read the state file that contains the removed
"cluserId" field.
+ assertEquals(ElectionState.withElectedLeader(epoch, leaderId, voters),
stateStore.readElectionState());
+
+ stateStore.clear();
+ assertFalse(stateFile.exists());
+ }
+
+ private void writeToStateFile(final File stateFile, String jsonString) {
+ try (final FileOutputStream fileOutputStream = new
FileOutputStream(stateFile);
+ final BufferedWriter writer = new BufferedWriter(
+ new OutputStreamWriter(fileOutputStream,
StandardCharsets.UTF_8))) {
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode node = mapper.readTree(jsonString);
+
+ writer.write(node.toString());
+ writer.flush();
+ fileOutputStream.getFD().sync();
+
+ } catch (IOException e) {
+ throw new UncheckedIOException(
+ String.format("Error while writing to Quorum state file
%s",
+ stateFile.getAbsolutePath()), e);
+ }
+ }
+
@AfterEach
public void cleanup() throws IOException {
if (stateStore != null) {