This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3a9777a6676 KAFKA-14619; KRaft validate snapshot id are at batch
boundaries (#17500)
3a9777a6676 is described below
commit 3a9777a667620c5f926176452744c751df4dac17
Author: kevin-wu24 <[email protected]>
AuthorDate: Mon Dec 9 10:38:00 2024 -0600
KAFKA-14619; KRaft validate snapshot id are at batch boundaries (#17500)
When KafkaRaftClient receives a request to create a snapshot with end
offset that is not aligned to a batch boundary, do not create a misaligned
snapshot and instead log at info level and throw an IllegalArgumentException.
Checking that the end offset is at a batch boundary is performed by reading
the log at snapshotId.offset() and checking whether the offset in question is
the base offset of the returned batch.
Reviewers: José Armando García Sancio <[email protected]>
---
.../main/scala/kafka/raft/KafkaMetadataLog.scala | 19 ++++
.../scala/kafka/raft/KafkaMetadataLogTest.scala | 23 +++-
.../kafka/raft/KafkaRaftClientSnapshotTest.java | 124 +++++++++++++++++----
.../test/java/org/apache/kafka/raft/MockLog.java | 12 ++
.../java/org/apache/kafka/raft/MockLogTest.java | 18 ++-
5 files changed, 167 insertions(+), 29 deletions(-)
diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
index c1d8b4abc8e..451bb5a8511 100644
--- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
+++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
@@ -272,6 +272,25 @@ final class KafkaMetadataLog private (
)
}
+ /*
+ Perform a check that the requested snapshot offset is batch aligned via
a log read, which
+ returns the base offset of the batch that contains the requested offset.
A snapshot offset
+ is one greater than the last offset contained in the snapshot, and
cannot go past the high
+ watermark.
+
+ This check is necessary because Raft replication code assumes the
snapshot offset is the
+ start of a batch. If a follower applies a non-batch aligned snapshot at
offset (X) and
+ fetches from this offset, the returned batch will start at offset (X -
M), and the
+ follower will be unable to append it since (X - M) < (X).
+ */
+ val baseOffset = read(snapshotId.offset,
Isolation.COMMITTED).startOffsetMetadata.offset
+ if (snapshotId.offset != baseOffset) {
+ throw new IllegalArgumentException(
+ s"Cannot create snapshot at offset (${snapshotId.offset}) because it
is not batch aligned. " +
+ s"The batch containing the requested offset has a base offset of
($baseOffset)"
+ )
+ }
+
createNewSnapshotUnchecked(snapshotId)
}
diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
index 1c65fd5073c..285560d3826 100644
--- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
+++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
@@ -142,13 +142,24 @@ final class KafkaMetadataLogTest {
// Test finding the first epoch
log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords,
firstEpoch)).get().close()
- log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords - 1,
firstEpoch)).get().close()
- log.createNewSnapshot(new OffsetAndEpoch(1, firstEpoch)).get().close()
// Test finding the second epoch
log.createNewSnapshot(new OffsetAndEpoch(2 * numberOfRecords,
secondEpoch)).get().close()
- log.createNewSnapshot(new OffsetAndEpoch(2 * numberOfRecords - 1,
secondEpoch)).get().close()
- log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords + 1,
secondEpoch)).get().close()
+ }
+
+ @Test
+ def testCreateSnapshotInMiddleOfBatch(): Unit = {
+ val numberOfRecords = 10
+ val epoch = 1
+ val log = buildMetadataLog(tempDir, mockTime)
+
+ append(log, numberOfRecords, epoch)
+ log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
+
+ assertThrows(
+ classOf[IllegalArgumentException],
+ () => log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords - 1,
epoch))
+ )
}
@Test
@@ -206,7 +217,7 @@ final class KafkaMetadataLogTest {
val snapshotId = new OffsetAndEpoch(numberOfRecords-4, epoch)
val log = buildMetadataLog(tempDir, mockTime)
- append(log, numberOfRecords, epoch)
+ (1 to numberOfRecords).foreach(_ => append(log, 1, epoch))
log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
createNewSnapshot(log, snapshotId)
@@ -282,7 +293,7 @@ final class KafkaMetadataLogTest {
def testCreateExistingSnapshot(): Unit = {
val numberOfRecords = 10
val epoch = 1
- val snapshotId = new OffsetAndEpoch(numberOfRecords - 1, epoch)
+ val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
val log = buildMetadataLog(tempDir, mockTime)
append(log, numberOfRecords, epoch)
diff --git
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
index 1fc43cd2e14..6ef692229af 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
@@ -1915,7 +1915,14 @@ public final class KafkaRaftClientSnapshotTest {
// When leader creating snapshot:
// 1.1 high watermark cannot be empty
assertEquals(OptionalLong.empty(), context.client.highWatermark());
- assertThrows(IllegalArgumentException.class, () ->
context.client.createSnapshot(invalidSnapshotId1, 0));
+ IllegalArgumentException exception = assertThrows(
+ IllegalArgumentException.class,
+ () -> context.client.createSnapshot(invalidSnapshotId1, 0)
+ );
+ assertEquals(
+ "Cannot create a snapshot with an id (OffsetAndEpoch(offset=4,
epoch=2)) greater than the high-watermark (0)",
+ exception.getMessage()
+ );
// 1.2 high watermark must larger than or equal to the snapshotId's
endOffset
context.advanceLocalLeaderHighWatermarkToLogEndOffset();
@@ -1927,18 +1934,52 @@ public final class KafkaRaftClientSnapshotTest {
context.client.poll();
assertEquals(context.log.endOffset().offset(),
context.client.highWatermark().getAsLong() + newRecords.size());
- OffsetAndEpoch invalidSnapshotId2 = new
OffsetAndEpoch(context.client.highWatermark().getAsLong() + 2, currentEpoch);
- assertThrows(IllegalArgumentException.class, () ->
context.client.createSnapshot(invalidSnapshotId2, 0));
+ OffsetAndEpoch invalidSnapshotId2 = new
OffsetAndEpoch(context.client.highWatermark().getAsLong() + newRecords.size(),
currentEpoch);
+ exception = assertThrows(
+ IllegalArgumentException.class,
+ () -> context.client.createSnapshot(invalidSnapshotId2, 0)
+ );
+ assertEquals(
+ "Cannot create a snapshot with an id (OffsetAndEpoch(offset=7,
epoch=3)) greater than the high-watermark (4)",
+ exception.getMessage()
+ );
// 2 the quorum epoch must larger than or equal to the snapshotId's
epoch
OffsetAndEpoch invalidSnapshotId3 = new
OffsetAndEpoch(context.client.highWatermark().getAsLong(), currentEpoch + 1);
- assertThrows(IllegalArgumentException.class, () ->
context.client.createSnapshot(invalidSnapshotId3, 0));
+ exception = assertThrows(
+ IllegalArgumentException.class,
+ () -> context.client.createSnapshot(invalidSnapshotId3, 0)
+ );
+ assertEquals(
+ "Snapshot id (OffsetAndEpoch(offset=4, epoch=4)) is not valid
according to the log: ValidOffsetAndEpoch(kind=DIVERGING,
offsetAndEpoch=OffsetAndEpoch(offset=7, epoch=3))",
+ exception.getMessage()
+ );
// 3 the snapshotId should be validated against endOffsetForEpoch
OffsetAndEpoch endOffsetForEpoch =
context.log.endOffsetForEpoch(epoch);
assertEquals(epoch, endOffsetForEpoch.epoch());
- OffsetAndEpoch invalidSnapshotId4 = new
OffsetAndEpoch(endOffsetForEpoch.offset() + 2, epoch);
- assertThrows(IllegalArgumentException.class, () ->
context.client.createSnapshot(invalidSnapshotId4, 0));
+ OffsetAndEpoch invalidSnapshotId4 = new
OffsetAndEpoch(endOffsetForEpoch.offset() + 1, epoch);
+ exception = assertThrows(
+ IllegalArgumentException.class,
+ () -> context.client.createSnapshot(invalidSnapshotId4, 0)
+ );
+ assertEquals(
+ "Snapshot id (OffsetAndEpoch(offset=4, epoch=2)) is not valid
according to the log: ValidOffsetAndEpoch(kind=DIVERGING,
offsetAndEpoch=OffsetAndEpoch(offset=3, epoch=2))",
+ exception.getMessage()
+ );
+
+ // 4 snapshotId offset must be at a batch boundary
+ context.advanceLocalLeaderHighWatermarkToLogEndOffset();
+ OffsetAndEpoch invalidSnapshotId5 = new
OffsetAndEpoch(context.client.highWatermark().getAsLong() - 1, currentEpoch);
+ // this points to the "f" offset, which is not batch aligned
+ exception = assertThrows(
+ IllegalArgumentException.class,
+ () -> context.client.createSnapshot(invalidSnapshotId5, 0)
+ );
+ assertEquals(
+ "Cannot create snapshot at offset (6) because it is not batch
aligned. The batch containing the requested offset has a base offset of (4)",
+ exception.getMessage()
+ );
}
@ParameterizedTest
@@ -1951,6 +1992,7 @@ public final class KafkaRaftClientSnapshotTest {
Set<Integer> voters = Set.of(localId, leaderId, otherFollowerId);
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
+ .appendToLog(1, List.of("a"))
.withElectedLeader(epoch, leaderId)
.withKip853Rpc(withKip853Rpc)
.build();
@@ -1959,18 +2001,25 @@ public final class KafkaRaftClientSnapshotTest {
// When follower creating snapshot:
// 1) The high watermark cannot be empty
assertEquals(OptionalLong.empty(), context.client.highWatermark());
- OffsetAndEpoch invalidSnapshotId1 = new OffsetAndEpoch(1, 0);
- assertThrows(IllegalArgumentException.class, () ->
context.client.createSnapshot(invalidSnapshotId1, 0));
+ OffsetAndEpoch invalidSnapshotId1 = new OffsetAndEpoch(1, 1);
+ IllegalArgumentException exception = assertThrows(
+ IllegalArgumentException.class,
+ () -> context.client.createSnapshot(invalidSnapshotId1, 0)
+ );
+ assertEquals(
+ "Cannot create a snapshot with an id (OffsetAndEpoch(offset=1,
epoch=1)) greater than the high-watermark (0)",
+ exception.getMessage()
+ );
// Poll for our first fetch request
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
assertTrue(voters.contains(fetchRequest.destination().id()));
- context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ context.assertFetchRequestData(fetchRequest, epoch, 1L, 1);
// The response does not advance the high watermark
- List<String> records1 = Arrays.asList("a", "b", "c");
- MemoryRecords batch1 = context.buildBatch(0L, 3, records1);
+ List<String> records1 = Arrays.asList("b", "c");
+ MemoryRecords batch1 = context.buildBatch(1L, 3, records1);
context.deliverResponse(
fetchRequest.correlationId(),
fetchRequest.destination(),
@@ -1981,11 +2030,14 @@ public final class KafkaRaftClientSnapshotTest {
// 2) The high watermark must be larger than or equal to the
snapshotId's endOffset
int currentEpoch = context.currentEpoch();
OffsetAndEpoch invalidSnapshotId2 = new
OffsetAndEpoch(context.client.highWatermark().getAsLong() + 1, currentEpoch);
- assertThrows(IllegalArgumentException.class, () ->
context.client.createSnapshot(invalidSnapshotId2, 0));
-
- // 3) The quorum epoch must be larger than or equal to the
snapshotId's epoch
- OffsetAndEpoch invalidSnapshotId3 = new
OffsetAndEpoch(context.client.highWatermark().getAsLong() + 1, currentEpoch +
1);
- assertThrows(IllegalArgumentException.class, () ->
context.client.createSnapshot(invalidSnapshotId3, 0));
+ exception = assertThrows(
+ IllegalArgumentException.class,
+ () -> context.client.createSnapshot(invalidSnapshotId2, 0)
+ );
+ assertEquals(
+ "Cannot create a snapshot with an id (OffsetAndEpoch(offset=1,
epoch=5)) greater than the high-watermark (0)",
+ exception.getMessage()
+ );
// The high watermark advances to be larger than
log.endOffsetForEpoch(3), to test the case 3
context.pollUntilRequest();
@@ -1994,7 +2046,8 @@ public final class KafkaRaftClientSnapshotTest {
context.assertFetchRequestData(fetchRequest, epoch, 3L, 3);
List<String> records2 = Arrays.asList("d", "e", "f");
- MemoryRecords batch2 = context.buildBatch(3L, 4, records2);
+ int batch2Epoch = 4;
+ MemoryRecords batch2 = context.buildBatch(3L, batch2Epoch, records2);
context.deliverResponse(
fetchRequest.correlationId(),
fetchRequest.destination(),
@@ -2003,11 +2056,44 @@ public final class KafkaRaftClientSnapshotTest {
context.client.poll();
assertEquals(6L, context.client.highWatermark().getAsLong());
+ // 3) The quorum epoch must be larger than or equal to the
snapshotId's epoch
+ OffsetAndEpoch invalidSnapshotId3 = new
OffsetAndEpoch(context.client.highWatermark().getAsLong(), currentEpoch + 1);
+ exception = assertThrows(
+ IllegalArgumentException.class,
+ () -> context.client.createSnapshot(invalidSnapshotId3, 0)
+ );
+ assertEquals(
+ "Snapshot id (OffsetAndEpoch(offset=6, epoch=6)) is not valid
according to the log: ValidOffsetAndEpoch(kind=DIVERGING,
offsetAndEpoch=OffsetAndEpoch(offset=6, epoch=4))",
+ exception.getMessage()
+ );
+
// 4) The snapshotId should be validated against endOffsetForEpoch
OffsetAndEpoch endOffsetForEpoch = context.log.endOffsetForEpoch(3);
assertEquals(3, endOffsetForEpoch.epoch());
- OffsetAndEpoch invalidSnapshotId4 = new
OffsetAndEpoch(endOffsetForEpoch.offset() + 1, epoch);
- assertThrows(IllegalArgumentException.class, () ->
context.client.createSnapshot(invalidSnapshotId4, 0));
+ OffsetAndEpoch invalidSnapshotId4 = new
OffsetAndEpoch(endOffsetForEpoch.offset() + 3, 3);
+ exception = assertThrows(
+ IllegalArgumentException.class,
+ () -> context.client.createSnapshot(invalidSnapshotId4, 0)
+ );
+ assertEquals(
+ "Snapshot id (OffsetAndEpoch(offset=6, epoch=3)) is not valid
according to the log: ValidOffsetAndEpoch(kind=DIVERGING,
offsetAndEpoch=OffsetAndEpoch(offset=3, epoch=3))",
+ exception.getMessage()
+ );
+
+ // 5) The snapshotId should be batch-aligned
+ endOffsetForEpoch = context.log.endOffsetForEpoch(batch2Epoch);
+ assertEquals(4, endOffsetForEpoch.epoch());
+ assertEquals(6, endOffsetForEpoch.offset());
+ OffsetAndEpoch invalidSnapshotId5 = new
OffsetAndEpoch(endOffsetForEpoch.offset() - 1, batch2Epoch);
+ // this points to the "f" offset, which is not batch aligned
+ exception = assertThrows(
+ IllegalArgumentException.class,
+ () -> context.client.createSnapshot(invalidSnapshotId5, 0)
+ );
+ assertEquals(
+ "Cannot create snapshot at offset (5) because it is not batch
aligned. The batch containing the requested offset has a base offset of (3)",
+ exception.getMessage()
+ );
}
private static ReplicaKey replicaKey(int id, boolean withDirectoryId) {
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLog.java
b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
index 4695d18f72f..a7a8e89a88c 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLog.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
@@ -490,6 +490,18 @@ public class MockLog implements ReplicatedLog {
);
}
+ long baseOffset = read(snapshotId.offset(),
Isolation.COMMITTED).startOffsetMetadata.offset();
+ if (snapshotId.offset() != baseOffset) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot create snapshot at offset (%s) because it is not
batch aligned. " +
+ "The batch containing the requested offset has a base
offset of (%s)",
+ snapshotId.offset(),
+ baseOffset
+ )
+ );
+ }
+
return createNewSnapshotUnchecked(snapshotId);
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
index 08e19866d9b..8306e103258 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
@@ -450,13 +450,23 @@ public class MockLogTest {
// Test snapshot id for the first epoch
log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords,
firstEpoch)).get().close();
- log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords - 1,
firstEpoch)).get().close();
- log.createNewSnapshot(new OffsetAndEpoch(1, firstEpoch)).get().close();
// Test snapshot id for the second epoch
log.createNewSnapshot(new OffsetAndEpoch(2 * numberOfRecords,
secondEpoch)).get().close();
- log.createNewSnapshot(new OffsetAndEpoch(2 * numberOfRecords - 1,
secondEpoch)).get().close();
- log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords + 1,
secondEpoch)).get().close();
+ }
+
+ @Test
+ public void testCreateSnapshotInMiddleOfBatch() {
+ int numberOfRecords = 10;
+ int epoch = 1;
+
+ appendBatch(numberOfRecords, epoch);
+ log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords));
+
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords -
1, epoch))
+ );
}
@Test