This is an automated email from the ASF dual-hosted git repository.

mittal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ff5025a21c4 KAFKA-19695: Fix bug in redundant offset calculation. 
(#20516)
ff5025a21c4 is described below

commit ff5025a21c4e2b4d437f85aa75e1c3f7d4f09ef9
Author: Sushant Mahajan <smaha...@confluent.io>
AuthorDate: Wed Sep 10 22:08:34 2025 +0530

    KAFKA-19695: Fix bug in redundant offset calculation. (#20516)
    
    * The `ShareCoordinatorShard` maintains the the record offset
    information for `SharePartitionKey`s in the
    `ShareCoordinatorOffsetsManager` class.
    * Replay of `ShareSnapshot`s in the shards are reflected in the offsets
    manager including records created due to delete state.
    * However, if the share partition delete is due to topic delete, no
    record will ever be written for the same  `SharePartitionKey` post the
    delete tombstone (as topic id will not repeat).
    As a result the offset manager will always consider the deleted share
    partition's offset as the last redundant one.
    * The fix is to make the offset manager aware of the tombstone records
    and remove them from the redundant offset calculation.
    * Unit tests have been updated for the same.
    
    Reviewers: Andrew Schofield <aschofi...@confluent.io>, Apoorv Mittal
     <apoorvmitta...@gmail.com>
---
 .../share/ShareCoordinatorOffsetsManager.java      |  9 +-
 .../coordinator/share/ShareCoordinatorShard.java   |  2 +-
 .../share/ShareCoordinatorOffsetsManagerTest.java  | 96 ++++++++++++++++++----
 3 files changed, 90 insertions(+), 17 deletions(-)

diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManager.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManager.java
index 69070f65e93..0b3e5a5ff08 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManager.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManager.java
@@ -60,13 +60,20 @@ public class ShareCoordinatorOffsetsManager {
      *
      * @param key    - represents {@link SharePartitionKey} whose offset needs 
updating
      * @param offset - represents the latest partition offset for provided key
+     * @param isDelete - true if the offset is for a tombstone record
      */
-    public void updateState(SharePartitionKey key, long offset) {
+    public void updateState(SharePartitionKey key, long offset, boolean 
isDelete) {
         lastRedundantOffset.set(Math.min(lastRedundantOffset.get(), offset));
         offsets.put(key, offset);
 
         Optional<Long> redundantOffset = findRedundantOffset();
         redundantOffset.ifPresent(lastRedundantOffset::set);
+
+        // If the share partition is deleted, we should not hold onto its 
offset in our calculations
+        // as there is nothing beyond deletion which is going to update its 
state.
+        if (isDelete) {
+            offsets.remove(key);
+        }
     }
 
     private Optional<Long> findRedundantOffset() {
diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
index 76a654de4c1..9d52780faa5 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
@@ -266,7 +266,7 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
             }
         }
 
-        offsetsManager.updateState(mapKey, offset);
+        offsetsManager.updateState(mapKey, offset, value == null);
     }
 
     private void handleShareUpdate(ShareUpdateKey key, ShareUpdateValue value) 
{
diff --git 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManagerTest.java
 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManagerTest.java
index 262f166be19..73d72bde9f0 100644
--- 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManagerTest.java
+++ 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManagerTest.java
@@ -32,6 +32,7 @@ import java.util.Optional;
 import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 
 public class ShareCoordinatorOffsetsManagerTest {
 
@@ -48,16 +49,19 @@ public class ShareCoordinatorOffsetsManagerTest {
 
     @Test
     public void testUpdateStateAddsToInternalState() {
-        manager.updateState(KEY1, 0L);
+        manager.updateState(KEY1, 0L, false);
         assertEquals(Optional.empty(), manager.lastRedundantOffset());
 
-        manager.updateState(KEY1, 10L);
+        manager.updateState(KEY1, 10L, false);
         assertEquals(Optional.of(10L), manager.lastRedundantOffset()); // 
[0-9] offsets are redundant.
 
-        manager.updateState(KEY2, 15L);
+        manager.updateState(KEY2, 15L, false);
         assertEquals(Optional.of(10L), manager.lastRedundantOffset());  // No 
update to last redundant after adding 15L so, still 10L.
 
-        assertEquals(10L, manager.curState().get(KEY1));
+        manager.updateState(KEY1, 25L, true);
+        assertEquals(Optional.of(15L), manager.lastRedundantOffset());  // 
KEY1 deleted, no longer part of calculation.
+
+        assertNull(manager.curState().get(KEY1));
         assertEquals(15L, manager.curState().get(KEY2));
     }
 
@@ -66,15 +70,21 @@ public class ShareCoordinatorOffsetsManagerTest {
             final SharePartitionKey key;
             final long offset;
             final Optional<Long> expectedOffset;
+            final boolean isDelete;
 
-            private TestTuple(SharePartitionKey key, long offset, 
Optional<Long> expectedOffset) {
+            private TestTuple(SharePartitionKey key, long offset, 
Optional<Long> expectedOffset, boolean isDelete) {
                 this.key = key;
                 this.offset = offset;
                 this.expectedOffset = expectedOffset;
+                this.isDelete = isDelete;
             }
 
             static TestTuple instance(SharePartitionKey key, long offset, 
Optional<Long> expectedOffset) {
-                return new TestTuple(key, offset, expectedOffset);
+                return new TestTuple(key, offset, expectedOffset, false);
+            }
+
+            static TestTuple instance(SharePartitionKey key, long offset, 
Optional<Long> expectedOffset, boolean isDelete) {
+                return new TestTuple(key, offset, expectedOffset, isDelete);
             }
         }
 
@@ -96,19 +106,35 @@ public class ShareCoordinatorOffsetsManagerTest {
     static Stream<ShareOffsetTestHolder> generateNoRedundantStateCases() {
         return Stream.of(
             new ShareOffsetTestHolder(
-                "no redundant state single key",
+                "no redundant state single key.",
                 List.of(
                     ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, 
Optional.of(10L))
                 )
             ),
 
             new ShareOffsetTestHolder(
-                "no redundant state multiple keys",
+                "no redundant state single key with delete.",
+                List.of(
+                    ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, 
Optional.of(10L), true)
+                )
+            ),
+
+            new ShareOffsetTestHolder(
+                "no redundant state multiple keys.",
                 List.of(
                     ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, 
Optional.of(10L)),
                     ShareOffsetTestHolder.TestTuple.instance(KEY4, 11L, 
Optional.of(10L)),
                     ShareOffsetTestHolder.TestTuple.instance(KEY2, 13L, 
Optional.of(10L))
                 )
+            ),
+
+            new ShareOffsetTestHolder(
+                "no redundant state multiple keys with delete.",
+                List.of(
+                    ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, 
Optional.of(10L), true),
+                    ShareOffsetTestHolder.TestTuple.instance(KEY4, 11L, 
Optional.of(11L), true),
+                    ShareOffsetTestHolder.TestTuple.instance(KEY2, 13L, 
Optional.of(13L), true)
+                )
             )
         );
     }
@@ -116,7 +142,7 @@ public class ShareCoordinatorOffsetsManagerTest {
     static Stream<ShareOffsetTestHolder> generateRedundantStateCases() {
         return Stream.of(
             new ShareOffsetTestHolder(
-                "redundant state single key",
+                "redundant state single key.",
                 List.of(
                     ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, 
Optional.of(10L)),
                     ShareOffsetTestHolder.TestTuple.instance(KEY1, 11L, 
Optional.of(11L)),
@@ -125,7 +151,7 @@ public class ShareCoordinatorOffsetsManagerTest {
             ),
 
             new ShareOffsetTestHolder(
-                "redundant state multiple keys",
+                "redundant state multiple keys.",
                 // KEY1: 10 17
                 // KEY2: 11 16
                 // KEY3: 15
@@ -136,6 +162,20 @@ public class ShareCoordinatorOffsetsManagerTest {
                     ShareOffsetTestHolder.TestTuple.instance(KEY2, 16L, 
Optional.of(10L)),  // KEY2 11 redundant but should not be returned
                     ShareOffsetTestHolder.TestTuple.instance(KEY1, 17L, 
Optional.of(15L))
                 )
+            ),
+
+            new ShareOffsetTestHolder(
+                "redundant state multiple keys with delete.",
+                // KEY1: 10 17
+                // KEY2: 11 16
+                // KEY3: 15
+                List.of(
+                    ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, 
Optional.of(10L)),
+                    ShareOffsetTestHolder.TestTuple.instance(KEY2, 11L, 
Optional.of(10L)),
+                    ShareOffsetTestHolder.TestTuple.instance(KEY3, 15L, 
Optional.of(10L), true),
+                    ShareOffsetTestHolder.TestTuple.instance(KEY2, 16L, 
Optional.of(10L)),  // KEY2 11 redundant but should not be returned
+                    ShareOffsetTestHolder.TestTuple.instance(KEY1, 17L, 
Optional.of(16L))   // Because we have removed KEY3 from calculation
+                )
             )
         );
 
@@ -144,7 +184,7 @@ public class ShareCoordinatorOffsetsManagerTest {
     static Stream<ShareOffsetTestHolder> generateComplexCases() {
         return Stream.of(
             new ShareOffsetTestHolder(
-                "redundant state reverse key order",
+                "redundant state reverse key order.",
                 // Requests come in order KEY1, KEY2, KEY3, KEY3, KEY2, KEY1.
                 List.of(
                     ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, 
Optional.of(10L)),
@@ -156,6 +196,18 @@ public class ShareCoordinatorOffsetsManagerTest {
                 )
             ),
 
+            new ShareOffsetTestHolder(
+                "redundant state reverse key order with delete.",
+                List.of(
+                    ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, 
Optional.of(10L)),
+                    ShareOffsetTestHolder.TestTuple.instance(KEY2, 11L, 
Optional.of(10L)),
+                    ShareOffsetTestHolder.TestTuple.instance(KEY3, 15L, 
Optional.of(10L)),
+                    ShareOffsetTestHolder.TestTuple.instance(KEY3, 18L, 
Optional.of(10L), true),
+                    ShareOffsetTestHolder.TestTuple.instance(KEY2, 20L, 
Optional.of(10L), true),
+                    ShareOffsetTestHolder.TestTuple.instance(KEY1, 25L, 
Optional.of(25L))   // Because KEY2 and KEY3 are gone.
+                )
+            ),
+
             new ShareOffsetTestHolder(
                 "redundant state infrequently written partition.",
                 List.of(
@@ -170,6 +222,20 @@ public class ShareCoordinatorOffsetsManagerTest {
                     ShareOffsetTestHolder.TestTuple.instance(KEY3, 28L, 
Optional.of(10L)),
                     ShareOffsetTestHolder.TestTuple.instance(KEY1, 30L, 
Optional.of(27L))
                 )
+            ),
+
+            new ShareOffsetTestHolder(
+                "redundant state infrequently written partition with delete.",
+                List.of(
+                    ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, 
Optional.of(10L)),
+                    ShareOffsetTestHolder.TestTuple.instance(KEY2, 11L, 
Optional.of(10L)),
+                    ShareOffsetTestHolder.TestTuple.instance(KEY3, 15L, 
Optional.of(10L)),
+                    ShareOffsetTestHolder.TestTuple.instance(KEY2, 18L, 
Optional.of(10L)),
+                    ShareOffsetTestHolder.TestTuple.instance(KEY3, 20L, 
Optional.of(10L), true),    //KEY3 no longer party to calculation
+                    ShareOffsetTestHolder.TestTuple.instance(KEY2, 22L, 
Optional.of(10L)),
+                    ShareOffsetTestHolder.TestTuple.instance(KEY2, 27L, 
Optional.of(10L), true),    //KEY2 no longer party to calculation
+                    ShareOffsetTestHolder.TestTuple.instance(KEY1, 30L, 
Optional.of(30L))
+                )
             )
         );
     }
@@ -179,7 +245,7 @@ public class ShareCoordinatorOffsetsManagerTest {
     public void testUpdateStateNoRedundantState(ShareOffsetTestHolder holder) {
         if (holder.shouldRun) {
             holder.tuples.forEach(tuple -> {
-                manager.updateState(tuple.key, tuple.offset);
+                manager.updateState(tuple.key, tuple.offset, tuple.isDelete);
                 assertEquals(tuple.expectedOffset, 
manager.lastRedundantOffset(), holder.testName);
             });
         }
@@ -190,7 +256,7 @@ public class ShareCoordinatorOffsetsManagerTest {
     public void testUpdateStateRedundantState(ShareOffsetTestHolder holder) {
         if (holder.shouldRun) {
             holder.tuples.forEach(tuple -> {
-                manager.updateState(tuple.key, tuple.offset);
+                manager.updateState(tuple.key, tuple.offset, tuple.isDelete);
                 assertEquals(tuple.expectedOffset, 
manager.lastRedundantOffset(), holder.testName);
             });
         }
@@ -201,9 +267,9 @@ public class ShareCoordinatorOffsetsManagerTest {
     public void testUpdateStateComplexCases(ShareOffsetTestHolder holder) {
         if (holder.shouldRun) {
             holder.tuples.forEach(tuple -> {
-                manager.updateState(tuple.key, tuple.offset);
+                manager.updateState(tuple.key, tuple.offset, tuple.isDelete);
                 assertEquals(tuple.expectedOffset, 
manager.lastRedundantOffset(), holder.testName);
             });
         }
     }
-}
+}
\ No newline at end of file

Reply via email to