This is an automated email from the ASF dual-hosted git repository.
boyang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 10b1f73 KAFKA-12958: add an invariant that notified leaders are never
asked to load snapshot (#10932)
10b1f73 is described below
commit 10b1f73cd42faaaabe3154694aae62a01d5de20d
Author: zhaohaidao <[email protected]>
AuthorDate: Sun Jul 4 23:32:12 2021 +0800
KAFKA-12958: add an invariant that notified leaders are never asked to load
snapshot (#10932)
Track handleSnapshot calls and make sure it is never triggered on the
leader node.
Reviewers: Luke Chen <[email protected]>, José Armando García Sancio
<[email protected]>, Boyang Chen <[email protected]>
---
.../java/org/apache/kafka/raft/ReplicatedCounter.java | 9 +++++++++
.../org/apache/kafka/raft/RaftEventSimulationTest.java | 18 ++++++++++++++++++
2 files changed, 27 insertions(+)
diff --git a/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
b/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
index 8cdd27f..a5a0a4e 100644
--- a/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
+++ b/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
@@ -38,6 +38,8 @@ public class ReplicatedCounter implements
RaftClient.Listener<Integer> {
private OptionalInt claimedEpoch = OptionalInt.empty();
private long lastOffsetSnapshotted = -1;
+ private int handleSnapshotCalls = 0;
+
public ReplicatedCounter(
int nodeId,
RaftClient<Integer> client,
@@ -152,6 +154,7 @@ public class ReplicatedCounter implements
RaftClient.Listener<Integer> {
}
}
lastOffsetSnapshotted = reader.lastContainedLogOffset();
+ handleSnapshotCalls += 1;
log.debug("Finished loading snapshot. Set value: {}", committed);
} finally {
reader.close();
@@ -170,5 +173,11 @@ public class ReplicatedCounter implements
RaftClient.Listener<Integer> {
uncommitted = -1;
claimedEpoch = OptionalInt.empty();
}
+ handleSnapshotCalls = 0;
+ }
+
+ /** Use handleSnapshotCalls to verify leader is never asked to load
snapshot */
+ public int handleSnapshotCalls() {
+ return handleSnapshotCalls;
}
}
diff --git
a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
index 1c48dee..739785a 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
@@ -352,6 +352,7 @@ public class RaftEventSimulationTest {
scheduler.addInvariant(new MajorityReachedHighWatermark(cluster));
scheduler.addInvariant(new SingleLeader(cluster));
scheduler.addInvariant(new SnapshotAtLogStart(cluster));
+ scheduler.addInvariant(new LeaderNeverLoadSnapshot(cluster));
scheduler.addValidation(new ConsistentCommittedData(cluster));
return scheduler;
}
@@ -1014,6 +1015,23 @@ public class RaftEventSimulationTest {
}
}
+ private static class LeaderNeverLoadSnapshot implements Invariant {
+ final Cluster cluster;
+
+ private LeaderNeverLoadSnapshot(Cluster cluster) {
+ this.cluster = cluster;
+ }
+
+ @Override
+ public void verify() {
+ for (RaftNode raftNode : cluster.running()) {
+ if (raftNode.counter.isWritable()) {
+ assertEquals(0, raftNode.counter.handleSnapshotCalls());
+ }
+ }
+ }
+ }
+
/**
* Validating the committed data is expensive, so we do this as a {@link
Validation}. We depend
* on the following external invariants: