This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3a246b1abab KAFKA-15078; KRaft leader replys with snapshot for offset
0 (#13845)
3a246b1abab is described below
commit 3a246b1abab0cfa8050546f54c987af2ec6cdd4e
Author: José Armando García Sancio <[email protected]>
AuthorDate: Wed Jun 28 14:21:11 2023 -0700
KAFKA-15078; KRaft leader replys with snapshot for offset 0 (#13845)
If the follower has an empty log, fetches with offset 0, it is more
efficient for the leader to reply with a snapshot id (redirect to
FETCH_SNAPSHOT) than for the follower to continue fetching from the log
segments.
Reviewers: David Arthur <[email protected]>, dengziming
<[email protected]>
---
.../org/apache/kafka/raft/KafkaRaftClient.java | 11 ++++-
.../kafka/raft/KafkaRaftClientSnapshotTest.java | 48 +++++++++++++++++++++-
2 files changed, 57 insertions(+), 2 deletions(-)
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index fbb1117da25..40c6a695908 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -1017,7 +1017,16 @@ public class KafkaRaftClient<T> implements RaftClient<T>
{
long fetchOffset = request.fetchOffset();
int lastFetchedEpoch = request.lastFetchedEpoch();
LeaderState<T> state = quorum.leaderStateOrThrow();
- ValidOffsetAndEpoch validOffsetAndEpoch =
log.validateOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
+
+ Optional<OffsetAndEpoch> latestSnapshotId = log.latestSnapshotId();
+ final ValidOffsetAndEpoch validOffsetAndEpoch;
+ if (fetchOffset == 0 && latestSnapshotId.isPresent()) {
+ // If the follower has an empty log and a snapshot exist, it
is always more efficient
+ // to reply with a snapshot id (FETCH_SNAPSHOT) instead of
fetching from the log segments.
+ validOffsetAndEpoch =
ValidOffsetAndEpoch.snapshot(latestSnapshotId.get());
+ } else {
+ validOffsetAndEpoch = log.validateOffsetAndEpoch(fetchOffset,
lastFetchedEpoch);
+ }
final Records records;
if (validOffsetAndEpoch.kind() == ValidOffsetAndEpoch.Kind.VALID) {
diff --git
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
index 2de6853d2e1..bc1d0ca21fc 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
@@ -303,7 +303,53 @@ final public class KafkaRaftClientSnapshotTest {
context.client.poll();
// Send Fetch request less than start offset
- context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 0,
epoch, 0));
+ context.deliverRequest(context.fetchRequest(epoch, otherNodeId,
snapshotId.offset() - 2, snapshotId.epoch(), 0));
+ context.pollUntilResponse();
+ FetchResponseData.PartitionData partitionResponse =
context.assertSentFetchPartitionResponse();
+ assertEquals(Errors.NONE,
Errors.forCode(partitionResponse.errorCode()));
+ assertEquals(epoch, partitionResponse.currentLeader().leaderEpoch());
+ assertEquals(localId, partitionResponse.currentLeader().leaderId());
+ assertEquals(snapshotId.epoch(),
partitionResponse.snapshotId().epoch());
+ assertEquals(snapshotId.offset(),
partitionResponse.snapshotId().endOffset());
+ }
+
+ @Test
+ public void testFetchRequestOffsetAtZero() throws Exception {
+ // When the follower sends a FETCH request at offset 0, reply with
snapshot id if it exists
+ int localId = 0;
+ int otherNodeId = localId + 1;
+ Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+ RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
+ .withAppendLingerMs(1)
+ .build();
+
+ context.becomeLeader();
+ int epoch = context.currentEpoch();
+
+ List<String> appendRecords = Arrays.asList("a", "b", "c");
+ context.client.scheduleAppend(epoch, appendRecords);
+ context.time.sleep(context.appendLingerMs());
+ context.client.poll();
+
+ long localLogEndOffset = context.log.endOffset().offset;
+ assertTrue(
+ appendRecords.size() <= localLogEndOffset,
+ String.format("Record length = %s, log end offset = %s",
appendRecords.size(), localLogEndOffset)
+ );
+
+ // Advance the highWatermark
+ context.advanceLocalLeaderHighWatermarkToLogEndOffset();
+
+ // Generate a snapshot at the LEO
+ OffsetAndEpoch snapshotId = new OffsetAndEpoch(localLogEndOffset,
epoch);
+ try (SnapshotWriter<String> snapshot =
context.client.createSnapshot(snapshotId, 0).get()) {
+ assertEquals(snapshotId, snapshot.snapshotId());
+ snapshot.freeze();
+ }
+
+ // Send Fetch request for offset 0
+ context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 0, 0,
0));
context.pollUntilResponse();
FetchResponseData.PartitionData partitionResponse =
context.assertSentFetchPartitionResponse();
assertEquals(Errors.NONE,
Errors.forCode(partitionResponse.errorCode()));