This is an automated email from the ASF dual-hosted git repository.

mittal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a663ce3f457 KAFKA-18265: Move acquisition lock classes from share 
partition (1/N) (#20227)
a663ce3f457 is described below

commit a663ce3f4572056bb6c89c7640dbf1dd77b65d55
Author: Apoorv Mittal <apoorvmitta...@gmail.com>
AuthorDate: Wed Jul 23 20:21:42 2025 +0100

    KAFKA-18265: Move acquisition lock classes from share partition (1/N) 
(#20227)
    
    While working on KAFKA-19476, I realized that we need to refactor
    SharePartition for read/write lock handling. I have started some work in
    the area. For the initial PR, I have moved AcquisitionLockTimeout class
    outside of SharePartition.
    
    Reviewers: Andrew Schofield <aschofi...@confluent.io>
---
 .../java/kafka/server/share/SharePartition.java    | 117 ++++++++-------------
 .../kafka/server/share/SharePartitionTest.java     |   7 +-
 .../share/fetch/AcquisitionLockTimeoutHandler.java |  34 ++++++
 .../share/fetch/AcquisitionLockTimerTask.java      |  66 ++++++++++++
 4 files changed, 150 insertions(+), 74 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index 8a8e62b5d8d..353d66e1fd8 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -41,6 +41,8 @@ import org.apache.kafka.coordinator.group.GroupConfig;
 import org.apache.kafka.coordinator.group.GroupConfigManager;
 import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
 import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
+import org.apache.kafka.server.share.fetch.AcquisitionLockTimeoutHandler;
+import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask;
 import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
 import org.apache.kafka.server.share.fetch.DelayedShareFetchKey;
 import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
@@ -2391,59 +2393,61 @@ public class SharePartition {
         long lastOffset,
         long delayMs
     ) {
-        return new AcquisitionLockTimerTask(delayMs, memberId, firstOffset, 
lastOffset);
+        return new AcquisitionLockTimerTask(time, delayMs, memberId, 
firstOffset, lastOffset, releaseAcquisitionLockOnTimeout(), 
sharePartitionMetrics);
     }
 
-    private void releaseAcquisitionLockOnTimeout(String memberId, long 
firstOffset, long lastOffset) {
-        List<PersisterStateBatch> stateBatches;
-        lock.writeLock().lock();
-        try {
-            Map.Entry<Long, InFlightBatch> floorOffset = 
cachedState.floorEntry(firstOffset);
-            if (floorOffset == null) {
-                log.error("Base offset {} not found for share partition: 
{}-{}", firstOffset, groupId, topicIdPartition);
-                return;
-            }
-            stateBatches = new ArrayList<>();
-            NavigableMap<Long, InFlightBatch> subMap = 
cachedState.subMap(floorOffset.getKey(), true, lastOffset, true);
-            for (Map.Entry<Long, InFlightBatch> entry : subMap.entrySet()) {
-                InFlightBatch inFlightBatch = entry.getValue();
+    private AcquisitionLockTimeoutHandler releaseAcquisitionLockOnTimeout() {
+        return (memberId, firstOffset, lastOffset) -> {
+            List<PersisterStateBatch> stateBatches;
+            lock.writeLock().lock();
+            try {
+                Map.Entry<Long, InFlightBatch> floorOffset = 
cachedState.floorEntry(firstOffset);
+                if (floorOffset == null) {
+                    log.error("Base offset {} not found for share partition: 
{}-{}", firstOffset, groupId, topicIdPartition);
+                    return;
+                }
+                stateBatches = new ArrayList<>();
+                NavigableMap<Long, InFlightBatch> subMap = 
cachedState.subMap(floorOffset.getKey(), true, lastOffset, true);
+                for (Map.Entry<Long, InFlightBatch> entry : subMap.entrySet()) 
{
+                    InFlightBatch inFlightBatch = entry.getValue();
 
-                if (inFlightBatch.offsetState() == null
+                    if (inFlightBatch.offsetState() == null
                         && inFlightBatch.batchState() == RecordState.ACQUIRED
                         && 
checkForStartOffsetWithinBatch(inFlightBatch.firstOffset(), 
inFlightBatch.lastOffset())) {
 
-                    // For the case when batch.firstOffset < start offset <= 
batch.lastOffset, we will be having some
-                    // acquired records that need to move to archived state 
despite their delivery count.
-                    inFlightBatch.maybeInitializeOffsetStateUpdate();
-                }
+                        // For the case when batch.firstOffset < start offset 
<= batch.lastOffset, we will be having some
+                        // acquired records that need to move to archived 
state despite their delivery count.
+                        inFlightBatch.maybeInitializeOffsetStateUpdate();
+                    }
 
-                // Case when the state of complete batch is valid
-                if (inFlightBatch.offsetState() == null) {
-                    
releaseAcquisitionLockOnTimeoutForCompleteBatch(inFlightBatch, stateBatches, 
memberId);
-                } else { // Case when batch has a valid offset state map.
-                    
releaseAcquisitionLockOnTimeoutForPerOffsetBatch(inFlightBatch, stateBatches, 
memberId, firstOffset, lastOffset);
+                    // Case when the state of complete batch is valid
+                    if (inFlightBatch.offsetState() == null) {
+                        
releaseAcquisitionLockOnTimeoutForCompleteBatch(inFlightBatch, stateBatches, 
memberId);
+                    } else { // Case when batch has a valid offset state map.
+                        
releaseAcquisitionLockOnTimeoutForPerOffsetBatch(inFlightBatch, stateBatches, 
memberId, firstOffset, lastOffset);
+                    }
                 }
-            }
 
-            if (!stateBatches.isEmpty()) {
-                writeShareGroupState(stateBatches).whenComplete((result, 
exception) -> {
-                    if (exception != null) {
-                        log.debug("Failed to write the share group state on 
acquisition lock timeout for share partition: {}-{} memberId: {}",
-                            groupId, topicIdPartition, memberId, exception);
-                    }
-                    // Even if write share group state RPC call fails, we will 
still go ahead with the state transition.
-                    // Update the cached state and start and end offsets after 
releasing the acquisition lock on timeout.
-                    maybeUpdateCachedStateAndOffsets();
-                });
+                if (!stateBatches.isEmpty()) {
+                    writeShareGroupState(stateBatches).whenComplete((result, 
exception) -> {
+                        if (exception != null) {
+                            log.debug("Failed to write the share group state 
on acquisition lock timeout for share partition: {}-{} memberId: {}",
+                                groupId, topicIdPartition, memberId, 
exception);
+                        }
+                        // Even if write share group state RPC call fails, we 
will still go ahead with the state transition.
+                        // Update the cached state and start and end offsets 
after releasing the acquisition lock on timeout.
+                        maybeUpdateCachedStateAndOffsets();
+                    });
+                }
+            } finally {
+                lock.writeLock().unlock();
             }
-        } finally {
-            lock.writeLock().unlock();
-        }
 
-        // If we have an acquisition lock timeout for a share-partition, then 
we should check if
-        // there is a pending share fetch request for the share-partition and 
complete it.
-        // Skip null check for stateBatches, it should always be initialized 
if reached here.
-        maybeCompleteDelayedShareFetchRequest(!stateBatches.isEmpty());
+            // If we have an acquisition lock timeout for a share-partition, 
then we should check if
+            // there is a pending share fetch request for the share-partition 
and complete it.
+            // Skip null check for stateBatches, it should always be 
initialized if reached here.
+            maybeCompleteDelayedShareFetchRequest(!stateBatches.isEmpty());
+        };
     }
 
     private void releaseAcquisitionLockOnTimeoutForCompleteBatch(InFlightBatch 
inFlightBatch,
@@ -2834,35 +2838,6 @@ public class SharePartition {
         }
     }
 
-    // Visible for testing
-    final class AcquisitionLockTimerTask extends TimerTask {
-        private final long expirationMs;
-        private final String memberId;
-        private final long firstOffset;
-        private final long lastOffset;
-
-        AcquisitionLockTimerTask(long delayMs, String memberId, long 
firstOffset, long lastOffset) {
-            super(delayMs);
-            this.expirationMs = time.hiResClockMs() + delayMs;
-            this.memberId = memberId;
-            this.firstOffset = firstOffset;
-            this.lastOffset = lastOffset;
-        }
-
-        long expirationMs() {
-            return expirationMs;
-        }
-
-        /**
-         * The task is executed when the acquisition lock timeout is reached. 
The task releases the acquired records.
-         */
-        @Override
-        public void run() {
-            
sharePartitionMetrics.recordAcquisitionLockTimeoutPerSec(lastOffset - 
firstOffset + 1);
-            releaseAcquisitionLockOnTimeout(memberId, firstOffset, lastOffset);
-        }
-    }
-
     /**
      * The InFlightBatch maintains the in-memory state of the fetched records 
i.e. in-flight records.
      */
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index c52b4e257d9..5059b4c892e 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -55,6 +55,7 @@ import org.apache.kafka.coordinator.group.GroupConfig;
 import org.apache.kafka.coordinator.group.GroupConfigManager;
 import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
 import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
+import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask;
 import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
 import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
 import org.apache.kafka.server.share.metrics.SharePartitionMetrics;
@@ -5153,7 +5154,7 @@ public class SharePartitionTest {
         SharePartition sharePartition = SharePartitionBuilder.builder()
             .withGroupConfigManager(groupConfigManager).build();
 
-        SharePartition.AcquisitionLockTimerTask timerTask = 
sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L);
+        AcquisitionLockTimerTask timerTask = 
sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L);
 
         Mockito.verify(groupConfigManager, 
Mockito.times(2)).groupConfig(GROUP_ID);
         Mockito.verify(groupConfig).shareRecordLockDurationMs();
@@ -5175,13 +5176,13 @@ public class SharePartitionTest {
         SharePartition sharePartition = SharePartitionBuilder.builder()
             .withGroupConfigManager(groupConfigManager).build();
 
-        SharePartition.AcquisitionLockTimerTask timerTask1 = 
sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L);
+        AcquisitionLockTimerTask timerTask1 = 
sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L);
 
         Mockito.verify(groupConfigManager, 
Mockito.times(2)).groupConfig(GROUP_ID);
         Mockito.verify(groupConfig).shareRecordLockDurationMs();
         assertEquals(expectedDurationMs1, timerTask1.delayMs);
 
-        SharePartition.AcquisitionLockTimerTask timerTask2 = 
sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L);
+        AcquisitionLockTimerTask timerTask2 = 
sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L);
 
         Mockito.verify(groupConfigManager, 
Mockito.times(4)).groupConfig(GROUP_ID);
         Mockito.verify(groupConfig, 
Mockito.times(2)).shareRecordLockDurationMs();
diff --git 
a/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimeoutHandler.java
 
b/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimeoutHandler.java
new file mode 100644
index 00000000000..c83d7e537da
--- /dev/null
+++ 
b/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimeoutHandler.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.share.fetch;
+
+/**
+ * AcquisitionLockTimeoutHandler is an interface that defines a handler for 
acquisition lock timeouts.
+ * It is used to handle cases where the acquisition lock for a share partition 
times out.
+ */
+public interface AcquisitionLockTimeoutHandler {
+
+    /**
+     * Handles the acquisition lock timeout for a share partition.
+     *
+     * @param memberId the id of the member that requested the lock
+     * @param firstOffset the first offset
+     * @param lastOffset the last offset
+     */
+    void handle(String memberId, long firstOffset, long lastOffset);
+
+}
diff --git 
a/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimerTask.java
 
b/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimerTask.java
new file mode 100644
index 00000000000..6796d24d374
--- /dev/null
+++ 
b/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimerTask.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.share.fetch;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.share.metrics.SharePartitionMetrics;
+import org.apache.kafka.server.util.timer.TimerTask;
+
+/**
+ * AcquisitionLockTimerTask is a timer task that is executed when the 
acquisition lock timeout is reached.
+ * It releases the acquired records.
+ */
+public class AcquisitionLockTimerTask extends TimerTask {
+
+    private final long expirationMs;
+    private final String memberId;
+    private final long firstOffset;
+    private final long lastOffset;
+    private final AcquisitionLockTimeoutHandler timeoutHandler;
+    private final SharePartitionMetrics sharePartitionMetrics;
+
+    public AcquisitionLockTimerTask(
+        Time time,
+        long delayMs,
+        String memberId,
+        long firstOffset,
+        long lastOffset,
+        AcquisitionLockTimeoutHandler timeoutHandler,
+        SharePartitionMetrics sharePartitionMetrics
+    ) {
+        super(delayMs);
+        this.expirationMs = time.hiResClockMs() + delayMs;
+        this.memberId = memberId;
+        this.firstOffset = firstOffset;
+        this.lastOffset = lastOffset;
+        this.timeoutHandler = timeoutHandler;
+        this.sharePartitionMetrics = sharePartitionMetrics;
+    }
+
+    public long expirationMs() {
+        return expirationMs;
+    }
+
+    /**
+     * The task is executed when the acquisition lock timeout is reached. The 
task releases the acquired records.
+     */
+    @Override
+    public void run() {
+        sharePartitionMetrics.recordAcquisitionLockTimeoutPerSec(lastOffset - 
firstOffset + 1);
+        timeoutHandler.handle(memberId, firstOffset, lastOffset);
+    }
+}

Reply via email to