This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2e3ff21c2e3 KAFKA-15412: Reading an unknown version of
quorum-state-file should trigger an error (#14302)
2e3ff21c2e3 is described below
commit 2e3ff21c2e3674ece50c2a8a4053b93024e12b4a
Author: mannoopj <[email protected]>
AuthorDate: Wed Aug 30 03:03:41 2023 -0400
KAFKA-15412: Reading an unknown version of quorum-state-file should trigger
an error (#14302)
Reading an unknown version of quorum-state-file should trigger an error.
Currently the only known version is 0. Reading any other version should cause
an error.
Reviewers: Justine Olshan <[email protected]>, Luke Chen
<[email protected]>
---
.../org/apache/kafka/raft/FileBasedStateStore.java | 5 ++
.../apache/kafka/raft/FileBasedStateStoreTest.java | 57 ++++++++++++++++++++++
2 files changed, 62 insertions(+)
diff --git a/raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java
b/raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java
index e403613ccd5..d567b15ece3 100644
--- a/raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java
+++ b/raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java
@@ -20,6 +20,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.ShortNode;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.raft.generated.QuorumStateData;
import org.apache.kafka.raft.generated.QuorumStateData.Voter;
@@ -90,6 +91,10 @@ public class FileBasedStateStore implements QuorumStateStore
{
" does not have " + DATA_VERSION + " field");
}
+ if (dataVersionNode.asInt() != 0) {
+ throw new UnsupportedVersionException("Unknown data version of
" + dataVersionNode.toString());
+ }
+
final short dataVersion = dataVersionNode.shortValue();
return QuorumStateDataJsonConverter.read(dataObject, dataVersion);
} catch (IOException e) {
diff --git
a/raft/src/test/java/org/apache/kafka/raft/FileBasedStateStoreTest.java
b/raft/src/test/java/org/apache/kafka/raft/FileBasedStateStoreTest.java
index 5fa4f5c6880..841991f8d63 100644
--- a/raft/src/test/java/org/apache/kafka/raft/FileBasedStateStoreTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/FileBasedStateStoreTest.java
@@ -16,18 +16,27 @@
*/
package org.apache.kafka.raft;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
+import java.io.BufferedWriter;
import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
import java.util.OptionalInt;
import java.util.Set;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class FileBasedStateStoreTest {
@@ -90,6 +99,54 @@ public class FileBasedStateStoreTest {
assertFalse(stateFile.exists());
}
+ @Test
+ public void testCantReadVersionQuorumState() throws IOException {
+ String jsonString =
"{\"leaderId\":9990,\"leaderEpoch\":3012,\"votedId\":-1," +
+ "\"appliedOffset\":
0,\"currentVoters\":[{\"voterId\":9990},{\"voterId\":9991},{\"voterId\":9992}],"
+
+ "\"data_version\":2}";
+ assertCantReadQuorumStateVersion(jsonString);
+ }
+
+ public void assertCantReadQuorumStateVersion(String jsonString) throws
IOException {
+ final File stateFile = TestUtils.tempFile();
+ stateStore = new FileBasedStateStore(stateFile);
+
+ // We initialized a state from the metadata log
+ assertTrue(stateFile.exists());
+
+ final int epoch = 3012;
+ final int leaderId = 9990;
+ final int follower1 = leaderId + 1;
+ final int follower2 = follower1 + 1;
+ Set<Integer> voters = Utils.mkSet(leaderId, follower1, follower2);
+ writeToStateFile(stateFile, jsonString);
+
+ assertThrows(UnsupportedVersionException.class, () -> {
+ stateStore.readElectionState(); });
+
+ stateStore.clear();
+ assertFalse(stateFile.exists());
+ }
+
+ private void writeToStateFile(final File stateFile, String jsonString) {
+ try (final FileOutputStream fileOutputStream = new
FileOutputStream(stateFile);
+ final BufferedWriter writer = new BufferedWriter(
+ new OutputStreamWriter(fileOutputStream,
StandardCharsets.UTF_8))) {
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode node = mapper.readTree(jsonString);
+
+ writer.write(node.toString());
+ writer.flush();
+ fileOutputStream.getFD().sync();
+
+ } catch (IOException e) {
+ throw new UncheckedIOException(
+ String.format("Error while writing to Quorum state file
%s",
+ stateFile.getAbsolutePath()), e);
+ }
+ }
+
+
@AfterEach
public void cleanup() throws IOException {
if (stateStore != null) {