This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3a246b1abab KAFKA-15078; KRaft leader replys with snapshot for offset 
0 (#13845)
3a246b1abab is described below

commit 3a246b1abab0cfa8050546f54c987af2ec6cdd4e
Author: José Armando García Sancio <[email protected]>
AuthorDate: Wed Jun 28 14:21:11 2023 -0700

    KAFKA-15078; KRaft leader replys with snapshot for offset 0 (#13845)
    
    If the follower has an empty log, fetches with offset 0, it is more
    efficient for the leader to reply with a snapshot id (redirect to
    FETCH_SNAPSHOT) than for the follower to continue fetching from the log
    segments.
    
    Reviewers: David Arthur <[email protected]>, dengziming 
<[email protected]>
---
 .../org/apache/kafka/raft/KafkaRaftClient.java     | 11 ++++-
 .../kafka/raft/KafkaRaftClientSnapshotTest.java    | 48 +++++++++++++++++++++-
 2 files changed, 57 insertions(+), 2 deletions(-)

diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java 
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index fbb1117da25..40c6a695908 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -1017,7 +1017,16 @@ public class KafkaRaftClient<T> implements RaftClient<T> 
{
             long fetchOffset = request.fetchOffset();
             int lastFetchedEpoch = request.lastFetchedEpoch();
             LeaderState<T> state = quorum.leaderStateOrThrow();
-            ValidOffsetAndEpoch validOffsetAndEpoch = 
log.validateOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
+
+            Optional<OffsetAndEpoch> latestSnapshotId = log.latestSnapshotId();
+            final ValidOffsetAndEpoch validOffsetAndEpoch;
+            if (fetchOffset == 0 && latestSnapshotId.isPresent()) {
+                // If the follower has an empty log and a snapshot exist, it 
is always more efficient
+                // to reply with a snapshot id (FETCH_SNAPSHOT) instead of 
fetching from the log segments.
+                validOffsetAndEpoch = 
ValidOffsetAndEpoch.snapshot(latestSnapshotId.get());
+            } else {
+                validOffsetAndEpoch = log.validateOffsetAndEpoch(fetchOffset, 
lastFetchedEpoch);
+            }
 
             final Records records;
             if (validOffsetAndEpoch.kind() == ValidOffsetAndEpoch.Kind.VALID) {
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
index 2de6853d2e1..bc1d0ca21fc 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
@@ -303,7 +303,53 @@ final public class KafkaRaftClientSnapshotTest {
         context.client.poll();
 
         // Send Fetch request less than start offset
-        context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 0, 
epoch, 0));
+        context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 
snapshotId.offset() - 2, snapshotId.epoch(), 0));
+        context.pollUntilResponse();
+        FetchResponseData.PartitionData partitionResponse = 
context.assertSentFetchPartitionResponse();
+        assertEquals(Errors.NONE, 
Errors.forCode(partitionResponse.errorCode()));
+        assertEquals(epoch, partitionResponse.currentLeader().leaderEpoch());
+        assertEquals(localId, partitionResponse.currentLeader().leaderId());
+        assertEquals(snapshotId.epoch(), 
partitionResponse.snapshotId().epoch());
+        assertEquals(snapshotId.offset(), 
partitionResponse.snapshotId().endOffset());
+    }
+
+    @Test
+    public void testFetchRequestOffsetAtZero() throws Exception {
+        // When the follower sends a FETCH request at offset 0, reply with 
snapshot id if it exists
+        int localId = 0;
+        int otherNodeId = localId + 1;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withAppendLingerMs(1)
+            .build();
+
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+
+        List<String> appendRecords = Arrays.asList("a", "b", "c");
+        context.client.scheduleAppend(epoch, appendRecords);
+        context.time.sleep(context.appendLingerMs());
+        context.client.poll();
+
+        long localLogEndOffset = context.log.endOffset().offset;
+        assertTrue(
+            appendRecords.size() <= localLogEndOffset,
+            String.format("Record length = %s, log end offset = %s", 
appendRecords.size(), localLogEndOffset)
+        );
+
+        // Advance the highWatermark
+        context.advanceLocalLeaderHighWatermarkToLogEndOffset();
+
+        // Generate a snapshot at the LEO
+        OffsetAndEpoch snapshotId = new OffsetAndEpoch(localLogEndOffset, 
epoch);
+        try (SnapshotWriter<String> snapshot = 
context.client.createSnapshot(snapshotId, 0).get()) {
+            assertEquals(snapshotId, snapshot.snapshotId());
+            snapshot.freeze();
+        }
+
+        // Send Fetch request for offset 0
+        context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 0, 0, 
0));
         context.pollUntilResponse();
         FetchResponseData.PartitionData partitionResponse = 
context.assertSentFetchPartitionResponse();
         assertEquals(Errors.NONE, 
Errors.forCode(partitionResponse.errorCode()));

Reply via email to