This is an automated email from the ASF dual-hosted git repository.

mittal pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.2 by this push:
     new 5112b77a656 KAFKA-19953: Bug releasing acquired records when 
maintained per offset (#21050)
5112b77a656 is described below

commit 5112b77a656c26197eedcf8ea836d4f0a33db1e7
Author: Abhinav Dixit <[email protected]>
AuthorDate: Tue Dec 2 21:12:46 2025 +0530

    KAFKA-19953: Bug releasing acquired records when maintained per offset 
(#21050)
    
    ### About
    When we have `releaseAcquiredRecords` call on SharePartition and the
    batch is being maintained per offset, if we have non matching member id
    while releasing acquired records, we should skip only that offset and
    continue with the next offset in the batch, not skip all the offsets
    after seeing the problematic offset
    
    ### Testing
    Added unit tests to verify the new functionality
    
    Reviewers: Andrew Schofield <[email protected]>, Apoorv Mittal
     <[email protected]>
---
 .../java/kafka/server/share/SharePartition.java    |  2 +-
 .../kafka/server/share/SharePartitionTest.java     | 84 ++++++++++++++++++++++
 2 files changed, 85 insertions(+), 1 deletion(-)

diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index f5ad66d4761..23a85611719 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -1089,7 +1089,7 @@ public class SharePartition {
             if (!offsetState.getValue().memberId().equals(memberId) && 
!offsetState.getValue().memberId().equals(EMPTY_MEMBER_ID)) {
                 log.debug("Member {} is not the owner of offset: {} in batch: 
{} for the share"
                         + " partition: {}-{}. Skipping offset.", memberId, 
offsetState.getKey(), inFlightBatch, groupId, topicIdPartition);
-                return Optional.empty();
+                continue;
             }
             if (offsetState.getValue().state() == RecordState.ACQUIRED) {
                 // These records were fetched but they were not actually 
delivered to the client.
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index d757e1c2eb7..b8e2d1f81ad 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -11832,6 +11832,90 @@ public class SharePartitionTest {
         assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
     }
 
+    @Test
+    public void testReleaseAcquiredRecordsPerOffsetWithDifferentMemberId() {
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
+        // Acquire offsets 5-14 with member-1.
+        fetchAcquiredRecords(sharePartition.acquire(MEMBER_ID, 
ShareAcquireMode.BATCH_OPTIMIZED, BATCH_SIZE,
+            MAX_FETCH_RECORDS, 21, fetchPartitionData(memoryRecords(5, 10)), 
FETCH_ISOLATION_HWM), 10);
+
+        // Acknowledge 5-6 offsets with ACCEPT and 10-12 with RELEASE
+        sharePartition.acknowledge(MEMBER_ID,
+            List.of(new ShareAcknowledgementBatch(5, 6, 
List.of(AcknowledgeType.ACCEPT.id))));
+        sharePartition.acknowledge(MEMBER_ID,
+            List.of(new ShareAcknowledgementBatch(10, 12, 
List.of(AcknowledgeType.RELEASE.id))));
+
+        assertEquals(1, sharePartition.cachedState().size());
+        assertNotNull(sharePartition.cachedState().get(5L).offsetState());
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(5L).offsetState().get(5L).state());
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(5L).offsetState().get(6L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).offsetState().get(7L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).offsetState().get(8L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).offsetState().get(9L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(5L).offsetState().get(10L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(5L).offsetState().get(11L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(5L).offsetState().get(12L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).offsetState().get(13L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).offsetState().get(14L).state());
+
+        // Acquire offsets 10-12 with member-2.
+        fetchAcquiredRecords(sharePartition.acquire("member-2", 
ShareAcquireMode.BATCH_OPTIMIZED, BATCH_SIZE,
+            MAX_FETCH_RECORDS, 21, fetchPartitionData(memoryRecords(10, 3)), 
FETCH_ISOLATION_HWM), 3);
+
+        assertEquals(1, sharePartition.cachedState().size());
+        assertNotNull(sharePartition.cachedState().get(5L).offsetState());
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(5L).offsetState().get(5L).state());
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(5L).offsetState().get(6L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).offsetState().get(7L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).offsetState().get(8L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).offsetState().get(9L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).offsetState().get(10L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).offsetState().get(11L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).offsetState().get(12L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).offsetState().get(13L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).offsetState().get(14L).state());
+
+        assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(5L).offsetState().get(5L).memberId());
+        assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(5L).offsetState().get(6L).memberId());
+        assertEquals(MEMBER_ID, 
sharePartition.cachedState().get(5L).offsetState().get(7L).memberId());
+        assertEquals(MEMBER_ID, 
sharePartition.cachedState().get(5L).offsetState().get(8L).memberId());
+        assertEquals(MEMBER_ID, 
sharePartition.cachedState().get(5L).offsetState().get(9L).memberId());
+        assertEquals("member-2", 
sharePartition.cachedState().get(5L).offsetState().get(10L).memberId());
+        assertEquals("member-2", 
sharePartition.cachedState().get(5L).offsetState().get(11L).memberId());
+        assertEquals("member-2", 
sharePartition.cachedState().get(5L).offsetState().get(12L).memberId());
+        assertEquals(MEMBER_ID, 
sharePartition.cachedState().get(5L).offsetState().get(13L).memberId());
+        assertEquals(MEMBER_ID, 
sharePartition.cachedState().get(5L).offsetState().get(14L).memberId());
+
+        // Release acquired records for member-1
+        CompletableFuture<Void> releaseResult = 
sharePartition.releaseAcquiredRecords(MEMBER_ID);
+        assertNull(releaseResult.join());
+        assertFalse(releaseResult.isCompletedExceptionally());
+        assertEquals(7, sharePartition.nextFetchOffset());
+        assertEquals(1, sharePartition.cachedState().size());
+        assertNotNull(sharePartition.cachedState().get(5L).offsetState());
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(5L).offsetState().get(5L).state());
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(5L).offsetState().get(6L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(5L).offsetState().get(7L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(5L).offsetState().get(8L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(5L).offsetState().get(9L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).offsetState().get(10L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).offsetState().get(11L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).offsetState().get(12L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(5L).offsetState().get(13L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(5L).offsetState().get(14L).state());
+
+        assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(5L).offsetState().get(5L).memberId());
+        assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(5L).offsetState().get(6L).memberId());
+        assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(5L).offsetState().get(7L).memberId());
+        assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(5L).offsetState().get(8L).memberId());
+        assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(5L).offsetState().get(9L).memberId());
+        assertEquals("member-2", 
sharePartition.cachedState().get(5L).offsetState().get(10L).memberId());
+        assertEquals("member-2", 
sharePartition.cachedState().get(5L).offsetState().get(11L).memberId());
+        assertEquals("member-2", 
sharePartition.cachedState().get(5L).offsetState().get(12L).memberId());
+        assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(5L).offsetState().get(13L).memberId());
+        assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(5L).offsetState().get(14L).memberId());
+    }
+
     /**
      * This function produces transactional data of a given no. of records 
followed by a transactional marker (COMMIT/ABORT).
      */

Reply via email to