This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3a9777a6676 KAFKA-14619; KRaft validate snapshot id are at batch 
boundaries (#17500)
3a9777a6676 is described below

commit 3a9777a667620c5f926176452744c751df4dac17
Author: kevin-wu24 <[email protected]>
AuthorDate: Mon Dec 9 10:38:00 2024 -0600

    KAFKA-14619; KRaft validate snapshot id are at batch boundaries (#17500)
    
    When KafkaRaftClient receives a request to create a snapshot with end 
offset that is not aligned to a batch boundary, do not create a misaligned 
snapshot and instead log at info level and throw an IllegalArgumentException.
    
    Checking that the end offset is at a batch boundary is performed by reading 
the log at snapshotId.offset() and checking whether the offset in question is 
the base offset of the returned batch.
    
    Reviewers: José Armando García Sancio <[email protected]>
---
 .../main/scala/kafka/raft/KafkaMetadataLog.scala   |  19 ++++
 .../scala/kafka/raft/KafkaMetadataLogTest.scala    |  23 +++-
 .../kafka/raft/KafkaRaftClientSnapshotTest.java    | 124 +++++++++++++++++----
 .../test/java/org/apache/kafka/raft/MockLog.java   |  12 ++
 .../java/org/apache/kafka/raft/MockLogTest.java    |  18 ++-
 5 files changed, 167 insertions(+), 29 deletions(-)

diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala 
b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
index c1d8b4abc8e..451bb5a8511 100644
--- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
+++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
@@ -272,6 +272,25 @@ final class KafkaMetadataLog private (
       )
     }
 
+    /*
+      Perform a check that the requested snapshot offset is batch aligned via 
a log read, which
+      returns the base offset of the batch that contains the requested offset. 
A snapshot offset
+      is one greater than the last offset contained in the snapshot, and 
cannot go past the high
+      watermark.
+
+      This check is necessary because Raft replication code assumes the 
snapshot offset is the
+      start of a batch. If a follower applies a non-batch aligned snapshot at 
offset (X) and
+      fetches from this offset, the returned batch will start at offset (X - 
M), and the
+      follower will be unable to append it since (X - M) < (X).
+     */
+    val baseOffset = read(snapshotId.offset, 
Isolation.COMMITTED).startOffsetMetadata.offset
+    if (snapshotId.offset != baseOffset) {
+      throw new IllegalArgumentException(
+        s"Cannot create snapshot at offset (${snapshotId.offset}) because it 
is not batch aligned. " +
+        s"The batch containing the requested offset has a base offset of 
($baseOffset)"
+      )
+    }
+
     createNewSnapshotUnchecked(snapshotId)
   }
 
diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala 
b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
index 1c65fd5073c..285560d3826 100644
--- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
+++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
@@ -142,13 +142,24 @@ final class KafkaMetadataLogTest {
 
     // Test finding the first epoch
     log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords, 
firstEpoch)).get().close()
-    log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords - 1, 
firstEpoch)).get().close()
-    log.createNewSnapshot(new OffsetAndEpoch(1, firstEpoch)).get().close()
 
     // Test finding the second epoch
     log.createNewSnapshot(new OffsetAndEpoch(2 * numberOfRecords, 
secondEpoch)).get().close()
-    log.createNewSnapshot(new OffsetAndEpoch(2 * numberOfRecords - 1, 
secondEpoch)).get().close()
-    log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords + 1, 
secondEpoch)).get().close()
+  }
+
+  @Test
+  def testCreateSnapshotInMiddleOfBatch(): Unit = {
+    val numberOfRecords = 10
+    val epoch = 1
+    val log = buildMetadataLog(tempDir, mockTime)
+
+    append(log, numberOfRecords, epoch)
+    log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
+
+    assertThrows(
+      classOf[IllegalArgumentException],
+      () => log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords - 1, 
epoch))
+    )
   }
 
   @Test
