This is an automated email from the ASF dual-hosted git repository.

mittal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dfa28c463af KAFKA-19953: Bug releasing acquired records when 
maintained per offset (#21050)
dfa28c463af is described below

commit dfa28c463afbaae3bd5ec4336d0f205879ae24b3
Author: Abhinav Dixit <[email protected]>
AuthorDate: Tue Dec 2 21:12:46 2025 +0530

    KAFKA-19953: Bug releasing acquired records when maintained per offset 
(#21050)
    
    ### About
    When we have `releaseAcquiredRecords` call on SharePartition and the
    batch is being maintained per offset, if we have non matching member id
    while releasing acquired records, we should skip only that offset and
    continue with the next offset in the batch, not skip all the offsets
    after seeing the problematic offset
    
    ### Testing
    Added unit tests to verify the new functionality
    
    Reviewers: Andrew Schofield <[email protected]>, Apoorv Mittal
     <[email protected]>
---
 .../java/kafka/server/share/SharePartition.java    |  2 +-
 .../kafka/server/share/SharePartitionTest.java     | 84 ++++++++++++++++++++++
 2 files changed, 85 insertions(+), 1 deletion(-)

diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index f5ad66d4761..23a85611719 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -1089,7 +1089,7 @@ public class SharePartition {
             if (!offsetState.getValue().memberId().equals(memberId) && 
!offsetState.getValue().memberId().equals(EMPTY_MEMBER_ID)) {
                 log.debug("Member {} is not the owner of offset: {} in batch: 
{} for the share"
                         + " partition: {}-{}. Skipping offset.", memberId, 
offsetState.getKey(), inFlightBatch, groupId, topicIdPartition);
-                return Optional.empty();
+                continue;
             }
             if (offsetState.getValue().state() == RecordState.ACQUIRED) {
                 // These records were fetched but they were not actually 
delivered to the client.
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index d757e1c2eb7..b8e2d1f81ad 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -11832,6 +11832,90 @@ public class SharePartitionTest {
         assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
     }
 
+    @Test
+    public void testReleaseAcquiredRecordsPerOffsetWithDifferentMemberId() {
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
+        // Acquire offsets 5-14 with member-1.
+        fetchAcquiredRecords(sharePartition.acquire(MEMBER_ID, 
ShareAcquireMode.BATCH_OPTIMIZED, BATCH_SIZE,
+            MAX_FETCH_RECORDS, 21, fetchPartitionData(memoryRecords(5, 10)), 
FETCH_ISOLATION_HWM), 10);
+
+        // Acknowledge 5-6 offsets with ACCEPT and 10-12 with RELEASE
+        sharePartition.acknowledge(MEMBER_ID,
+            List.of(new ShareAcknowledgementBatch(5, 6, 
List.of(AcknowledgeType.ACCEPT.id))));
+        sharePartition.acknowledge(MEMBER_ID,
+            List.of(new ShareAcknowledgementBatch(10, 12, 
List.of(AcknowledgeType.RELEASE.id))));
+
+        assertEquals(1, sharePartition.cachedState().size());
+        assertNotNull(sharePartition.cachedState().get(5L).offsetState());
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(5L).offsetState().get(5L).state());
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(5L).offsetState().get(6L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).offsetState().get(7L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).offsetState().get(8L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).offsetState().get(9L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(5L).offsetState().get(10L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(5L).offsetState().get(11L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(5L).offsetState().get(12L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).offsetState().get(13L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).offsetState().get(14L).state());
+
+        // Acquire offsets 10-12 with member-2.
+        fetchAcquiredRecords(sharePartition.acquire("member-2", 
ShareAcquireMode.BATCH_OPTIMIZED, BATCH_SIZE,
+            MAX_FETCH_RECORDS, 21, fetchPartitionData(memoryRecords(10, 3)), 
FETCH_ISOLATION_HWM), 3);
+
+        assertEquals(1, sharePartition.cachedState().size());
+        assertNotNull(sharePartition.cachedState().get(5L).offsetState());
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(5L).offsetState().get(5L).state());
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(5L).offsetState().get(6L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).offsetState().get(7L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).offsetState().get(8L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).offsetState().get(9L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).offsetState().get(10L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).offsetState().get(11L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).offsetState().get(12L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).offsetState().get(13L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).offsetState().get(14L).state());
+
+        assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(5L).offsetState().get(5L).memberId());
+        assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(5L).offsetState().get(6L).memberId());
+        assertEquals(MEMBER_ID, 
sharePartition.cachedState().get(5L).offsetState().get(7L).memberId());
+        assertEquals(MEMBER_ID, 
sharePartition.cachedState().get(5L).offsetState().get(8L).memberId());
+        assertEquals(MEMBER_ID, 
sharePartition.cachedState().get(5L).offsetState().get(9L).memberId());
+        assertEquals("member-2", 
sharePartition.cachedState().get(5L).offsetState().get(10L).memberId());
+        assertEquals("member-2", 
sharePartition.cachedState().get(5L).offsetState().get(11L).memberId());
+        assertEquals("member-2", 
sharePartition.cachedState().get(5L).offsetState().get(12L).memberId());
+        assertEquals(MEMBER_ID, 
sharePartition.cachedState().get(5L).offsetState().get(13L).memberId());
+        assertEquals(MEMBER_ID, 
sharePartition.cachedState().get(5L).offsetState().get(14L).memberId());
+
+        // Release acquired records for member-1
+        CompletableFuture<Void> releaseResult = 
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        assertNull(releaseResult.join());
+        assertFalse(releaseResult.isCompletedExceptionally());
+        assertEquals(7, sharePartition.nextFetchOffset());
+        assertEquals(1, sharePartition.cachedState().size());
+        assertNotNull(sharePartition.cachedState().get(5L).offsetState());
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(5L).offsetState().get(5L).state());
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(5L).offsetState().get(6L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(5L).offsetState().get(7L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(5L).offsetState().get(8L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(5L).offsetState().get(9L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).offsetState().get(10L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).offsetState().get(11L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).offsetState().get(12L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(5L).offsetState().get(13L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(5L).offsetState().get(14L).state());
+
+        assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(5L).offsetState().get(5L).memberId());
+        assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(5L).offsetState().get(6L).memberId());
+        assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(5L).offsetState().get(7L).memberId());
+        assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(5L).offsetState().get(8L).memberId());
+        assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(5L).offsetState().get(9L).memberId());
+        assertEquals("member-2", 
sharePartition.cachedState().get(5L).offsetState().get(10L).memberId());
+        assertEquals("member-2", 
sharePartition.cachedState().get(5L).offsetState().get(11L).memberId());
+        assertEquals("member-2", 
sharePartition.cachedState().get(5L).offsetState().get(12L).memberId());
+        assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(5L).offsetState().get(13L).memberId());
+        assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(5L).offsetState().get(14L).memberId());
+    }
+
     /**
      * This function produces transactional data of a given no. of records 
followed by a transactional marker (COMMIT/ABORT).
      */

Reply via email to