This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7495e70365e KAFKA-16532; Support for first leader bootstrapping the
voter set (#16518)
7495e70365e is described below
commit 7495e70365ebeba4806fc4da23c2ca75fa191649
Author: Alyssa Huang <[email protected]>
AuthorDate: Fri Jul 12 13:44:21 2024 -0700
KAFKA-16532; Support for first leader bootstrapping the voter set (#16518)
The first leader of a KRaft topic partition must rewrite the content of the
bootstrap checkpoint (0-0.checkpoint) to the log so that it is replicated.
Bootstrap checkpoints are not replicated to the followers.
The control records for KRaftVersionRecord and VotersRecord in the
bootstrap checkpoint will be written in one batch along with the
LeaderChangeMessage. The leader will write these control records before
accepting data records from the state machine (Controller).
The leader determines that the bootstrap checkpoint has not been written to
the log if the latest set of voters is located at offset -1. This is the last
contained offset for the bootstrap checkpoint (0-0.checkpoint).
This change also improves the RaftClientTestContext to allow for better
testing of the reconfiguration functionality. This is mainly done by allowing
the voter set to be configured statically or through the bootstrap checkpoint.
Reviewers: José Armando García Sancio <[email protected]>, Colin P. McCabe
<[email protected]>
Co-authors: José Armando García Sancio <[email protected]>
---
.../scala/kafka/raft/KafkaMetadataLogTest.scala | 1 -
.../org/apache/kafka/raft/KafkaRaftClient.java | 41 ++-
.../java/org/apache/kafka/raft/LeaderState.java | 81 ++++-
.../java/org/apache/kafka/raft/QuorumState.java | 59 ++--
.../kafka/raft/internals/BatchAccumulator.java | 21 +-
.../apache/kafka/raft/internals/BatchBuilder.java | 5 +-
.../internals/KRaftControlRecordStateMachine.java | 11 +-
.../kafka/raft/internals/TreeMapLogHistory.java | 5 +-
.../kafka/raft/internals/VoterSetHistory.java | 18 ++
.../kafka/snapshot/RecordsSnapshotWriter.java | 2 +-
.../java/org/apache/kafka/snapshot/Snapshots.java | 2 +
.../kafka/raft/KafkaRaftClientReconfigTest.java | 349 +++++++++++++++++++++
.../kafka/raft/KafkaRaftClientSnapshotTest.java | 75 +++--
.../org/apache/kafka/raft/KafkaRaftClientTest.java | 2 +-
.../org/apache/kafka/raft/LeaderStateTest.java | 9 +-
.../org/apache/kafka/raft/MockNetworkChannel.java | 6 +-
.../org/apache/kafka/raft/QuorumStateTest.java | 16 +-
.../apache/kafka/raft/RaftClientTestContext.java | 218 +++++++++----
.../kafka/raft/internals/BatchAccumulatorTest.java | 47 ++-
.../kafka/raft/internals/BatchBuilderTest.java | 6 +-
.../kafka/raft/internals/KafkaRaftMetricsTest.java | 15 +-
.../kafka/raft/internals/RecordsIteratorTest.java | 1 -
.../raft/internals/TreeMapLogHistoryTest.java | 8 +-
.../kafka/raft/internals/VoterSetHistoryTest.java | 33 +-
.../apache/kafka/raft/internals/VoterSetTest.java | 8 +-
.../kafka/snapshot/SnapshotWriterReaderTest.java | 27 +-
26 files changed, 888 insertions(+), 178 deletions(-)
diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
index 4f2daf3bfa6..cca56f7aa96 100644
--- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
+++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
@@ -669,7 +669,6 @@ final class KafkaMetadataLogTest {
Compression.NONE,
0L,
mockTime.milliseconds(),
- false,
leaderEpoch,
maxBatchSizeInBytes
)
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index b9f9f9bef5d..0f632a29021 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -105,6 +105,7 @@ import java.util.stream.Collectors;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition;
+import static org.apache.kafka.snapshot.Snapshots.BOOTSTRAP_SNAPSHOT_ID;
/**
* This class implements a Kafkaesque version of the Raft protocol. Leader
election
@@ -413,8 +414,12 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
QuorumStateStore quorumStateStore,
Metrics metrics
) {
+ Optional<VoterSet> staticVoters = voterAddresses.isEmpty() ?
+ Optional.empty() :
+
Optional.of(VoterSet.fromInetSocketAddresses(channel.listenerName(),
voterAddresses));
+
partitionState = new KRaftControlRecordStateMachine(
-
Optional.of(VoterSet.fromInetSocketAddresses(channel.listenerName(),
voterAddresses)),
+ staticVoters,
log,
serde,
BufferSupplier.create(),
@@ -453,8 +458,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
quorum = new QuorumState(
nodeId,
nodeDirectoryId,
- partitionState::lastVoterSet,
- partitionState::lastKraftVersion,
+ partitionState,
localListeners,
quorumConfig.electionTimeoutMs(),
quorumConfig.fetchTimeoutMs(),
@@ -542,7 +546,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
// The high watermark can only be advanced once we have written a
record
// from the new leader's epoch. Hence we write a control message
immediately
// to ensure there is no delay committing pending data.
- state.appendLeaderChangeMessage(currentTimeMs);
+ state.appendLeaderChangeMessageAndBootstrapRecords(currentTimeMs);
resetConnections();
kafkaRaftMetrics.maybeUpdateElectionLatency(currentTimeMs);
@@ -1398,8 +1402,8 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
Optional<OffsetAndEpoch> latestSnapshotId = log.latestSnapshotId();
final ValidOffsetAndEpoch validOffsetAndEpoch;
- if (fetchOffset == 0 && latestSnapshotId.isPresent()) {
- // If the follower has an empty log and a snapshot exist, it
is always more efficient
+ if (fetchOffset == 0 && latestSnapshotId.isPresent() &&
!latestSnapshotId.get().equals(BOOTSTRAP_SNAPSHOT_ID)) {
+ // If the follower has an empty log and a non-bootstrap
snapshot exists, it is always more efficient
// to reply with a snapshot id (FETCH_SNAPSHOT) instead of
fetching from the log segments.
validOffsetAndEpoch =
ValidOffsetAndEpoch.snapshot(latestSnapshotId.get());
} else {
@@ -1559,11 +1563,20 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
// snapshot is expected to reference offsets and epochs
greater than the log
// end offset and high-watermark.
state.setFetchingSnapshot(log.createNewSnapshotUnchecked(snapshotId));
- logger.info(
- "Fetching snapshot {} from Fetch response from leader
{}",
- snapshotId,
- quorum.leaderIdOrSentinel()
- );
+ if (state.fetchingSnapshot().isPresent()) {
+ logger.info(
+ "Fetching snapshot {} from Fetch response from
leader {}",
+ snapshotId,
+ quorum.leaderIdOrSentinel()
+ );
+ } else {
+ logger.info(
+ "Leader {} returned a snapshot {} in the FETCH
response which is " +
+ "already stored",
+ quorum.leaderIdOrSentinel(),
+ snapshotId
+ );
+ }
}
} else {
Records records =
FetchResponse.recordsOrFail(partitionResponse);
@@ -1712,8 +1725,12 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
partitionSnapshot.snapshotId().endOffset(),
partitionSnapshot.snapshotId().epoch()
);
+
Optional<RawSnapshotReader> snapshotOpt = log.readSnapshot(snapshotId);
- if (!snapshotOpt.isPresent()) {
+ if (!snapshotOpt.isPresent() ||
snapshotId.equals(BOOTSTRAP_SNAPSHOT_ID)) {
+ // The bootstrap checkpoint should not be replicated. The first
leader will
+ // make sure that the content of the bootstrap checkpoint is
included in the
+ // partition log
return RaftUtil.singletonFetchSnapshotResponse(
requestMetadata.listenerName(),
requestMetadata.apiVersion(),
diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
index 5e6dd8de6b5..59c17cbcc9e 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
@@ -17,10 +17,14 @@
package org.apache.kafka.raft;
import org.apache.kafka.common.message.DescribeQuorumResponseData;
+import org.apache.kafka.common.message.KRaftVersionRecord;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.ControlRecordUtils;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
@@ -39,6 +43,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.OptionalLong;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -58,6 +63,10 @@ public class LeaderState<T> implements EpochState {
private final long epochStartOffset;
private final Set<Integer> grantingVoters;
private final Endpoints endpoints;
+ private final VoterSet voterSetAtEpochStart;
+ // This field is non-empty if the voter set at epoch start came from a
snapshot or log segment
+ private final OptionalLong offsetOfVotersAtEpochStart;
+ private final short kraftVersionAtEpochStart;
private Optional<LogOffsetMetadata> highWatermark = Optional.empty();
private Map<Integer, ReplicaState> voterStates = new HashMap<>();
@@ -79,7 +88,9 @@ public class LeaderState<T> implements EpochState {
ReplicaKey localReplicaKey,
int epoch,
long epochStartOffset,
- VoterSet voters,
+ VoterSet voterSetAtEpochStart,
+ OptionalLong offsetOfVotersAtEpochStart,
+ short kraftVersionAtEpochStart,
Set<Integer> grantingVoters,
BatchAccumulator<T> accumulator,
Endpoints endpoints,
@@ -91,7 +102,7 @@ public class LeaderState<T> implements EpochState {
this.epochStartOffset = epochStartOffset;
this.endpoints = endpoints;
- for (VoterSet.VoterNode voterNode: voters.voterNodes()) {
+ for (VoterSet.VoterNode voterNode: voterSetAtEpochStart.voterNodes()) {
boolean hasAcknowledgedLeader = voterNode.isVoter(localReplicaKey);
this.voterStates.put(
voterNode.voterKey().id(),
@@ -106,6 +117,9 @@ public class LeaderState<T> implements EpochState {
this.checkQuorumTimer = time.timer(checkQuorumTimeoutMs);
this.beginQuorumEpochTimeoutMs = fetchTimeoutMs / 2;
this.beginQuorumEpochTimer = time.timer(0);
+ this.voterSetAtEpochStart = voterSetAtEpochStart;
+ this.offsetOfVotersAtEpochStart = offsetOfVotersAtEpochStart;
+ this.kraftVersionAtEpochStart = kraftVersionAtEpochStart;
}
public long timeUntilBeginQuorumEpochTimerExpires(long currentTimeMs) {
@@ -149,7 +163,7 @@ public class LeaderState<T> implements EpochState {
/**
* Reset the checkQuorumTimer if we've received fetch/fetchSnapshot
request from the majority of the voter
*
- * @param id the node id
+ * @param replicaKey the replica key of the voter
* @param currentTimeMs the current timestamp in millisecond
*/
public void updateCheckQuorumForFollowingVoter(ReplicaKey replicaKey, long
currentTimeMs) {
@@ -192,7 +206,7 @@ public class LeaderState<T> implements EpochState {
.collect(Collectors.toList());
}
- public void appendLeaderChangeMessage(long currentTimeMs) {
+ public void appendLeaderChangeMessageAndBootstrapRecords(long
currentTimeMs) {
List<Voter> voters = convertToVoters(voterStates.keySet());
List<Voter> grantingVoters = convertToVoters(this.grantingVoters());
@@ -202,7 +216,59 @@ public class LeaderState<T> implements EpochState {
.setVoters(voters)
.setGrantingVoters(grantingVoters);
- accumulator.appendLeaderChangeMessage(leaderChangeMessage,
currentTimeMs);
+ accumulator.appendControlMessages((baseOffset, epoch, compression,
buffer) -> {
+ try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
+ buffer,
+ RecordBatch.CURRENT_MAGIC_VALUE,
+ compression,
+ TimestampType.CREATE_TIME,
+ baseOffset,
+ currentTimeMs,
+ RecordBatch.NO_PRODUCER_ID,
+ RecordBatch.NO_PRODUCER_EPOCH,
+ RecordBatch.NO_SEQUENCE,
+ false, // isTransactional
+ true, // isControlBatch
+ epoch,
+ buffer.capacity()
+ )
+ ) {
+ builder.appendLeaderChangeMessage(currentTimeMs,
leaderChangeMessage);
+
+ offsetOfVotersAtEpochStart.ifPresent(offset -> {
+ if (offset == -1) {
+ // Latest voter set came from the bootstrap checkpoint
(0-0.checkpoint)
+ // rewrite the voter set to the log so that it is
replcated to the replicas.
+ if (kraftVersionAtEpochStart < 1) {
+ throw new IllegalStateException(
+ String.format(
+ "The bootstrap checkpoint contains a set
of voters %s at %s " +
+ "and the KRaft version is %s",
+ voterSetAtEpochStart,
+ offset,
+ kraftVersionAtEpochStart
+ )
+ );
+ } else {
+ builder.appendKRaftVersionMessage(
+ currentTimeMs,
+ new KRaftVersionRecord()
+
.setVersion(ControlRecordUtils.KRAFT_VERSION_CURRENT_VERSION)
+ .setKRaftVersion(kraftVersionAtEpochStart)
+ );
+ builder.appendVotersMessage(
+ currentTimeMs,
+ voterSetAtEpochStart.toVotersRecord(
+
ControlRecordUtils.KRAFT_VOTERS_CURRENT_VERSION
+ )
+ );
+ }
+ }
+ });
+
+ return builder.build();
+ }
+ });
accumulator.forceDrain();
}
@@ -338,7 +404,7 @@ public class LeaderState<T> implements EpochState {
* Update the local replica state.
*
* @param endOffsetMetadata updated log end offset of local replica
- * @param lastVoters the up-to-date voter set
+ * @param lastVoterSet the up-to-date voter set
* @return true if the high watermark is updated as a result of this call
*/
public boolean updateLocalState(
@@ -362,8 +428,7 @@ public class LeaderState<T> implements EpochState {
/**
* Update the replica state in terms of fetch time and log end offsets.
*
- * @param replicaId replica id
- * @param replicaDirectoryId replica directory id
+ * @param replicaKey replica key
* @param currentTimeMs current time in milliseconds
* @param fetchOffsetMetadata new log offset and metadata
* @return true if the high watermark is updated as a result of this call
diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
index f2734891e58..816d9210bcc 100644
--- a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.raft.internals.BatchAccumulator;
+import org.apache.kafka.raft.internals.KRaftControlRecordStateMachine;
import org.apache.kafka.raft.internals.ReplicaKey;
import org.apache.kafka.raft.internals.VoterSet;
@@ -32,7 +33,6 @@ import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Random;
-import java.util.function.Supplier;
/**
* This class is responsible for managing the current state of this node and
ensuring
@@ -82,8 +82,7 @@ public class QuorumState {
private final Time time;
private final Logger log;
private final QuorumStateStore store;
- private final Supplier<VoterSet> latestVoterSet;
- private final Supplier<Short> latestKraftVersion;
+ private final KRaftControlRecordStateMachine partitionState;
private final Endpoints localListeners;
private final Random random;
private final int electionTimeoutMs;
@@ -95,8 +94,7 @@ public class QuorumState {
public QuorumState(
OptionalInt localId,
Uuid localDirectoryId,
- Supplier<VoterSet> latestVoterSet,
- Supplier<Short> latestKraftVersion,
+ KRaftControlRecordStateMachine partitionState,
Endpoints localListeners,
int electionTimeoutMs,
int fetchTimeoutMs,
@@ -107,8 +105,7 @@ public class QuorumState {
) {
this.localId = localId;
this.localDirectoryId = localDirectoryId;
- this.latestVoterSet = latestVoterSet;
- this.latestKraftVersion = latestKraftVersion;
+ this.partitionState = partitionState;
this.localListeners = localListeners;
this.electionTimeoutMs = electionTimeoutMs;
this.fetchTimeoutMs = fetchTimeoutMs;
@@ -123,7 +120,9 @@ public class QuorumState {
ElectionState election;
election = store
.readElectionState()
- .orElseGet(() -> ElectionState.withUnknownLeader(0,
latestVoterSet.get().voterIds()));
+ .orElseGet(
+ () -> ElectionState.withUnknownLeader(0,
partitionState.lastVoterSet().voterIds())
+ );
return election;
}
@@ -154,7 +153,7 @@ public class QuorumState {
time,
logEndOffsetAndEpoch.epoch(),
OptionalInt.empty(),
- latestVoterSet.get().voterIds(),
+ partitionState.lastVoterSet().voterIds(),
Optional.empty(),
randomElectionTimeoutMs(),
logContext
@@ -170,7 +169,7 @@ public class QuorumState {
time,
localId.getAsInt(),
election.epoch(),
- latestVoterSet.get().voterIds(),
+ partitionState.lastVoterSet().voterIds(),
randomElectionTimeoutMs(),
Collections.emptyList(),
localListeners,
@@ -185,7 +184,7 @@ public class QuorumState {
localId.getAsInt(),
localDirectoryId,
election.epoch(),
- latestVoterSet.get(),
+ partitionState.lastVoterSet(),
Optional.empty(),
1,
randomElectionTimeoutMs(),
@@ -196,13 +195,13 @@ public class QuorumState {
time,
election.epoch(),
election.votedKey(),
- latestVoterSet.get().voterIds(),
+ partitionState.lastVoterSet().voterIds(),
Optional.empty(),
randomElectionTimeoutMs(),
logContext
);
} else if (election.hasLeader()) {
- VoterSet voters = latestVoterSet.get();
+ VoterSet voters = partitionState.lastVoterSet();
Endpoints leaderEndpoints = voters.listeners(election.leaderId());
if (leaderEndpoints.isEmpty()) {
// Since the leader's endpoints are not known, it cannot send
Fetch or
@@ -223,7 +222,7 @@ public class QuorumState {
time,
election.epoch(),
OptionalInt.of(election.leaderId()),
- latestVoterSet.get().voterIds(),
+ partitionState.lastVoterSet().voterIds(),
Optional.empty(),
randomElectionTimeoutMs(),
logContext
@@ -245,7 +244,7 @@ public class QuorumState {
time,
election.epoch(),
OptionalInt.empty(),
- latestVoterSet.get().voterIds(),
+ partitionState.lastVoterSet().voterIds(),
Optional.empty(),
randomElectionTimeoutMs(),
logContext
@@ -257,7 +256,9 @@ public class QuorumState {
public boolean isOnlyVoter() {
return localId.isPresent() &&
- latestVoterSet.get().isOnlyVoter(ReplicaKey.of(localId.getAsInt(),
localDirectoryId));
+ partitionState
+ .lastVoterSet()
+ .isOnlyVoter(ReplicaKey.of(localId.getAsInt(),
localDirectoryId));
}
public int localIdOrSentinel() {
@@ -317,13 +318,13 @@ public class QuorumState {
return false;
}
- return latestVoterSet
- .get()
+ return partitionState
+ .lastVoterSet()
.isVoter(ReplicaKey.of(localId.getAsInt(), localDirectoryId));
}
public boolean isVoter(ReplicaKey nodeKey) {
- return latestVoterSet.get().isVoter(nodeKey);
+ return partitionState.lastVoterSet().isVoter(nodeKey);
}
public boolean isObserver() {
@@ -343,7 +344,7 @@ public class QuorumState {
time,
localIdOrThrow(),
epoch,
- latestVoterSet.get().voterIds(),
+ partitionState.lastVoterSet().voterIds(),
randomElectionTimeoutMs(),
preferredSuccessors,
localListeners,
@@ -380,7 +381,7 @@ public class QuorumState {
time,
epoch,
OptionalInt.empty(),
- latestVoterSet.get().voterIds(),
+ partitionState.lastVoterSet().voterIds(),
state.highWatermark(),
electionTimeoutMs,
logContext
@@ -437,7 +438,7 @@ public class QuorumState {
time,
epoch,
candidateKey,
- latestVoterSet.get().voterIds(),
+ partitionState.lastVoterSet().voterIds(),
state.highWatermark(),
randomElectionTimeoutMs(),
logContext
@@ -467,7 +468,7 @@ public class QuorumState {
epoch,
leaderId,
endpoints,
- latestVoterSet.get().voterIds(),
+ partitionState.lastVoterSet().voterIds(),
state.highWatermark(),
fetchTimeoutMs,
logContext
@@ -483,7 +484,7 @@ public class QuorumState {
"is not one of the voters %s",
localId,
localDirectoryId,
- latestVoterSet.get()
+ partitionState.lastVoterSet()
)
);
} else if (isLeader()) {
@@ -500,7 +501,7 @@ public class QuorumState {
localIdOrThrow(),
localDirectoryId,
newEpoch,
- latestVoterSet.get(),
+ partitionState.lastVoterSet(),
state.highWatermark(),
retries,
electionTimeoutMs,
@@ -516,7 +517,7 @@ public class QuorumState {
"is not one of the voters %s",
localId,
localDirectoryId,
- latestVoterSet.get()
+ partitionState.lastVoterSet()
)
);
} else if (!isCandidate()) {
@@ -543,7 +544,9 @@ public class QuorumState {
ReplicaKey.of(localIdOrThrow(), localDirectoryId),
epoch(),
epochStartOffset,
- latestVoterSet.get(),
+ partitionState.lastVoterSet(),
+ partitionState.lastVoterSetOffset(),
+ partitionState.lastKraftVersion(),
candidateState.grantingVoters(),
accumulator,
localListeners,
@@ -564,7 +567,7 @@ public class QuorumState {
}
}
- store.writeElectionState(newState.election(),
latestKraftVersion.get());
+ store.writeElectionState(newState.election(),
partitionState.lastKraftVersion());
memoryTransitionTo(newState);
}
diff --git
a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
index ce629cc44ef..09204812ae6 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
@@ -49,7 +49,12 @@ import java.util.concurrent.locks.ReentrantLock;
public class BatchAccumulator<T> implements Closeable {
@FunctionalInterface
public interface MemoryRecordsCreator {
- MemoryRecords create(long baseOffset, int epoch, ByteBuffer
byteBuffer);
+ MemoryRecords create(
+ long baseOffset,
+ int epoch,
+ Compression compression,
+ ByteBuffer byteBuffer
+ );
}
private final int epoch;
@@ -228,7 +233,12 @@ public class BatchAccumulator<T> implements Closeable {
if (buffer != null) {
try {
forceDrain();
- MemoryRecords memoryRecords =
valueCreator.create(nextOffset, epoch, buffer);
+ MemoryRecords memoryRecords = valueCreator.create(
+ nextOffset,
+ epoch,
+ compression,
+ buffer
+ );
int numberOfRecords =
validateMemoryRecordsAndReturnCount(memoryRecords);
@@ -304,7 +314,7 @@ public class BatchAccumulator<T> implements Closeable {
LeaderChangeMessage leaderChangeMessage,
long currentTimestamp
) {
- appendControlMessages((baseOffset, epoch, buffer) ->
+ appendControlMessages((baseOffset, epoch, compression, buffer) ->
MemoryRecords.withLeaderChangeMessage(
baseOffset,
currentTimestamp,
@@ -327,7 +337,7 @@ public class BatchAccumulator<T> implements Closeable {
SnapshotHeaderRecord snapshotHeaderRecord,
long currentTimestamp
) {
- appendControlMessages((baseOffset, epoch, buffer) ->
+ appendControlMessages((baseOffset, epoch, compression, buffer) ->
MemoryRecords.withSnapshotHeaderRecord(
baseOffset,
currentTimestamp,
@@ -349,7 +359,7 @@ public class BatchAccumulator<T> implements Closeable {
SnapshotFooterRecord snapshotFooterRecord,
long currentTimestamp
) {
- appendControlMessages((baseOffset, epoch, buffer) ->
+ appendControlMessages((baseOffset, epoch, compression, buffer) ->
MemoryRecords.withSnapshotFooterRecord(
baseOffset,
currentTimestamp,
@@ -391,7 +401,6 @@ public class BatchAccumulator<T> implements Closeable {
compression,
nextOffset,
time.milliseconds(),
- false,
epoch,
maxBatchSize
);
diff --git
a/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java
b/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java
index 1133fe7fd4c..f0ac2de19bf 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/BatchBuilder.java
@@ -53,7 +53,6 @@ public class BatchBuilder<T> {
private final DataOutputStreamWritable recordOutput;
private final long baseOffset;
private final long appendTime;
- private final boolean isControlBatch;
private final int leaderEpoch;
private final int initialPosition;
private final int maxBytes;
@@ -70,7 +69,6 @@ public class BatchBuilder<T> {
Compression compression,
long baseOffset,
long appendTime,
- boolean isControlBatch,
int leaderEpoch,
int maxBytes
) {
@@ -81,7 +79,6 @@ public class BatchBuilder<T> {
this.baseOffset = baseOffset;
this.nextOffset = baseOffset;
this.appendTime = appendTime;
- this.isControlBatch = isControlBatch;
this.initialPosition = batchOutput.position();
this.leaderEpoch = leaderEpoch;
this.maxBytes = maxBytes;
@@ -255,7 +252,7 @@ public class BatchBuilder<T> {
RecordBatch.NO_PRODUCER_EPOCH,
RecordBatch.NO_SEQUENCE,
false,
- isControlBatch,
+ false,
false,
leaderEpoch,
numRecords()
diff --git
a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
index b57f5e104f9..b088a63c381 100644
---
a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
+++
b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
@@ -39,7 +39,7 @@ import java.util.OptionalLong;
* The KRaft state machine for tracking control records in the topic partition.
*
* This type keeps track of changes to the finalized kraft.version and the
sets of voters between
- * the latest snasphot and the log end offset.
+ * the latest snapshot and the log end offset.
*
* There are two type of actors/threads accessing this type. One is the KRaft
driver which indirectly call a lot of
* the public methods. The other actors/threads are the callers of {@code
RaftClient.createSnapshot} which
@@ -137,6 +137,15 @@ public final class KRaftControlRecordStateMachine {
}
}
+ /**
+ * Returns the offset of the last voter set.
+ */
+ public OptionalLong lastVoterSetOffset() {
+ synchronized (voterSetHistory) {
+ return voterSetHistory.lastVoterSetOffset();
+ }
+ }
+
/**
* Returns the last kraft version.
*/
diff --git
a/raft/src/main/java/org/apache/kafka/raft/internals/TreeMapLogHistory.java
b/raft/src/main/java/org/apache/kafka/raft/internals/TreeMapLogHistory.java
index 5625eeb5c17..8cf8fc7c843 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/TreeMapLogHistory.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/TreeMapLogHistory.java
@@ -29,9 +29,10 @@ public final class TreeMapLogHistory<T> implements
LogHistory<T> {
@Override
public void addAt(long offset, T value) {
- if (offset < 0) {
+ // we consider -1 a legal offset to account for loading values from
the 0-0.checkpoint
+ if (offset < -1) {
throw new IllegalArgumentException(
- String.format("Next offset %d must be greater than or equal to
0", offset)
+ String.format("Next offset %d must be greater than or equal to
-1", offset)
);
}
diff --git
a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java
b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java
index 26580795b6b..1b630c3e22c 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java
@@ -17,6 +17,7 @@
package org.apache.kafka.raft.internals;
import java.util.Optional;
+import java.util.OptionalLong;
/**
* A type for storing the historical value of the set of voters.
@@ -91,6 +92,23 @@ public final class VoterSetHistory {
.orElseThrow(() -> new IllegalStateException("No voter set
found"));
}
+ /**
+ * Returns the offset of the last voter set stored in the partition
history.
+ *
+ * Returns {@code OptionalLong.empty} if the last voter set is from the
static voters
+ * configuration.
+ *
+ * @return the offset storing the last voter set
+ */
+ public OptionalLong lastVoterSetOffset() {
+ Optional<LogHistory.Entry<VoterSet>> lastEntry =
votersHistory.lastEntry();
+ if (lastEntry.isPresent()) {
+ return OptionalLong.of(lastEntry.get().offset());
+ } else {
+ return OptionalLong.empty();
+ }
+ }
+
/**
* Removes all entries with an offset greater than or equal to {@code
endOffset}.
*
diff --git
a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
index 30176bdfc70..fde70c11c44 100644
--- a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
+++ b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
@@ -212,7 +212,7 @@ public final class RecordsSnapshotWriter<T> implements
SnapshotWriter<T> {
serde
);
- writer.accumulator.appendControlMessages((baseOffset, epoch,
buffer) -> {
+ writer.accumulator.appendControlMessages((baseOffset, epoch,
compression, buffer) -> {
long now = time.milliseconds();
try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
buffer,
diff --git a/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
b/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
index c3c008a1165..716c6e00f16 100644
--- a/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
+++ b/raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
@@ -45,6 +45,8 @@ public final class Snapshots {
private static final int OFFSET_WIDTH = 20;
private static final int EPOCH_WIDTH = 10;
+ public static final OffsetAndEpoch BOOTSTRAP_SNAPSHOT_ID = new
OffsetAndEpoch(0, 0);
+
static {
OFFSET_FORMATTER.setMinimumIntegerDigits(OFFSET_WIDTH);
OFFSET_FORMATTER.setGroupingUsed(false);
diff --git
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
new file mode 100644
index 00000000000..84a9af918b6
--- /dev/null
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.message.KRaftVersionRecord;
+import org.apache.kafka.common.message.LeaderChangeMessage;
+import org.apache.kafka.common.message.SnapshotFooterRecord;
+import org.apache.kafka.common.message.SnapshotHeaderRecord;
+import org.apache.kafka.common.message.VotersRecord;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.ControlRecordUtils;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.raft.internals.ReplicaKey;
+import org.apache.kafka.raft.internals.VoterSet;
+import org.apache.kafka.raft.internals.VoterSetTest;
+import org.apache.kafka.snapshot.RecordsSnapshotReader;
+import org.apache.kafka.snapshot.SnapshotReader;
+import org.apache.kafka.snapshot.SnapshotWriterReaderTest;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.stream.Stream;
+
+import static org.apache.kafka.raft.KafkaRaftClientTest.replicaKey;
+import static org.apache.kafka.snapshot.Snapshots.BOOTSTRAP_SNAPSHOT_ID;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class KafkaRaftClientReconfigTest {
+
+ @Test
+ public void testLeaderWritesBootstrapRecords() throws Exception {
+ ReplicaKey local = replicaKey(0, true);
+ ReplicaKey follower = replicaKey(1, true);
+
+ VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
+
+ RaftClientTestContext context = new
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+ .withKip853Rpc(true)
+ .withBootstrapSnapshot(Optional.of(voters))
+ .withUnknownLeader(0)
+ .build();
+
+ List<List<ControlRecord>> expectedBootstrapRecords = Arrays.asList(
+ Arrays.asList(
+ new ControlRecord(
+ ControlRecordType.SNAPSHOT_HEADER,
+ new SnapshotHeaderRecord()
+ .setVersion((short) 0)
+ .setLastContainedLogTimestamp(0)
+ ),
+ new ControlRecord(
+ ControlRecordType.KRAFT_VERSION,
+ new KRaftVersionRecord()
+
.setVersion(ControlRecordUtils.KRAFT_VERSION_CURRENT_VERSION)
+ .setKRaftVersion((short) 1)
+ ),
+ new ControlRecord(
+ ControlRecordType.KRAFT_VOTERS,
+
voters.toVotersRecord(ControlRecordUtils.KRAFT_VOTERS_CURRENT_VERSION)
+ )
+ ),
+ Arrays.asList(
+ new ControlRecord(
+ ControlRecordType.SNAPSHOT_FOOTER,
+ new SnapshotFooterRecord()
+ .setVersion((short) 0)
+ )
+ )
+ );
+
+ // check the bootstrap snapshot exists and contains the expected
records
+ assertEquals(BOOTSTRAP_SNAPSHOT_ID,
context.log.latestSnapshotId().get());
+ try (SnapshotReader<?> reader = RecordsSnapshotReader.of(
+ context.log.latestSnapshot().get(),
+ context.serde,
+ BufferSupplier.NO_CACHING,
+ KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
+ false
+ )
+ ) {
+
SnapshotWriterReaderTest.assertControlSnapshot(expectedBootstrapRecords,
reader);
+ }
+
+ context.becomeLeader();
+
+ // check if leader writes 3 bootstrap records to the log
+ Records records = context.log.read(0, Isolation.UNCOMMITTED).records;
+ RecordBatch batch = records.batches().iterator().next();
+ assertTrue(batch.isControlBatch());
+ Iterator<Record> recordIterator = batch.iterator();
+ Record record = recordIterator.next();
+ RaftClientTestContext.verifyLeaderChangeMessage(
+ local.id(),
+ Arrays.asList(local.id(), follower.id()),
+ Arrays.asList(local.id(), follower.id()),
+ record.key(),
+ record.value()
+ );
+ record = recordIterator.next();
+ verifyKRaftVersionRecord((short) 1, record.key(), record.value());
+ record = recordIterator.next();
+ verifyVotersRecord(voters, record.key(), record.value());
+ }
+
+ @Test
+ public void testBootstrapCheckpointIsNotReturnedOnFetch() throws Exception
{
+ ReplicaKey local = replicaKey(0, true);
+ ReplicaKey follower = replicaKey(1, true);
+
+ VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
+
+ RaftClientTestContext context = new
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+ .withKip853Rpc(true)
+ .withBootstrapSnapshot(Optional.of(voters))
+ .withUnknownLeader(0)
+ .build();
+
+ context.becomeLeader();
+ int epoch = context.currentEpoch();
+
+ // check that leader does not respond with bootstrap snapshot id when
follower fetches offset 0
+ context.deliverRequest(
+ context.fetchRequest(
+ epoch,
+ follower,
+ 0,
+ 0,
+ 0
+ )
+ );
+ context.pollUntilResponse();
+ context.assertSentFetchPartitionResponse(Errors.NONE, epoch,
OptionalInt.of(local.id()));
+ }
+
+ @Test
+ public void testLeaderDoesNotBootstrapRecordsWithKraftVersion0() throws
Exception {
+ ReplicaKey local = replicaKey(0, true);
+ ReplicaKey follower = replicaKey(1, true);
+
+ VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
+
+ RaftClientTestContext context = new
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+ .withStaticVoters(voters.voterIds())
+ .withBootstrapSnapshot(Optional.empty())
+ .withUnknownLeader(0)
+ .build();
+
+ List<List<ControlRecord>> expectedBootstrapRecords = Arrays.asList(
+ Arrays.asList(
+ new ControlRecord(
+ ControlRecordType.SNAPSHOT_HEADER,
+ new SnapshotHeaderRecord()
+ .setVersion((short) 0)
+ .setLastContainedLogTimestamp(0)
+ )
+ ),
+ Arrays.asList(
+ new ControlRecord(
+ ControlRecordType.SNAPSHOT_FOOTER,
+ new SnapshotFooterRecord()
+ .setVersion((short) 0)
+ )
+ )
+ );
+
+ // check the bootstrap snapshot exists but is empty
+ assertEquals(BOOTSTRAP_SNAPSHOT_ID,
context.log.latestSnapshotId().get());
+ try (SnapshotReader<?> reader = RecordsSnapshotReader.of(
+ context.log.latestSnapshot().get(),
+ context.serde,
+ BufferSupplier.NO_CACHING,
+ KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
+ false
+ )
+ ) {
+
SnapshotWriterReaderTest.assertControlSnapshot(expectedBootstrapRecords,
reader);
+ }
+
+ // check leader does not write bootstrap records to log
+ context.becomeLeader();
+
+ Records records = context.log.read(0, Isolation.UNCOMMITTED).records;
+ RecordBatch batch = records.batches().iterator().next();
+ assertTrue(batch.isControlBatch());
+ Iterator<Record> recordIterator = batch.iterator();
+ Record record = recordIterator.next();
+ RaftClientTestContext.verifyLeaderChangeMessage(
+ local.id(),
+ Arrays.asList(local.id(), follower.id()),
+ Arrays.asList(local.id(), follower.id()),
+ record.key(),
+ record.value()
+ );
+ assertFalse(recordIterator.hasNext());
+ }
+
+ @Test
+ public void testFollowerDoesNotRequestLeaderBootstrapSnapshot() throws
Exception {
+ ReplicaKey local = replicaKey(0, true);
+ ReplicaKey leader = replicaKey(1, true);
+ int epoch = 1;
+
+ VoterSet voters = VoterSetTest.voterSet(Stream.of(local, leader));
+
+ RaftClientTestContext context = new
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+ .withKip853Rpc(true)
+ .withBootstrapSnapshot(Optional.of(voters))
+ .withElectedLeader(epoch, leader.id())
+ .build();
+
+ // check that follower will send fetch request to leader
+ context.pollUntilRequest();
+ RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+
+ // check if leader response were to contain bootstrap snapshot id,
follower would not send fetch snapshot request
+ context.deliverResponse(
+ fetchRequest.correlationId(),
+ fetchRequest.destination(),
+ context.snapshotFetchResponse(epoch, leader.id(),
BOOTSTRAP_SNAPSHOT_ID, 0)
+ );
+ context.pollUntilRequest();
+ fetchRequest = context.assertSentFetchRequest();
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ }
+
+ @Test
+ public void testFollowerReadsKRaftBootstrapRecords() throws Exception {
+ ReplicaKey local = replicaKey(0, true);
+ ReplicaKey leader = replicaKey(1, true);
+ ReplicaKey follower = replicaKey(2, true);
+ VoterSet voterSet = VoterSetTest.voterSet(Stream.of(local, leader));
+ int epoch = 5;
+
+ RaftClientTestContext context = new
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+ .withKip853Rpc(true)
+ .withBootstrapSnapshot(Optional.of(voterSet))
+ .withElectedLeader(epoch, leader.id())
+ .build();
+
+ // check that follower will send fetch request to leader
+ context.pollUntilRequest();
+ RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+
+ // check that before receiving bootstrap records from leader, follower
is not in the voter set
+ assertFalse(context.client.quorum().isVoter(follower));
+
+ // leader sends batch with bootstrap records
+ VoterSet leadersVoterSet = VoterSetTest.voterSet(
+ Stream.concat(voterSet.voterKeys().stream(), Stream.of(follower))
+ );
+ ByteBuffer buffer = ByteBuffer.allocate(128);
+ try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
+ buffer,
+ RecordBatch.CURRENT_MAGIC_VALUE,
+ Compression.NONE,
+ TimestampType.CREATE_TIME,
+ 0, // baseOffset
+ 0, // logAppendTime
+ RecordBatch.NO_PRODUCER_ID,
+ RecordBatch.NO_PRODUCER_EPOCH,
+ RecordBatch.NO_SEQUENCE,
+ false, // isTransactional
+ true, // isControlBatch
+ epoch,
+ buffer.capacity()
+ )
+ ) {
+ builder.appendLeaderChangeMessage(
+ 0,
+ new LeaderChangeMessage()
+ );
+ builder.appendKRaftVersionMessage(
+ 0, // timesteamp
+ new KRaftVersionRecord()
+
.setVersion(ControlRecordUtils.KRAFT_VERSION_CURRENT_VERSION)
+ .setKRaftVersion((short) 1)
+ );
+ builder.appendVotersMessage(
+ 0, // timesteamp
+
leadersVoterSet.toVotersRecord(ControlRecordUtils.KRAFT_VOTERS_CURRENT_VERSION)
+ );
+ MemoryRecords leaderRecords = builder.build();
+ context.deliverResponse(
+ fetchRequest.correlationId(),
+ fetchRequest.destination(),
+ context.fetchResponse(epoch, leader.id(), leaderRecords, 0,
Errors.NONE)
+ );
+ }
+
+ // follower applies the bootstrap records, registering follower2 as a
new voter
+ context.client.poll();
+ assertTrue(context.client.quorum().isVoter(follower));
+ }
+
+ private static void verifyVotersRecord(
+ VoterSet expectedVoterSet,
+ ByteBuffer recordKey,
+ ByteBuffer recordValue
+ ) {
+ assertEquals(ControlRecordType.KRAFT_VOTERS,
ControlRecordType.parse(recordKey));
+ VotersRecord votersRecord =
ControlRecordUtils.deserializeVotersRecord(recordValue);
+ assertEquals(
+ expectedVoterSet,
+ VoterSet.fromVotersRecord(votersRecord)
+ );
+ }
+
+ private static void verifyKRaftVersionRecord(
+ short expectedKRaftVersion,
+ ByteBuffer recordKey,
+ ByteBuffer recordValue
+ ) {
+ assertEquals(ControlRecordType.KRAFT_VERSION,
ControlRecordType.parse(recordKey));
+ KRaftVersionRecord kRaftVersionRecord =
ControlRecordUtils.deserializeKRaftVersionRecord(recordValue);
+ assertEquals(expectedKRaftVersion, kRaftVersionRecord.kRaftVersion());
+ }
+}
diff --git
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
index 02a92e8303e..11fefd347ca 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
@@ -28,12 +28,15 @@ import
org.apache.kafka.common.requests.FetchSnapshotRequest;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.raft.internals.ReplicaKey;
import org.apache.kafka.raft.internals.StringSerde;
+import org.apache.kafka.raft.internals.VoterSet;
+import org.apache.kafka.raft.internals.VoterSetTest;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotWriter;
import org.apache.kafka.snapshot.RecordsSnapshotWriter;
import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.snapshot.SnapshotWriter;
import org.apache.kafka.snapshot.SnapshotWriterReaderTest;
+import org.apache.kafka.snapshot.Snapshots;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -50,6 +53,7 @@ import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -126,7 +130,7 @@ public final class KafkaRaftClientSnapshotTest {
// Check that listener was notified of the new snapshot
try (SnapshotReader<String> snapshot =
context.listener.drainHandledSnapshot().get()) {
assertEquals(snapshotId, snapshot.snapshotId());
- SnapshotWriterReaderTest.assertSnapshot(Collections.emptyList(),
snapshot);
+
SnapshotWriterReaderTest.assertDataSnapshot(Collections.emptyList(), snapshot);
}
}
@@ -168,7 +172,7 @@ public final class KafkaRaftClientSnapshotTest {
// Check that listener was notified of the new snapshot
try (SnapshotReader<String> snapshot =
context.listener.drainHandledSnapshot().get()) {
assertEquals(snapshotId, snapshot.snapshotId());
- SnapshotWriterReaderTest.assertSnapshot(Collections.emptyList(),
snapshot);
+
SnapshotWriterReaderTest.assertDataSnapshot(Collections.emptyList(), snapshot);
}
}
@@ -214,7 +218,7 @@ public final class KafkaRaftClientSnapshotTest {
// Check that the second listener was notified of the new snapshot
try (SnapshotReader<String> snapshot =
secondListener.drainHandledSnapshot().get()) {
assertEquals(snapshotId, snapshot.snapshotId());
- SnapshotWriterReaderTest.assertSnapshot(Collections.emptyList(),
snapshot);
+
SnapshotWriterReaderTest.assertDataSnapshot(Collections.emptyList(), snapshot);
}
}
@@ -251,7 +255,7 @@ public final class KafkaRaftClientSnapshotTest {
// Check that listener was notified of the new snapshot
try (SnapshotReader<String> snapshot =
context.listener.drainHandledSnapshot().get()) {
assertEquals(snapshotId, snapshot.snapshotId());
- SnapshotWriterReaderTest.assertSnapshot(Collections.emptyList(),
snapshot);
+
SnapshotWriterReaderTest.assertDataSnapshot(Collections.emptyList(), snapshot);
}
// Generate a new snapshot
@@ -270,7 +274,7 @@ public final class KafkaRaftClientSnapshotTest {
// Check that listener was notified of the second snapshot
try (SnapshotReader<String> snapshot =
context.listener.drainHandledSnapshot().get()) {
assertEquals(secondSnapshotId, snapshot.snapshotId());
- SnapshotWriterReaderTest.assertSnapshot(Collections.emptyList(),
snapshot);
+
SnapshotWriterReaderTest.assertDataSnapshot(Collections.emptyList(), snapshot);
}
}
@@ -663,7 +667,40 @@ public final class KafkaRaftClientSnapshotTest {
fetchSnapshotRequest(
context.metadataPartition,
epoch,
- new OffsetAndEpoch(0, 0),
+ Snapshots.BOOTSTRAP_SNAPSHOT_ID,
+ Integer.MAX_VALUE,
+ 0
+ )
+ );
+
+ context.client.poll();
+
+ FetchSnapshotResponseData.PartitionSnapshot response =
context.assertSentFetchSnapshotResponse(context.metadataPartition).get();
+ assertEquals(Errors.SNAPSHOT_NOT_FOUND,
Errors.forCode(response.errorCode()));
+ }
+
+ @Test
+ public void testFetchSnapshotRequestBootstrapSnapshot() throws Exception {
+ ReplicaKey localKey = replicaKey(0, true);
+ VoterSet voters = VoterSetTest.voterSet(
+ Stream.of(localKey, replicaKey(localKey.id() + 1, true))
+ );
+
+ RaftClientTestContext context = new RaftClientTestContext
+ .Builder(localKey.id(), localKey.directoryId().get())
+ .withKip853Rpc(true)
+ .withBootstrapSnapshot(Optional.of(voters))
+ .withUnknownLeader(3)
+ .build();
+
+ context.becomeLeader();
+ int epoch = context.currentEpoch();
+
+ context.deliverRequest(
+ fetchSnapshotRequest(
+ context.metadataPartition,
+ epoch,
+ Snapshots.BOOTSTRAP_SNAPSHOT_ID,
Integer.MAX_VALUE,
0
)
@@ -694,7 +731,7 @@ public final class KafkaRaftClientSnapshotTest {
fetchSnapshotRequest(
topicPartition,
epoch,
- new OffsetAndEpoch(0, 0),
+ Snapshots.BOOTSTRAP_SNAPSHOT_ID,
Integer.MAX_VALUE,
0
)
@@ -940,7 +977,7 @@ public final class KafkaRaftClientSnapshotTest {
int leaderId = localId + 1;
Set<Integer> voters = Utils.mkSet(localId, leaderId);
int epoch = 2;
- OffsetAndEpoch snapshotId = new OffsetAndEpoch(0, 0);
+ OffsetAndEpoch snapshotId = Snapshots.BOOTSTRAP_SNAPSHOT_ID;
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
.withElectedLeader(epoch, leaderId)
@@ -1030,7 +1067,7 @@ public final class KafkaRaftClientSnapshotTest {
public void testFetchSnapshotRequestWithOlderEpoch(boolean withKip853Rpc)
throws Exception {
int localId = 0;
Set<Integer> voters = Utils.mkSet(localId, localId + 1);
- OffsetAndEpoch snapshotId = new OffsetAndEpoch(0, 0);
+ OffsetAndEpoch snapshotId = Snapshots.BOOTSTRAP_SNAPSHOT_ID;
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
.withUnknownLeader(1)
@@ -1063,7 +1100,7 @@ public final class KafkaRaftClientSnapshotTest {
public void testFetchSnapshotRequestWithNewerEpoch(boolean withKip853Rpc)
throws Exception {
int localId = 0;
Set<Integer> voters = Utils.mkSet(localId, localId + 1);
- OffsetAndEpoch snapshotId = new OffsetAndEpoch(0, 0);
+ OffsetAndEpoch snapshotId = Snapshots.BOOTSTRAP_SNAPSHOT_ID;
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
.withUnknownLeader(1)
@@ -1218,12 +1255,12 @@ public final class KafkaRaftClientSnapshotTest {
// Check that the snapshot was written to the log
RawSnapshotReader snapshot =
context.log.readSnapshot(snapshotId).get();
assertEquals(memorySnapshot.buffer().remaining(),
snapshot.sizeInBytes());
-
SnapshotWriterReaderTest.assertSnapshot(Collections.singletonList(records),
snapshot);
+
SnapshotWriterReaderTest.assertDataSnapshot(Collections.singletonList(records),
snapshot);
// Check that listener was notified of the new snapshot
try (SnapshotReader<String> reader =
context.listener.drainHandledSnapshot().get()) {
assertEquals(snapshotId, reader.snapshotId());
-
SnapshotWriterReaderTest.assertSnapshot(Collections.singletonList(records),
reader);
+
SnapshotWriterReaderTest.assertDataSnapshot(Collections.singletonList(records),
reader);
}
}
@@ -1323,12 +1360,12 @@ public final class KafkaRaftClientSnapshotTest {
// Check that the snapshot was written to the log
RawSnapshotReader snapshot =
context.log.readSnapshot(snapshotId).get();
assertEquals(memorySnapshot.buffer().remaining(),
snapshot.sizeInBytes());
-
SnapshotWriterReaderTest.assertSnapshot(Collections.singletonList(records),
snapshot);
+
SnapshotWriterReaderTest.assertDataSnapshot(Collections.singletonList(records),
snapshot);
// Check that listener was notified of the new snapshot
try (SnapshotReader<String> reader =
context.listener.drainHandledSnapshot().get()) {
assertEquals(snapshotId, reader.snapshotId());
-
SnapshotWriterReaderTest.assertSnapshot(Collections.singletonList(records),
reader);
+
SnapshotWriterReaderTest.assertDataSnapshot(Collections.singletonList(records),
reader);
}
}
@@ -1801,7 +1838,7 @@ public final class KafkaRaftClientSnapshotTest {
otherNode,
context.metadataPartition,
epoch,
- new OffsetAndEpoch(0, 0),
+ Snapshots.BOOTSTRAP_SNAPSHOT_ID,
Integer.MAX_VALUE,
0
)
@@ -1816,7 +1853,7 @@ public final class KafkaRaftClientSnapshotTest {
otherNode,
context.metadataPartition,
epoch,
- new OffsetAndEpoch(0, 0),
+ Snapshots.BOOTSTRAP_SNAPSHOT_ID,
Integer.MAX_VALUE,
0
)
@@ -1831,7 +1868,7 @@ public final class KafkaRaftClientSnapshotTest {
otherNode,
context.metadataPartition,
epoch,
- new OffsetAndEpoch(0, 0),
+ Snapshots.BOOTSTRAP_SNAPSHOT_ID,
Integer.MAX_VALUE,
0
)
@@ -1846,7 +1883,7 @@ public final class KafkaRaftClientSnapshotTest {
otherNode,
context.metadataPartition,
epoch,
- new OffsetAndEpoch(0, 0),
+ Snapshots.BOOTSTRAP_SNAPSHOT_ID,
Integer.MAX_VALUE,
0
)
@@ -1977,7 +2014,7 @@ public final class KafkaRaftClientSnapshotTest {
return ReplicaKey.of(id, directoryId);
}
- private static FetchSnapshotRequestData fetchSnapshotRequest(
+ public static FetchSnapshotRequestData fetchSnapshotRequest(
TopicPartition topicPartition,
int epoch,
OffsetAndEpoch offsetAndEpoch,
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index a8c108bb2d9..9e1258ba666 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -3850,7 +3850,7 @@ public class KafkaRaftClientTest {
}
}
- private static ReplicaKey replicaKey(int id, boolean withDirectoryId) {
+ static ReplicaKey replicaKey(int id, boolean withDirectoryId) {
Uuid directoryId = withDirectoryId ? Uuid.randomUuid() :
ReplicaKey.NO_DIRECTORY_ID;
return ReplicaKey.of(id, directoryId);
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
index 22050505dfc..0d0849d1aa0 100644
--- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
@@ -36,6 +36,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.OptionalLong;
import java.util.stream.IntStream;
import java.util.stream.Stream;
@@ -58,6 +59,7 @@ public class LeaderStateTest {
private final int fetchTimeoutMs = 2000;
private final int checkQuorumTimeoutMs = (int) (fetchTimeoutMs *
CHECK_QUORUM_TIMEOUT_FACTOR);
private final int beginQuorumEpochTimeoutMs = fetchTimeoutMs / 2;
+ private final short kraftVersion = 1;
private LeaderState<?> newLeaderState(
VoterSet voters,
@@ -69,6 +71,8 @@ public class LeaderStateTest {
epoch,
epochStartOffset,
voters,
+ OptionalLong.of(0L),
+ kraftVersion,
voters.voterIds(),
accumulator,
voters.listeners(localReplicaKey.id()),
@@ -103,6 +107,7 @@ public class LeaderStateTest {
@Test
public void testRequireNonNullAccumulator() {
+ VoterSet voterSet = VoterSetTest.voterSet(Stream.of(localReplicaKey));
assertThrows(
NullPointerException.class,
() -> new LeaderState<>(
@@ -110,7 +115,9 @@ public class LeaderStateTest {
localReplicaKey,
epoch,
0,
- VoterSetTest.voterSet(Stream.of(localReplicaKey)),
+ voterSet,
+ OptionalLong.of(0),
+ kraftVersion,
Collections.emptySet(),
null,
Endpoints.empty(),
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockNetworkChannel.java
b/raft/src/test/java/org/apache/kafka/raft/MockNetworkChannel.java
index f9c3efee02a..b5698545bdf 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockNetworkChannel.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockNetworkChannel.java
@@ -18,6 +18,7 @@ package org.apache.kafka.raft;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.raft.internals.VoterSetTest;
import java.util.ArrayList;
import java.util.HashMap;
@@ -28,10 +29,11 @@ import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
public class MockNetworkChannel implements NetworkChannel {
+ public static final ListenerName LISTENER_NAME =
VoterSetTest.DEFAULT_LISTENER_NAME;
+
private final AtomicInteger correlationIdCounter;
private final List<RaftRequest.Outbound> sendQueue = new ArrayList<>();
private final Map<Integer, RaftRequest.Outbound> awaitingResponse = new
HashMap<>();
- private final ListenerName listenerName =
ListenerName.normalised("CONTROLLER");
public MockNetworkChannel(AtomicInteger correlationIdCounter) {
this.correlationIdCounter = correlationIdCounter;
@@ -53,7 +55,7 @@ public class MockNetworkChannel implements NetworkChannel {
@Override
public ListenerName listenerName() {
- return listenerName;
+ return LISTENER_NAME;
}
public List<RaftRequest.Outbound> drainSendQueue() {
diff --git a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java
b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java
index a5e13988c3d..ba1b179d3b5 100644
--- a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.raft.internals.BatchAccumulator;
+import org.apache.kafka.raft.internals.KRaftControlRecordStateMachine;
import org.apache.kafka.raft.internals.ReplicaKey;
import org.apache.kafka.raft.internals.VoterSet;
import org.apache.kafka.raft.internals.VoterSetTest;
@@ -61,11 +62,22 @@ public class QuorumStateTest {
VoterSet voterSet,
short kraftVersion
) {
+ KRaftControlRecordStateMachine mockPartitionState =
Mockito.mock(KRaftControlRecordStateMachine.class);
+
+ Mockito
+ .when(mockPartitionState.lastVoterSet())
+ .thenReturn(voterSet);
+ Mockito
+ .when(mockPartitionState.lastVoterSetOffset())
+ .thenReturn(kraftVersion == 0 ? OptionalLong.empty() :
OptionalLong.of(0));
+ Mockito
+ .when(mockPartitionState.lastKraftVersion())
+ .thenReturn(kraftVersion);
+
return new QuorumState(
localId,
localDirectoryId,
- () -> voterSet,
- () -> kraftVersion,
+ mockPartitionState,
localId.isPresent() ? voterSet.listeners(localId.getAsInt()) :
Endpoints.empty(),
electionTimeoutMs,
fetchTimeoutMs,
diff --git
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index 573c92b1120..13f12034fb2 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -58,8 +58,9 @@ import org.apache.kafka.raft.internals.ReplicaKey;
import org.apache.kafka.raft.internals.StringSerde;
import org.apache.kafka.raft.internals.VoterSet;
import org.apache.kafka.server.common.serialization.RecordSerde;
-import org.apache.kafka.snapshot.RawSnapshotWriter;
+import org.apache.kafka.snapshot.RecordsSnapshotWriter;
import org.apache.kafka.snapshot.SnapshotReader;
+import org.apache.kafka.snapshot.Snapshots;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
@@ -113,6 +114,7 @@ public final class RaftClientTestContext {
final Uuid clusterId;
private final OptionalInt localId;
public final Uuid localDirectoryId;
+ public final short kraftVersion;
public final KafkaRaftClient<String> client;
final Metrics metrics;
public final MockLog log;
@@ -120,7 +122,7 @@ public final class RaftClientTestContext {
final MockMessageQueue messageQueue;
final MockTime time;
final MockListener listener;
- final VoterSet voters;
+ final VoterSet startingVoters;
final Set<Integer> bootstrapIds;
// Used to determine which RPC request and response to construct
final boolean kip853Rpc;
@@ -147,9 +149,8 @@ public final class RaftClientTestContext {
private final LogContext logContext = new LogContext();
private final MockLog log = new MockLog(METADATA_PARTITION,
Uuid.METADATA_TOPIC_ID, logContext);
private final Uuid clusterId = Uuid.randomUuid();
- private final Set<Integer> voters;
private final OptionalInt localId;
- private final short kraftVersion = 0;
+ private final Uuid localDirectoryId;
private int requestTimeoutMs = DEFAULT_REQUEST_TIMEOUT_MS;
private int electionTimeoutMs = DEFAULT_ELECTION_TIMEOUT_MS;
@@ -157,35 +158,58 @@ public final class RaftClientTestContext {
private MemoryPool memoryPool = MemoryPool.NONE;
private List<InetSocketAddress> bootstrapServers =
Collections.emptyList();
private boolean kip853Rpc = false;
+ private short kraftVersion = 0;
+ private Optional<VoterSet> startingVoters = Optional.empty();
+ private boolean isStartingVotersStatic = false;
- public Builder(int localId, Set<Integer> voters) {
- this(OptionalInt.of(localId), voters);
+ public Builder(int localId, Set<Integer> staticVoters) {
+ this(OptionalInt.of(localId), staticVoters);
}
- public Builder(OptionalInt localId, Set<Integer> voters) {
- this.voters = voters;
+ public Builder(OptionalInt localId, Set<Integer> staticVoters) {
+ this(localId, Uuid.randomUuid());
+
+ withStaticVoters(staticVoters);
+ }
+
+ public Builder(int localId, Uuid localDirectoryId) {
+ this(OptionalInt.of(localId), localDirectoryId);
+ }
+
+ public Builder(OptionalInt localId, Uuid localDirectoryId) {
this.localId = localId;
+ this.localDirectoryId = localDirectoryId;
+ }
+
+ private static IllegalStateException missingStartingVoterException() {
+ return new IllegalStateException(
+ "The starting voter set must be set with withStaticVoters or
withBootstrapSnapshot"
+ );
}
+
Builder withElectedLeader(int epoch, int leaderId) {
+ VoterSet startingVoters =
this.startingVoters.orElseThrow(Builder::missingStartingVoterException);
quorumStateStore.writeElectionState(
- ElectionState.withElectedLeader(epoch, leaderId, voters),
+ ElectionState.withElectedLeader(epoch, leaderId,
startingVoters.voterIds()),
kraftVersion
);
return this;
}
Builder withUnknownLeader(int epoch) {
+ VoterSet startingVoters =
this.startingVoters.orElseThrow(Builder::missingStartingVoterException);
quorumStateStore.writeElectionState(
- ElectionState.withUnknownLeader(epoch, voters),
+ ElectionState.withUnknownLeader(epoch,
startingVoters.voterIds()),
kraftVersion
);
return this;
}
Builder withVotedCandidate(int epoch, ReplicaKey votedKey) {
+ VoterSet startingVoters =
this.startingVoters.orElseThrow(Builder::missingStartingVoterException);
quorumStateStore.writeElectionState(
- ElectionState.withVotedCandidate(epoch, votedKey, voters),
+ ElectionState.withVotedCandidate(epoch, votedKey,
startingVoters.voterIds()),
kraftVersion
);
return this;
@@ -226,9 +250,15 @@ public final class RaftClientTestContext {
}
Builder withEmptySnapshot(OffsetAndEpoch snapshotId) {
- try (RawSnapshotWriter snapshot =
log.createNewSnapshotUnchecked(snapshotId).get()) {
+ try (RecordsSnapshotWriter<?> snapshot = new
RecordsSnapshotWriter.Builder()
+ .setTime(time)
+ .setKraftVersion((short) 0)
+
.setRawSnapshotWriter(log.createNewSnapshotUnchecked(snapshotId).get())
+ .build(SERDE)
+ ) {
snapshot.freeze();
}
+
return this;
}
@@ -261,27 +291,75 @@ public final class RaftClientTestContext {
return this;
}
+ Builder withStaticVoters(Set<Integer> staticVoters) {
+ Map<Integer, InetSocketAddress> staticVoterAddressMap =
staticVoters
+ .stream()
+ .collect(
+ Collectors.toMap(Function.identity(),
RaftClientTestContext::mockAddress)
+ );
+
+ this.startingVoters = Optional.of(
+ VoterSet.fromInetSocketAddresses(
+ MockNetworkChannel.LISTENER_NAME,
+ staticVoterAddressMap
+ )
+ );
+ this.isStartingVotersStatic = true;
+
+ return this;
+ }
+
+ Builder withBootstrapSnapshot(Optional<VoterSet> voters) {
+ if (voters.isPresent()) {
+ kraftVersion = 1;
+
+ startingVoters = voters;
+ isStartingVotersStatic = false;
+
+ RecordsSnapshotWriter.Builder builder = new
RecordsSnapshotWriter.Builder()
+ .setRawSnapshotWriter(
+
log.createNewSnapshotUnchecked(Snapshots.BOOTSTRAP_SNAPSHOT_ID).get()
+ )
+ .setKraftVersion(kraftVersion)
+ .setVoterSet(voters);
+
+ try (RecordsSnapshotWriter<String> writer =
builder.build(SERDE)) {
+ writer.freeze();
+ }
+ } else {
+ // Create an empty bootstrap snapshot if there is no voter set
+ kraftVersion = 0;
+ withEmptySnapshot(Snapshots.BOOTSTRAP_SNAPSHOT_ID);
+ }
+
+ return this;
+ }
+
public RaftClientTestContext build() throws IOException {
+ VoterSet startingVoters =
this.startingVoters.orElseThrow(Builder::missingStartingVoterException);
+
Metrics metrics = new Metrics(time);
MockNetworkChannel channel = new MockNetworkChannel();
MockListener listener = new MockListener(localId);
- Map<Integer, InetSocketAddress> voterAddressMap = voters
- .stream()
- .collect(Collectors.toMap(Function.identity(),
RaftClientTestContext::mockAddress));
-
- VoterSet voterSet = VoterSet.fromInetSocketAddresses(
- channel.listenerName(),
- voterAddressMap
- );
+ Map<Integer, InetSocketAddress> staticVoterAddressMap =
Collections.emptyMap();
+ if (isStartingVotersStatic) {
+ staticVoterAddressMap = startingVoters
+ .voterNodes(startingVoters.voterIds().stream(),
channel.listenerName())
+ .stream()
+ .collect(
+ Collectors.toMap(
+ Node::id,
+ node ->
InetSocketAddress.createUnresolved(node.host(), node.port())
+ )
+ );
+ }
// Compute the local listeners. Only potential voters/leader need
to provide the local listeners
// If the local id is not set (must be observer), the local
listener can be empty.
Endpoints localListeners = localId.isPresent() ?
- voterSet.listeners(localId.getAsInt()) :
+ startingVoters.listeners(localId.getAsInt()) :
Endpoints.empty();
- Uuid localDirectoryId = Uuid.randomUuid();
-
QuorumConfig quorumConfig = new QuorumConfig(
requestTimeoutMs,
RETRY_BACKOFF_MS,
@@ -312,7 +390,7 @@ public final class RaftClientTestContext {
client.register(listener);
client.initialize(
- voterAddressMap,
+ staticVoterAddressMap,
quorumStateStore,
metrics
);
@@ -321,13 +399,14 @@ public final class RaftClientTestContext {
clusterId,
localId,
localDirectoryId,
+ kraftVersion,
client,
log,
channel,
messageQueue,
time,
quorumStateStore,
- voterSet,
+ startingVoters,
IntStream
.iterate(-2, id -> id - 1)
.limit(bootstrapServers.size())
@@ -351,13 +430,14 @@ public final class RaftClientTestContext {
Uuid clusterId,
OptionalInt localId,
Uuid localDirectoryId,
+ short kraftVersion,
KafkaRaftClient<String> client,
MockLog log,
MockNetworkChannel channel,
MockMessageQueue messageQueue,
MockTime time,
QuorumStateStore quorumStateStore,
- VoterSet voters,
+ VoterSet startingVoters,
Set<Integer> bootstrapIds,
boolean kip853Rpc,
Metrics metrics,
@@ -366,13 +446,14 @@ public final class RaftClientTestContext {
this.clusterId = clusterId;
this.localId = localId;
this.localDirectoryId = localDirectoryId;
+ this.kraftVersion = kraftVersion;
this.client = client;
this.log = log;
this.channel = channel;
this.messageQueue = messageQueue;
this.time = time;
this.quorumStateStore = quorumStateStore;
- this.voters = voters;
+ this.startingVoters = startingVoters;
this.bootstrapIds = bootstrapIds;
this.kip853Rpc = kip853Rpc;
this.metrics = metrics;
@@ -412,7 +493,6 @@ public final class RaftClientTestContext {
Compression.NONE,
baseOffset,
timestamp,
- false,
epoch,
512
);
@@ -512,22 +592,23 @@ public final class RaftClientTestContext {
ElectionState.withVotedCandidate(
epoch,
ReplicaKey.of(candidateId, ReplicaKey.NO_DIRECTORY_ID),
- voters.voterIds()
+ startingVoters.voterIds()
),
quorumStateStore.readElectionState().get()
);
}
public void assertElectedLeader(int epoch, int leaderId) {
+ Set<Integer> voters = kraftVersion == 0 ? startingVoters.voterIds() :
Collections.emptySet();
assertEquals(
- ElectionState.withElectedLeader(epoch, leaderId,
voters.voterIds()),
+ ElectionState.withElectedLeader(epoch, leaderId, voters),
quorumStateStore.readElectionState().get()
);
}
void assertUnknownLeader(int epoch) {
assertEquals(
- ElectionState.withUnknownLeader(epoch, voters.voterIds()),
+ ElectionState.withUnknownLeader(epoch, startingVoters.voterIds()),
quorumStateStore.readElectionState().get()
);
}
@@ -535,7 +616,7 @@ public final class RaftClientTestContext {
void assertResignedLeader(int epoch, int leaderId) {
assertTrue(client.quorum().isResigned());
assertEquals(
- ElectionState.withElectedLeader(epoch, leaderId,
voters.voterIds()),
+ ElectionState.withElectedLeader(epoch, leaderId,
startingVoters.voterIds()),
quorumStateStore.readElectionState().get()
);
}
@@ -616,7 +697,7 @@ public final class RaftClientTestContext {
assertEquals(leaderId.orElse(-1), partitionResponse.leaderId());
if (kip853Rpc && leaderId.isPresent()) {
- Endpoints expectedLeaderEndpoints =
voters.listeners(leaderId.getAsInt());
+ Endpoints expectedLeaderEndpoints =
startingVoters.listeners(leaderId.getAsInt());
Endpoints responseEndpoints = Endpoints.fromVoteResponse(
channel.listenerName(),
leaderId.getAsInt(),
@@ -731,7 +812,7 @@ public final class RaftClientTestContext {
.get(0);
if (kip853Rpc && partitionResponse.leaderId() >= 0) {
int leaderId = partitionResponse.leaderId();
- Endpoints expectedLeaderEndpoints = voters.listeners(leaderId);
+ Endpoints expectedLeaderEndpoints =
startingVoters.listeners(leaderId);
Endpoints responseEndpoints =
Endpoints.fromBeginQuorumEpochResponse(
channel.listenerName(),
leaderId,
@@ -762,7 +843,7 @@ public final class RaftClientTestContext {
assertEquals(partitionError,
Errors.forCode(partitionResponse.errorCode()));
if (kip853Rpc && leaderId.isPresent()) {
- Endpoints expectedLeaderEndpoints =
voters.listeners(leaderId.getAsInt());
+ Endpoints expectedLeaderEndpoints =
startingVoters.listeners(leaderId.getAsInt());
Endpoints responseEndpoints =
Endpoints.fromBeginQuorumEpochResponse(
channel.listenerName(),
leaderId.getAsInt(),
@@ -800,7 +881,7 @@ public final class RaftClientTestContext {
.get(0);
if (kip853Rpc && partitionResponse.leaderId() >= 0) {
int leaderId = partitionResponse.leaderId();
- Endpoints expectedLeaderEndpoints = voters.listeners(leaderId);
+ Endpoints expectedLeaderEndpoints =
startingVoters.listeners(leaderId);
Endpoints responseEndpoints =
Endpoints.fromEndQuorumEpochResponse(
channel.listenerName(),
leaderId,
@@ -831,7 +912,7 @@ public final class RaftClientTestContext {
assertEquals(partitionError,
Errors.forCode(partitionResponse.errorCode()));
if (kip853Rpc && leaderId.isPresent()) {
- Endpoints expectedLeaderEndpoints =
voters.listeners(leaderId.getAsInt());
+ Endpoints expectedLeaderEndpoints =
startingVoters.listeners(leaderId.getAsInt());
Endpoints responseEndpoints = Endpoints.fromEndQuorumEpochResponse(
channel.listenerName(),
leaderId.getAsInt(),
@@ -876,7 +957,7 @@ public final class RaftClientTestContext {
FetchResponseData.PartitionData partitionResponse =
response.responses().get(0).partitions().get(0);
if (kip853Rpc && partitionResponse.currentLeader().leaderId() >= 0) {
int leaderId = partitionResponse.currentLeader().leaderId();
- Endpoints expectedLeaderEndpoints = voters.listeners(leaderId);
+ Endpoints expectedLeaderEndpoints =
startingVoters.listeners(leaderId);
Endpoints responseEndpoints = Endpoints.fromFetchResponse(
channel.listenerName(),
leaderId,
@@ -966,7 +1047,7 @@ public final class RaftClientTestContext {
if (result.isPresent() && kip853Rpc &&
result.get().currentLeader().leaderId() >= 0) {
int leaderId = result.get().currentLeader().leaderId();
- Endpoints expectedLeaderEndpoints = voters.listeners(leaderId);
+ Endpoints expectedLeaderEndpoints =
startingVoters.listeners(leaderId);
Endpoints responseEndpoints = Endpoints.fromFetchSnapshotResponse(
channel.listenerName(),
leaderId,
@@ -1014,8 +1095,8 @@ public final class RaftClientTestContext {
RaftRequest.Outbound fetchRequest = assertSentFetchRequest();
int destinationId = fetchRequest.destination().id();
assertTrue(
- voters.voterIds().contains(destinationId) ||
bootstrapIds.contains(destinationId),
- String.format("id %d is not in sets %s or %s", destinationId,
voters, bootstrapIds)
+ startingVoters.voterIds().contains(destinationId) ||
bootstrapIds.contains(destinationId),
+ String.format("id %d is not in sets %s or %s", destinationId,
startingVoters, bootstrapIds)
);
assertFetchRequestData(fetchRequest, 0, 0L, 0);
@@ -1061,7 +1142,7 @@ public final class RaftClientTestContext {
Errors.NONE,
epoch,
leaderId.orElse(-1),
- leaderId.isPresent() ? voters.listeners(leaderId.getAsInt()) :
Endpoints.empty()
+ leaderId.isPresent() ?
startingVoters.listeners(leaderId.getAsInt()) : Endpoints.empty()
);
}
@@ -1116,7 +1197,7 @@ public final class RaftClientTestContext {
clusterId,
epoch,
leaderId,
- voters.listeners(leaderId),
+ startingVoters.listeners(leaderId),
voterKey
);
}
@@ -1130,7 +1211,7 @@ public final class RaftClientTestContext {
Errors.NONE,
epoch,
leaderId,
- voters.listeners(leaderId)
+ startingVoters.listeners(leaderId)
);
}
@@ -1199,7 +1280,7 @@ public final class RaftClientTestContext {
epoch,
leaderId.orElse(-1),
voteGranted,
- leaderId.isPresent() ? voters.listeners(leaderId.getAsInt()) :
Endpoints.empty()
+ leaderId.isPresent() ?
startingVoters.listeners(leaderId.getAsInt()) : Endpoints.empty()
);
}
@@ -1232,10 +1313,20 @@ public final class RaftClientTestContext {
LeaderChangeMessage leaderChangeMessage =
ControlRecordUtils.deserializeLeaderChangeMessage(recordValue);
assertEquals(leaderId, leaderChangeMessage.leaderId());
- assertEquals(voters.stream().map(voterId -> new
Voter().setVoterId(voterId)).collect(Collectors.toList()),
- leaderChangeMessage.voters());
- assertEquals(grantingVoters.stream().map(voterId -> new
Voter().setVoterId(voterId)).collect(Collectors.toSet()),
- new HashSet<>(leaderChangeMessage.grantingVoters()));
+ assertEquals(
+ voters
+ .stream()
+ .map(voterId -> new Voter().setVoterId(voterId))
+ .collect(Collectors.toList()),
+ leaderChangeMessage.voters()
+ );
+ assertEquals(
+ grantingVoters
+ .stream()
+ .map(voterId -> new Voter().setVoterId(voterId))
+ .collect(Collectors.toSet()),
+ new HashSet<>(leaderChangeMessage.grantingVoters())
+ );
}
void assertFetchRequestData(
@@ -1263,7 +1354,7 @@ public final class RaftClientTestContext {
assertEquals(localId.orElse(-1), request.replicaState().replicaId());
// Assert that voters have flushed up to the fetch offset
- if (localId.isPresent() &&
voters.voterIds().contains(localId.getAsInt())) {
+ if (localId.isPresent() &&
startingVoters.voterIds().contains(localId.getAsInt())) {
assertEquals(
log.firstUnflushedOffset(),
fetchOffset,
@@ -1306,13 +1397,16 @@ public final class RaftClientTestContext {
FetchRequestData request = RaftUtil.singletonFetchRequest(
metadataPartition,
metadataTopicId,
- fetchPartition -> fetchPartition
- .setCurrentLeaderEpoch(epoch)
- .setLastFetchedEpoch(lastFetchedEpoch)
- .setFetchOffset(fetchOffset)
- .setReplicaDirectoryId(
- replicaKey.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID)
- )
+ fetchPartition -> {
+ fetchPartition
+ .setCurrentLeaderEpoch(epoch)
+ .setLastFetchedEpoch(lastFetchedEpoch)
+ .setFetchOffset(fetchOffset);
+ if (kip853Rpc) {
+ fetchPartition
+
.setReplicaDirectoryId(replicaKey.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID));
+ }
+ }
);
return request
.setMaxWaitMs(maxWaitTimeMs)
@@ -1336,7 +1430,7 @@ public final class RaftClientTestContext {
metadataTopicId,
Errors.NONE,
leaderId,
- voters.listeners(leaderId),
+ startingVoters.listeners(leaderId),
partitionData -> {
partitionData
.setRecords(records)
@@ -1364,7 +1458,7 @@ public final class RaftClientTestContext {
metadataTopicId,
Errors.NONE,
leaderId,
- voters.listeners(leaderId),
+ startingVoters.listeners(leaderId),
partitionData -> {
partitionData.setHighWatermark(highWatermark);
@@ -1392,7 +1486,7 @@ public final class RaftClientTestContext {
metadataTopicId,
Errors.NONE,
leaderId,
- voters.listeners(leaderId),
+ startingVoters.listeners(leaderId),
partitionData -> {
partitionData.setHighWatermark(highWatermark);
@@ -1416,7 +1510,7 @@ public final class RaftClientTestContext {
fetchSnapshotRpcVersion(),
metadataPartition,
leaderId,
- voters.listeners(leaderId),
+ startingVoters.listeners(leaderId),
operator
);
}
@@ -1513,7 +1607,7 @@ public final class RaftClientTestContext {
assertEquals(localId, currentLeader());
long localLogEndOffset = log.endOffset().offset();
- Iterable<ReplicaKey> followers = () -> voters
+ Iterable<ReplicaKey> followers = () -> startingVoters
.voterKeys()
.stream()
.filter(voterKey -> voterKey.id() != localId.getAsInt())
diff --git
a/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java
b/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java
index bb52b38e8e6..8a4b774bae7 100644
---
a/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java
+++
b/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java
@@ -580,9 +580,16 @@ class BatchAccumulatorTest {
maxBatchSize
)
) {
- acc.appendControlMessages((offset, epoch, buf) -> {
+ acc.appendControlMessages((offset, epoch, compression, buf) -> {
long now = 1234;
- try (MemoryRecordsBuilder builder =
controlRecordsBuilder(offset, epoch, now, buf)) {
+ try (MemoryRecordsBuilder builder = controlRecordsBuilder(
+ offset,
+ epoch,
+ compression,
+ now,
+ buf
+ )
+ ) {
builder.appendSnapshotHeaderMessage(
now,
new SnapshotHeaderRecord()
@@ -624,9 +631,16 @@ class BatchAccumulatorTest {
Mockito.when(memoryPool.tryAllocate(maxBatchSize))
.thenReturn(buffer);
- BatchAccumulator.MemoryRecordsCreator creator = (offset, epoch, buf)
-> {
+ BatchAccumulator.MemoryRecordsCreator creator = (offset, epoch,
compression, buf) -> {
long now = 1234;
- try (MemoryRecordsBuilder builder = controlRecordsBuilder(offset +
1, epoch, now, buf)) {
+ try (MemoryRecordsBuilder builder = controlRecordsBuilder(
+ offset + 1,
+ epoch,
+ compression,
+ now,
+ buf
+ )
+ ) {
builder.appendSnapshotHeaderMessage(
now,
new SnapshotHeaderRecord()
@@ -660,9 +674,16 @@ class BatchAccumulatorTest {
Mockito.when(memoryPool.tryAllocate(maxBatchSize))
.thenReturn(buffer);
- BatchAccumulator.MemoryRecordsCreator creator = (offset, epoch, buf)
-> {
+ BatchAccumulator.MemoryRecordsCreator creator = (offset, epoch,
compression, buf) -> {
long now = 1234;
- try (MemoryRecordsBuilder builder = controlRecordsBuilder(offset,
epoch + 1, now, buf)) {
+ try (MemoryRecordsBuilder builder = controlRecordsBuilder(
+ offset,
+ epoch + 1,
+ compression,
+ now,
+ buf
+ )
+ ) {
builder.appendSnapshotHeaderMessage(
now,
new SnapshotHeaderRecord()
@@ -696,9 +717,16 @@ class BatchAccumulatorTest {
Mockito.when(memoryPool.tryAllocate(maxBatchSize))
.thenReturn(buffer);
- BatchAccumulator.MemoryRecordsCreator creator = (offset, epoch, buf)
-> {
+ BatchAccumulator.MemoryRecordsCreator creator = (offset, epoch,
compression, buf) -> {
long now = 1234;
- try (MemoryRecordsBuilder builder = controlRecordsBuilder(offset,
epoch, now, buf)) {
+ try (MemoryRecordsBuilder builder = controlRecordsBuilder(
+ offset,
+ epoch,
+ compression,
+ now,
+ buf
+ )
+ ) {
// Create a control batch without any records
return builder.build();
}
@@ -718,13 +746,14 @@ class BatchAccumulatorTest {
private static MemoryRecordsBuilder controlRecordsBuilder(
long baseOffset,
int epoch,
+ Compression compression,
long now,
ByteBuffer buffer
) {
return new MemoryRecordsBuilder(
buffer,
RecordBatch.CURRENT_MAGIC_VALUE,
- Compression.NONE,
+ compression,
TimestampType.CREATE_TIME,
baseOffset,
now,
diff --git
a/raft/src/test/java/org/apache/kafka/raft/internals/BatchBuilderTest.java
b/raft/src/test/java/org/apache/kafka/raft/internals/BatchBuilderTest.java
index 5e82c0be82c..ef0a648beaf 100644
--- a/raft/src/test/java/org/apache/kafka/raft/internals/BatchBuilderTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/internals/BatchBuilderTest.java
@@ -47,7 +47,6 @@ class BatchBuilderTest {
ByteBuffer buffer = ByteBuffer.allocate(1024);
long baseOffset = 57;
long logAppendTime = time.milliseconds();
- boolean isControlBatch = false;
int leaderEpoch = 15;
Compression compression = Compression.of(compressionType).build();
BatchBuilder<String> builder = new BatchBuilder<>(
@@ -56,7 +55,6 @@ class BatchBuilderTest {
compression,
baseOffset,
logAppendTime,
- isControlBatch,
leaderEpoch,
buffer.limit()
);
@@ -83,7 +81,7 @@ class BatchBuilderTest {
assertEquals(compressionType, batch.compressionType());
assertEquals(baseOffset, batch.baseOffset());
assertEquals(logAppendTime, batch.maxTimestamp());
- assertEquals(isControlBatch, batch.isControlBatch());
+ assertEquals(false, batch.isControlBatch());
assertEquals(leaderEpoch, batch.partitionLeaderEpoch());
List<String> builtRecords = Utils.toList(batch).stream()
@@ -99,7 +97,6 @@ class BatchBuilderTest {
ByteBuffer buffer = ByteBuffer.allocate(batchSize);
long baseOffset = 57;
long logAppendTime = time.milliseconds();
- boolean isControlBatch = false;
int leaderEpoch = 15;
BatchBuilder<String> builder = new BatchBuilder<>(
@@ -108,7 +105,6 @@ class BatchBuilderTest {
Compression.NONE,
baseOffset,
logAppendTime,
- isControlBatch,
leaderEpoch,
buffer.limit()
);
diff --git
a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java
b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java
index 1b45f8c0a6f..4d9ef1c2574 100644
---
a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java
+++
b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java
@@ -64,11 +64,22 @@ public class KafkaRaftMetricsTest {
}
private QuorumState buildQuorumState(VoterSet voterSet, short
kraftVersion) {
+ KRaftControlRecordStateMachine mockPartitionState =
Mockito.mock(KRaftControlRecordStateMachine.class);
+
+ Mockito
+ .when(mockPartitionState.lastVoterSet())
+ .thenReturn(voterSet);
+ Mockito
+ .when(mockPartitionState.lastVoterSetOffset())
+ .thenReturn(kraftVersion == 0 ? OptionalLong.empty() :
OptionalLong.of(0));
+ Mockito
+ .when(mockPartitionState.lastKraftVersion())
+ .thenReturn(kraftVersion);
+
return new QuorumState(
OptionalInt.of(localId),
localDirectoryId,
- () -> voterSet,
- () -> kraftVersion,
+ mockPartitionState,
voterSet.listeners(localId),
electionTimeoutMs,
fetchTimeoutMs,
diff --git
a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
index 9acf142bce8..cee5d68bc99 100644
---
a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
+++
b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
@@ -411,7 +411,6 @@ public final class RecordsIteratorTest {
compression,
batch.baseOffset,
batch.appendTimestamp,
- false,
batch.epoch,
1024
);
diff --git
a/raft/src/test/java/org/apache/kafka/raft/internals/TreeMapLogHistoryTest.java
b/raft/src/test/java/org/apache/kafka/raft/internals/TreeMapLogHistoryTest.java
index b428eda25a2..a1e5a4d1a32 100644
---
a/raft/src/test/java/org/apache/kafka/raft/internals/TreeMapLogHistoryTest.java
+++
b/raft/src/test/java/org/apache/kafka/raft/internals/TreeMapLogHistoryTest.java
@@ -34,9 +34,12 @@ public final class TreeMapLogHistoryTest {
@Test
void testAddAt() {
TreeMapLogHistory<String> history = new TreeMapLogHistory<>();
- assertThrows(IllegalArgumentException.class, () -> history.addAt(-1,
""));
+ assertThrows(IllegalArgumentException.class, () -> history.addAt(-2,
""));
assertEquals(Optional.empty(), history.lastEntry());
+ history.addAt(-1, "-1");
+ assertEquals(Optional.of("-1"), history.valueAtOrBefore(-1));
+ assertEquals(Optional.of("-1"), history.valueAtOrBefore(0));
history.addAt(100, "100");
assertThrows(IllegalArgumentException.class, () -> history.addAt(99,
""));
assertThrows(IllegalArgumentException.class, () -> history.addAt(100,
""));
@@ -44,7 +47,8 @@ public final class TreeMapLogHistoryTest {
assertEquals(Optional.of("100"), history.valueAtOrBefore(201));
history.addAt(200, "200");
- assertEquals(Optional.empty(), history.valueAtOrBefore(99));
+ assertEquals(Optional.empty(), history.valueAtOrBefore(-2));
+ assertEquals(Optional.of("-1"), history.valueAtOrBefore(-1));
assertEquals(Optional.of("100"), history.valueAtOrBefore(100));
assertEquals(Optional.of("100"), history.valueAtOrBefore(101));
assertEquals(Optional.of("100"), history.valueAtOrBefore(199));
diff --git
a/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetHistoryTest.java
b/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetHistoryTest.java
index 4931614fed0..4cefd448773 100644
---
a/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetHistoryTest.java
+++
b/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetHistoryTest.java
@@ -66,7 +66,7 @@ public final class VoterSetHistoryTest {
assertThrows(
IllegalArgumentException.class,
- () -> votersHistory.addAt(-1, new
VoterSet(VoterSetTest.voterMap(IntStream.of(1, 2, 3), true)))
+ () -> votersHistory.addAt(-2, new
VoterSet(VoterSetTest.voterMap(IntStream.of(1, 2, 3), true)))
);
assertEquals(staticVoterSet, votersHistory.lastValue());
@@ -88,6 +88,37 @@ public final class VoterSetHistoryTest {
assertEquals(Optional.of(removedVoterSet),
votersHistory.valueAtOrBefore(200));
}
+ @Test
+ void testBootstrapAddAt() {
+ Map<Integer, VoterSet.VoterNode> voterMap =
VoterSetTest.voterMap(IntStream.of(1, 2, 3), true);
+ VoterSet bootstrapVoterSet = new VoterSet(new HashMap<>(voterMap));
+ VoterSetHistory votersHistory = new VoterSetHistory(Optional.empty());
+
+ votersHistory.addAt(-1, bootstrapVoterSet);
+ assertEquals(bootstrapVoterSet, votersHistory.lastValue());
+ assertEquals(Optional.empty(), votersHistory.valueAtOrBefore(-2));
+ assertEquals(Optional.of(bootstrapVoterSet),
votersHistory.valueAtOrBefore(-1));
+
+ voterMap.put(4, VoterSetTest.voterNode(4, true));
+ VoterSet addedVoterSet = new VoterSet(new HashMap<>(voterMap));
+ votersHistory.addAt(100, addedVoterSet);
+
+ assertEquals(addedVoterSet, votersHistory.lastValue());
+ assertEquals(Optional.empty(), votersHistory.valueAtOrBefore(-2));
+ assertEquals(Optional.of(bootstrapVoterSet),
votersHistory.valueAtOrBefore(99));
+ assertEquals(Optional.of(addedVoterSet),
votersHistory.valueAtOrBefore(100));
+
+ voterMap.remove(4);
+ VoterSet removedVoterSet = new VoterSet(new HashMap<>(voterMap));
+ votersHistory.addAt(200, removedVoterSet);
+
+ assertEquals(removedVoterSet, votersHistory.lastValue());
+ assertEquals(Optional.empty(), votersHistory.valueAtOrBefore(-2));
+ assertEquals(Optional.of(bootstrapVoterSet),
votersHistory.valueAtOrBefore(99));
+ assertEquals(Optional.of(addedVoterSet),
votersHistory.valueAtOrBefore(199));
+ assertEquals(Optional.of(removedVoterSet),
votersHistory.valueAtOrBefore(200));
+ }
+
@Test
void testAddAtNonOverlapping() {
VoterSetHistory votersHistory = new VoterSetHistory(Optional.empty());
diff --git
a/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetTest.java
b/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetTest.java
index ded0ac0137a..614aeaf29ef 100644
--- a/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetTest.java
@@ -55,7 +55,7 @@ public final class VoterSetTest {
void testVoterNode() {
VoterSet voterSet = new VoterSet(voterMap(IntStream.of(1, 2, 3),
true));
assertEquals(
- Optional.of(new Node(1, "replica-1", 1234)),
+ Optional.of(new Node(1, "localhost", 9991)),
voterSet.voterNode(1, DEFAULT_LISTENER_NAME)
);
assertEquals(Optional.empty(), voterSet.voterNode(1,
ListenerName.normalised("MISSING")));
@@ -67,7 +67,7 @@ public final class VoterSetTest {
VoterSet voterSet = new VoterSet(voterMap(IntStream.of(1, 2, 3),
true));
assertEquals(
- Utils.mkSet(new Node(1, "replica-1", 1234), new Node(2,
"replica-2", 1234)),
+ Utils.mkSet(new Node(1, "localhost", 9991), new Node(2,
"localhost", 9992)),
voterSet.voterNodes(IntStream.of(1, 2).boxed(),
DEFAULT_LISTENER_NAME)
);
@@ -324,8 +324,8 @@ public final class VoterSetTest {
Collections.singletonMap(
DEFAULT_LISTENER_NAME,
InetSocketAddress.createUnresolved(
- String.format("replica-%d", replicaKey.id()),
- 1234
+ "localhost",
+ 9990 + replicaKey.id()
)
)
),
diff --git
a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
index 0bc823a2728..e0243740c95 100644
--- a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
+++ b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.BufferSupplier.GrowableBufferSupplier;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.ControlRecord;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.RaftClientTestContext;
import org.apache.kafka.raft.internals.StringSerde;
@@ -110,7 +111,7 @@ public final class SnapshotWriterReaderTest {
RawSnapshotReader snapshot = context.log.readSnapshot(id).get();
int recordCount = validateDelimiters(snapshot, magicTimestamp);
assertEquals((recordsPerBatch * batches) + delimiterCount,
recordCount);
- assertSnapshot(expected, reader);
+ assertDataSnapshot(expected, reader);
assertEquals(magicTimestamp,
Snapshots.lastContainedLogTimestamp(snapshot));
}
@@ -247,14 +248,17 @@ public final class SnapshotWriterReaderTest {
return countRecords;
}
- public static void assertSnapshot(List<List<String>> batches,
RawSnapshotReader reader) {
- assertSnapshot(
+ public static void assertDataSnapshot(List<List<String>> batches,
RawSnapshotReader reader) {
+ assertDataSnapshot(
batches,
RecordsSnapshotReader.of(reader, new StringSerde(),
BufferSupplier.create(), Integer.MAX_VALUE, true)
);
}
- public static void assertSnapshot(List<List<String>> batches,
SnapshotReader<String> reader) {
+ public static void assertDataSnapshot(
+ List<List<String>> batches,
+ SnapshotReader<String> reader
+ ) {
List<String> expected = new ArrayList<>();
batches.forEach(expected::addAll);
@@ -268,4 +272,19 @@ public final class SnapshotWriterReaderTest {
assertEquals(expected, actual);
}
+
+ public static void assertControlSnapshot(
+ List<List<ControlRecord>> expectedBatches,
+ SnapshotReader<?> reader
+ ) {
+ List<List<ControlRecord>> actualBatches = new
ArrayList<>(expectedBatches.size());
+ while (reader.hasNext()) {
+ Batch<?> batch = reader.next();
+ if (!batch.controlRecords().isEmpty()) {
+ actualBatches.add(batch.controlRecords());
+ }
+ }
+
+ assertEquals(expectedBatches, actualBatches);
+ }
}