@@ -206,7 +217,7 @@ final class KafkaMetadataLogTest {
     val snapshotId = new OffsetAndEpoch(numberOfRecords-4, epoch)
     val log = buildMetadataLog(tempDir, mockTime)
 
-    append(log, numberOfRecords, epoch)
+    (1 to numberOfRecords).foreach(_ => append(log, 1, epoch))
     log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
     createNewSnapshot(log, snapshotId)
 
@@ -282,7 +293,7 @@ final class KafkaMetadataLogTest {
   def testCreateExistingSnapshot(): Unit = {
     val numberOfRecords = 10
     val epoch = 1
-    val snapshotId = new OffsetAndEpoch(numberOfRecords - 1, epoch)
+    val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
     val log = buildMetadataLog(tempDir, mockTime)
 
     append(log, numberOfRecords, epoch)
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
index 1fc43cd2e14..6ef692229af 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
@@ -1915,7 +1915,14 @@ public final class KafkaRaftClientSnapshotTest {
         // When leader creating snapshot:
         // 1.1 high watermark cannot be empty
         assertEquals(OptionalLong.empty(), context.client.highWatermark());
-        assertThrows(IllegalArgumentException.class, () -> 
context.client.createSnapshot(invalidSnapshotId1, 0));
+        IllegalArgumentException exception = assertThrows(
+            IllegalArgumentException.class,
+            () -> context.client.createSnapshot(invalidSnapshotId1, 0)
+        );
+        assertEquals(
+            "Cannot create a snapshot with an id (OffsetAndEpoch(offset=4, 
epoch=2)) greater than the high-watermark (0)",
+            exception.getMessage()
+        );
 
         // 1.2 high watermark must larger than or equal to the snapshotId's 
endOffset
         context.advanceLocalLeaderHighWatermarkToLogEndOffset();
@@ -1927,18 +1934,52 @@ public final class KafkaRaftClientSnapshotTest {
         context.client.poll();
         assertEquals(context.log.endOffset().offset(), 
context.client.highWatermark().getAsLong() + newRecords.size());
 
-        OffsetAndEpoch invalidSnapshotId2 = new 
OffsetAndEpoch(context.client.highWatermark().getAsLong() + 2, currentEpoch);
-        assertThrows(IllegalArgumentException.class, () -> 
context.client.createSnapshot(invalidSnapshotId2, 0));
+        OffsetAndEpoch invalidSnapshotId2 = new 
OffsetAndEpoch(context.client.highWatermark().getAsLong() + newRecords.size(), 
currentEpoch);
+        exception = assertThrows(
+            IllegalArgumentException.class,
+            () -> context.client.createSnapshot(invalidSnapshotId2, 0)
+        );
+        assertEquals(
+            "Cannot create a snapshot with an id (OffsetAndEpoch(offset=7, 
epoch=3)) greater than the high-watermark (4)",
+            exception.getMessage()
+        );
 
         // 2 the quorum epoch must larger than or equal to the snapshotId's 
epoch
         OffsetAndEpoch invalidSnapshotId3 = new 
OffsetAndEpoch(context.client.highWatermark().getAsLong(), currentEpoch + 1);
-        assertThrows(IllegalArgumentException.class, () -> 
context.client.createSnapshot(invalidSnapshotId3, 0));
+        exception = assertThrows(
+            IllegalArgumentException.class,
+            () -> context.client.createSnapshot(invalidSnapshotId3, 0)
+        );
+        assertEquals(
+            "Snapshot id (OffsetAndEpoch(offset=4, epoch=4)) is not valid 
according to the log: ValidOffsetAndEpoch(kind=DIVERGING, 
offsetAndEpoch=OffsetAndEpoch(offset=7, epoch=3))",
+            exception.getMessage()
+        );
 
         // 3 the snapshotId should be validated against endOffsetForEpoch
         OffsetAndEpoch endOffsetForEpoch = 
context.log.endOffsetForEpoch(epoch);
         assertEquals(epoch, endOffsetForEpoch.epoch());
-        OffsetAndEpoch invalidSnapshotId4 = new 
OffsetAndEpoch(endOffsetForEpoch.offset() + 2, epoch);
-        assertThrows(IllegalArgumentException.class, () -> 
context.client.createSnapshot(invalidSnapshotId4, 0));
+        OffsetAndEpoch invalidSnapshotId4 = new 
OffsetAndEpoch(endOffsetForEpoch.offset() + 1, epoch);
+        exception = assertThrows(
+            IllegalArgumentException.class,
+            () -> context.client.createSnapshot(invalidSnapshotId4, 0)
+        );
+        assertEquals(
+            "Snapshot id (OffsetAndEpoch(offset=4, epoch=2)) is not valid 
according to the log: ValidOffsetAndEpoch(kind=DIVERGING, 
offsetAndEpoch=OffsetAndEpoch(offset=3, epoch=2))",
+            exception.getMessage()
+        );
+
+        // 4 snapshotId offset must be at a batch boundary
+        context.advanceLocalLeaderHighWatermarkToLogEndOffset();
+        OffsetAndEpoch invalidSnapshotId5 = new 
OffsetAndEpoch(context.client.highWatermark().getAsLong() - 1, currentEpoch);
+        // this points to the "f" offset, which is not batch aligned
+        exception = assertThrows(
+                IllegalArgumentException.class,
+                () -> context.client.createSnapshot(invalidSnapshotId5, 0)
+        );
+        assertEquals(
+                "Cannot create snapshot at offset (6) because it is not batch 
aligned. The batch containing the requested offset has a base offset of (4)",
+                exception.getMessage()
+        );
     }
 
     @ParameterizedTest
