This is an automated email from the ASF dual-hosted git repository. mittal pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new a663ce3f457 KAFKA-18265: Move acquisition lock classes from share partition (1/N) (#20227) a663ce3f457 is described below commit a663ce3f4572056bb6c89c7640dbf1dd77b65d55 Author: Apoorv Mittal <apoorvmitta...@gmail.com> AuthorDate: Wed Jul 23 20:21:42 2025 +0100 KAFKA-18265: Move acquisition lock classes from share partition (1/N) (#20227) While working on KAFKA-19476, I realized that we need to refactor SharePartition for read/write lock handling. I have started some work in the area. For the initial PR, I have moved AcquisitionLockTimeout class outside of SharePartition. Reviewers: Andrew Schofield <aschofi...@confluent.io> --- .../java/kafka/server/share/SharePartition.java | 117 ++++++++------------- .../kafka/server/share/SharePartitionTest.java | 7 +- .../share/fetch/AcquisitionLockTimeoutHandler.java | 34 ++++++ .../share/fetch/AcquisitionLockTimerTask.java | 66 ++++++++++++ 4 files changed, 150 insertions(+), 74 deletions(-) diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index 8a8e62b5d8d..353d66e1fd8 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -41,6 +41,8 @@ import org.apache.kafka.coordinator.group.GroupConfig; import org.apache.kafka.coordinator.group.GroupConfigManager; import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy; import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch; +import org.apache.kafka.server.share.fetch.AcquisitionLockTimeoutHandler; +import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask; import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey; import org.apache.kafka.server.share.fetch.DelayedShareFetchKey; import org.apache.kafka.server.share.fetch.ShareAcquiredRecords; @@ -2391,59 +2393,61 @@ public class SharePartition { long lastOffset, long delayMs ) { - return new AcquisitionLockTimerTask(delayMs, memberId, firstOffset, lastOffset); + return new AcquisitionLockTimerTask(time, delayMs, memberId, firstOffset, lastOffset, releaseAcquisitionLockOnTimeout(), sharePartitionMetrics); } - private void releaseAcquisitionLockOnTimeout(String memberId, long firstOffset, long lastOffset) { - List<PersisterStateBatch> stateBatches; - lock.writeLock().lock(); - try { - Map.Entry<Long, InFlightBatch> floorOffset = cachedState.floorEntry(firstOffset); - if (floorOffset == null) { - log.error("Base offset {} not found for share partition: {}-{}", firstOffset, groupId, topicIdPartition); - return; - } - stateBatches = new ArrayList<>(); - NavigableMap<Long, InFlightBatch> subMap = cachedState.subMap(floorOffset.getKey(), true, lastOffset, true); - for (Map.Entry<Long, InFlightBatch> entry : subMap.entrySet()) { - InFlightBatch inFlightBatch = entry.getValue(); + private AcquisitionLockTimeoutHandler releaseAcquisitionLockOnTimeout() { + return (memberId, firstOffset, lastOffset) -> { + List<PersisterStateBatch> stateBatches; + lock.writeLock().lock(); + try { + Map.Entry<Long, InFlightBatch> floorOffset = cachedState.floorEntry(firstOffset); + if (floorOffset == null) { + log.error("Base offset {} not found for share partition: {}-{}", firstOffset, groupId, topicIdPartition); + return; + } + stateBatches = new ArrayList<>(); + NavigableMap<Long, InFlightBatch> subMap = cachedState.subMap(floorOffset.getKey(), true, lastOffset, true); + for (Map.Entry<Long, InFlightBatch> entry : subMap.entrySet()) { + InFlightBatch inFlightBatch = entry.getValue(); - if (inFlightBatch.offsetState() == null + if (inFlightBatch.offsetState() == null && inFlightBatch.batchState() == RecordState.ACQUIRED && checkForStartOffsetWithinBatch(inFlightBatch.firstOffset(), inFlightBatch.lastOffset())) { - // For the case when batch.firstOffset < start offset <= batch.lastOffset, we will be having some - // acquired records that need to move to archived state despite their delivery count. - inFlightBatch.maybeInitializeOffsetStateUpdate(); - } + // For the case when batch.firstOffset < start offset <= batch.lastOffset, we will be having some + // acquired records that need to move to archived state despite their delivery count. + inFlightBatch.maybeInitializeOffsetStateUpdate(); + } - // Case when the state of complete batch is valid - if (inFlightBatch.offsetState() == null) { - releaseAcquisitionLockOnTimeoutForCompleteBatch(inFlightBatch, stateBatches, memberId); - } else { // Case when batch has a valid offset state map. - releaseAcquisitionLockOnTimeoutForPerOffsetBatch(inFlightBatch, stateBatches, memberId, firstOffset, lastOffset); + // Case when the state of complete batch is valid + if (inFlightBatch.offsetState() == null) { + releaseAcquisitionLockOnTimeoutForCompleteBatch(inFlightBatch, stateBatches, memberId); + } else { // Case when batch has a valid offset state map. + releaseAcquisitionLockOnTimeoutForPerOffsetBatch(inFlightBatch, stateBatches, memberId, firstOffset, lastOffset); + } } - } - if (!stateBatches.isEmpty()) { - writeShareGroupState(stateBatches).whenComplete((result, exception) -> { - if (exception != null) { - log.debug("Failed to write the share group state on acquisition lock timeout for share partition: {}-{} memberId: {}", - groupId, topicIdPartition, memberId, exception); - } - // Even if write share group state RPC call fails, we will still go ahead with the state transition. - // Update the cached state and start and end offsets after releasing the acquisition lock on timeout. - maybeUpdateCachedStateAndOffsets(); - }); + if (!stateBatches.isEmpty()) { + writeShareGroupState(stateBatches).whenComplete((result, exception) -> { + if (exception != null) { + log.debug("Failed to write the share group state on acquisition lock timeout for share partition: {}-{} memberId: {}", + groupId, topicIdPartition, memberId, exception); + } + // Even if write share group state RPC call fails, we will still go ahead with the state transition. + // Update the cached state and start and end offsets after releasing the acquisition lock on timeout. + maybeUpdateCachedStateAndOffsets(); + }); + } + } finally { + lock.writeLock().unlock(); } - } finally { - lock.writeLock().unlock(); - } - // If we have an acquisition lock timeout for a share-partition, then we should check if - // there is a pending share fetch request for the share-partition and complete it. - // Skip null check for stateBatches, it should always be initialized if reached here. - maybeCompleteDelayedShareFetchRequest(!stateBatches.isEmpty()); + // If we have an acquisition lock timeout for a share-partition, then we should check if + // there is a pending share fetch request for the share-partition and complete it. + // Skip null check for stateBatches, it should always be initialized if reached here. + maybeCompleteDelayedShareFetchRequest(!stateBatches.isEmpty()); + }; } private void releaseAcquisitionLockOnTimeoutForCompleteBatch(InFlightBatch inFlightBatch, @@ -2834,35 +2838,6 @@ public class SharePartition { } } - // Visible for testing - final class AcquisitionLockTimerTask extends TimerTask { - private final long expirationMs; - private final String memberId; - private final long firstOffset; - private final long lastOffset; - - AcquisitionLockTimerTask(long delayMs, String memberId, long firstOffset, long lastOffset) { - super(delayMs); - this.expirationMs = time.hiResClockMs() + delayMs; - this.memberId = memberId; - this.firstOffset = firstOffset; - this.lastOffset = lastOffset; - } - - long expirationMs() { - return expirationMs; - } - - /** - * The task is executed when the acquisition lock timeout is reached. The task releases the acquired records. - */ - @Override - public void run() { - sharePartitionMetrics.recordAcquisitionLockTimeoutPerSec(lastOffset - firstOffset + 1); - releaseAcquisitionLockOnTimeout(memberId, firstOffset, lastOffset); - } - } - /** * The InFlightBatch maintains the in-memory state of the fetched records i.e. in-flight records. */ diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index c52b4e257d9..5059b4c892e 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -55,6 +55,7 @@ import org.apache.kafka.coordinator.group.GroupConfig; import org.apache.kafka.coordinator.group.GroupConfigManager; import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy; import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch; +import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask; import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey; import org.apache.kafka.server.share.fetch.ShareAcquiredRecords; import org.apache.kafka.server.share.metrics.SharePartitionMetrics; @@ -5153,7 +5154,7 @@ public class SharePartitionTest { SharePartition sharePartition = SharePartitionBuilder.builder() .withGroupConfigManager(groupConfigManager).build(); - SharePartition.AcquisitionLockTimerTask timerTask = sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L); + AcquisitionLockTimerTask timerTask = sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L); Mockito.verify(groupConfigManager, Mockito.times(2)).groupConfig(GROUP_ID); Mockito.verify(groupConfig).shareRecordLockDurationMs(); @@ -5175,13 +5176,13 @@ public class SharePartitionTest { SharePartition sharePartition = SharePartitionBuilder.builder() .withGroupConfigManager(groupConfigManager).build(); - SharePartition.AcquisitionLockTimerTask timerTask1 = sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L); + AcquisitionLockTimerTask timerTask1 = sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L); Mockito.verify(groupConfigManager, Mockito.times(2)).groupConfig(GROUP_ID); Mockito.verify(groupConfig).shareRecordLockDurationMs(); assertEquals(expectedDurationMs1, timerTask1.delayMs); - SharePartition.AcquisitionLockTimerTask timerTask2 = sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L); + AcquisitionLockTimerTask timerTask2 = sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L); Mockito.verify(groupConfigManager, Mockito.times(4)).groupConfig(GROUP_ID); Mockito.verify(groupConfig, Mockito.times(2)).shareRecordLockDurationMs(); diff --git a/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimeoutHandler.java b/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimeoutHandler.java new file mode 100644 index 00000000000..c83d7e537da --- /dev/null +++ b/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimeoutHandler.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.share.fetch; + +/** + * AcquisitionLockTimeoutHandler is an interface that defines a handler for acquisition lock timeouts. + * It is used to handle cases where the acquisition lock for a share partition times out. + */ +public interface AcquisitionLockTimeoutHandler { + + /** + * Handles the acquisition lock timeout for a share partition. + * + * @param memberId the id of the member that requested the lock + * @param firstOffset the first offset + * @param lastOffset the last offset + */ + void handle(String memberId, long firstOffset, long lastOffset); + +} diff --git a/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimerTask.java b/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimerTask.java new file mode 100644 index 00000000000..6796d24d374 --- /dev/null +++ b/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimerTask.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.share.fetch; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.share.metrics.SharePartitionMetrics; +import org.apache.kafka.server.util.timer.TimerTask; + +/** + * AcquisitionLockTimerTask is a timer task that is executed when the acquisition lock timeout is reached. + * It releases the acquired records. + */ +public class AcquisitionLockTimerTask extends TimerTask { + + private final long expirationMs; + private final String memberId; + private final long firstOffset; + private final long lastOffset; + private final AcquisitionLockTimeoutHandler timeoutHandler; + private final SharePartitionMetrics sharePartitionMetrics; + + public AcquisitionLockTimerTask( + Time time, + long delayMs, + String memberId, + long firstOffset, + long lastOffset, + AcquisitionLockTimeoutHandler timeoutHandler, + SharePartitionMetrics sharePartitionMetrics + ) { + super(delayMs); + this.expirationMs = time.hiResClockMs() + delayMs; + this.memberId = memberId; + this.firstOffset = firstOffset; + this.lastOffset = lastOffset; + this.timeoutHandler = timeoutHandler; + this.sharePartitionMetrics = sharePartitionMetrics; + } + + public long expirationMs() { + return expirationMs; + } + + /** + * The task is executed when the acquisition lock timeout is reached. The task releases the acquired records. + */ + @Override + public void run() { + sharePartitionMetrics.recordAcquisitionLockTimeoutPerSec(lastOffset - firstOffset + 1); + timeoutHandler.handle(memberId, firstOffset, lastOffset); + } +}