This is an automated email from the ASF dual-hosted git repository.
mittal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new dfa28c463af KAFKA-19953: Bug releasing acquired records when
maintained per offset (#21050)
dfa28c463af is described below
commit dfa28c463afbaae3bd5ec4336d0f205879ae24b3
Author: Abhinav Dixit <[email protected]>
AuthorDate: Tue Dec 2 21:12:46 2025 +0530
KAFKA-19953: Bug releasing acquired records when maintained per offset
(#21050)
### About
When we have `releaseAcquiredRecords` call on SharePartition and the
batch is being maintained per offset, if we have non matching member id
while releasing acquired records, we should skip only that offset and
continue with the next offset in the batch, not skip all the offsets
after seeing the problematic offset
### Testing
Added unit tests to verify the new functionality
Reviewers: Andrew Schofield <[email protected]>, Apoorv Mittal
<[email protected]>
---
.../java/kafka/server/share/SharePartition.java | 2 +-
.../kafka/server/share/SharePartitionTest.java | 84 ++++++++++++++++++++++
2 files changed, 85 insertions(+), 1 deletion(-)
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index f5ad66d4761..23a85611719 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -1089,7 +1089,7 @@ public class SharePartition {
if (!offsetState.getValue().memberId().equals(memberId) &&
!offsetState.getValue().memberId().equals(EMPTY_MEMBER_ID)) {
log.debug("Member {} is not the owner of offset: {} in batch:
{} for the share"
+ " partition: {}-{}. Skipping offset.", memberId,
offsetState.getKey(), inFlightBatch, groupId, topicIdPartition);
- return Optional.empty();
+ continue;
}
if (offsetState.getValue().state() == RecordState.ACQUIRED) {
// These records were fetched but they were not actually
delivered to the client.
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index d757e1c2eb7..b8e2d1f81ad 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -11832,6 +11832,90 @@ public class SharePartitionTest {
assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
}
+ @Test
+ public void testReleaseAcquiredRecordsPerOffsetWithDifferentMemberId() {
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
+ // Acquire offsets 5-14 with member-1.
+ fetchAcquiredRecords(sharePartition.acquire(MEMBER_ID,
ShareAcquireMode.BATCH_OPTIMIZED, BATCH_SIZE,
+ MAX_FETCH_RECORDS, 21, fetchPartitionData(memoryRecords(5, 10)),
FETCH_ISOLATION_HWM), 10);
+
+ // Acknowledge 5-6 offsets with ACCEPT and 10-12 with RELEASE
+ sharePartition.acknowledge(MEMBER_ID,
+ List.of(new ShareAcknowledgementBatch(5, 6,
List.of(AcknowledgeType.ACCEPT.id))));
+ sharePartition.acknowledge(MEMBER_ID,
+ List.of(new ShareAcknowledgementBatch(10, 12,
List.of(AcknowledgeType.RELEASE.id))));
+
+ assertEquals(1, sharePartition.cachedState().size());
+ assertNotNull(sharePartition.cachedState().get(5L).offsetState());
+ assertEquals(RecordState.ACKNOWLEDGED,
sharePartition.cachedState().get(5L).offsetState().get(5L).state());
+ assertEquals(RecordState.ACKNOWLEDGED,
sharePartition.cachedState().get(5L).offsetState().get(6L).state());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(5L).offsetState().get(7L).state());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(5L).offsetState().get(8L).state());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(5L).offsetState().get(9L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(5L).offsetState().get(10L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(5L).offsetState().get(11L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(5L).offsetState().get(12L).state());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(5L).offsetState().get(13L).state());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(5L).offsetState().get(14L).state());
+
+ // Acquire offsets 10-12 with member-2.
+ fetchAcquiredRecords(sharePartition.acquire("member-2",
ShareAcquireMode.BATCH_OPTIMIZED, BATCH_SIZE,
+ MAX_FETCH_RECORDS, 21, fetchPartitionData(memoryRecords(10, 3)),
FETCH_ISOLATION_HWM), 3);
+
+ assertEquals(1, sharePartition.cachedState().size());
+ assertNotNull(sharePartition.cachedState().get(5L).offsetState());
+ assertEquals(RecordState.ACKNOWLEDGED,
sharePartition.cachedState().get(5L).offsetState().get(5L).state());
+ assertEquals(RecordState.ACKNOWLEDGED,
sharePartition.cachedState().get(5L).offsetState().get(6L).state());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(5L).offsetState().get(7L).state());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(5L).offsetState().get(8L).state());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(5L).offsetState().get(9L).state());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(5L).offsetState().get(10L).state());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(5L).offsetState().get(11L).state());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(5L).offsetState().get(12L).state());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(5L).offsetState().get(13L).state());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(5L).offsetState().get(14L).state());
+
+ assertEquals(EMPTY_MEMBER_ID,
sharePartition.cachedState().get(5L).offsetState().get(5L).memberId());
+ assertEquals(EMPTY_MEMBER_ID,
sharePartition.cachedState().get(5L).offsetState().get(6L).memberId());
+ assertEquals(MEMBER_ID,
sharePartition.cachedState().get(5L).offsetState().get(7L).memberId());
+ assertEquals(MEMBER_ID,
sharePartition.cachedState().get(5L).offsetState().get(8L).memberId());
+ assertEquals(MEMBER_ID,
sharePartition.cachedState().get(5L).offsetState().get(9L).memberId());
+ assertEquals("member-2",
sharePartition.cachedState().get(5L).offsetState().get(10L).memberId());
+ assertEquals("member-2",
sharePartition.cachedState().get(5L).offsetState().get(11L).memberId());
+ assertEquals("member-2",
sharePartition.cachedState().get(5L).offsetState().get(12L).memberId());
+ assertEquals(MEMBER_ID,
sharePartition.cachedState().get(5L).offsetState().get(13L).memberId());
+ assertEquals(MEMBER_ID,
sharePartition.cachedState().get(5L).offsetState().get(14L).memberId());
+
+ // Release acquired records for member-1
+ CompletableFuture<Void> releaseResult =
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+ assertNull(releaseResult.join());
+ assertFalse(releaseResult.isCompletedExceptionally());
+ assertEquals(7, sharePartition.nextFetchOffset());
+ assertEquals(1, sharePartition.cachedState().size());
+ assertNotNull(sharePartition.cachedState().get(5L).offsetState());
+ assertEquals(RecordState.ACKNOWLEDGED,
sharePartition.cachedState().get(5L).offsetState().get(5L).state());
+ assertEquals(RecordState.ACKNOWLEDGED,
sharePartition.cachedState().get(5L).offsetState().get(6L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(5L).offsetState().get(7L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(5L).offsetState().get(8L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(5L).offsetState().get(9L).state());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(5L).offsetState().get(10L).state());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(5L).offsetState().get(11L).state());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(5L).offsetState().get(12L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(5L).offsetState().get(13L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(5L).offsetState().get(14L).state());
+
+ assertEquals(EMPTY_MEMBER_ID,
sharePartition.cachedState().get(5L).offsetState().get(5L).memberId());
+ assertEquals(EMPTY_MEMBER_ID,
sharePartition.cachedState().get(5L).offsetState().get(6L).memberId());
+ assertEquals(EMPTY_MEMBER_ID,
sharePartition.cachedState().get(5L).offsetState().get(7L).memberId());
+ assertEquals(EMPTY_MEMBER_ID,
sharePartition.cachedState().get(5L).offsetState().get(8L).memberId());
+ assertEquals(EMPTY_MEMBER_ID,
sharePartition.cachedState().get(5L).offsetState().get(9L).memberId());
+ assertEquals("member-2",
sharePartition.cachedState().get(5L).offsetState().get(10L).memberId());
+ assertEquals("member-2",
sharePartition.cachedState().get(5L).offsetState().get(11L).memberId());
+ assertEquals("member-2",
sharePartition.cachedState().get(5L).offsetState().get(12L).memberId());
+ assertEquals(EMPTY_MEMBER_ID,
sharePartition.cachedState().get(5L).offsetState().get(13L).memberId());
+ assertEquals(EMPTY_MEMBER_ID,
sharePartition.cachedState().get(5L).offsetState().get(14L).memberId());
+ }
+
/**
* This function produces transactional data of a given no. of records
followed by a transactional marker (COMMIT/ABORT).
*/