@@ -1951,6 +1992,7 @@ public final class KafkaRaftClientSnapshotTest {
         Set<Integer> voters = Set.of(localId, leaderId, otherFollowerId);
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .appendToLog(1, List.of("a"))
             .withElectedLeader(epoch, leaderId)
             .withKip853Rpc(withKip853Rpc)
             .build();
@@ -1959,18 +2001,25 @@ public final class KafkaRaftClientSnapshotTest {
         // When follower creating snapshot:
         // 1) The high watermark cannot be empty
         assertEquals(OptionalLong.empty(), context.client.highWatermark());
-        OffsetAndEpoch invalidSnapshotId1 = new OffsetAndEpoch(1, 0);
-        assertThrows(IllegalArgumentException.class, () -> 
context.client.createSnapshot(invalidSnapshotId1, 0));
+        OffsetAndEpoch invalidSnapshotId1 = new OffsetAndEpoch(1, 1);
+        IllegalArgumentException exception = assertThrows(
+            IllegalArgumentException.class,
+            () -> context.client.createSnapshot(invalidSnapshotId1, 0)
+        );
+        assertEquals(
+            "Cannot create a snapshot with an id (OffsetAndEpoch(offset=1, 
epoch=1)) greater than the high-watermark (0)",
+            exception.getMessage()
+        );
 
         // Poll for our first fetch request
         context.pollUntilRequest();
         RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
         assertTrue(voters.contains(fetchRequest.destination().id()));
-        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+        context.assertFetchRequestData(fetchRequest, epoch, 1L, 1);
 
         // The response does not advance the high watermark
-        List<String> records1 = Arrays.asList("a", "b", "c");
-        MemoryRecords batch1 = context.buildBatch(0L, 3, records1);
+        List<String> records1 = Arrays.asList("b", "c");
+        MemoryRecords batch1 = context.buildBatch(1L, 3, records1);
         context.deliverResponse(
             fetchRequest.correlationId(),
             fetchRequest.destination(),
@@ -1981,11 +2030,14 @@ public final class KafkaRaftClientSnapshotTest {
         // 2) The high watermark must be larger than or equal to the 
snapshotId's endOffset
         int currentEpoch = context.currentEpoch();
         OffsetAndEpoch invalidSnapshotId2 = new 
OffsetAndEpoch(context.client.highWatermark().getAsLong() + 1, currentEpoch);
-        assertThrows(IllegalArgumentException.class, () -> 
context.client.createSnapshot(invalidSnapshotId2, 0));
-
-        // 3) The quorum epoch must be larger than or equal to the 
snapshotId's epoch
-        OffsetAndEpoch invalidSnapshotId3 = new 
OffsetAndEpoch(context.client.highWatermark().getAsLong() + 1, currentEpoch + 
1);
-        assertThrows(IllegalArgumentException.class, () -> 
context.client.createSnapshot(invalidSnapshotId3, 0));
+        exception = assertThrows(
+            IllegalArgumentException.class,
+            () -> context.client.createSnapshot(invalidSnapshotId2, 0)
+        );
+        assertEquals(
+            "Cannot create a snapshot with an id (OffsetAndEpoch(offset=1, 
epoch=5)) greater than the high-watermark (0)",
+            exception.getMessage()
+        );
 
         // The high watermark advances to be larger than 
log.endOffsetForEpoch(3), to test the case 3
         context.pollUntilRequest();
