This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 366d4958dfc KAFKA-18563 move RaftClientTestContext RpcVersion methods
into RaftProtocol (#19349)
366d4958dfc is described below
commit 366d4958dfc45c92a8b07ae90ee59506388e24cc
Author: PoAn Yang <[email protected]>
AuthorDate: Wed May 6 00:37:41 2026 +0800
KAFKA-18563 move RaftClientTestContext RpcVersion methods into RaftProtocol
(#19349)
There are lot of xyzRpcVersion which returning value by checking
RaftProtocol. It's better to include these functions in RaftProtocol
itself.
Reviewers: TaiJuWu <[email protected]>, Ken Huang <[email protected]>,
Chia-Ping Tsai <[email protected]>
---
.../apache/kafka/raft/RaftClientTestContext.java | 212 ++++++++++-----------
1 file changed, 102 insertions(+), 110 deletions(-)
diff --git
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index 02f92500711..d819102a79b 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -812,7 +812,7 @@ public final class RaftClientTestContext {
}
DescribeQuorumResponseData.NodeCollection nodes = new
DescribeQuorumResponseData.NodeCollection(0);
- if (describeQuorumRpcVersion() >= 2) {
+ if (raftProtocol.describeQuorumRpcVersion() >= 2) {
nodes = new
DescribeQuorumResponseData.NodeCollection(voterStates.size());
for (ReplicaState voterState : voterStates) {
nodes.add(new DescribeQuorumResponseData.Node()
@@ -1558,7 +1558,7 @@ public final class RaftClientTestContext {
) {
return RaftUtil.singletonEndQuorumEpochResponse(
channel.listenerName(),
- endQuorumEpochRpcVersion(),
+ raftProtocol.endQuorumEpochRpcVersion(),
Errors.NONE,
metadataPartition,
Errors.NONE,
@@ -1651,7 +1651,7 @@ public final class RaftClientTestContext {
BeginQuorumEpochResponseData beginEpochResponse(int epoch, int leaderId) {
return RaftUtil.singletonBeginQuorumEpochResponse(
channel.listenerName(),
- beginQuorumEpochRpcVersion(),
+ raftProtocol.beginQuorumEpochRpcVersion(),
Errors.NONE,
metadataPartition,
Errors.NONE,
@@ -1738,11 +1738,11 @@ public final class RaftClientTestContext {
}
VoteResponseData voteResponse(boolean voteGranted, OptionalInt leaderId,
int epoch) {
- return voteResponse(Errors.NONE, voteGranted, leaderId, epoch,
voteRpcVersion());
+ return voteResponse(Errors.NONE, voteGranted, leaderId, epoch,
raftProtocol.voteRpcVersion());
}
VoteResponseData voteResponse(Errors error, OptionalInt leaderId, int
epoch) {
- return voteResponse(error, false, leaderId, epoch, voteRpcVersion());
+ return voteResponse(error, false, leaderId, epoch,
raftProtocol.voteRpcVersion());
}
VoteResponseData voteResponse(Errors error, boolean voteGranted,
OptionalInt leaderId, int epoch, short version) {
@@ -1925,7 +1925,7 @@ public final class RaftClientTestContext {
) {
return RaftUtil.singletonFetchResponse(
channel.listenerName(),
- fetchRpcVersion(),
+ raftProtocol.fetchRpcVersion(),
metadataPartition,
metadataTopicId,
Errors.NONE,
@@ -1953,7 +1953,7 @@ public final class RaftClientTestContext {
) {
return RaftUtil.singletonFetchResponse(
channel.listenerName(),
- fetchRpcVersion(),
+ raftProtocol.fetchRpcVersion(),
metadataPartition,
metadataTopicId,
Errors.NONE,
@@ -1983,7 +1983,7 @@ public final class RaftClientTestContext {
) {
return RaftUtil.singletonFetchResponse(
channel.listenerName(),
- fetchRpcVersion(),
+ raftProtocol.fetchRpcVersion(),
metadataPartition,
metadataTopicId,
Errors.NONE,
@@ -2011,7 +2011,7 @@ public final class RaftClientTestContext {
) {
return RaftUtil.singletonFetchSnapshotResponse(
channel.listenerName(),
- fetchSnapshotRpcVersion(),
+ raftProtocol.fetchSnapshotRpcVersion(),
metadataPartition,
leaderId,
startingVoters.listeners(leaderId),
@@ -2091,103 +2091,25 @@ public final class RaftClientTestContext {
);
}
- private short fetchRpcVersion() {
- if (raftProtocol.isHwmInFetchSupported()) {
- return 18;
- } else if (raftProtocol.isReconfigSupported()) {
- return 17;
- } else {
- return 16;
- }
- }
-
- private short fetchSnapshotRpcVersion() {
- if (raftProtocol.isReconfigSupported()) {
- return 1;
- } else {
- return 0;
- }
- }
-
- short voteRpcVersion() {
- if (raftProtocol.isPreVoteSupported()) {
- return 2;
- } else if (raftProtocol.isReconfigSupported()) {
- return 1;
- } else {
- return 0;
- }
- }
-
- private short beginQuorumEpochRpcVersion() {
- if (raftProtocol.isReconfigSupported()) {
- return 1;
- } else {
- return 0;
- }
- }
-
- private short endQuorumEpochRpcVersion() {
- if (raftProtocol.isReconfigSupported()) {
- return 1;
- } else {
- return 0;
- }
- }
-
- private short describeQuorumRpcVersion() {
- if (raftProtocol.isReconfigSupported()) {
- return 2;
- } else {
- return 1;
- }
- }
-
- private short addVoterRpcVersion() {
- if (raftProtocol.isAutoJoinSupported()) {
- return 1;
- } else if (raftProtocol.isReconfigSupported()) {
- return 0;
- } else {
- throw new IllegalStateException("Reconfiguration must be enabled
by calling withRaftProtocol(KIP_853_PROTOCOL)");
- }
- }
-
- private short removeVoterRpcVersion() {
- if (raftProtocol.isReconfigSupported()) {
- return 0;
- } else {
- throw new IllegalStateException("Reconfiguration must be enabled
by calling withRaftProtocol(KIP_853_PROTOCOL)");
- }
- }
-
- private short updateVoterRpcVersion() {
- if (raftProtocol.isReconfigSupported()) {
- return 0;
- } else {
- throw new IllegalStateException("Reconfiguration must be enabled
by calling withRaftProtocol(KIP_853_PROTOCOL)");
- }
- }
-
private short raftRequestVersion(ApiMessage request) {
if (request instanceof FetchRequestData) {
- return fetchRpcVersion();
+ return raftProtocol.fetchRpcVersion();
} else if (request instanceof FetchSnapshotRequestData) {
- return fetchSnapshotRpcVersion();
+ return raftProtocol.fetchSnapshotRpcVersion();
} else if (request instanceof VoteRequestData) {
- return voteRpcVersion();
+ return raftProtocol.voteRpcVersion();
} else if (request instanceof BeginQuorumEpochRequestData) {
- return beginQuorumEpochRpcVersion();
+ return raftProtocol.beginQuorumEpochRpcVersion();
} else if (request instanceof EndQuorumEpochRequestData) {
- return endQuorumEpochRpcVersion();
+ return raftProtocol.endQuorumEpochRpcVersion();
} else if (request instanceof DescribeQuorumRequestData) {
- return describeQuorumRpcVersion();
+ return raftProtocol.describeQuorumRpcVersion();
} else if (request instanceof AddRaftVoterRequestData) {
- return addVoterRpcVersion();
+ return raftProtocol.addVoterRpcVersion();
} else if (request instanceof RemoveRaftVoterRequestData) {
- return removeVoterRpcVersion();
+ return raftProtocol.removeVoterRpcVersion();
} else if (request instanceof UpdateRaftVoterRequestData) {
- return updateVoterRpcVersion();
+ return raftProtocol.updateVoterRpcVersion();
} else {
throw new IllegalArgumentException(String.format("Request %s is
not a raft request", request));
}
@@ -2195,23 +2117,23 @@ public final class RaftClientTestContext {
private short raftResponseVersion(ApiMessage response) {
if (response instanceof FetchResponseData) {
- return fetchRpcVersion();
+ return raftProtocol.fetchRpcVersion();
} else if (response instanceof FetchSnapshotResponseData) {
- return fetchSnapshotRpcVersion();
+ return raftProtocol.fetchSnapshotRpcVersion();
} else if (response instanceof VoteResponseData) {
- return voteRpcVersion();
+ return raftProtocol.voteRpcVersion();
} else if (response instanceof BeginQuorumEpochResponseData) {
- return beginQuorumEpochRpcVersion();
+ return raftProtocol.beginQuorumEpochRpcVersion();
} else if (response instanceof EndQuorumEpochResponseData) {
- return endQuorumEpochRpcVersion();
+ return raftProtocol.endQuorumEpochRpcVersion();
} else if (response instanceof DescribeQuorumResponseData) {
- return describeQuorumRpcVersion();
+ return raftProtocol.describeQuorumRpcVersion();
} else if (response instanceof AddRaftVoterResponseData) {
- return addVoterRpcVersion();
+ return raftProtocol.addVoterRpcVersion();
} else if (response instanceof RemoveRaftVoterResponseData) {
- return removeVoterRpcVersion();
+ return raftProtocol.removeVoterRpcVersion();
} else if (response instanceof UpdateRaftVoterResponseData) {
- return updateVoterRpcVersion();
+ return raftProtocol.updateVoterRpcVersion();
} else if (response instanceof ApiVersionsResponseData) {
return 4;
} else {
@@ -2431,12 +2353,82 @@ public final class RaftClientTestContext {
return isAtLeast(KIP_996_PROTOCOL);
}
- boolean isHwmInFetchSupported() {
- return isAtLeast(KIP_1166_PROTOCOL);
+ short describeQuorumRpcVersion() {
+ if (isAtLeast(KIP_853_PROTOCOL)) {
+ return 2;
+ } else {
+ return 1;
+ }
+ }
+
+ short fetchRpcVersion() {
+ if (isAtLeast(KIP_1166_PROTOCOL)) {
+ return 18;
+ } else if (isAtLeast(KIP_853_PROTOCOL)) {
+ return 17;
+ } else {
+ return 16;
+ }
+ }
+
+ short fetchSnapshotRpcVersion() {
+ if (isAtLeast(KIP_853_PROTOCOL)) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+
+ short voteRpcVersion() {
+ if (isAtLeast(KIP_996_PROTOCOL)) {
+ return 2;
+ } else if (isAtLeast(KIP_853_PROTOCOL)) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+
+ short beginQuorumEpochRpcVersion() {
+ if (isAtLeast(KIP_853_PROTOCOL)) {
+ return 1;
+ } else {
+ return 0;
+ }
}
-
- boolean isAutoJoinSupported() {
- return isAtLeast(KIP_1186_PROTOCOL);
+
+ short endQuorumEpochRpcVersion() {
+ if (isAtLeast(KIP_853_PROTOCOL)) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+
+ short addVoterRpcVersion() {
+ if (isAtLeast(KIP_1186_PROTOCOL)) {
+ return 1;
+ } else if (isAtLeast(KIP_853_PROTOCOL)) {
+ return 0;
+ } else {
+ throw new IllegalStateException("Reconfiguration must be
enabled by calling withRaftProtocol(KIP_853_PROTOCOL)");
+ }
+ }
+
+ short removeVoterRpcVersion() {
+ if (isAtLeast(KIP_853_PROTOCOL)) {
+ return 0;
+ } else {
+ throw new IllegalStateException("Reconfiguration must be
enabled by calling withRaftProtocol(KIP_853_PROTOCOL)");
+ }
+ }
+
+ short updateVoterRpcVersion() {
+ if (isAtLeast(KIP_853_PROTOCOL)) {
+ return 0;
+ } else {
+ throw new IllegalStateException("Reconfiguration must be
enabled by calling withRaftProtocol(KIP_853_PROTOCOL)");
+ }
}
private boolean isAtLeast(RaftProtocol otherRpc) {