This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new aecaf444756 KAFKA-16520: Support KIP-853 in DescribeQuorum (#16106)
aecaf444756 is described below
commit aecaf4447561edd8da9f06e3abdf46f382dc9d89
Author: Nikolay <[email protected]>
AuthorDate: Tue Jun 11 20:01:35 2024 +0300
KAFKA-16520: Support KIP-853 in DescribeQuorum (#16106)
Add support for KIP-953 KRaft Quorum reconfiguration in the DescribeQuorum
request and response.
Also add support to AdminClient.describeQuorum, so that users will be able
to find the current set of
quorum nodes, as well as their directories, via these RPCs.
Reviewers: Luke Chen <[email protected]>, Colin P. McCabe
<[email protected]>, Andrew Schofield <[email protected]>
---
.../kafka/clients/admin/KafkaAdminClient.java | 20 +-
.../org/apache/kafka/clients/admin/QuorumInfo.java | 76 ++++++-
.../common/requests/DescribeQuorumRequest.java | 4 +-
.../common/requests/DescribeQuorumResponse.java | 13 +-
.../common/message/DescribeQuorumRequest.json | 4 +-
.../common/message/DescribeQuorumResponse.json | 24 +-
.../kafka/clients/admin/KafkaAdminClientTest.java | 23 +-
.../unit/kafka/server/ApiVersionsRequestTest.scala | 4 +-
.../kafka/server/DescribeQuorumRequestTest.scala | 2 +
.../scala/unit/kafka/server/KafkaApisTest.scala | 3 +-
.../org/apache/kafka/raft/KafkaRaftClient.java | 10 +-
.../apache/kafka/raft/KafkaRaftClientDriver.java | 1 +
.../java/org/apache/kafka/raft/LeaderState.java | 84 +++++--
.../java/org/apache/kafka/raft/QuorumState.java | 3 +-
.../java/org/apache/kafka/raft/RaftMessage.java | 3 +-
.../java/org/apache/kafka/raft/RaftRequest.java | 17 +-
.../java/org/apache/kafka/raft/RaftResponse.java | 5 +
.../kafka/raft/internals/BlockingMessageQueue.java | 5 +
.../org/apache/kafka/raft/internals/VoterSet.java | 6 +-
.../apache/kafka/raft/KafkaNetworkChannelTest.java | 1 +
.../org/apache/kafka/raft/KafkaRaftClientTest.java | 7 +
.../org/apache/kafka/raft/LeaderStateTest.java | 243 ++++++++++++---------
.../apache/kafka/raft/RaftClientTestContext.java | 19 +-
.../apache/kafka/raft/RaftEventSimulationTest.java | 2 +-
.../kafka/raft/internals/KafkaRaftMetricsTest.java | 4 +-
25 files changed, 418 insertions(+), 165 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index c59cccf67c4..d7d525e4431 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -4414,12 +4414,13 @@ public class KafkaAdminClient extends AdminClient {
private QuorumInfo.ReplicaState
translateReplicaState(DescribeQuorumResponseData.ReplicaState replica) {
return new QuorumInfo.ReplicaState(
replica.replicaId(),
+ replica.replicaDirectoryId() == null ? Uuid.ZERO_UUID
: replica.replicaDirectoryId(),
replica.logEndOffset(),
replica.lastFetchTimestamp() == -1 ?
OptionalLong.empty() : OptionalLong.of(replica.lastFetchTimestamp()),
replica.lastCaughtUpTimestamp() == -1 ?
OptionalLong.empty() : OptionalLong.of(replica.lastCaughtUpTimestamp()));
}
- private QuorumInfo createQuorumResult(final
DescribeQuorumResponseData.PartitionData partition) {
+ private QuorumInfo createQuorumResult(final
DescribeQuorumResponseData.PartitionData partition,
DescribeQuorumResponseData.NodeCollection nodeCollection) {
List<QuorumInfo.ReplicaState> voters =
partition.currentVoters().stream()
.map(this::translateReplicaState)
.collect(Collectors.toList());
@@ -4428,12 +4429,21 @@ public class KafkaAdminClient extends AdminClient {
.map(this::translateReplicaState)
.collect(Collectors.toList());
+ Map<Integer, QuorumInfo.Node> nodes =
nodeCollection.stream().map(n -> {
+ List<RaftVoterEndpoint> endpoints = n.listeners().stream()
+ .map(l -> new RaftVoterEndpoint(l.name(), l.host(),
l.port()))
+ .collect(Collectors.toList());
+
+ return new QuorumInfo.Node(n.nodeId(), endpoints);
+ }).collect(Collectors.toMap(QuorumInfo.Node::nodeId,
Function.identity()));
+
return new QuorumInfo(
partition.leaderId(),
partition.leaderEpoch(),
partition.highWatermark(),
voters,
- observers
+ observers,
+ nodes
);
}
@@ -4447,7 +4457,7 @@ public class KafkaAdminClient extends AdminClient {
void handleResponse(AbstractResponse response) {
final DescribeQuorumResponse quorumResponse =
(DescribeQuorumResponse) response;
if (quorumResponse.data().errorCode() != Errors.NONE.code()) {
- throw
Errors.forCode(quorumResponse.data().errorCode()).exception();
+ throw
Errors.forCode(quorumResponse.data().errorCode()).exception(quorumResponse.data().errorMessage());
}
if (quorumResponse.data().topics().size() != 1) {
String msg = String.format("DescribeMetadataQuorum
received %d topics when 1 was expected",
@@ -4476,9 +4486,9 @@ public class KafkaAdminClient extends AdminClient {
throw new UnknownServerException(msg);
}
if (partition.errorCode() != Errors.NONE.code()) {
- throw Errors.forCode(partition.errorCode()).exception();
+ throw
Errors.forCode(partition.errorCode()).exception(partition.errorMessage());
}
- future.complete(createQuorumResult(partition));
+ future.complete(createQuorumResult(partition,
quorumResponse.data().nodes()));
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java
b/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java
index f9e4f8c11c9..5264b6f6aae 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java
@@ -16,7 +16,10 @@
*/
package org.apache.kafka.clients.admin;
+import org.apache.kafka.common.Uuid;
+
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
@@ -29,19 +32,22 @@ public class QuorumInfo {
private final long highWatermark;
private final List<ReplicaState> voters;
private final List<ReplicaState> observers;
+ private final Map<Integer, Node> nodes;
QuorumInfo(
int leaderId,
long leaderEpoch,
long highWatermark,
List<ReplicaState> voters,
- List<ReplicaState> observers
+ List<ReplicaState> observers,
+ Map<Integer, Node> nodes
) {
this.leaderId = leaderId;
this.leaderEpoch = leaderEpoch;
this.highWatermark = highWatermark;
this.voters = voters;
this.observers = observers;
+ this.nodes = nodes;
}
public int leaderId() {
@@ -64,6 +70,13 @@ public class QuorumInfo {
return observers;
}
+ /**
+ * @return The voter nodes in the Raft cluster, or an empty map if KIP-853
is not enabled.
+ */
+ public Map<Integer, Node> nodes() {
+ return nodes;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -73,12 +86,13 @@ public class QuorumInfo {
&& leaderEpoch == that.leaderEpoch
&& highWatermark == that.highWatermark
&& Objects.equals(voters, that.voters)
- && Objects.equals(observers, that.observers);
+ && Objects.equals(observers, that.observers)
+ && Objects.equals(nodes, that.nodes);
}
@Override
public int hashCode() {
- return Objects.hash(leaderId, leaderEpoch, highWatermark, voters,
observers);
+ return Objects.hash(leaderId, leaderEpoch, highWatermark, voters,
observers, nodes);
}
@Override
@@ -89,26 +103,30 @@ public class QuorumInfo {
", highWatermark=" + highWatermark +
", voters=" + voters +
", observers=" + observers +
+ ", nodes=" + nodes +
')';
}
public static class ReplicaState {
private final int replicaId;
+ private final Uuid replicaDirectoryId;
private final long logEndOffset;
private final OptionalLong lastFetchTimestamp;
private final OptionalLong lastCaughtUpTimestamp;
ReplicaState() {
- this(0, 0, OptionalLong.empty(), OptionalLong.empty());
+ this(0, Uuid.ZERO_UUID, 0, OptionalLong.empty(),
OptionalLong.empty());
}
ReplicaState(
int replicaId,
+ Uuid replicaDirectoryId,
long logEndOffset,
OptionalLong lastFetchTimestamp,
OptionalLong lastCaughtUpTimestamp
) {
this.replicaId = replicaId;
+ this.replicaDirectoryId = replicaDirectoryId;
this.logEndOffset = logEndOffset;
this.lastFetchTimestamp = lastFetchTimestamp;
this.lastCaughtUpTimestamp = lastCaughtUpTimestamp;
@@ -122,6 +140,13 @@ public class QuorumInfo {
return replicaId;
}
+ /**
+ * Return the directory id of the replica if configured, or
Uuid.ZERO_UUID if not.
+ */
+ public Uuid replicaDirectoryId() {
+ return replicaDirectoryId;
+ }
+
/**
* Return the logEndOffset known by the leader for this replica.
* @return The logEndOffset for this replica
@@ -154,6 +179,7 @@ public class QuorumInfo {
if (o == null || getClass() != o.getClass()) return false;
ReplicaState that = (ReplicaState) o;
return replicaId == that.replicaId
+ && Objects.equals(replicaDirectoryId, that.replicaDirectoryId)
&& logEndOffset == that.logEndOffset
&& lastFetchTimestamp.equals(that.lastFetchTimestamp)
&& lastCaughtUpTimestamp.equals(that.lastCaughtUpTimestamp);
@@ -161,17 +187,57 @@ public class QuorumInfo {
@Override
public int hashCode() {
- return Objects.hash(replicaId, logEndOffset, lastFetchTimestamp,
lastCaughtUpTimestamp);
+ return Objects.hash(replicaId, replicaDirectoryId, logEndOffset,
lastFetchTimestamp, lastCaughtUpTimestamp);
}
@Override
public String toString() {
return "ReplicaState(" +
"replicaId=" + replicaId +
+ ", replicaDirectoryId=" + replicaDirectoryId +
", logEndOffset=" + logEndOffset +
", lastFetchTimestamp=" + lastFetchTimestamp +
", lastCaughtUpTimestamp=" + lastCaughtUpTimestamp +
')';
}
}
+
+ public static class Node {
+ private final int nodeId;
+ private final List<RaftVoterEndpoint> endpoints;
+
+ Node(int nodeId, List<RaftVoterEndpoint> endpoints) {
+ this.nodeId = nodeId;
+ this.endpoints = endpoints;
+ }
+
+ public int nodeId() {
+ return nodeId;
+ }
+
+ public List<RaftVoterEndpoint> endpoints() {
+ return endpoints;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Node node = (Node) o;
+ return nodeId == node.nodeId && Objects.equals(endpoints,
node.endpoints);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(nodeId, endpoints);
+ }
+
+ @Override
+ public String toString() {
+ return "Node{" +
+ "nodeId=" + nodeId +
+ ", endpoints=" + endpoints +
+ '}';
+ }
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumRequest.java
index acdb11c6644..5de7da403d4 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumRequest.java
@@ -83,6 +83,7 @@ public class DescribeQuorumRequest extends AbstractRequest {
public static DescribeQuorumResponseData
getPartitionLevelErrorResponse(DescribeQuorumRequestData data, Errors error) {
short errorCode = error.code();
+ String errorMessage = error.message();
List<DescribeQuorumResponseData.TopicData> topicResponses = new
ArrayList<>();
for (DescribeQuorumRequestData.TopicData topic : data.topics()) {
@@ -93,6 +94,7 @@ public class DescribeQuorumRequest extends AbstractRequest {
requestPartition -> new
DescribeQuorumResponseData.PartitionData()
.setPartitionIndex(requestPartition.partitionIndex())
.setErrorCode(errorCode)
+ .setErrorMessage(errorMessage)
).collect(Collectors.toList())));
}
@@ -100,6 +102,6 @@ public class DescribeQuorumRequest extends AbstractRequest {
}
public static DescribeQuorumResponseData getTopLevelErrorResponse(Errors
topLevelError) {
- return new
DescribeQuorumResponseData().setErrorCode(topLevelError.code());
+ return new
DescribeQuorumResponseData().setErrorCode(topLevelError.code()).setErrorMessage(topLevelError.message());
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
index 39e050c9405..5ad51f4e1de 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
@@ -84,19 +84,26 @@ public class DescribeQuorumResponse extends
AbstractResponse {
.setTopicName(topicPartition.topic())
.setPartitions(Collections.singletonList(new
DescribeQuorumResponseData.PartitionData()
.setPartitionIndex(topicPartition.partition())
- .setErrorCode(error.code())))));
+ .setErrorCode(error.code())
+ .setErrorMessage(error.message())))));
}
public static DescribeQuorumResponseData singletonResponse(
TopicPartition topicPartition,
- DescribeQuorumResponseData.PartitionData partitionData
+ DescribeQuorumResponseData.PartitionData partitionData,
+ DescribeQuorumResponseData.NodeCollection nodes
) {
- return new DescribeQuorumResponseData()
+ DescribeQuorumResponseData res = new DescribeQuorumResponseData()
.setTopics(Collections.singletonList(new
DescribeQuorumResponseData.TopicData()
.setTopicName(topicPartition.topic())
.setPartitions(Collections.singletonList(partitionData
.setPartitionIndex(topicPartition.partition())))));
+
+ if (nodes != null)
+ res.setNodes(nodes);
+
+ return res;
}
public static DescribeQuorumResponse parse(ByteBuffer buffer, short
version) {
diff --git
a/clients/src/main/resources/common/message/DescribeQuorumRequest.json
b/clients/src/main/resources/common/message/DescribeQuorumRequest.json
index cee8fe69822..2faaefec736 100644
--- a/clients/src/main/resources/common/message/DescribeQuorumRequest.json
+++ b/clients/src/main/resources/common/message/DescribeQuorumRequest.json
@@ -19,8 +19,10 @@
"listeners": ["broker", "controller"],
"name": "DescribeQuorumRequest",
// Version 1 adds additional fields in the response. The request is
unchanged (KIP-836).
- "validVersions": "0-1",
+ // Version 2 adds additional fields in the response. The request is
unchanged (KIP-853).
+ "validVersions": "0-2",
"flexibleVersions": "0+",
+ "latestVersionUnstable": true, // Version 2 is still under development.
"fields": [
{ "name": "Topics", "type": "[]TopicData",
"versions": "0+", "fields": [
diff --git
a/clients/src/main/resources/common/message/DescribeQuorumResponse.json
b/clients/src/main/resources/common/message/DescribeQuorumResponse.json
index 0ea6271238b..e0be61781f5 100644
--- a/clients/src/main/resources/common/message/DescribeQuorumResponse.json
+++ b/clients/src/main/resources/common/message/DescribeQuorumResponse.json
@@ -18,11 +18,14 @@
"type": "response",
"name": "DescribeQuorumResponse",
// Version 1 adds LastFetchTimeStamp and LastCaughtUpTimestamp in
ReplicaState (KIP-836).
- "validVersions": "0-1",
+ // Version 2 adds ErrorMessage, Nodes, ErrorMessage in ParitionData,
ReplicaDirectoryId in ReplicaState (KIP-853).
+ "validVersions": "0-2",
"flexibleVersions": "0+",
"fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The top level error code."},
+ { "name": "ErrorMessage", "type": "string", "versions": "2+",
"nullableVersions": "2+", "ignorable": true,
+ "about": "The error message, or null if there was no error." },
{ "name": "Topics", "type": "[]TopicData",
"versions": "0+", "fields": [
{ "name": "TopicName", "type": "string", "versions": "0+", "entityType":
"topicName",
@@ -32,6 +35,8 @@
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+"},
+ { "name": "ErrorMessage", "type": "string", "versions": "2+",
"nullableVersions": "2+", "ignorable": true,
+ "about": "The error message, or null if there was no error." },
{ "name": "LeaderId", "type": "int32", "versions": "0+", "entityType":
"brokerId",
"about": "The ID of the current leader or -1 if the leader is
unknown."},
{ "name": "LeaderEpoch", "type": "int32", "versions": "0+",
@@ -40,10 +45,25 @@
{ "name": "CurrentVoters", "type": "[]ReplicaState", "versions": "0+"
},
{ "name": "Observers", "type": "[]ReplicaState", "versions": "0+" }
]}
- ]}],
+ ]},
+ { "name": "Nodes", "type": "[]Node", "versions": "2+", "fields": [
+ { "name": "NodeId", "type": "int32", "versions": "2+",
+ "mapKey": true, "entityType": "brokerId", "about": "The ID of the
associated node" },
+ { "name": "Listeners", "type": "[]Listener",
+ "about": "The listeners of this controller", "versions": "2+",
"fields": [
+ { "name": "Name", "type": "string", "versions": "2+", "mapKey": true,
+ "about": "The name of the endpoint" },
+ { "name": "Host", "type": "string", "versions": "2+",
+ "about": "The hostname" },
+ { "name": "Port", "type": "uint16", "versions": "2+",
+ "about": "The port" }
+ ]}
+ ]}
+ ],
"commonStructs": [
{ "name": "ReplicaState", "versions": "0+", "fields": [
{ "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType":
"brokerId" },
+ { "name": "ReplicaDirectoryId", "type": "uuid", "versions": "2+" },
{ "name": "LogEndOffset", "type": "int64", "versions": "0+",
"about": "The last known log end offset of the follower or -1 if it is
unknown"},
{ "name": "LastFetchTimestamp", "type": "int64", "versions": "1+",
"ignorable": true, "default": -1,
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index bd7c7d0b1ab..f7b27dcb868 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -269,6 +269,7 @@ import static java.util.Collections.emptyList;
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
import static
org.apache.kafka.clients.admin.KafkaAdminClient.DEFAULT_LEAVE_GROUP_REASON;
import static
org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse;
import static
org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse;
@@ -294,6 +295,7 @@ public class KafkaAdminClientTest {
private static final Logger log =
LoggerFactory.getLogger(KafkaAdminClientTest.class);
private static final String GROUP_ID = "group-0";
private static final int THROTTLE = 10;
+ public static final Uuid REPLICA_DIRECTORY_ID = Uuid.randomUuid();
@Test
public void testDefaultApiTimeoutAndRequestTimeoutConflicts() {
@@ -732,12 +734,17 @@ public class KafkaAdminClientTest {
private static QuorumInfo defaultQuorumInfo(boolean emptyOptionals) {
return new QuorumInfo(1, 1, 1L,
- singletonList(new QuorumInfo.ReplicaState(1, 100,
+ singletonList(new QuorumInfo.ReplicaState(1,
+ emptyOptionals ? Uuid.ZERO_UUID : REPLICA_DIRECTORY_ID,
+ 100,
emptyOptionals ? OptionalLong.empty() :
OptionalLong.of(1000),
emptyOptionals ? OptionalLong.empty() :
OptionalLong.of(1000))),
- singletonList(new QuorumInfo.ReplicaState(1, 100,
+ singletonList(new QuorumInfo.ReplicaState(1,
+ emptyOptionals ? Uuid.ZERO_UUID : REPLICA_DIRECTORY_ID,
+ 100,
emptyOptionals ? OptionalLong.empty() :
OptionalLong.of(1000),
- emptyOptionals ? OptionalLong.empty() :
OptionalLong.of(1000))));
+ emptyOptionals ? OptionalLong.empty() :
OptionalLong.of(1000))),
+ singletonMap(1, new QuorumInfo.Node(1,
Collections.emptyList())));
}
private static DescribeQuorumResponse prepareDescribeQuorumResponse(
@@ -755,6 +762,7 @@ public class KafkaAdminClientTest {
for (int i = 0; i < (partitionCountError ? 2 : 1); i++) {
DescribeQuorumResponseData.ReplicaState replica = new
DescribeQuorumResponseData.ReplicaState()
.setReplicaId(1)
+ .setReplicaDirectoryId(emptyOptionals ? Uuid.ZERO_UUID :
REPLICA_DIRECTORY_ID)
.setLogEndOffset(100);
replica.setLastFetchTimestamp(emptyOptionals ? -1 : 1000);
replica.setLastCaughtUpTimestamp(emptyOptionals ? -1 : 1000);
@@ -764,12 +772,17 @@ public class KafkaAdminClientTest {
.setHighWatermark(1)
.setCurrentVoters(singletonList(replica))
.setObservers(singletonList(replica))
- .setErrorCode(partitionLevelError.code()));
+ .setErrorCode(partitionLevelError.code())
+ .setErrorMessage(partitionLevelError.message()));
}
for (int i = 0; i < (topicCountError ? 2 : 1); i++) {
topics.add(new
DescribeQuorumResponseData.TopicData().setTopicName(topicName).setPartitions(partitions));
}
- return new DescribeQuorumResponse(new
DescribeQuorumResponseData().setTopics(topics).setErrorCode(topLevelError.code()));
+ return new DescribeQuorumResponse(new DescribeQuorumResponseData()
+ .setTopics(topics)
+ .setErrorCode(topLevelError.code())
+ .setErrorMessage(topLevelError.message())
+ .setNodes(new
DescribeQuorumResponseData.NodeCollection(Collections.singleton(new
DescribeQuorumResponseData.Node().setNodeId(1)).iterator())));
}
/**
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
index 98a66eccd5a..b71ded0cb2a 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
@@ -115,7 +115,7 @@ class ApiVersionsRequestTest(cluster: ClusterInstance)
extends AbstractApiVersio
def testApiVersionsRequestThroughControllerListener(): Unit = {
val request = new ApiVersionsRequest.Builder().build()
val apiVersionsResponse = sendApiVersionsRequest(request,
cluster.controllerListenerName.get())
- validateApiVersionsResponse(apiVersionsResponse,
cluster.controllerListenerName.get())
+ validateApiVersionsResponse(apiVersionsResponse,
cluster.controllerListenerName.get(), enableUnstableLastVersion = true)
}
@ClusterTemplate("zkApiVersionsRequest")
@@ -153,7 +153,7 @@ class ApiVersionsRequestTest(cluster: ClusterInstance)
extends AbstractApiVersio
def testApiVersionsRequestValidationV0ThroughControllerListener(): Unit = {
val apiVersionsRequest = new
ApiVersionsRequest.Builder().build(0.asInstanceOf[Short])
val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest,
cluster.controllerListenerName.get())
- validateApiVersionsResponse(apiVersionsResponse,
cluster.controllerListenerName.get(), apiVersion = 0)
+ validateApiVersionsResponse(apiVersionsResponse,
cluster.controllerListenerName.get(), apiVersion = 0, enableUnstableLastVersion
= true)
}
@ClusterTemplate("zkApiVersionsRequest")
diff --git
a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
index e8d94e3bb65..622dcf366a1 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
@@ -60,6 +60,7 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
val response = connectAndReceive[DescribeQuorumResponse](request)
assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
+ assertEquals("", response.data.errorMessage)
assertEquals(1, response.data.topics.size)
val topicData = response.data.topics.get(0)
@@ -69,6 +70,7 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
val partitionData = topicData.partitions.get(0)
assertEquals(KafkaRaftServer.MetadataPartition.partition,
partitionData.partitionIndex)
assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
+ assertEquals("", partitionData.errorMessage())
assertTrue(partitionData.leaderEpoch > 0)
val leaderId = partitionData.leaderId
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 208f9d59e1b..041d04c21f6 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -615,7 +615,7 @@ class KafkaApisTest extends Logging {
def testDescribeQuorumNotAllowedForZkClusters(): Unit = {
val requestData =
DescribeQuorumRequest.singletonRequest(KafkaRaftServer.MetadataPartition)
val requestBuilder = new DescribeQuorumRequest.Builder(requestData)
- val request = buildRequest(requestBuilder.build())
+ val request =
buildRequest(requestBuilder.build(DescribeQuorumRequestData.HIGHEST_SUPPORTED_VERSION))
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
any[Long])).thenReturn(0)
@@ -624,6 +624,7 @@ class KafkaApisTest extends Logging {
val response = verifyNoThrottling[DescribeQuorumResponse](request)
assertEquals(Errors.UNKNOWN_SERVER_ERROR,
Errors.forCode(response.data.errorCode))
+ assertEquals(Errors.UNKNOWN_SERVER_ERROR.message(),
response.data.errorMessage)
}
@Test
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index aa07ce89203..eb29f7321ff 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -314,7 +314,7 @@ final public class KafkaRaftClient<T> implements
RaftClient<T> {
) {
final LogOffsetMetadata endOffsetMetadata = log.endOffset();
- if (state.updateLocalState(endOffsetMetadata,
partitionState.lastVoterSet().voterIds())) {
+ if (state.updateLocalState(endOffsetMetadata,
partitionState.lastVoterSet().voters())) {
onUpdateLeaderHighWatermark(state, currentTimeMs);
}
@@ -1156,7 +1156,7 @@ final public class KafkaRaftClient<T> implements
RaftClient<T> {
if (validOffsetAndEpoch.kind() == ValidOffsetAndEpoch.Kind.VALID) {
LogFetchInfo info = log.read(fetchOffset,
Isolation.UNCOMMITTED);
- if (state.updateReplicaState(replicaId, currentTimeMs,
info.startOffsetMetadata)) {
+ if (state.updateReplicaState(replicaId, Uuid.ZERO_UUID,
currentTimeMs, info.startOffsetMetadata)) {
onUpdateLeaderHighWatermark(state, currentTimeMs);
}
@@ -1353,7 +1353,10 @@ final public class KafkaRaftClient<T> implements
RaftClient<T> {
LeaderState<T> leaderState = quorum.leaderStateOrThrow();
return DescribeQuorumResponse.singletonResponse(
log.topicPartition(),
- leaderState.describeQuorum(currentTimeMs)
+ leaderState.describeQuorum(currentTimeMs),
+ requestMetadata.apiVersion() <
DescribeQuorumResponseData.Node.LOWEST_SUPPORTED_VERSION
+ ? null
+ : leaderState.nodes(currentTimeMs)
);
}
@@ -1894,6 +1897,7 @@ final public class KafkaRaftClient<T> implements
RaftClient<T> {
RaftRequest.Outbound requestMessage = new RaftRequest.Outbound(
correlationId,
+ request.highestSupportedVersion(),
request,
destination,
currentTimeMs
diff --git
a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClientDriver.java
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClientDriver.java
index 01edf845c60..67fa3eff930 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClientDriver.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClientDriver.java
@@ -106,6 +106,7 @@ public class KafkaRaftClientDriver<T> extends
ShutdownableThread {
) {
RaftRequest.Inbound inboundRequest = new RaftRequest.Inbound(
header.correlationId(),
+ header.apiVersion(),
request,
createdTimeMs
);
diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
index 13a6d106345..fcb4a5950bc 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
@@ -16,9 +16,10 @@
*/
package org.apache.kafka.raft;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.DescribeQuorumResponseData;
-import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
import org.apache.kafka.common.message.LeaderChangeMessage;
+import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.ControlRecordUtils;
import org.apache.kafka.common.utils.LogContext;
@@ -26,9 +27,11 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.raft.internals.BatchAccumulator;
import org.apache.kafka.raft.internals.ReplicaKey;
+import org.apache.kafka.raft.internals.VoterSet;
import org.slf4j.Logger;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -51,6 +54,7 @@ public class LeaderState<T> implements EpochState {
static final double CHECK_QUORUM_TIMEOUT_FACTOR = 1.5;
private final int localId;
+ private final Uuid localDirectoryId;
private final int epoch;
private final long epochStartOffset;
private final Set<Integer> grantingVoters;
@@ -71,21 +75,23 @@ public class LeaderState<T> implements EpochState {
protected LeaderState(
Time time,
int localId,
+ Uuid localDirectoryId,
int epoch,
long epochStartOffset,
- Set<Integer> voters,
+ Map<Integer, VoterSet.VoterNode> voters,
Set<Integer> grantingVoters,
BatchAccumulator<T> accumulator,
int fetchTimeoutMs,
LogContext logContext
) {
this.localId = localId;
+ this.localDirectoryId = localDirectoryId;
this.epoch = epoch;
this.epochStartOffset = epochStartOffset;
- for (int voterId : voters) {
- boolean hasAcknowledgedLeader = voterId == localId;
- this.voterStates.put(voterId, new ReplicaState(voterId,
hasAcknowledgedLeader));
+ for (Map.Entry<Integer, VoterSet.VoterNode> voter : voters.entrySet())
{
+ boolean hasAcknowledgedLeader = voter.getKey() == localId;
+ this.voterStates.put(voter.getKey(), new
ReplicaState(voter.getKey(), voter.getValue().voterKey().directoryId(),
hasAcknowledgedLeader));
}
this.grantingVoters = Collections.unmodifiableSet(new
HashSet<>(grantingVoters));
this.log = logContext.logger(LeaderState.class);
@@ -200,6 +206,10 @@ public class LeaderState<T> implements EpochState {
return localId;
}
+ public Uuid localDirectoryId() {
+ return localDirectoryId;
+ }
+
public Set<Integer> nonAcknowledgingVoters() {
Set<Integer> nonAcknowledging = new HashSet<>();
for (ReplicaState state : voterStates.values()) {
@@ -298,14 +308,14 @@ public class LeaderState<T> implements EpochState {
* Update the local replica state.
*
* @param endOffsetMetadata updated log end offset of local replica
- * @param lastVoterSet the up-to-date voter set
+ * @param lastVoters the up-to-date voter set
* @return true if the high watermark is updated as a result of this call
*/
public boolean updateLocalState(
LogOffsetMetadata endOffsetMetadata,
- Set<Integer> lastVoterSet
+ Map<Integer, VoterSet.VoterNode> lastVoters
) {
- ReplicaState state = getOrCreateReplicaState(localId);
+ ReplicaState state = getOrCreateReplicaState(localId,
localDirectoryId);
state.endOffset.ifPresent(currentEndOffset -> {
if (currentEndOffset.offset > endOffsetMetadata.offset) {
throw new IllegalStateException("Detected non-monotonic update
of local " +
@@ -313,7 +323,7 @@ public class LeaderState<T> implements EpochState {
}
});
state.updateLeaderEndOffset(endOffsetMetadata);
- updateVoterAndObserverStates(lastVoterSet);
+ updateVoterAndObserverStates(lastVoters);
return maybeUpdateHighWatermark();
}
@@ -321,12 +331,14 @@ public class LeaderState<T> implements EpochState {
* Update the replica state in terms of fetch time and log end offsets.
*
* @param replicaId replica id
+ * @param replicaDirectoryId replica directory id
* @param currentTimeMs current time in milliseconds
* @param fetchOffsetMetadata new log offset and metadata
* @return true if the high watermark is updated as a result of this call
*/
public boolean updateReplicaState(
int replicaId,
+ Uuid replicaDirectoryId,
long currentTimeMs,
LogOffsetMetadata fetchOffsetMetadata
) {
@@ -338,7 +350,7 @@ public class LeaderState<T> implements EpochState {
throw new IllegalStateException("Remote replica ID " + replicaId +
" matches the local leader ID");
}
- ReplicaState state = getOrCreateReplicaState(replicaId);
+ ReplicaState state = getOrCreateReplicaState(replicaId,
replicaDirectoryId);
state.endOffset.ifPresent(currentEndOffset -> {
if (currentEndOffset.offset > fetchOffsetMetadata.offset) {
@@ -346,7 +358,7 @@ public class LeaderState<T> implements EpochState {
state.nodeId, currentEndOffset.offset,
fetchOffsetMetadata.offset);
}
});
- Optional<LogOffsetMetadata> leaderEndOffsetOpt =
getOrCreateReplicaState(localId).endOffset;
+ Optional<LogOffsetMetadata> leaderEndOffsetOpt =
getOrCreateReplicaState(localId, localDirectoryId).endOffset;
state.updateFollowerState(
currentTimeMs,
@@ -387,10 +399,10 @@ public class LeaderState<T> implements EpochState {
return epochStartOffset;
}
- private ReplicaState getOrCreateReplicaState(int remoteNodeId) {
+ private ReplicaState getOrCreateReplicaState(int remoteNodeId, Uuid
remoteNodeDirectory) {
ReplicaState state = voterStates.get(remoteNodeId);
if (state == null) {
- observerStates.putIfAbsent(remoteNodeId, new
ReplicaState(remoteNodeId, false));
+ observerStates.putIfAbsent(remoteNodeId, new
ReplicaState(remoteNodeId, Optional.of(remoteNodeDirectory), false));
return observerStates.get(remoteNodeId);
}
return state;
@@ -408,6 +420,29 @@ public class LeaderState<T> implements EpochState {
.setObservers(describeReplicaStates(observerStates,
currentTimeMs));
}
+ public DescribeQuorumResponseData.NodeCollection nodes(long currentTimeMs)
{
+ clearInactiveObservers(currentTimeMs);
+
+ return nodes(voterStates.values(), observerStates.values());
+ }
+
+ private static DescribeQuorumResponseData.NodeCollection
nodes(Collection<ReplicaState> voters, Collection<ReplicaState> observers) {
+ DescribeQuorumResponseData.NodeCollection res = new
DescribeQuorumResponseData.NodeCollection();
+
+ voters.forEach(replicaState -> node(res, replicaState));
+ observers.forEach(replicaState -> node(res, replicaState));
+
+ return res;
+ }
+
+ private static void node(DescribeQuorumResponseData.NodeCollection res,
ReplicaState replicaState) {
+ if (res.find(replicaState.nodeId) != null) {
+ return;
+ }
+
+ res.add(new
DescribeQuorumResponseData.Node().setNodeId(replicaState.nodeId));
+ }
+
private List<DescribeQuorumResponseData.ReplicaState>
describeReplicaStates(
Map<Integer, ReplicaState> state,
long currentTimeMs
@@ -432,6 +467,7 @@ public class LeaderState<T> implements EpochState {
}
return new DescribeQuorumResponseData.ReplicaState()
.setReplicaId(replicaState.nodeId)
+
.setReplicaDirectoryId(replicaState.nodeDirectory.orElse(Uuid.ZERO_UUID))
.setLogEndOffset(replicaState.endOffset.map(md ->
md.offset).orElse(-1L))
.setLastCaughtUpTimestamp(lastCaughtUpTimestamp)
.setLastFetchTimestamp(lastFetchTimestamp);
@@ -452,11 +488,11 @@ public class LeaderState<T> implements EpochState {
return voterStates.containsKey(remoteNodeId);
}
- private void updateVoterAndObserverStates(Set<Integer> lastVoterSet) {
+ private void updateVoterAndObserverStates(Map<Integer, VoterSet.VoterNode>
lastVoters) {
// Move any replica that is not in the last voter set from voterStates
to observerStates
for (Iterator<Map.Entry<Integer, ReplicaState>> iter =
voterStates.entrySet().iterator(); iter.hasNext(); ) {
Map.Entry<Integer, ReplicaState> replica = iter.next();
- if (!lastVoterSet.contains(replica.getKey())) {
+ if (!lastVoters.containsKey(replica.getKey())) {
observerStates.put(replica.getKey(), replica.getValue());
iter.remove();
}
@@ -464,24 +500,26 @@ public class LeaderState<T> implements EpochState {
// Add replicas that are in the last voter set and not in voterStates
to voterStates (from observerStates
// if they exist)
- for (int voterId : lastVoterSet) {
- if (!voterStates.containsKey(voterId)) {
- Optional<ReplicaState> existingObserverState =
Optional.ofNullable(observerStates.remove(voterId));
- voterStates.put(voterId, existingObserverState.orElse(new
ReplicaState(voterId, false)));
+ for (Map.Entry<Integer, VoterSet.VoterNode> voter :
lastVoters.entrySet()) {
+ if (!voterStates.containsKey(voter.getKey())) {
+ Optional<ReplicaState> existingObserverState =
Optional.ofNullable(observerStates.remove(voter.getKey()));
+ voterStates.put(voter.getKey(),
existingObserverState.orElse(new ReplicaState(voter.getKey(),
voter.getValue().voterKey().directoryId(), false)));
}
}
}
private static class ReplicaState implements Comparable<ReplicaState> {
final int nodeId;
+ final Optional<Uuid> nodeDirectory;
Optional<LogOffsetMetadata> endOffset;
long lastFetchTimestamp;
long lastFetchLeaderLogEndOffset;
long lastCaughtUpTimestamp;
boolean hasAcknowledgedLeader;
- public ReplicaState(int nodeId, boolean hasAcknowledgedLeader) {
+ public ReplicaState(int nodeId, Optional<Uuid> nodeDirectory, boolean
hasAcknowledgedLeader) {
this.nodeId = nodeId;
+ this.nodeDirectory = nodeDirectory;
this.endOffset = Optional.empty();
this.lastFetchTimestamp = -1;
this.lastFetchLeaderLogEndOffset = -1;
@@ -535,9 +573,10 @@ public class LeaderState<T> implements EpochState {
@Override
public String toString() {
return String.format(
- "ReplicaState(nodeId=%d, endOffset=%s, lastFetchTimestamp=%s,
" +
+ "ReplicaState(nodeId=%d, nodeDirectoryId=%s, endOffset=%s,
lastFetchTimestamp=%s, " +
"lastCaughtUpTimestamp=%s, hasAcknowledgedLeader=%s)",
nodeId,
+ nodeDirectory,
endOffset,
lastFetchTimestamp,
lastCaughtUpTimestamp,
@@ -559,8 +598,9 @@ public class LeaderState<T> implements EpochState {
@Override
public String toString() {
return String.format(
- "Leader(localId=%d, epoch=%d, epochStartOffset=%d,
highWatermark=%s, voterStates=%s)",
+ "Leader(localId=%d, localDirectoryId=%s, epoch=%d,
epochStartOffset=%d, highWatermark=%s, voterStates=%s)",
localId,
+ localDirectoryId,
epoch,
epochStartOffset,
highWatermark,
diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
index b9b17c5f99b..b46d520a620 100644
--- a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
@@ -518,9 +518,10 @@ public class QuorumState {
LeaderState<T> state = new LeaderState<>(
time,
localIdOrThrow(),
+ localDirectoryId(),
epoch(),
epochStartOffset,
- latestVoterSet.get().voterIds(),
+ latestVoterSet.get().voters(),
candidateState.grantingVoters(),
accumulator,
fetchTimeoutMs,
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftMessage.java
b/raft/src/main/java/org/apache/kafka/raft/RaftMessage.java
index f50ec902e0e..a3e71ed1cc1 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftMessage.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftMessage.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.protocol.ApiMessage;
public interface RaftMessage {
int correlationId();
- ApiMessage data();
+ short apiVersion();
+ ApiMessage data();
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftRequest.java
b/raft/src/main/java/org/apache/kafka/raft/RaftRequest.java
index bf590f56ab1..ba5bfefe211 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftRequest.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftRequest.java
@@ -23,11 +23,13 @@ import java.util.concurrent.CompletableFuture;
public abstract class RaftRequest implements RaftMessage {
private final int correlationId;
+ private final short apiVersion;
private final ApiMessage data;
private final long createdTimeMs;
- public RaftRequest(int correlationId, ApiMessage data, long createdTimeMs)
{
+ public RaftRequest(int correlationId, short apiVersion, ApiMessage data,
long createdTimeMs) {
this.correlationId = correlationId;
+ this.apiVersion = apiVersion;
this.data = data;
this.createdTimeMs = createdTimeMs;
}
@@ -37,6 +39,11 @@ public abstract class RaftRequest implements RaftMessage {
return correlationId;
}
+ @Override
+ public short apiVersion() {
+ return apiVersion;
+ }
+
@Override
public ApiMessage data() {
return data;
@@ -49,8 +56,8 @@ public abstract class RaftRequest implements RaftMessage {
public final static class Inbound extends RaftRequest {
public final CompletableFuture<RaftResponse.Outbound> completion = new
CompletableFuture<>();
- public Inbound(int correlationId, ApiMessage data, long createdTimeMs)
{
- super(correlationId, data, createdTimeMs);
+ public Inbound(int correlationId, short apiVersion, ApiMessage data,
long createdTimeMs) {
+ super(correlationId, apiVersion, data, createdTimeMs);
}
@Override
@@ -68,8 +75,8 @@ public abstract class RaftRequest implements RaftMessage {
private final Node destination;
public final CompletableFuture<RaftResponse.Inbound> completion = new
CompletableFuture<>();
- public Outbound(int correlationId, ApiMessage data, Node destination,
long createdTimeMs) {
- super(correlationId, data, createdTimeMs);
+ public Outbound(int correlationId, short apiVersion, ApiMessage data,
Node destination, long createdTimeMs) {
+ super(correlationId, apiVersion, data, createdTimeMs);
this.destination = destination;
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftResponse.java
b/raft/src/main/java/org/apache/kafka/raft/RaftResponse.java
index 9c5047ca92d..fb0f22f2f4b 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftResponse.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftResponse.java
@@ -33,6 +33,11 @@ public abstract class RaftResponse implements RaftMessage {
return correlationId;
}
+ @Override
+ public short apiVersion() {
+ return data().highestSupportedVersion();
+ }
+
@Override
public ApiMessage data() {
return data;
diff --git
a/raft/src/main/java/org/apache/kafka/raft/internals/BlockingMessageQueue.java
b/raft/src/main/java/org/apache/kafka/raft/internals/BlockingMessageQueue.java
index 9343cca8d47..216a67640c9 100644
---
a/raft/src/main/java/org/apache/kafka/raft/internals/BlockingMessageQueue.java
+++
b/raft/src/main/java/org/apache/kafka/raft/internals/BlockingMessageQueue.java
@@ -33,6 +33,11 @@ public class BlockingMessageQueue implements
RaftMessageQueue {
return 0;
}
+ @Override
+ public short apiVersion() {
+ return 0;
+ }
+
@Override
public ApiMessage data() {
return null;
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java
b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java
index 393cb373b31..840d7b89931 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java
@@ -138,6 +138,10 @@ final public class VoterSet {
return voters.keySet();
}
+ public Map<Integer, VoterNode> voters() {
+ return voters;
+ }
+
/**
* Adds a voter to the voter set.
*
@@ -276,7 +280,7 @@ final public class VoterSet {
private final Map<ListenerName, InetSocketAddress> listeners;
private final SupportedVersionRange supportedKRaftVersion;
- VoterNode(
+ public VoterNode(
ReplicaKey voterKey,
Map<ListenerName, InetSocketAddress> listeners,
SupportedVersionRange supportedKRaftVersion
diff --git
a/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java
index 2455990e770..6b013bac462 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java
@@ -232,6 +232,7 @@ public class KafkaNetworkChannelTest {
ApiMessage apiRequest = buildTestRequest(apiKey);
RaftRequest.Outbound request = new RaftRequest.Outbound(
correlationId,
+ apiRequest.highestSupportedVersion(),
apiRequest,
destination,
createdTimeMs
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 00588a42305..3844725ec12 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.raft;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.memory.MemoryPool;
@@ -2313,6 +2314,7 @@ public class KafkaRaftClientTest {
DescribeQuorumResponseData responseData =
context.collectDescribeQuorumResponse();
assertEquals(Errors.NONE, Errors.forCode(responseData.errorCode()));
+ assertEquals("", responseData.errorMessage());
assertEquals(1, responseData.topics().size());
DescribeQuorumResponseData.TopicData topicData =
responseData.topics().get(0);
@@ -2322,6 +2324,7 @@ public class KafkaRaftClientTest {
DescribeQuorumResponseData.PartitionData partitionData =
topicData.partitions().get(0);
assertEquals(context.metadataPartition.partition(),
partitionData.partitionIndex());
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER,
Errors.forCode(partitionData.errorCode()));
+ assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.message(),
partitionData.errorMessage());
}
@Test
@@ -2364,6 +2367,7 @@ public class KafkaRaftClientTest {
Arrays.asList(
new ReplicaState()
.setReplicaId(localId)
+ .setReplicaDirectoryId(Uuid.ZERO_UUID)
// As we are appending the records directly to the log,
// the leader end offset hasn't been updated yet.
.setLogEndOffset(3L)
@@ -2371,17 +2375,20 @@ public class KafkaRaftClientTest {
.setLastCaughtUpTimestamp(context.time.milliseconds()),
new ReplicaState()
.setReplicaId(laggingFollower)
+ .setReplicaDirectoryId(Uuid.ZERO_UUID)
.setLogEndOffset(1L)
.setLastFetchTimestamp(laggingFollowerFetchTime)
.setLastCaughtUpTimestamp(laggingFollowerFetchTime),
new ReplicaState()
.setReplicaId(closeFollower)
+ .setReplicaDirectoryId(Uuid.ZERO_UUID)
.setLogEndOffset(3L)
.setLastFetchTimestamp(closeFollowerFetchTime)
.setLastCaughtUpTimestamp(closeFollowerFetchTime)),
singletonList(
new ReplicaState()
.setReplicaId(observerId)
+ .setReplicaDirectoryId(Uuid.ZERO_UUID)
.setLogEndOffset(0L)
.setLastFetchTimestamp(observerFetchTime)
.setLastCaughtUpTimestamp(-1L)));
diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
index 6d31154ccf2..20b9e0b3f29 100644
--- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
@@ -16,12 +16,13 @@
*/
package org.apache.kafka.raft;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.DescribeQuorumResponseData;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.raft.internals.BatchAccumulator;
import org.apache.kafka.raft.internals.ReplicaKey;
+import org.apache.kafka.raft.internals.VoterSet;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -29,10 +30,14 @@ import org.mockito.Mockito;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
@@ -45,6 +50,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class LeaderStateTest {
private final int localId = 0;
+ private final Uuid localDirectoryId = Uuid.randomUuid();
private final int epoch = 5;
private final LogContext logContext = new LogContext();
private final BatchAccumulator<?> accumulator =
Mockito.mock(BatchAccumulator.class);
@@ -59,9 +65,10 @@ public class LeaderStateTest {
return new LeaderState<>(
time,
localId,
+ Uuid.randomUuid(),
epoch,
epochStartOffset,
- voters,
+ toMap(voters),
voters,
accumulator,
fetchTimeoutMs,
@@ -74,9 +81,10 @@ public class LeaderStateTest {
assertThrows(NullPointerException.class, () -> new LeaderState<>(
new MockTime(),
localId,
+ Uuid.randomUuid(),
epoch,
0,
- Collections.emptySet(),
+ Collections.emptyMap(),
Collections.emptySet(),
null,
fetchTimeoutMs,
@@ -108,12 +116,12 @@ public class LeaderStateTest {
Set<Integer> voterSet = singleton(localId);
LeaderState<?> state = newLeaderState(voterSet, 15L);
assertEquals(Optional.empty(), state.highWatermark());
- assertFalse(state.updateLocalState(new LogOffsetMetadata(15L),
voterSet));
+ assertFalse(state.updateLocalState(new LogOffsetMetadata(15L),
toMap(voterSet)));
assertEquals(emptySet(), state.nonAcknowledgingVoters());
assertEquals(Optional.empty(), state.highWatermark());
- assertTrue(state.updateLocalState(new LogOffsetMetadata(16L),
voterSet));
+ assertTrue(state.updateLocalState(new LogOffsetMetadata(16L),
toMap(voterSet)));
assertEquals(Optional.of(new LogOffsetMetadata(16L)),
state.highWatermark());
- assertTrue(state.updateLocalState(new LogOffsetMetadata(20),
voterSet));
+ assertTrue(state.updateLocalState(new LogOffsetMetadata(20),
toMap(voterSet)));
assertEquals(Optional.of(new LogOffsetMetadata(20L)),
state.highWatermark());
}
@@ -122,10 +130,10 @@ public class LeaderStateTest {
Set<Integer> voterSet = singleton(localId);
LeaderState<?> state = newLeaderState(voterSet, 15L);
assertEquals(Optional.empty(), state.highWatermark());
- assertTrue(state.updateLocalState(new LogOffsetMetadata(16L),
voterSet));
+ assertTrue(state.updateLocalState(new LogOffsetMetadata(16L),
toMap(voterSet)));
assertEquals(Optional.of(new LogOffsetMetadata(16L)),
state.highWatermark());
assertThrows(IllegalStateException.class,
- () -> state.updateLocalState(new LogOffsetMetadata(15L),
voterSet));
+ () -> state.updateLocalState(new LogOffsetMetadata(15L),
toMap(voterSet)));
}
@Test
@@ -138,42 +146,42 @@ public class LeaderStateTest {
Set<Integer> voterSet = mkSet(localId, node1, node2);
LeaderState<?> state = newLeaderState(voterSet, 10L);
assertEquals(Optional.empty(), state.highWatermark());
- assertFalse(state.updateLocalState(new LogOffsetMetadata(10L),
voterSet));
+ assertFalse(state.updateLocalState(new LogOffsetMetadata(10L),
toMap(voterSet)));
assertEquals(mkSet(node1, node2), state.nonAcknowledgingVoters());
assertEquals(Optional.empty(), state.highWatermark());
// Node 1 falls behind
- assertFalse(state.updateLocalState(new LogOffsetMetadata(11L),
voterSet));
- assertFalse(state.updateReplicaState(node1, ++fetchTime, new
LogOffsetMetadata(10L)));
+ assertFalse(state.updateLocalState(new LogOffsetMetadata(11L),
toMap(voterSet)));
+ assertFalse(state.updateReplicaState(node1, Uuid.randomUuid(),
++fetchTime, new LogOffsetMetadata(10L)));
assertEquals(currentTime, describeVoterState(state, localId,
currentTime).lastCaughtUpTimestamp());
assertEquals(caughtUpTime, describeVoterState(state, node1,
currentTime).lastCaughtUpTimestamp());
// Node 1 catches up to leader
- assertTrue(state.updateReplicaState(node1, ++fetchTime, new
LogOffsetMetadata(11L)));
+ assertTrue(state.updateReplicaState(node1, Uuid.randomUuid(),
++fetchTime, new LogOffsetMetadata(11L)));
caughtUpTime = fetchTime;
assertEquals(currentTime, describeVoterState(state, localId,
currentTime).lastCaughtUpTimestamp());
assertEquals(caughtUpTime, describeVoterState(state, node1,
currentTime).lastCaughtUpTimestamp());
// Node 1 falls behind
- assertFalse(state.updateLocalState(new LogOffsetMetadata(100L),
voterSet));
- assertTrue(state.updateReplicaState(node1, ++fetchTime, new
LogOffsetMetadata(50L)));
+ assertFalse(state.updateLocalState(new LogOffsetMetadata(100L),
toMap(voterSet)));
+ assertTrue(state.updateReplicaState(node1, Uuid.randomUuid(),
++fetchTime, new LogOffsetMetadata(50L)));
assertEquals(currentTime, describeVoterState(state, localId,
currentTime).lastCaughtUpTimestamp());
assertEquals(caughtUpTime, describeVoterState(state, node1,
currentTime).lastCaughtUpTimestamp());
// Node 1 catches up to the last fetch offset
int prevFetchTime = fetchTime;
- assertFalse(state.updateLocalState(new LogOffsetMetadata(200L),
voterSet));
- assertTrue(state.updateReplicaState(node1, ++fetchTime, new
LogOffsetMetadata(100L)));
+ assertFalse(state.updateLocalState(new LogOffsetMetadata(200L),
toMap(voterSet)));
+ assertTrue(state.updateReplicaState(node1, Uuid.randomUuid(),
++fetchTime, new LogOffsetMetadata(100L)));
caughtUpTime = prevFetchTime;
assertEquals(currentTime, describeVoterState(state, localId,
currentTime).lastCaughtUpTimestamp());
assertEquals(caughtUpTime, describeVoterState(state, node1,
currentTime).lastCaughtUpTimestamp());
// Node2 has never caught up to leader
assertEquals(-1L, describeVoterState(state, node2,
currentTime).lastCaughtUpTimestamp());
- assertFalse(state.updateLocalState(new LogOffsetMetadata(300L),
voterSet));
- assertTrue(state.updateReplicaState(node2, ++fetchTime, new
LogOffsetMetadata(200L)));
+ assertFalse(state.updateLocalState(new LogOffsetMetadata(300L),
toMap(voterSet)));
+ assertTrue(state.updateReplicaState(node2, Uuid.randomUuid(),
++fetchTime, new LogOffsetMetadata(200L)));
assertEquals(-1L, describeVoterState(state, node2,
currentTime).lastCaughtUpTimestamp());
- assertTrue(state.updateReplicaState(node2, ++fetchTime, new
LogOffsetMetadata(250L)));
+ assertTrue(state.updateReplicaState(node2, Uuid.randomUuid(),
++fetchTime, new LogOffsetMetadata(250L)));
assertEquals(-1L, describeVoterState(state, node2,
currentTime).lastCaughtUpTimestamp());
}
@@ -189,33 +197,33 @@ public class LeaderStateTest {
assertEquals(emptySet(), state.nonAcknowledgingVoters());
// Node 1 falls behind
- assertTrue(state.updateLocalState(new LogOffsetMetadata(11L),
voterSet));
- assertFalse(state.updateReplicaState(node1, ++fetchTime, new
LogOffsetMetadata(10L)));
+ assertTrue(state.updateLocalState(new LogOffsetMetadata(11L),
toMap(voterSet)));
+ assertFalse(state.updateReplicaState(node1, Uuid.randomUuid(),
++fetchTime, new LogOffsetMetadata(10L)));
assertEquals(currentTime, describeVoterState(state, localId,
currentTime).lastCaughtUpTimestamp());
assertEquals(caughtUpTime, describeObserverState(state, node1,
currentTime).lastCaughtUpTimestamp());
// Node 1 catches up to leader
- assertFalse(state.updateReplicaState(node1, ++fetchTime, new
LogOffsetMetadata(11L)));
+ assertFalse(state.updateReplicaState(node1, Uuid.randomUuid(),
++fetchTime, new LogOffsetMetadata(11L)));
caughtUpTime = fetchTime;
assertEquals(currentTime, describeVoterState(state, localId,
currentTime).lastCaughtUpTimestamp());
assertEquals(caughtUpTime, describeObserverState(state, node1,
currentTime).lastCaughtUpTimestamp());
// Node 1 falls behind
- assertTrue(state.updateLocalState(new LogOffsetMetadata(100L),
voterSet));
- assertFalse(state.updateReplicaState(node1, ++fetchTime, new
LogOffsetMetadata(50L)));
+ assertTrue(state.updateLocalState(new LogOffsetMetadata(100L),
toMap(voterSet)));
+ assertFalse(state.updateReplicaState(node1, Uuid.randomUuid(),
++fetchTime, new LogOffsetMetadata(50L)));
assertEquals(currentTime, describeVoterState(state, localId,
currentTime).lastCaughtUpTimestamp());
assertEquals(caughtUpTime, describeObserverState(state, node1,
currentTime).lastCaughtUpTimestamp());
// Node 1 catches up to the last fetch offset
int prevFetchTime = fetchTime;
- assertTrue(state.updateLocalState(new LogOffsetMetadata(200L),
voterSet));
- assertFalse(state.updateReplicaState(node1, ++fetchTime, new
LogOffsetMetadata(102L)));
+ assertTrue(state.updateLocalState(new LogOffsetMetadata(200L),
toMap(voterSet)));
+ assertFalse(state.updateReplicaState(node1, Uuid.randomUuid(),
++fetchTime, new LogOffsetMetadata(102L)));
caughtUpTime = prevFetchTime;
assertEquals(currentTime, describeVoterState(state, localId,
currentTime).lastCaughtUpTimestamp());
assertEquals(caughtUpTime, describeObserverState(state, node1,
currentTime).lastCaughtUpTimestamp());
// Node 1 catches up to leader
- assertFalse(state.updateReplicaState(node1, ++fetchTime, new
LogOffsetMetadata(200L)));
+ assertFalse(state.updateReplicaState(node1, Uuid.randomUuid(),
++fetchTime, new LogOffsetMetadata(200L)));
caughtUpTime = fetchTime;
assertEquals(currentTime, describeVoterState(state, localId,
currentTime).lastCaughtUpTimestamp());
assertEquals(caughtUpTime, describeObserverState(state, node1,
currentTime).lastCaughtUpTimestamp());
@@ -226,8 +234,8 @@ public class LeaderStateTest {
Set<Integer> voterSet = singleton(localId);
LeaderState<?> state = newLeaderState(voterSet, 15L);
assertEquals(Optional.empty(), state.highWatermark());
- assertTrue(state.updateLocalState(new LogOffsetMetadata(16L),
voterSet));
- assertFalse(state.updateLocalState(new LogOffsetMetadata(16L),
voterSet));
+ assertTrue(state.updateLocalState(new LogOffsetMetadata(16L),
toMap(voterSet)));
+ assertFalse(state.updateLocalState(new LogOffsetMetadata(16L),
toMap(voterSet)));
assertEquals(Optional.of(new LogOffsetMetadata(16L)),
state.highWatermark());
}
@@ -238,11 +246,11 @@ public class LeaderStateTest {
assertEquals(Optional.empty(), state.highWatermark());
LogOffsetMetadata initialHw = new LogOffsetMetadata(16L,
Optional.of(new MockOffsetMetadata("bar")));
- assertTrue(state.updateLocalState(initialHw, voterSet));
+ assertTrue(state.updateLocalState(initialHw, toMap(voterSet)));
assertEquals(Optional.of(initialHw), state.highWatermark());
LogOffsetMetadata updateHw = new LogOffsetMetadata(16L,
Optional.of(new MockOffsetMetadata("baz")));
- assertTrue(state.updateLocalState(updateHw, voterSet));
+ assertTrue(state.updateLocalState(updateHw, toMap(voterSet)));
assertEquals(Optional.of(updateHw), state.highWatermark());
}
@@ -251,15 +259,15 @@ public class LeaderStateTest {
int otherNodeId = 1;
Set<Integer> voterSet = mkSet(localId, otherNodeId);
LeaderState<?> state = newLeaderState(voterSet, 10L);
- assertFalse(state.updateLocalState(new LogOffsetMetadata(13L),
voterSet));
+ assertFalse(state.updateLocalState(new LogOffsetMetadata(13L),
toMap(voterSet)));
assertEquals(singleton(otherNodeId), state.nonAcknowledgingVoters());
assertEquals(Optional.empty(), state.highWatermark());
- assertFalse(state.updateReplicaState(otherNodeId, 0, new
LogOffsetMetadata(10L)));
+ assertFalse(state.updateReplicaState(otherNodeId, Uuid.randomUuid(),
0, new LogOffsetMetadata(10L)));
assertEquals(emptySet(), state.nonAcknowledgingVoters());
assertEquals(Optional.empty(), state.highWatermark());
- assertTrue(state.updateReplicaState(otherNodeId, 0, new
LogOffsetMetadata(11L)));
+ assertTrue(state.updateReplicaState(otherNodeId, Uuid.randomUuid(), 0,
new LogOffsetMetadata(11L)));
assertEquals(Optional.of(new LogOffsetMetadata(11L)),
state.highWatermark());
- assertTrue(state.updateReplicaState(otherNodeId, 0, new
LogOffsetMetadata(13L)));
+ assertTrue(state.updateReplicaState(otherNodeId, Uuid.randomUuid(), 0,
new LogOffsetMetadata(13L)));
assertEquals(Optional.of(new LogOffsetMetadata(13L)),
state.highWatermark());
}
@@ -269,22 +277,22 @@ public class LeaderStateTest {
int node2 = 2;
Set<Integer> voterSet = mkSet(localId, node1, node2);
LeaderState<?> state = newLeaderState(voterSet, 10L);
- assertFalse(state.updateLocalState(new LogOffsetMetadata(15L),
voterSet));
+ assertFalse(state.updateLocalState(new LogOffsetMetadata(15L),
toMap(voterSet)));
assertEquals(mkSet(node1, node2), state.nonAcknowledgingVoters());
assertEquals(Optional.empty(), state.highWatermark());
- assertFalse(state.updateReplicaState(node1, 0, new
LogOffsetMetadata(10L)));
+ assertFalse(state.updateReplicaState(node1, Uuid.randomUuid(), 0, new
LogOffsetMetadata(10L)));
assertEquals(singleton(node2), state.nonAcknowledgingVoters());
assertEquals(Optional.empty(), state.highWatermark());
- assertFalse(state.updateReplicaState(node2, 0, new
LogOffsetMetadata(10L)));
+ assertFalse(state.updateReplicaState(node2, Uuid.randomUuid(), 0, new
LogOffsetMetadata(10L)));
assertEquals(emptySet(), state.nonAcknowledgingVoters());
assertEquals(Optional.empty(), state.highWatermark());
- assertTrue(state.updateReplicaState(node2, 0, new
LogOffsetMetadata(15L)));
+ assertTrue(state.updateReplicaState(node2, Uuid.randomUuid(), 0, new
LogOffsetMetadata(15L)));
assertEquals(Optional.of(new LogOffsetMetadata(15L)),
state.highWatermark());
- assertFalse(state.updateLocalState(new LogOffsetMetadata(20L),
voterSet));
+ assertFalse(state.updateLocalState(new LogOffsetMetadata(20L),
toMap(voterSet)));
assertEquals(Optional.of(new LogOffsetMetadata(15L)),
state.highWatermark());
- assertTrue(state.updateReplicaState(node1, 0, new
LogOffsetMetadata(20L)));
+ assertTrue(state.updateReplicaState(node1, Uuid.randomUuid(), 0, new
LogOffsetMetadata(20L)));
assertEquals(Optional.of(new LogOffsetMetadata(20L)),
state.highWatermark());
- assertFalse(state.updateReplicaState(node2, 0, new
LogOffsetMetadata(20L)));
+ assertFalse(state.updateReplicaState(node2, Uuid.randomUuid(), 0, new
LogOffsetMetadata(20L)));
assertEquals(Optional.of(new LogOffsetMetadata(20L)),
state.highWatermark());
}
@@ -294,23 +302,23 @@ public class LeaderStateTest {
int node2 = 2;
Set<Integer> originalVoterSet = mkSet(localId, node1);
LeaderState<?> state = newLeaderState(originalVoterSet, 5L);
- assertFalse(state.updateLocalState(new LogOffsetMetadata(15L),
originalVoterSet));
- assertTrue(state.updateReplicaState(node1, 0, new
LogOffsetMetadata(10L)));
+ assertFalse(state.updateLocalState(new LogOffsetMetadata(15L),
toMap(originalVoterSet)));
+ assertTrue(state.updateReplicaState(node1, Uuid.randomUuid(), 0, new
LogOffsetMetadata(10L)));
assertEquals(Optional.of(new LogOffsetMetadata(10L)),
state.highWatermark());
// updating replica state of node2 before it joins voterSet should not
increase HW to 15L
- assertFalse(state.updateReplicaState(node2, 0, new
LogOffsetMetadata(15L)));
+ assertFalse(state.updateReplicaState(node2, Uuid.randomUuid(), 0, new
LogOffsetMetadata(15L)));
assertEquals(Optional.of(new LogOffsetMetadata(10L)),
state.highWatermark());
// adding node2 to voterSet will cause HW to increase to 15L
Set<Integer> voterSetWithNode2 = mkSet(localId, node1, node2);
- assertTrue(state.updateLocalState(new LogOffsetMetadata(15L),
voterSetWithNode2));
+ assertTrue(state.updateLocalState(new LogOffsetMetadata(15L),
toMap(voterSetWithNode2)));
assertEquals(Optional.of(new LogOffsetMetadata(15L)),
state.highWatermark());
// HW will not update to 16L until a majority reaches it
- assertFalse(state.updateLocalState(new LogOffsetMetadata(16L),
voterSetWithNode2));
+ assertFalse(state.updateLocalState(new LogOffsetMetadata(16L),
toMap(voterSetWithNode2)));
assertEquals(Optional.of(new LogOffsetMetadata(15L)),
state.highWatermark());
- assertTrue(state.updateReplicaState(node2, 0, new
LogOffsetMetadata(16L)));
+ assertTrue(state.updateReplicaState(node2, Uuid.randomUuid(), 0, new
LogOffsetMetadata(16L)));
assertEquals(Optional.of(new LogOffsetMetadata(16L)),
state.highWatermark());
}
@@ -322,29 +330,29 @@ public class LeaderStateTest {
// start with three voters with HW at 15L
Set<Integer> originalVoterSet = mkSet(localId, node1, node2);
LeaderState<?> state = newLeaderState(originalVoterSet, 5L);
- assertFalse(state.updateLocalState(new LogOffsetMetadata(15L),
originalVoterSet));
- assertTrue(state.updateReplicaState(node1, 0, new
LogOffsetMetadata(15L)));
+ assertFalse(state.updateLocalState(new LogOffsetMetadata(15L),
toMap(originalVoterSet)));
+ assertTrue(state.updateReplicaState(node1, Uuid.randomUuid(), 0, new
LogOffsetMetadata(15L)));
assertEquals(Optional.of(new LogOffsetMetadata(15L)),
state.highWatermark());
- assertFalse(state.updateReplicaState(node2, 0, new
LogOffsetMetadata(10L)));
+ assertFalse(state.updateReplicaState(node2, Uuid.randomUuid(), 0, new
LogOffsetMetadata(10L)));
// updating replica state of node3 before it joins voterSet
- assertFalse(state.updateReplicaState(node3, 0, new
LogOffsetMetadata(10L)));
+ assertFalse(state.updateReplicaState(node3, Uuid.randomUuid(), 0, new
LogOffsetMetadata(10L)));
// adding node3 to voterSet should not cause HW to decrease even if
majority is < HW
Set<Integer> voterSetWithNode3 = mkSet(localId, node1, node2, node3);
- assertFalse(state.updateLocalState(new LogOffsetMetadata(16L),
voterSetWithNode3));
+ assertFalse(state.updateLocalState(new LogOffsetMetadata(16L),
toMap(voterSetWithNode3)));
assertEquals(Optional.of(new LogOffsetMetadata(15L)),
state.highWatermark());
// HW will not decrease if calculated HW is anything lower than the
last HW
- assertFalse(state.updateReplicaState(node2, 0, new
LogOffsetMetadata(13L)));
+ assertFalse(state.updateReplicaState(node2, Uuid.randomUuid(), 0, new
LogOffsetMetadata(13L)));
assertEquals(Optional.of(new LogOffsetMetadata(15L)),
state.highWatermark());
- assertFalse(state.updateReplicaState(node3, 0, new
LogOffsetMetadata(13L)));
+ assertFalse(state.updateReplicaState(node3, Uuid.randomUuid(), 0, new
LogOffsetMetadata(13L)));
assertEquals(Optional.of(new LogOffsetMetadata(15L)),
state.highWatermark());
- assertFalse(state.updateReplicaState(node1, 0, new
LogOffsetMetadata(16L)));
+ assertFalse(state.updateReplicaState(node1, Uuid.randomUuid(), 0, new
LogOffsetMetadata(16L)));
assertEquals(Optional.of(new LogOffsetMetadata(15L)),
state.highWatermark());
// HW will update to 16L once a majority of the voterSet is at least
16L
- assertTrue(state.updateReplicaState(node3, 0, new
LogOffsetMetadata(16L)));
+ assertTrue(state.updateReplicaState(node3, Uuid.randomUuid(), 0, new
LogOffsetMetadata(16L)));
assertEquals(Optional.of(new LogOffsetMetadata(16L)),
state.highWatermark());
}
@@ -354,28 +362,28 @@ public class LeaderStateTest {
int node2 = 2;
Set<Integer> originalVoterSet = mkSet(localId, node1, node2);
LeaderState<?> state = newLeaderState(originalVoterSet, 10L);
- assertFalse(state.updateLocalState(new LogOffsetMetadata(15L),
originalVoterSet));
- assertTrue(state.updateReplicaState(node1, 0, new
LogOffsetMetadata(15L)));
- assertFalse(state.updateReplicaState(node2, 0, new
LogOffsetMetadata(10L)));
+ assertFalse(state.updateLocalState(new LogOffsetMetadata(15L),
toMap(originalVoterSet)));
+ assertTrue(state.updateReplicaState(node1, Uuid.randomUuid(), 0, new
LogOffsetMetadata(15L)));
+ assertFalse(state.updateReplicaState(node2, Uuid.randomUuid(), 0, new
LogOffsetMetadata(10L)));
assertEquals(Optional.of(new LogOffsetMetadata(15L)),
state.highWatermark());
// removing node1 should not decrement HW to 10L
Set<Integer> voterSetWithoutNode1 = mkSet(localId, node2);
- assertFalse(state.updateLocalState(new LogOffsetMetadata(17L),
voterSetWithoutNode1));
+ assertFalse(state.updateLocalState(new LogOffsetMetadata(17L),
toMap(voterSetWithoutNode1)));
assertEquals(Optional.of(new LogOffsetMetadata(15L)),
state.highWatermark());
// HW cannot change until after node2 catches up to last HW
- assertFalse(state.updateReplicaState(node2, 0, new
LogOffsetMetadata(14L)));
+ assertFalse(state.updateReplicaState(node2, Uuid.randomUuid(), 0, new
LogOffsetMetadata(14L)));
assertEquals(Optional.of(new LogOffsetMetadata(15L)),
state.highWatermark());
- assertFalse(state.updateLocalState(new LogOffsetMetadata(18L),
voterSetWithoutNode1));
+ assertFalse(state.updateLocalState(new LogOffsetMetadata(18L),
toMap(voterSetWithoutNode1)));
assertEquals(Optional.of(new LogOffsetMetadata(15L)),
state.highWatermark());
- assertFalse(state.updateReplicaState(node1, 0, new
LogOffsetMetadata(18L)));
+ assertFalse(state.updateReplicaState(node1, Uuid.randomUuid(), 0, new
LogOffsetMetadata(18L)));
assertEquals(Optional.of(new LogOffsetMetadata(15L)),
state.highWatermark());
- assertFalse(state.updateReplicaState(node2, 0, new
LogOffsetMetadata(15L)));
+ assertFalse(state.updateReplicaState(node2, Uuid.randomUuid(), 0, new
LogOffsetMetadata(15L)));
assertEquals(Optional.of(new LogOffsetMetadata(15L)),
state.highWatermark());
// HW should update to 16L
- assertTrue(state.updateReplicaState(node2, 0, new
LogOffsetMetadata(16L)));
+ assertTrue(state.updateReplicaState(node2, Uuid.randomUuid(), 0, new
LogOffsetMetadata(16L)));
assertEquals(Optional.of(new LogOffsetMetadata(16L)),
state.highWatermark());
}
@@ -385,28 +393,28 @@ public class LeaderStateTest {
int node2 = 2;
Set<Integer> originalVoterSet = mkSet(localId, node1, node2);
LeaderState<?> state = newLeaderState(originalVoterSet, 10L);
- assertFalse(state.updateLocalState(new LogOffsetMetadata(15L),
originalVoterSet));
- assertTrue(state.updateReplicaState(node1, 0, new
LogOffsetMetadata(15L)));
- assertFalse(state.updateReplicaState(node2, 0, new
LogOffsetMetadata(10L)));
+ assertFalse(state.updateLocalState(new LogOffsetMetadata(15L),
toMap(originalVoterSet)));
+ assertTrue(state.updateReplicaState(node1, Uuid.randomUuid(), 0, new
LogOffsetMetadata(15L)));
+ assertFalse(state.updateReplicaState(node2, Uuid.randomUuid(), 0, new
LogOffsetMetadata(10L)));
assertEquals(Optional.of(new LogOffsetMetadata(15L)),
state.highWatermark());
// removing leader should not decrement HW to 10L
Set<Integer> voterSetWithoutLeader = mkSet(node1, node2);
- assertFalse(state.updateLocalState(new LogOffsetMetadata(17L),
voterSetWithoutLeader));
+ assertFalse(state.updateLocalState(new LogOffsetMetadata(17L),
toMap(voterSetWithoutLeader)));
assertEquals(Optional.of(new LogOffsetMetadata(15L)),
state.highWatermark());
// HW cannot change until node2 catches up to last HW
- assertFalse(state.updateReplicaState(node1, 0, new
LogOffsetMetadata(16L)));
+ assertFalse(state.updateReplicaState(node1, Uuid.randomUuid(), 0, new
LogOffsetMetadata(16L)));
assertEquals(Optional.of(new LogOffsetMetadata(15L)),
state.highWatermark());
- assertFalse(state.updateLocalState(new LogOffsetMetadata(18L),
voterSetWithoutLeader));
+ assertFalse(state.updateLocalState(new LogOffsetMetadata(18L),
toMap(voterSetWithoutLeader)));
assertEquals(Optional.of(new LogOffsetMetadata(15L)),
state.highWatermark());
- assertFalse(state.updateReplicaState(node2, 0, new
LogOffsetMetadata(14L)));
+ assertFalse(state.updateReplicaState(node2, Uuid.randomUuid(), 0, new
LogOffsetMetadata(14L)));
assertEquals(Optional.of(new LogOffsetMetadata(15L)),
state.highWatermark());
- assertFalse(state.updateReplicaState(node2, 0, new
LogOffsetMetadata(15L)));
+ assertFalse(state.updateReplicaState(node2, Uuid.randomUuid(), 0, new
LogOffsetMetadata(15L)));
assertEquals(Optional.of(new LogOffsetMetadata(15L)),
state.highWatermark());
// HW will not update to 16L until majority of remaining voterSet
(node1, node2) are at least 16L
- assertTrue(state.updateReplicaState(node2, 0, new
LogOffsetMetadata(16L)));
+ assertTrue(state.updateReplicaState(node2, Uuid.randomUuid(), 0, new
LogOffsetMetadata(16L)));
assertEquals(Optional.of(new LogOffsetMetadata(16L)),
state.highWatermark());
}
@@ -416,13 +424,13 @@ public class LeaderStateTest {
int node1 = 1;
Set<Integer> voterSet = mkSet(localId, node1);
LeaderState<?> state = newLeaderState(voterSet, 0L);
- state.updateLocalState(new LogOffsetMetadata(10L), voterSet);
- state.updateReplicaState(node1, time.milliseconds(), new
LogOffsetMetadata(10L));
+ state.updateLocalState(new LogOffsetMetadata(10L), toMap(voterSet));
+ state.updateReplicaState(node1, Uuid.randomUuid(),
time.milliseconds(), new LogOffsetMetadata(10L));
assertEquals(Optional.of(new LogOffsetMetadata(10L)),
state.highWatermark());
// Follower crashes and disk is lost. It fetches an earlier offset to
rebuild state.
// The leader will report an error in the logs, but will not let the
high watermark rewind
- assertFalse(state.updateReplicaState(node1, time.milliseconds(), new
LogOffsetMetadata(5L)));
+ assertFalse(state.updateReplicaState(node1, Uuid.randomUuid(),
time.milliseconds(), new LogOffsetMetadata(5L)));
assertEquals(5L, describeVoterState(state, node1,
time.milliseconds()).logEndOffset());
assertEquals(Optional.of(new LogOffsetMetadata(10L)),
state.highWatermark());
}
@@ -459,6 +467,7 @@ public class LeaderStateTest {
assertEquals(1, partitionData.currentVoters().size());
assertEquals(new DescribeQuorumResponseData.ReplicaState()
.setReplicaId(localId)
+ .setReplicaDirectoryId(localDirectoryId)
.setLogEndOffset(-1)
.setLastFetchTimestamp(time.milliseconds())
.setLastCaughtUpTimestamp(time.milliseconds()),
@@ -466,7 +475,7 @@ public class LeaderStateTest {
// Now update the high watermark and verify the describe output
- assertTrue(state.updateLocalState(new
LogOffsetMetadata(leaderEndOffset), voterSet));
+ assertTrue(state.updateLocalState(new
LogOffsetMetadata(leaderEndOffset), toMap(voterSet)));
assertEquals(Optional.of(new LogOffsetMetadata(leaderEndOffset)),
state.highWatermark());
time.sleep(500);
@@ -479,6 +488,7 @@ public class LeaderStateTest {
assertEquals(1, partitionData.currentVoters().size());
assertEquals(new DescribeQuorumResponseData.ReplicaState()
.setReplicaId(localId)
+ .setReplicaDirectoryId(localDirectoryId)
.setLogEndOffset(leaderEndOffset)
.setLastFetchTimestamp(time.milliseconds())
.setLastCaughtUpTimestamp(time.milliseconds()),
@@ -489,17 +499,34 @@ public class LeaderStateTest {
public void testDescribeQuorumWithMultipleVoters() {
MockTime time = new MockTime();
int activeFollowerId = 1;
+ Uuid activeFollowerDirectoryId = Uuid.randomUuid();
int inactiveFollowerId = 2;
+ Uuid inactiveFollowerDirectoryId = Uuid.randomUuid();
long leaderStartOffset = 10L;
long leaderEndOffset = 15L;
- Set<Integer> voterSet = mkSet(localId, activeFollowerId,
inactiveFollowerId);
- LeaderState<?> state = newLeaderState(voterSet, leaderStartOffset);
- assertFalse(state.updateLocalState(new
LogOffsetMetadata(leaderEndOffset), voterSet));
+ Map<Integer, VoterSet.VoterNode> voters = new HashMap<>();
+ voters.put(localId, voterNode(localId, localDirectoryId));
+ voters.put(activeFollowerId, voterNode(activeFollowerId,
activeFollowerDirectoryId));
+ voters.put(inactiveFollowerId, voterNode(inactiveFollowerId,
inactiveFollowerDirectoryId));
+
+ LeaderState<?> state = new LeaderState<>(
+ time,
+ localId,
+ Uuid.randomUuid(),
+ epoch,
+ leaderStartOffset,
+ voters,
+ voters.keySet(),
+ accumulator,
+ fetchTimeoutMs,
+ logContext
+ );
+ assertFalse(state.updateLocalState(new
LogOffsetMetadata(leaderEndOffset), voters));
assertEquals(Optional.empty(), state.highWatermark());
long activeFollowerFetchTimeMs = time.milliseconds();
- assertTrue(state.updateReplicaState(activeFollowerId,
activeFollowerFetchTimeMs, new LogOffsetMetadata(leaderEndOffset)));
+ assertTrue(state.updateReplicaState(activeFollowerId,
activeFollowerDirectoryId, activeFollowerFetchTimeMs, new
LogOffsetMetadata(leaderEndOffset)));
assertEquals(Optional.of(new LogOffsetMetadata(leaderEndOffset)),
state.highWatermark());
time.sleep(500);
@@ -517,6 +544,7 @@ public class LeaderStateTest {
findReplicaOrFail(localId, partitionData.currentVoters());
assertEquals(new DescribeQuorumResponseData.ReplicaState()
.setReplicaId(localId)
+ .setReplicaDirectoryId(localDirectoryId)
.setLogEndOffset(leaderEndOffset)
.setLastFetchTimestamp(time.milliseconds())
.setLastCaughtUpTimestamp(time.milliseconds()),
@@ -526,6 +554,7 @@ public class LeaderStateTest {
findReplicaOrFail(activeFollowerId, partitionData.currentVoters());
assertEquals(new DescribeQuorumResponseData.ReplicaState()
.setReplicaId(activeFollowerId)
+ .setReplicaDirectoryId(activeFollowerDirectoryId)
.setLogEndOffset(leaderEndOffset)
.setLastFetchTimestamp(activeFollowerFetchTimeMs)
.setLastCaughtUpTimestamp(activeFollowerFetchTimeMs),
@@ -535,6 +564,7 @@ public class LeaderStateTest {
findReplicaOrFail(inactiveFollowerId,
partitionData.currentVoters());
assertEquals(new DescribeQuorumResponseData.ReplicaState()
.setReplicaId(inactiveFollowerId)
+ .setReplicaDirectoryId(inactiveFollowerDirectoryId)
.setLogEndOffset(-1)
.setLastFetchTimestamp(-1)
.setLastCaughtUpTimestamp(-1),
@@ -547,10 +577,10 @@ public class LeaderStateTest {
long leaderEndOffset) {
Set<Integer> voterSet = mkSet(localId, follower1, follower2);
LeaderState<?> state = newLeaderState(voterSet, leaderStartOffset);
- state.updateLocalState(new LogOffsetMetadata(leaderEndOffset),
voterSet);
+ state.updateLocalState(new LogOffsetMetadata(leaderEndOffset),
toMap(voterSet));
assertEquals(Optional.empty(), state.highWatermark());
- state.updateReplicaState(follower1, 0, new
LogOffsetMetadata(leaderStartOffset));
- state.updateReplicaState(follower2, 0, new
LogOffsetMetadata(leaderEndOffset));
+ state.updateReplicaState(follower1, Uuid.randomUuid(), 0, new
LogOffsetMetadata(leaderStartOffset));
+ state.updateReplicaState(follower2, Uuid.randomUuid(), 0, new
LogOffsetMetadata(leaderEndOffset));
return state;
}
@@ -558,16 +588,17 @@ public class LeaderStateTest {
public void testDescribeQuorumWithObservers() {
MockTime time = new MockTime();
int observerId = 10;
+ Uuid observerDirectoryId = Uuid.randomUuid();
long epochStartOffset = 10L;
Set<Integer> voterSet = singleton(localId);
LeaderState<?> state = newLeaderState(voterSet, epochStartOffset);
- assertTrue(state.updateLocalState(new
LogOffsetMetadata(epochStartOffset + 1), voterSet));
+ assertTrue(state.updateLocalState(new
LogOffsetMetadata(epochStartOffset + 1), toMap(voterSet)));
assertEquals(Optional.of(new LogOffsetMetadata(epochStartOffset + 1)),
state.highWatermark());
time.sleep(500);
long observerFetchTimeMs = time.milliseconds();
- assertFalse(state.updateReplicaState(observerId, observerFetchTimeMs,
new LogOffsetMetadata(epochStartOffset + 1)));
+ assertFalse(state.updateReplicaState(observerId, observerDirectoryId,
observerFetchTimeMs, new LogOffsetMetadata(epochStartOffset + 1)));
time.sleep(500);
DescribeQuorumResponseData.PartitionData partitionData =
state.describeQuorum(time.milliseconds());
@@ -584,6 +615,7 @@ public class LeaderStateTest {
DescribeQuorumResponseData.ReplicaState observerState =
observerStates.get(0);
assertEquals(new DescribeQuorumResponseData.ReplicaState()
.setReplicaId(observerId)
+ .setReplicaDirectoryId(observerDirectoryId)
.setLogEndOffset(epochStartOffset + 1)
.setLastFetchTimestamp(observerFetchTimeMs)
.setLastCaughtUpTimestamp(observerFetchTimeMs),
@@ -600,15 +632,15 @@ public class LeaderStateTest {
Set<Integer> voterSet = mkSet(leader, node1, node2);
LeaderState<?> state = newLeaderState(voterSet, epochStartOffset);
- assertFalse(state.updateLocalState(new
LogOffsetMetadata(epochStartOffset + 1), voterSet));
- assertTrue(state.updateReplicaState(node2, 0, new
LogOffsetMetadata(epochStartOffset + 1)));
+ assertFalse(state.updateLocalState(new
LogOffsetMetadata(epochStartOffset + 1), toMap(voterSet)));
+ assertTrue(state.updateReplicaState(node2, Uuid.randomUuid(), 0, new
LogOffsetMetadata(epochStartOffset + 1)));
assertEquals(Optional.of(new LogOffsetMetadata(epochStartOffset + 1)),
state.highWatermark());
// node1 becomes an observer
long fetchTimeMs = time.milliseconds();
- assertFalse(state.updateReplicaState(node1, fetchTimeMs, new
LogOffsetMetadata(epochStartOffset + 1)));
+ assertFalse(state.updateReplicaState(node1, Uuid.randomUuid(),
fetchTimeMs, new LogOffsetMetadata(epochStartOffset + 1)));
Set<Integer> voterSetWithoutNode1 = mkSet(leader, node2);
- state.updateLocalState(new LogOffsetMetadata(epochStartOffset + 5),
voterSetWithoutNode1);
+ state.updateLocalState(new LogOffsetMetadata(epochStartOffset + 5),
toMap(voterSetWithoutNode1));
time.sleep(500);
@@ -624,11 +656,11 @@ public class LeaderStateTest {
// node1 catches up with leader, HW should not change
time.sleep(500);
fetchTimeMs = time.milliseconds();
- assertFalse(state.updateReplicaState(node1, fetchTimeMs, new
LogOffsetMetadata(epochStartOffset + 5)));
+ assertFalse(state.updateReplicaState(node1, Uuid.randomUuid(),
fetchTimeMs, new LogOffsetMetadata(epochStartOffset + 5)));
assertEquals(Optional.of(new LogOffsetMetadata(epochStartOffset + 1)),
state.highWatermark());
// node1 becomes a voter again, HW should change
- assertTrue(state.updateLocalState(new
LogOffsetMetadata(epochStartOffset + 10), voterSet));
+ assertTrue(state.updateLocalState(new
LogOffsetMetadata(epochStartOffset + 10), toMap(voterSet)));
time.sleep(500);
partitionData = state.describeQuorum(time.milliseconds());
@@ -653,13 +685,13 @@ public class LeaderStateTest {
Set<Integer> voterSet = mkSet(localId, followerId);
LeaderState<?> state = newLeaderState(voterSet, epochStartOffset);
- assertFalse(state.updateLocalState(new
LogOffsetMetadata(epochStartOffset + 1), voterSet));
- assertTrue(state.updateReplicaState(followerId, time.milliseconds(),
new LogOffsetMetadata(epochStartOffset + 1)));
+ assertFalse(state.updateLocalState(new
LogOffsetMetadata(epochStartOffset + 1), toMap(voterSet)));
+ assertTrue(state.updateReplicaState(followerId, Uuid.randomUuid(),
time.milliseconds(), new LogOffsetMetadata(epochStartOffset + 1)));
// observer is returned since its lastFetchTimestamp is within
OBSERVER_SESSION_TIMEOUT_MS
time.sleep(500);
long observerFetchTimeMs = time.milliseconds();
- assertFalse(state.updateReplicaState(observerId, observerFetchTimeMs,
new LogOffsetMetadata(epochStartOffset + 1)));
+ assertFalse(state.updateReplicaState(observerId, Uuid.randomUuid(),
observerFetchTimeMs, new LogOffsetMetadata(epochStartOffset + 1)));
time.sleep(500);
DescribeQuorumResponseData.PartitionData partitionData =
state.describeQuorum(time.milliseconds());
@@ -678,8 +710,8 @@ public class LeaderStateTest {
assertEquals(0, partitionData.observers().size());
// leader becomes observer
- Set<Integer> voterSetWithoutLeader = singleton(followerId);
- assertFalse(state.updateLocalState(new
LogOffsetMetadata(epochStartOffset + 10), voterSetWithoutLeader));
+ Set<Integer> voterSetWithoutLeader = mkSet(followerId);
+ assertFalse(state.updateLocalState(new
LogOffsetMetadata(epochStartOffset + 10), toMap(voterSetWithoutLeader)));
// leader should be returned in describe quorum output
time.sleep(LeaderState.OBSERVER_SESSION_TIMEOUT_MS);
@@ -752,7 +784,7 @@ public class LeaderStateTest {
long epochStartOffset = 10L;
LeaderState<?> state = newLeaderState(mkSet(localId),
epochStartOffset);
- assertFalse(state.updateReplicaState(replicaId, 0, new
LogOffsetMetadata(epochStartOffset)));
+ assertFalse(state.updateReplicaState(replicaId, Uuid.randomUuid(), 0,
new LogOffsetMetadata(epochStartOffset)));
DescribeQuorumResponseData.PartitionData partitionData =
state.describeQuorum(time.milliseconds());
List<DescribeQuorumResponseData.ReplicaState> observerStates =
partitionData.observers();
@@ -766,7 +798,7 @@ public class LeaderStateTest {
long epochStartOffset = 10L;
LeaderState<?> state = newLeaderState(mkSet(localId),
epochStartOffset);
- state.updateReplicaState(observerId, time.milliseconds(), new
LogOffsetMetadata(epochStartOffset));
+ state.updateReplicaState(observerId, Uuid.randomUuid(),
time.milliseconds(), new LogOffsetMetadata(epochStartOffset));
DescribeQuorumResponseData.PartitionData partitionData =
state.describeQuorum(time.milliseconds());
List<DescribeQuorumResponseData.ReplicaState> observerStates =
partitionData.observers();
assertEquals(1, observerStates.size());
@@ -782,7 +814,7 @@ public class LeaderStateTest {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testGrantVote(boolean isLogUpToDate) {
- LeaderState<?> state = newLeaderState(Utils.mkSet(1, 2, 3), 1);
+ LeaderState<?> state = newLeaderState(mkSet(1, 2, 3), 1);
assertFalse(state.canGrantVote(ReplicaKey.of(1, Optional.empty()),
isLogUpToDate));
assertFalse(state.canGrantVote(ReplicaKey.of(2, Optional.empty()),
isLogUpToDate));
@@ -840,4 +872,11 @@ public class LeaderStateTest {
));
}
+ private Map<Integer, VoterSet.VoterNode> toMap(Set<Integer> data) {
+ return data.stream().collect(Collectors.toMap(Function.identity(), id
-> voterNode(id, id == localId ? localDirectoryId : Uuid.randomUuid())));
+ }
+
+ private VoterSet.VoterNode voterNode(int id, Uuid directoryId) {
+ return new VoterSet.VoterNode(ReplicaKey.of(id,
Optional.of(directoryId)), null, null);
+ }
}
diff --git
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index 19f2fb61ec6..1ebc875d1d4 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -528,9 +528,24 @@ public final class RaftClientTestContext {
.setHighWatermark(highWatermark)
.setCurrentVoters(voterStates)
.setObservers(observerStates);
+
+ DescribeQuorumResponseData.NodeCollection nodes = new
DescribeQuorumResponseData.NodeCollection();
+
+ Consumer<DescribeQuorumResponseData.ReplicaState> addToNodes =
replicaState -> {
+ if (nodes.find(replicaState.replicaId()) != null)
+ return;
+
+ nodes.add(new DescribeQuorumResponseData.Node()
+ .setNodeId(replicaState.replicaId()));
+ };
+
+ voterStates.forEach(addToNodes);
+ observerStates.forEach(addToNodes);
+
DescribeQuorumResponseData expectedResponse =
DescribeQuorumResponse.singletonResponse(
metadataPartition,
- partitionData
+ partitionData,
+ nodes
);
assertEquals(expectedResponse, response);
}
@@ -595,7 +610,7 @@ public final class RaftClientTestContext {
void deliverRequest(ApiMessage request) {
RaftRequest.Inbound inboundRequest = new RaftRequest.Inbound(
- channel.newCorrelationId(), request, time.milliseconds());
+ channel.newCorrelationId(), request.highestSupportedVersion(),
request, time.milliseconds());
inboundRequest.completion.whenComplete((response, exception) -> {
if (exception != null) {
throw new RuntimeException(exception);
diff --git
a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
index 4896571c22e..2c0fd1eff2b 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
@@ -1235,7 +1235,7 @@ public class RaftEventSimulationTest {
int correlationId = outbound.correlationId();
Node destination = outbound.destination();
- RaftRequest.Inbound inbound = new
RaftRequest.Inbound(correlationId, outbound.data(),
+ RaftRequest.Inbound inbound = new
RaftRequest.Inbound(correlationId, outbound.apiVersion(), outbound.data(),
cluster.time.milliseconds());
if (!filters.get(destination.id()).acceptInbound(inbound))
diff --git
a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java
b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java
index ce59d587b9d..f84e81b18dd 100644
---
a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java
+++
b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java
@@ -147,8 +147,8 @@ public class KafkaRaftMetricsTest {
assertEquals((double) 1, getMetric(metrics,
"current-epoch").metricValue());
assertEquals((double) -1L, getMetric(metrics,
"high-watermark").metricValue());
- state.leaderStateOrThrow().updateLocalState(new LogOffsetMetadata(5L),
voters.voterIds());
- state.leaderStateOrThrow().updateReplicaState(1, 0, new
LogOffsetMetadata(5L));
+ state.leaderStateOrThrow().updateLocalState(new LogOffsetMetadata(5L),
voters.voters());
+ state.leaderStateOrThrow().updateReplicaState(1, Uuid.randomUuid(), 0,
new LogOffsetMetadata(5L));
assertEquals((double) 5L, getMetric(metrics,
"high-watermark").metricValue());
state.transitionToFollower(2, voters.voterNode(1,
VoterSetTest.DEFAULT_LISTENER_NAME).get());