@@ -1994,7 +2046,8 @@ public final class KafkaRaftClientSnapshotTest {
         context.assertFetchRequestData(fetchRequest, epoch, 3L, 3);
 
         List<String> records2 = Arrays.asList("d", "e", "f");
-        MemoryRecords batch2 = context.buildBatch(3L, 4, records2);
+        int batch2Epoch = 4;
+        MemoryRecords batch2 = context.buildBatch(3L, batch2Epoch, records2);
         context.deliverResponse(
             fetchRequest.correlationId(),
             fetchRequest.destination(),
@@ -2003,11 +2056,44 @@ public final class KafkaRaftClientSnapshotTest {
         context.client.poll();
         assertEquals(6L, context.client.highWatermark().getAsLong());
 
+        // 3) The quorum epoch must be larger than or equal to the 
snapshotId's epoch
+        OffsetAndEpoch invalidSnapshotId3 = new 
OffsetAndEpoch(context.client.highWatermark().getAsLong(), currentEpoch + 1);
+        exception = assertThrows(
+            IllegalArgumentException.class,
+            () -> context.client.createSnapshot(invalidSnapshotId3, 0)
+        );
+        assertEquals(
+            "Snapshot id (OffsetAndEpoch(offset=6, epoch=6)) is not valid 
according to the log: ValidOffsetAndEpoch(kind=DIVERGING, 
offsetAndEpoch=OffsetAndEpoch(offset=6, epoch=4))",
+            exception.getMessage()
+        );
+
         // 4) The snapshotId should be validated against endOffsetForEpoch
         OffsetAndEpoch endOffsetForEpoch = context.log.endOffsetForEpoch(3);
         assertEquals(3, endOffsetForEpoch.epoch());
-        OffsetAndEpoch invalidSnapshotId4 = new 
OffsetAndEpoch(endOffsetForEpoch.offset() + 1, epoch);
-        assertThrows(IllegalArgumentException.class, () -> 
context.client.createSnapshot(invalidSnapshotId4, 0));
+        OffsetAndEpoch invalidSnapshotId4 = new 
OffsetAndEpoch(endOffsetForEpoch.offset() + 3, 3);
+        exception = assertThrows(
+            IllegalArgumentException.class,
+            () -> context.client.createSnapshot(invalidSnapshotId4, 0)
+        );
+        assertEquals(
+            "Snapshot id (OffsetAndEpoch(offset=6, epoch=3)) is not valid 
according to the log: ValidOffsetAndEpoch(kind=DIVERGING, 
offsetAndEpoch=OffsetAndEpoch(offset=3, epoch=3))",
+            exception.getMessage()
+        );
+
+        // 5) The snapshotId should be batch-aligned
+        endOffsetForEpoch = context.log.endOffsetForEpoch(batch2Epoch);
+        assertEquals(4, endOffsetForEpoch.epoch());
+        assertEquals(6, endOffsetForEpoch.offset());
+        OffsetAndEpoch invalidSnapshotId5 = new 
OffsetAndEpoch(endOffsetForEpoch.offset() - 1, batch2Epoch);
+        // this points to the "f" offset, which is not batch aligned
+        exception = assertThrows(
+            IllegalArgumentException.class,
+            () -> context.client.createSnapshot(invalidSnapshotId5, 0)
+        );
+        assertEquals(
+            "Cannot create snapshot at offset (5) because it is not batch 
aligned. The batch containing the requested offset has a base offset of (3)",
+            exception.getMessage()
+        );
     }
 
     private static ReplicaKey replicaKey(int id, boolean withDirectoryId) {
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLog.java 
b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
index 4695d18f72f..a7a8e89a88c 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLog.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
@@ -490,6 +490,18 @@ public class MockLog implements ReplicatedLog {
             );
         }
 
+        long baseOffset = read(snapshotId.offset(), 
Isolation.COMMITTED).startOffsetMetadata.offset();
+        if (snapshotId.offset() != baseOffset) {
+            throw new IllegalArgumentException(
+                String.format(
+                    "Cannot create snapshot at offset (%s) because it is not 
batch aligned. " +
+                    "The batch containing the requested offset has a base 
offset of (%s)",
+                    snapshotId.offset(),
+                    baseOffset
+                )
+            );
+        }
+
         return createNewSnapshotUnchecked(snapshotId);
     }
 
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java 
b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
index 08e19866d9b..8306e103258 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
@@ -450,13 +450,23 @@ public class MockLogTest {
 
         // Test snapshot id for the first epoch
         log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords, 
firstEpoch)).get().close();
-        log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords - 1, 
firstEpoch)).get().close();
-        log.createNewSnapshot(new OffsetAndEpoch(1, firstEpoch)).get().close();
 
         // Test snapshot id for the second epoch
         log.createNewSnapshot(new OffsetAndEpoch(2 * numberOfRecords, 
secondEpoch)).get().close();
-        log.createNewSnapshot(new OffsetAndEpoch(2 * numberOfRecords - 1, 
secondEpoch)).get().close();
-        log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords + 1, 
secondEpoch)).get().close();
+    }
+
+    @Test
+    public void testCreateSnapshotInMiddleOfBatch() {
+        int numberOfRecords = 10;
+        int epoch = 1;
+
+        appendBatch(numberOfRecords, epoch);
+        log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords));
+
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> log.createNewSnapshot(new OffsetAndEpoch(numberOfRecords - 
1, epoch))
+        );
     }
 
     @Test

Reply via email to