This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6b187e9ff93 KAFKA-16926: Optimize BeginQuorumEpoch heartbeat (#20318)
6b187e9ff93 is described below

commit 6b187e9ff9374711a10452cc3aaa903837d937ba
Author: TaiJuWu <[email protected]>
AuthorDate: Mon Oct 27 21:39:25 2025 +0800

    KAFKA-16926: Optimize BeginQuorumEpoch heartbeat (#20318)
    
    Instead of sending out BeginQuorum requests to every voter on a cadence,
    we can save on some requests by only sending to those which have not
    fetched within the BeginQuorumEpoch timeout.
    
    Reviewers: José Armando García Sancio <[email protected]>, Chia-Ping
     Tsai <[email protected]>, Kuan-Po Tseng <[email protected]>, Alyssa
     Huang <[email protected]>
---
 .../org/apache/kafka/raft/KafkaRaftClient.java     | 15 +++----
 .../java/org/apache/kafka/raft/LeaderState.java    | 23 ++++++++++
 .../org/apache/kafka/raft/KafkaRaftClientTest.java | 50 +++++++++++++++++++++-
 3 files changed, 78 insertions(+), 10 deletions(-)

diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java 
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index b1c45b4a55b..d6623bca8ab 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -358,7 +358,6 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         long currentTimeMs
     ) {
         final LogOffsetMetadata endOffsetMetadata = log.endOffset();
-
         if (state.updateLocalState(endOffsetMetadata, 
partitionState.lastVoterSet())) {
             onUpdateLeaderHighWatermark(state, currentTimeMs);
         }
@@ -1512,6 +1511,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             FetchRequest.replicaId(request),
             fetchPartition.replicaDirectoryId()
         );
+
         FetchResponseData response = tryCompleteFetchRequest(
             requestMetadata.listenerName(),
             requestMetadata.apiVersion(),
@@ -2933,7 +2933,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         return minBackoffMs;
     }
 
-    private long maybeSendRequest(
+    private long maybeSendRequests(
         long currentTimeMs,
         Set<ReplicaKey> remoteVoters,
         Function<Integer, Node> destinationSupplier,
@@ -3120,13 +3120,10 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
                         )
                     );
 
-            timeUntilNextBeginQuorumSend = maybeSendRequest(
+            Set<ReplicaKey> needToSendBeginQuorumRequests = 
state.needToSendBeginQuorumRequests(currentTimeMs);
+            timeUntilNextBeginQuorumSend = maybeSendRequests(
                 currentTimeMs,
-                voters
-                    .voterKeys()
-                    .stream()
-                    .filter(key -> key.id() != quorum.localIdOrThrow())
-                    .collect(Collectors.toSet()),
+                needToSendBeginQuorumRequests,
                 nodeSupplier,
                 this::buildBeginQuorumEpochRequest
             );
@@ -3208,7 +3205,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         if (!state.epochElection().isVoteRejected()) {
             VoterSet voters = partitionState.lastVoterSet();
             boolean preVote = quorum.isProspective();
-            return maybeSendRequest(
+            return maybeSendRequests(
                 currentTimeMs,
                 state.epochElection().unrecordedVoters(),
                 voterId -> voters
diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java 
b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
index 7c379275a0e..23a74f33ae4 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
@@ -188,6 +188,29 @@ public class LeaderState<T> implements EpochState {
         beginQuorumEpochTimer.reset(beginQuorumEpochTimeoutMs);
     }
 
+    /**
+     * Determines the set of replicas that should receive a {@code 
BeginQuorumEpoch} request
+     * based on the elapsed time since their last fetch.
+     * <p>
+     * For each remote voter (excluding the local node), if the time since the 
last
+     * fetch exceeds the configured {@code beginQuorumEpochTimeoutMs}, the 
replica
+     * is considered to need a new quorum epoch request.
+     *
+     * @param currentTimeMs the current system time in milliseconds
+     * @return an unmodifiable set of {@link ReplicaKey} objects representing 
replicas
+     *         that need to receive a {@code BeginQuorumEpoch} request
+     */
+    public Set<ReplicaKey> needToSendBeginQuorumRequests(long currentTimeMs) {
+        return voterStates.values()
+            .stream()
+            .filter(
+                state -> state.replicaKey.id() != 
localVoterNode.voterKey().id() &&
+                currentTimeMs - state.lastFetchTimestamp >= 
beginQuorumEpochTimeoutMs
+            )
+            .map(ReplicaState::replicaKey)
+            .collect(Collectors.toUnmodifiableSet());
+    }
+
     /**
      * Get the remaining time in milliseconds until the checkQuorumTimer 
expires.
      *
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 1efd3247ebd..ecc83aa0ad2 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -39,6 +39,7 @@ import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.raft.errors.BufferAllocationException;
 import org.apache.kafka.raft.errors.NotLeaderException;
+import org.apache.kafka.server.common.KRaftVersion;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.Test;
@@ -618,7 +619,7 @@ class KafkaRaftClientTest {
         int epoch = context.currentEpoch();
         assertEquals(OptionalInt.of(localId), context.currentLeader());
 
-        // begin epoch requests should be sent out every 
beginQuorumEpochTimeoutMs
+        // begin epoch requests sent out every beginQuorumEpochTimeoutMs if 
replicas have not fetched
         context.time.sleep(context.beginQuorumEpochTimeoutMs);
         context.client.poll();
         context.assertSentBeginQuorumEpochRequest(epoch, Set.of(remoteId1, 
remoteId2));
@@ -633,6 +634,53 @@ class KafkaRaftClientTest {
         context.assertSentBeginQuorumEpochRequest(epoch, Set.of(remoteId1, 
remoteId2));
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void testBeginQuorumShouldNotSendAfterFetchRequest(boolean 
withKip853Rpc) throws Exception {
+        ReplicaKey localKey = replicaKey(randomReplicaId(), true);
+        int remoteId1 = localKey.id() + 1;
+        int remoteId2 = localKey.id() + 2;
+        ReplicaKey replicaKey1 = replicaKey(remoteId1, withKip853Rpc);
+        ReplicaKey replicaKey2 = replicaKey(remoteId2, withKip853Rpc);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localKey.id(), localKey.directoryId().get())
+            .withKip853Rpc(withKip853Rpc)
+            .withStartingVoters(
+                VoterSetTest.voterSet(Stream.of(localKey, replicaKey1, 
replicaKey2)),
+                KRaftVersion.KRAFT_VERSION_1
+            )
+            .build();
+
+        context.unattachedToLeader();
+        int epoch = context.currentEpoch();
+        assertEquals(OptionalInt.of(localKey.id()), context.currentLeader());
+
+        // begin epoch requests sent out every beginQuorumEpochTimeoutMs if 
replicas have not fetched
+        context.time.sleep(context.beginQuorumEpochTimeoutMs);
+        context.client.poll();
+        context.assertSentBeginQuorumEpochRequest(epoch, Set.of(remoteId1, 
remoteId2));
+
+        long partialDelay = context.beginQuorumEpochTimeoutMs / 3;
+        context.time.sleep(context.beginQuorumEpochTimeoutMs / 3);
+        context.deliverRequest(context.fetchRequest(epoch, replicaKey1, 0, 0, 
0));
+        context.pollUntilResponse();
+
+        context.time.sleep(context.beginQuorumEpochTimeoutMs - partialDelay);
+        context.client.poll();
+        // leader will not send BeginQuorumEpochRequest again for replica 1 
since fetchRequest was received
+        // before beginQuorumEpochTimeoutMs time has elapsed
+        context.assertSentBeginQuorumEpochRequest(epoch, Set.of(remoteId2));
+
+        context.deliverRequest(context.fetchRequest(epoch, replicaKey1, 0, 0, 
0));
+        context.pollUntilResponse();
+        context.time.sleep(context.beginQuorumEpochTimeoutMs);
+        context.client.poll();
+        // leader should send BeginQuorumEpochRequest to a node if 
beginQuorumEpochTimeoutMs time has elapsed
+        // without receiving a fetch request from that node
+        context.assertSentBeginQuorumEpochRequest(epoch, Set.of(remoteId1, 
remoteId2));
+    }
+
+
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void 
testLeaderShouldResignLeadershipIfNotGetFetchRequestFromMajorityVoters(boolean 
withKip853Rpc) throws Exception {

Reply via email to