This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2e3ff21c2e3 KAFKA-15412: Reading an unknown version of 
quorum-state-file should trigger an error (#14302)
2e3ff21c2e3 is described below

commit 2e3ff21c2e3674ece50c2a8a4053b93024e12b4a
Author: mannoopj <[email protected]>
AuthorDate: Wed Aug 30 03:03:41 2023 -0400

    KAFKA-15412: Reading an unknown version of quorum-state-file should trigger 
an error (#14302)
    
    Reading an unknown version of quorum-state-file should trigger an error. 
Currently the only known version is 0. Reading any other version should cause 
an error.
    
    Reviewers: Justine Olshan <[email protected]>, Luke Chen 
<[email protected]>
---
 .../org/apache/kafka/raft/FileBasedStateStore.java |  5 ++
 .../apache/kafka/raft/FileBasedStateStoreTest.java | 57 ++++++++++++++++++++++
 2 files changed, 62 insertions(+)

diff --git a/raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java 
b/raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java
index e403613ccd5..d567b15ece3 100644
--- a/raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java
+++ b/raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java
@@ -20,6 +20,7 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.fasterxml.jackson.databind.node.ShortNode;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.raft.generated.QuorumStateData;
 import org.apache.kafka.raft.generated.QuorumStateData.Voter;
@@ -90,6 +91,10 @@ public class FileBasedStateStore implements QuorumStateStore 
{
                     " does not have " + DATA_VERSION + " field");
             }
 
+            if (dataVersionNode.asInt() != 0) {
+                throw new UnsupportedVersionException("Unknown data version of 
" + dataVersionNode.toString());
+            }
+
             final short dataVersion = dataVersionNode.shortValue();
             return QuorumStateDataJsonConverter.read(dataObject, dataVersion);
         } catch (IOException e) {
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/FileBasedStateStoreTest.java 
b/raft/src/test/java/org/apache/kafka/raft/FileBasedStateStoreTest.java
index 5fa4f5c6880..841991f8d63 100644
--- a/raft/src/test/java/org/apache/kafka/raft/FileBasedStateStoreTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/FileBasedStateStoreTest.java
@@ -16,18 +16,27 @@
  */
 package org.apache.kafka.raft;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.AfterEach;
 
+import java.io.BufferedWriter;
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
 import java.util.OptionalInt;
 import java.util.Set;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class FileBasedStateStoreTest {
@@ -90,6 +99,54 @@ public class FileBasedStateStoreTest {
         assertFalse(stateFile.exists());
     }
 
+    @Test
+    public void testCantReadVersionQuorumState() throws IOException {
+        String jsonString = 
"{\"leaderId\":9990,\"leaderEpoch\":3012,\"votedId\":-1," +
+                "\"appliedOffset\": 
0,\"currentVoters\":[{\"voterId\":9990},{\"voterId\":9991},{\"voterId\":9992}],"
 +
+                "\"data_version\":2}";
+        assertCantReadQuorumStateVersion(jsonString);
+    }
+
+    public void assertCantReadQuorumStateVersion(String jsonString) throws 
IOException {
+        final File stateFile = TestUtils.tempFile();
+        stateStore = new FileBasedStateStore(stateFile);
+
+        // We initialized a state from the metadata log
+        assertTrue(stateFile.exists());
+
+        final int epoch = 3012;
+        final int leaderId = 9990;
+        final int follower1 = leaderId + 1;
+        final int follower2 = follower1 + 1;
+        Set<Integer> voters = Utils.mkSet(leaderId, follower1, follower2);
+        writeToStateFile(stateFile, jsonString);
+
+        assertThrows(UnsupportedVersionException.class, () -> {
+            stateStore.readElectionState(); });
+
+        stateStore.clear();
+        assertFalse(stateFile.exists());
+    }
+
+    private void writeToStateFile(final File stateFile, String jsonString) {
+        try (final FileOutputStream fileOutputStream = new 
FileOutputStream(stateFile);
+             final BufferedWriter writer = new BufferedWriter(
+                     new OutputStreamWriter(fileOutputStream, 
StandardCharsets.UTF_8))) {
+            ObjectMapper mapper = new ObjectMapper();
+            JsonNode node = mapper.readTree(jsonString);
+
+            writer.write(node.toString());
+            writer.flush();
+            fileOutputStream.getFD().sync();
+
+        } catch (IOException e) {
+            throw new UncheckedIOException(
+                    String.format("Error while writing to Quorum state file 
%s",
+                            stateFile.getAbsolutePath()), e);
+        }
+    }
+
+
     @AfterEach
     public void cleanup() throws IOException {
         if (stateStore != null) {

Reply via email to