This is an automated email from the ASF dual-hosted git repository. mittal pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new ff5025a21c4 KAFKA-19695: Fix bug in redundant offset calculation. (#20516) ff5025a21c4 is described below commit ff5025a21c4e2b4d437f85aa75e1c3f7d4f09ef9 Author: Sushant Mahajan <smaha...@confluent.io> AuthorDate: Wed Sep 10 22:08:34 2025 +0530 KAFKA-19695: Fix bug in redundant offset calculation. (#20516) * The `ShareCoordinatorShard` maintains the the record offset information for `SharePartitionKey`s in the `ShareCoordinatorOffsetsManager` class. * Replay of `ShareSnapshot`s in the shards are reflected in the offsets manager including records created due to delete state. * However, if the share partition delete is due to topic delete, no record will ever be written for the same `SharePartitionKey` post the delete tombstone (as topic id will not repeat). As a result the offset manager will always consider the deleted share partition's offset as the last redundant one. * The fix is to make the offset manager aware of the tombstone records and remove them from the redundant offset calculation. * Unit tests have been updated for the same. Reviewers: Andrew Schofield <aschofi...@confluent.io>, Apoorv Mittal <apoorvmitta...@gmail.com> --- .../share/ShareCoordinatorOffsetsManager.java | 9 +- .../coordinator/share/ShareCoordinatorShard.java | 2 +- .../share/ShareCoordinatorOffsetsManagerTest.java | 96 ++++++++++++++++++---- 3 files changed, 90 insertions(+), 17 deletions(-) diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManager.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManager.java index 69070f65e93..0b3e5a5ff08 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManager.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManager.java @@ -60,13 +60,20 @@ public class ShareCoordinatorOffsetsManager { * * @param key - represents {@link SharePartitionKey} whose offset needs updating * @param offset - represents the latest partition offset for provided key + * @param isDelete - true if the offset is for a tombstone record */ - public void updateState(SharePartitionKey key, long offset) { + public void updateState(SharePartitionKey key, long offset, boolean isDelete) { lastRedundantOffset.set(Math.min(lastRedundantOffset.get(), offset)); offsets.put(key, offset); Optional<Long> redundantOffset = findRedundantOffset(); redundantOffset.ifPresent(lastRedundantOffset::set); + + // If the share partition is deleted, we should not hold onto its offset in our calculations + // as there is nothing beyond deletion which is going to update its state. + if (isDelete) { + offsets.remove(key); + } } private Optional<Long> findRedundantOffset() { diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java index 76a654de4c1..9d52780faa5 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java @@ -266,7 +266,7 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord } } - offsetsManager.updateState(mapKey, offset); + offsetsManager.updateState(mapKey, offset, value == null); } private void handleShareUpdate(ShareUpdateKey key, ShareUpdateValue value) { diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManagerTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManagerTest.java index 262f166be19..73d72bde9f0 100644 --- a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManagerTest.java +++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManagerTest.java @@ -32,6 +32,7 @@ import java.util.Optional; import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; public class ShareCoordinatorOffsetsManagerTest { @@ -48,16 +49,19 @@ public class ShareCoordinatorOffsetsManagerTest { @Test public void testUpdateStateAddsToInternalState() { - manager.updateState(KEY1, 0L); + manager.updateState(KEY1, 0L, false); assertEquals(Optional.empty(), manager.lastRedundantOffset()); - manager.updateState(KEY1, 10L); + manager.updateState(KEY1, 10L, false); assertEquals(Optional.of(10L), manager.lastRedundantOffset()); // [0-9] offsets are redundant. - manager.updateState(KEY2, 15L); + manager.updateState(KEY2, 15L, false); assertEquals(Optional.of(10L), manager.lastRedundantOffset()); // No update to last redundant after adding 15L so, still 10L. - assertEquals(10L, manager.curState().get(KEY1)); + manager.updateState(KEY1, 25L, true); + assertEquals(Optional.of(15L), manager.lastRedundantOffset()); // KEY1 deleted, no longer part of calculation. + + assertNull(manager.curState().get(KEY1)); assertEquals(15L, manager.curState().get(KEY2)); } @@ -66,15 +70,21 @@ public class ShareCoordinatorOffsetsManagerTest { final SharePartitionKey key; final long offset; final Optional<Long> expectedOffset; + final boolean isDelete; - private TestTuple(SharePartitionKey key, long offset, Optional<Long> expectedOffset) { + private TestTuple(SharePartitionKey key, long offset, Optional<Long> expectedOffset, boolean isDelete) { this.key = key; this.offset = offset; this.expectedOffset = expectedOffset; + this.isDelete = isDelete; } static TestTuple instance(SharePartitionKey key, long offset, Optional<Long> expectedOffset) { - return new TestTuple(key, offset, expectedOffset); + return new TestTuple(key, offset, expectedOffset, false); + } + + static TestTuple instance(SharePartitionKey key, long offset, Optional<Long> expectedOffset, boolean isDelete) { + return new TestTuple(key, offset, expectedOffset, isDelete); } } @@ -96,19 +106,35 @@ public class ShareCoordinatorOffsetsManagerTest { static Stream<ShareOffsetTestHolder> generateNoRedundantStateCases() { return Stream.of( new ShareOffsetTestHolder( - "no redundant state single key", + "no redundant state single key.", List.of( ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L)) ) ), new ShareOffsetTestHolder( - "no redundant state multiple keys", + "no redundant state single key with delete.", + List.of( + ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L), true) + ) + ), + + new ShareOffsetTestHolder( + "no redundant state multiple keys.", List.of( ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L)), ShareOffsetTestHolder.TestTuple.instance(KEY4, 11L, Optional.of(10L)), ShareOffsetTestHolder.TestTuple.instance(KEY2, 13L, Optional.of(10L)) ) + ), + + new ShareOffsetTestHolder( + "no redundant state multiple keys with delete.", + List.of( + ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L), true), + ShareOffsetTestHolder.TestTuple.instance(KEY4, 11L, Optional.of(11L), true), + ShareOffsetTestHolder.TestTuple.instance(KEY2, 13L, Optional.of(13L), true) + ) ) ); } @@ -116,7 +142,7 @@ public class ShareCoordinatorOffsetsManagerTest { static Stream<ShareOffsetTestHolder> generateRedundantStateCases() { return Stream.of( new ShareOffsetTestHolder( - "redundant state single key", + "redundant state single key.", List.of( ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L)), ShareOffsetTestHolder.TestTuple.instance(KEY1, 11L, Optional.of(11L)), @@ -125,7 +151,7 @@ public class ShareCoordinatorOffsetsManagerTest { ), new ShareOffsetTestHolder( - "redundant state multiple keys", + "redundant state multiple keys.", // KEY1: 10 17 // KEY2: 11 16 // KEY3: 15 @@ -136,6 +162,20 @@ public class ShareCoordinatorOffsetsManagerTest { ShareOffsetTestHolder.TestTuple.instance(KEY2, 16L, Optional.of(10L)), // KEY2 11 redundant but should not be returned ShareOffsetTestHolder.TestTuple.instance(KEY1, 17L, Optional.of(15L)) ) + ), + + new ShareOffsetTestHolder( + "redundant state multiple keys with delete.", + // KEY1: 10 17 + // KEY2: 11 16 + // KEY3: 15 + List.of( + ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY2, 11L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY3, 15L, Optional.of(10L), true), + ShareOffsetTestHolder.TestTuple.instance(KEY2, 16L, Optional.of(10L)), // KEY2 11 redundant but should not be returned + ShareOffsetTestHolder.TestTuple.instance(KEY1, 17L, Optional.of(16L)) // Because we have removed KEY3 from calculation + ) ) ); @@ -144,7 +184,7 @@ public class ShareCoordinatorOffsetsManagerTest { static Stream<ShareOffsetTestHolder> generateComplexCases() { return Stream.of( new ShareOffsetTestHolder( - "redundant state reverse key order", + "redundant state reverse key order.", // Requests come in order KEY1, KEY2, KEY3, KEY3, KEY2, KEY1. List.of( ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L)), @@ -156,6 +196,18 @@ public class ShareCoordinatorOffsetsManagerTest { ) ), + new ShareOffsetTestHolder( + "redundant state reverse key order with delete.", + List.of( + ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY2, 11L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY3, 15L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY3, 18L, Optional.of(10L), true), + ShareOffsetTestHolder.TestTuple.instance(KEY2, 20L, Optional.of(10L), true), + ShareOffsetTestHolder.TestTuple.instance(KEY1, 25L, Optional.of(25L)) // Because KEY2 and KEY3 are gone. + ) + ), + new ShareOffsetTestHolder( "redundant state infrequently written partition.", List.of( @@ -170,6 +222,20 @@ public class ShareCoordinatorOffsetsManagerTest { ShareOffsetTestHolder.TestTuple.instance(KEY3, 28L, Optional.of(10L)), ShareOffsetTestHolder.TestTuple.instance(KEY1, 30L, Optional.of(27L)) ) + ), + + new ShareOffsetTestHolder( + "redundant state infrequently written partition with delete.", + List.of( + ShareOffsetTestHolder.TestTuple.instance(KEY1, 10L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY2, 11L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY3, 15L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY2, 18L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY3, 20L, Optional.of(10L), true), //KEY3 no longer party to calculation + ShareOffsetTestHolder.TestTuple.instance(KEY2, 22L, Optional.of(10L)), + ShareOffsetTestHolder.TestTuple.instance(KEY2, 27L, Optional.of(10L), true), //KEY2 no longer party to calculation + ShareOffsetTestHolder.TestTuple.instance(KEY1, 30L, Optional.of(30L)) + ) ) ); } @@ -179,7 +245,7 @@ public class ShareCoordinatorOffsetsManagerTest { public void testUpdateStateNoRedundantState(ShareOffsetTestHolder holder) { if (holder.shouldRun) { holder.tuples.forEach(tuple -> { - manager.updateState(tuple.key, tuple.offset); + manager.updateState(tuple.key, tuple.offset, tuple.isDelete); assertEquals(tuple.expectedOffset, manager.lastRedundantOffset(), holder.testName); }); } @@ -190,7 +256,7 @@ public class ShareCoordinatorOffsetsManagerTest { public void testUpdateStateRedundantState(ShareOffsetTestHolder holder) { if (holder.shouldRun) { holder.tuples.forEach(tuple -> { - manager.updateState(tuple.key, tuple.offset); + manager.updateState(tuple.key, tuple.offset, tuple.isDelete); assertEquals(tuple.expectedOffset, manager.lastRedundantOffset(), holder.testName); }); } @@ -201,9 +267,9 @@ public class ShareCoordinatorOffsetsManagerTest { public void testUpdateStateComplexCases(ShareOffsetTestHolder holder) { if (holder.shouldRun) { holder.tuples.forEach(tuple -> { - manager.updateState(tuple.key, tuple.offset); + manager.updateState(tuple.key, tuple.offset, tuple.isDelete); assertEquals(tuple.expectedOffset, manager.lastRedundantOffset(), holder.testName); }); } } -} +} \ No newline at end of file