This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6b187e9ff93 KAFKA-16926: Optimize BeginQuorumEpoch heartbeat (#20318)
6b187e9ff93 is described below
commit 6b187e9ff9374711a10452cc3aaa903837d937ba
Author: TaiJuWu <[email protected]>
AuthorDate: Mon Oct 27 21:39:25 2025 +0800
KAFKA-16926: Optimize BeginQuorumEpoch heartbeat (#20318)
Instead of sending out BeginQuorum requests to every voter on a cadence,
we can save on some requests by only sending to those which have not
fetched within the BeginQuorumEpoch timeout.
Reviewers: José Armando García Sancio <[email protected]>, Chia-Ping
Tsai <[email protected]>, Kuan-Po Tseng <[email protected]>, Alyssa
Huang <[email protected]>
---
.../org/apache/kafka/raft/KafkaRaftClient.java | 15 +++----
.../java/org/apache/kafka/raft/LeaderState.java | 23 ++++++++++
.../org/apache/kafka/raft/KafkaRaftClientTest.java | 50 +++++++++++++++++++++-
3 files changed, 78 insertions(+), 10 deletions(-)
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index b1c45b4a55b..d6623bca8ab 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -358,7 +358,6 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
long currentTimeMs
) {
final LogOffsetMetadata endOffsetMetadata = log.endOffset();
-
if (state.updateLocalState(endOffsetMetadata,
partitionState.lastVoterSet())) {
onUpdateLeaderHighWatermark(state, currentTimeMs);
}
@@ -1512,6 +1511,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
FetchRequest.replicaId(request),
fetchPartition.replicaDirectoryId()
);
+
FetchResponseData response = tryCompleteFetchRequest(
requestMetadata.listenerName(),
requestMetadata.apiVersion(),
@@ -2933,7 +2933,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
return minBackoffMs;
}
- private long maybeSendRequest(
+ private long maybeSendRequests(
long currentTimeMs,
Set<ReplicaKey> remoteVoters,
Function<Integer, Node> destinationSupplier,
@@ -3120,13 +3120,10 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
)
);
- timeUntilNextBeginQuorumSend = maybeSendRequest(
+ Set<ReplicaKey> needToSendBeginQuorumRequests =
state.needToSendBeginQuorumRequests(currentTimeMs);
+ timeUntilNextBeginQuorumSend = maybeSendRequests(
currentTimeMs,
- voters
- .voterKeys()
- .stream()
- .filter(key -> key.id() != quorum.localIdOrThrow())
- .collect(Collectors.toSet()),
+ needToSendBeginQuorumRequests,
nodeSupplier,
this::buildBeginQuorumEpochRequest
);
@@ -3208,7 +3205,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
if (!state.epochElection().isVoteRejected()) {
VoterSet voters = partitionState.lastVoterSet();
boolean preVote = quorum.isProspective();
- return maybeSendRequest(
+ return maybeSendRequests(
currentTimeMs,
state.epochElection().unrecordedVoters(),
voterId -> voters
diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
index 7c379275a0e..23a74f33ae4 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
@@ -188,6 +188,29 @@ public class LeaderState<T> implements EpochState {
beginQuorumEpochTimer.reset(beginQuorumEpochTimeoutMs);
}
+ /**
+ * Determines the set of replicas that should receive a {@code
BeginQuorumEpoch} request
+ * based on the elapsed time since their last fetch.
+ * <p>
+ * For each remote voter (excluding the local node), if the time since the
last
+ * fetch exceeds the configured {@code beginQuorumEpochTimeoutMs}, the
replica
+ * is considered to need a new quorum epoch request.
+ *
+ * @param currentTimeMs the current system time in milliseconds
+ * @return an unmodifiable set of {@link ReplicaKey} objects representing
replicas
+ * that need to receive a {@code BeginQuorumEpoch} request
+ */
+ public Set<ReplicaKey> needToSendBeginQuorumRequests(long currentTimeMs) {
+ return voterStates.values()
+ .stream()
+ .filter(
+ state -> state.replicaKey.id() !=
localVoterNode.voterKey().id() &&
+ currentTimeMs - state.lastFetchTimestamp >=
beginQuorumEpochTimeoutMs
+ )
+ .map(ReplicaState::replicaKey)
+ .collect(Collectors.toUnmodifiableSet());
+ }
+
/**
* Get the remaining time in milliseconds until the checkQuorumTimer
expires.
*
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 1efd3247ebd..ecc83aa0ad2 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -39,6 +39,7 @@ import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.raft.errors.BufferAllocationException;
import org.apache.kafka.raft.errors.NotLeaderException;
+import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
@@ -618,7 +619,7 @@ class KafkaRaftClientTest {
int epoch = context.currentEpoch();
assertEquals(OptionalInt.of(localId), context.currentLeader());
- // begin epoch requests should be sent out every
beginQuorumEpochTimeoutMs
+ // begin epoch requests sent out every beginQuorumEpochTimeoutMs if
replicas have not fetched
context.time.sleep(context.beginQuorumEpochTimeoutMs);
context.client.poll();
context.assertSentBeginQuorumEpochRequest(epoch, Set.of(remoteId1,
remoteId2));
@@ -633,6 +634,53 @@ class KafkaRaftClientTest {
context.assertSentBeginQuorumEpochRequest(epoch, Set.of(remoteId1,
remoteId2));
}
+ @ParameterizedTest
+ @ValueSource(booleans = { true, false })
+ public void testBeginQuorumShouldNotSendAfterFetchRequest(boolean
withKip853Rpc) throws Exception {
+ ReplicaKey localKey = replicaKey(randomReplicaId(), true);
+ int remoteId1 = localKey.id() + 1;
+ int remoteId2 = localKey.id() + 2;
+ ReplicaKey replicaKey1 = replicaKey(remoteId1, withKip853Rpc);
+ ReplicaKey replicaKey2 = replicaKey(remoteId2, withKip853Rpc);
+
+ RaftClientTestContext context = new
RaftClientTestContext.Builder(localKey.id(), localKey.directoryId().get())
+ .withKip853Rpc(withKip853Rpc)
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(localKey, replicaKey1,
replicaKey2)),
+ KRaftVersion.KRAFT_VERSION_1
+ )
+ .build();
+
+ context.unattachedToLeader();
+ int epoch = context.currentEpoch();
+ assertEquals(OptionalInt.of(localKey.id()), context.currentLeader());
+
+ // begin epoch requests sent out every beginQuorumEpochTimeoutMs if
replicas have not fetched
+ context.time.sleep(context.beginQuorumEpochTimeoutMs);
+ context.client.poll();
+ context.assertSentBeginQuorumEpochRequest(epoch, Set.of(remoteId1,
remoteId2));
+
+ long partialDelay = context.beginQuorumEpochTimeoutMs / 3;
+ context.time.sleep(context.beginQuorumEpochTimeoutMs / 3);
+ context.deliverRequest(context.fetchRequest(epoch, replicaKey1, 0, 0,
0));
+ context.pollUntilResponse();
+
+ context.time.sleep(context.beginQuorumEpochTimeoutMs - partialDelay);
+ context.client.poll();
+ // leader will not send BeginQuorumEpochRequest again for replica 1
since fetchRequest was received
+ // before beginQuorumEpochTimeoutMs time has elapsed
+ context.assertSentBeginQuorumEpochRequest(epoch, Set.of(remoteId2));
+
+ context.deliverRequest(context.fetchRequest(epoch, replicaKey1, 0, 0,
0));
+ context.pollUntilResponse();
+ context.time.sleep(context.beginQuorumEpochTimeoutMs);
+ context.client.poll();
+ // leader should send BeginQuorumEpochRequest to a node if
beginQuorumEpochTimeoutMs time has elapsed
+ // without receiving a fetch request from that node
+ context.assertSentBeginQuorumEpochRequest(epoch, Set.of(remoteId1,
remoteId2));
+ }
+
+
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void
testLeaderShouldResignLeadershipIfNotGetFetchRequestFromMajorityVoters(boolean
withKip853Rpc) throws Exception {