This is an automated email from the ASF dual-hosted git repository.

apoorvmittal10 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cdc9eb2c3a7 KAFKA-20245: DLQ records exceeding max delivery count. 
[3/N] (#22080)
cdc9eb2c3a7 is described below

commit cdc9eb2c3a723df0e0730c1a29efab39ab63e03f
Author: Sushant Mahajan <[email protected]>
AuthorDate: Wed May 6 15:46:02 2026 +0530

    KAFKA-20245: DLQ records exceeding max delivery count. [3/N] (#22080)
    
    * Changes to support DLQ of offsets and batches which have exceeded max
    delivery count.
    * The Phase 2 part of the transition to DLQ (->ARCHIVED (in mem and
    persist)) has been refactored into a separate method.
    * The current code does not transactionally handle state transitions on
    max delivery count exceeded. ARCHIVING follows a similar approach.
    * The acquisition lock timeout handler has been leveraged to invoke DLQ
    logic for cases where `InflightState.tryUpdateState` deems that the
    record should be in ARCHIVING state.
    
    Reviewers: Apoorv Mittal <[email protected]>
---
 .../java/kafka/server/share/SharePartition.java    | 238 +++++----
 .../kafka/server/share/SharePartitionTest.java     | 560 ++++++++++++++++++++-
 .../server/share/dlq/NoOpShareGroupDLQManager.java |   2 +-
 .../kafka/server/share/fetch/InFlightBatch.java    |  10 +-
 .../kafka/server/share/fetch/InFlightState.java    |   8 +-
 5 files changed, 713 insertions(+), 105 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index 7e86a4477a6..2fbca75a72a 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -938,7 +938,7 @@ public class SharePartition {
                     continue;
                 }
 
-                InFlightState updateResult = 
inFlightBatch.tryUpdateBatchState(RecordState.ACQUIRED, 
DeliveryCountOps.INCREASE, maxDeliveryCount(), memberId);
+                InFlightState updateResult = 
inFlightBatch.tryUpdateBatchState(RecordState.ACQUIRED, 
DeliveryCountOps.INCREASE, maxDeliveryCount(), memberId, 
shareGroupDlqEnableSupplier.get());
                 if (updateResult == null || updateResult.state() != 
RecordState.ACQUIRED) {
                     log.info("Unable to acquire records for the batch: {} in 
share partition: {}-{}",
                         inFlightBatch, groupId, topicIdPartition);
@@ -1129,7 +1129,8 @@ public class SharePartition {
                         offsetState.getKey() < startOffset ? 
RecordState.ARCHIVED : recordState,
                         DeliveryCountOps.NO_OP,
                         this.maxDeliveryCount(),
-                        EMPTY_MEMBER_ID
+                        EMPTY_MEMBER_ID,
+                        shareGroupDlqEnableSupplier.get()
                 );
                 if (updateResult == null) {
                     log.debug("Unable to release records from acquired state 
for the offset: {} in batch: {}"
@@ -1139,8 +1140,9 @@ public class SharePartition {
                 }
 
                 // Successfully updated the state of the offset and created a 
persister state batch for write to persister.
+                Throwable dlqCause = updateResult.state() == 
RecordState.ARCHIVING ? ShareGroupDLQ.DELIVERY_COUNT_EXCEEDED : null;
                 persisterBatches.add(new PersisterBatch(updateResult, new 
PersisterStateBatch(offsetState.getKey(),
-                    offsetState.getKey(), updateResult.state().id(), (short) 
updateResult.deliveryCount()), null));
+                    offsetState.getKey(), updateResult.state().id(), (short) 
updateResult.deliveryCount()), dlqCause));
                 if (offsetState.getKey() >= startOffset && 
isStateTerminal(updateResult.state())) {
                     deliveryCompleteCount.incrementAndGet();
                 }
@@ -1171,7 +1173,8 @@ public class SharePartition {
                     inFlightBatch.lastOffset() < startOffset ? 
RecordState.ARCHIVED : recordState,
                     DeliveryCountOps.NO_OP,
                     this.maxDeliveryCount(),
-                    EMPTY_MEMBER_ID
+                    EMPTY_MEMBER_ID,
+                    shareGroupDlqEnableSupplier.get()
             );
             if (updateResult == null) {
                 log.debug("Unable to release records from acquired state for 
the batch: {}"
@@ -1180,8 +1183,10 @@ public class SharePartition {
             }
 
             // Successfully updated the state of the batch and created a 
persister state batch for write to persister.
+            // If DLQ support is enabled, then update the DLQ cause exception 
message.
+            Throwable dlqCause = updateResult.state() == RecordState.ARCHIVING 
? ShareGroupDLQ.DELIVERY_COUNT_EXCEEDED : null;
             persisterBatches.add(new PersisterBatch(updateResult, new 
PersisterStateBatch(inFlightBatch.firstOffset(),
-                inFlightBatch.lastOffset(), updateResult.state().id(), (short) 
updateResult.deliveryCount()), null));
+                inFlightBatch.lastOffset(), updateResult.state().id(), (short) 
updateResult.deliveryCount()), dlqCause));
             if (isStateTerminal(updateResult.state())) {
                 
deliveryCompleteCount.addAndGet(numInFlightRecordsInBatch(inFlightBatch.firstOffset(),
 inFlightBatch.lastOffset()));
             }
@@ -1991,7 +1996,7 @@ public class SharePartition {
                 }
 
                 InFlightState updateResult = 
offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, 
DeliveryCountOps.INCREASE,
-                    maxDeliveryCount, memberId);
+                    maxDeliveryCount, memberId, 
shareGroupDlqEnableSupplier.get());
                 if (updateResult == null || updateResult.state() != 
RecordState.ACQUIRED) {
                     log.trace("Unable to acquire records for the offset: {} in 
batch: {}"
                             + " for the share partition: {}-{}", 
offsetState.getKey(), inFlightBatch,
@@ -2344,7 +2349,8 @@ public class SharePartition {
                         recordState,
                         DeliveryCountOps.NO_OP,
                         this.maxDeliveryCount(),
-                        EMPTY_MEMBER_ID
+                        EMPTY_MEMBER_ID,
+                        shareGroupDlqEnableSupplier.get()
                     );
 
                     if (updateResult == null) {
@@ -2355,6 +2361,12 @@ public class SharePartition {
                             "Unable to acknowledge records for the batch"));
                     }
 
+                    // This check makes sure that we don't skip the cause if 
updated result
+                    // results in ARCHIVING due to max delivery count exceeded.
+                    if (dlqCause == null && updateResult.state() == 
RecordState.ARCHIVING) {
+                        dlqCause = ShareGroupDLQ.DELIVERY_COUNT_EXCEEDED;
+                    }
+
                     // Successfully updated the state of the offset and 
created a persister state batch for write to persister.
                     persisterBatches.add(new PersisterBatch(updateResult, new 
PersisterStateBatch(offsetState.getKey(),
                         offsetState.getKey(), updateResult.state().id(), 
(short) updateResult.deliveryCount()), dlqCause));
@@ -2425,7 +2437,8 @@ public class SharePartition {
                 recordState,
                 DeliveryCountOps.NO_OP,
                 this.maxDeliveryCount(),
-                EMPTY_MEMBER_ID
+                EMPTY_MEMBER_ID,
+                shareGroupDlqEnableSupplier.get()
             );
             if (updateResult == null) {
                 log.debug("Unable to acknowledge records for the batch: {} 
with state: {}"
@@ -2435,6 +2448,12 @@ public class SharePartition {
                     new InvalidRecordStateException("Unable to acknowledge 
records for the batch"));
             }
 
+            // This check makes sure that we don't skip the cause if updated 
result
+            // results in ARCHIVING due to max delivery count exceeded.
+            if (dlqCause == null && updateResult.state() == 
RecordState.ARCHIVING) {
+                dlqCause = ShareGroupDLQ.DELIVERY_COUNT_EXCEEDED;
+            }
+
             // Successfully updated the state of the batch and created a 
persister state batch for write to persister.
             persisterBatches.add(new PersisterBatch(updateResult, new 
PersisterStateBatch(inFlightBatch.firstOffset(),
                 inFlightBatch.lastOffset(), updateResult.state().id(), (short) 
updateResult.deliveryCount()), dlqCause));
@@ -2521,6 +2540,7 @@ public class SharePartition {
                 // on the startOffset to move ahead, hence track if the state 
is updated in the cache. If
                 // yes, then notify the delayed share fetch purgatory to 
complete the pending requests.
                 boolean cacheStateUpdated = false;
+                List<PersisterBatch> dlqBatches = new 
ArrayList<>(persisterBatches.size());
                 lock.writeLock().lock();
                 try {
                     if (exception != null) {
@@ -2547,95 +2567,16 @@ public class SharePartition {
                     log.trace("State change request successful for share 
partition: {}-{}",
                         groupId, topicIdPartition);
 
-                    List<PersisterBatch> nonDlqBatches = new 
ArrayList<>(persisterBatches.size());
-                    List<PersisterBatch> dlqBatches = new 
ArrayList<>(persisterBatches.size());
                     for (PersisterBatch persisterBatch : persisterBatches) {
+                        
persisterBatch.updatedState().completeStateTransition(true);
+                        if (persisterBatch.updatedState.state() == 
RecordState.AVAILABLE) {
+                            updateFindNextFetchOffset(true);
+                        }
                         if (persisterBatch.updatedState.state() == 
RecordState.ARCHIVING) {
                             dlqBatches.add(persisterBatch);
-                        } else {
-                            nonDlqBatches.add(persisterBatch);
                         }
                     }
 
-                    nonDlqBatches.forEach(persisterBatch -> {
-                        
persisterBatch.updatedState.completeStateTransition(true);
-                        if (persisterBatch.updatedState.state() == 
RecordState.AVAILABLE) {
-                            updateFindNextFetchOffset(true);
-                        }
-                    });
-
-                    dlqBatches.forEach(persisterBatch -> {
-                        
persisterBatch.updatedState.completeStateTransition(true);
-                        shareGroupDLQ.enqueue(new ShareGroupDLQRecordParameter(
-                            groupId,
-                            topicIdPartition,
-                            persisterBatch.stateBatch.firstOffset(),
-                            persisterBatch.stateBatch.lastOffset(),
-                            
Optional.of(persisterBatch.stateBatch.deliveryCount()),
-                            Optional.ofNullable(persisterBatch.dlqCause),
-                            false
-                        )).whenComplete((v1, dlqException) -> {
-                            PersisterStateBatch sb = persisterBatch.stateBatch;
-                            if (dlqException != null) {
-                                log.error("Failed to write to DLQ for share 
partition: {}-{}, offsets {}-{}. "
-                                        + "Proceeding to ARCHIVED state 
regardless.",
-                                    groupId, topicIdPartition, 
sb.firstOffset(), sb.lastOffset(), dlqException);
-                            }
-
-                            PersisterBatch phase2Batch;
-                            lock.writeLock().lock();
-                            try {
-                                // dlqBatch.updatedState() is the same 
InFlightState object in the cache,
-                                // now committed in ARCHIVING. Transition it 
directly.
-                                InFlightState updateResult = 
persisterBatch.updatedState().startStateTransition(
-                                    RecordState.ARCHIVED,
-                                    DeliveryCountOps.NO_OP,
-                                    maxDeliveryCount(),
-                                    EMPTY_MEMBER_ID
-                                );
-                                if (updateResult == null) {
-                                    log.error("Unable to transition ARCHIVING 
→ ARCHIVED for offsets {}-{} "
-                                            + "in share partition: {}-{}", 
sb.firstOffset(), sb.lastOffset(),
-                                        groupId, topicIdPartition);
-                                    return;
-                                }
-                                phase2Batch = new PersisterBatch(updateResult, 
new PersisterStateBatch(
-                                    sb.firstOffset(), sb.lastOffset(),
-                                    updateResult.state().id(), (short) 
updateResult.deliveryCount()), null);
-                                deliveryCompleteCount.addAndGet(
-                                    
numInFlightRecordsInBatch(sb.firstOffset(), sb.lastOffset()));
-                            } finally {
-                                lock.writeLock().unlock();
-                            }
-
-                            // Second persist: ARCHIVING → ARCHIVED
-                            
writeShareGroupState(List.of(phase2Batch.stateBatch()))
-                                .whenComplete((v2, phase2Exception) -> {
-                                    boolean phase2CacheUpdated = false;
-                                    lock.writeLock().lock();
-                                    try {
-                                        if (phase2Exception != null) {
-                                            log.error("Failed to persist 
ARCHIVED state for DLQ phase 2, "
-                                                    + "share partition: {}-{}. 
Records remain in ARCHIVING.",
-                                                groupId, topicIdPartition, 
phase2Exception);
-                                            
phase2Batch.updatedState().completeStateTransition(false);
-                                            if 
(isStateTerminal(RecordState.forId(phase2Batch.stateBatch().deliveryState()))
-                                                && 
!isStateTerminal(phase2Batch.updatedState().state())) {
-                                                
deliveryCompleteCount.addAndGet(
-                                                    
-numInFlightRecordsInBatch(sb.firstOffset(), sb.lastOffset()));
-                                            }
-                                            return;
-                                        }
-
-                                        
phase2Batch.updatedState().completeStateTransition(true);
-                                        phase2CacheUpdated = 
maybeUpdateCachedStateAndOffsets();
-                                    } finally {
-                                        lock.writeLock().unlock();
-                                        
maybeCompleteDelayedShareFetchRequest(phase2CacheUpdated);
-                                    }
-                                });
-                        });
-                    });
                     // Update the cached state and start and end offsets after 
acknowledging/releasing the acquired records.
                     cacheStateUpdated = maybeUpdateCachedStateAndOffsets();
                 } finally {
@@ -2650,6 +2591,17 @@ public class SharePartition {
                     // request can be completed. The call should be made 
outside the lock to avoid deadlock.
                     maybeCompleteDelayedShareFetchRequest(cacheStateUpdated);
                 }
+
+                // Persister batch state has been moved to ARCHIVING, we must 
now start the DLQ flow and transition to ARCHIVED.
+                dlqBatches.forEach(persisterBatch -> {
+                    initiateDLQAndArchive(
+                        persisterBatch.updatedState,
+                        persisterBatch.stateBatch.firstOffset(),
+                        persisterBatch.stateBatch.lastOffset(),
+                        persisterBatch.stateBatch.deliveryCount(),
+                        persisterBatch.dlqCause
+                    );
+                });
             });
     }
 
@@ -2982,6 +2934,8 @@ public class SharePartition {
     private AcquisitionLockTimeoutHandler releaseAcquisitionLockOnTimeout() {
         return (memberId, firstOffset, lastOffset, timerTask) -> {
             List<PersisterStateBatch> stateBatches;
+            List<DlqBatch> dlqBatches;
+
             lock.writeLock().lock();
             try {
                 // Check if timer task is already cancelled. This can happen 
when concurrent requests
@@ -2998,6 +2952,7 @@ public class SharePartition {
                     return;
                 }
                 stateBatches = new ArrayList<>();
+                dlqBatches = new ArrayList<>();
                 NavigableMap<Long, InFlightBatch> subMap = 
cachedState.subMap(floorOffset.getKey(), true, lastOffset, true);
                 for (Map.Entry<Long, InFlightBatch> entry : subMap.entrySet()) 
{
                     InFlightBatch inFlightBatch = entry.getValue();
@@ -3013,9 +2968,9 @@ public class SharePartition {
 
                     // Case when the state of complete batch is valid
                     if (inFlightBatch.offsetState() == null) {
-                        
releaseAcquisitionLockOnTimeoutForCompleteBatch(inFlightBatch, stateBatches, 
memberId);
+                        
releaseAcquisitionLockOnTimeoutForCompleteBatch(inFlightBatch, stateBatches, 
dlqBatches, memberId);
                     } else { // Case when batch has a valid offset state map.
-                        
releaseAcquisitionLockOnTimeoutForPerOffsetBatch(inFlightBatch, stateBatches, 
memberId, firstOffset, lastOffset);
+                        
releaseAcquisitionLockOnTimeoutForPerOffsetBatch(inFlightBatch, stateBatches, 
dlqBatches, memberId, firstOffset, lastOffset);
                     }
                 }
             } finally {
@@ -3031,6 +2986,15 @@ public class SharePartition {
                     // Even if write share group state RPC call fails, we will 
still go ahead with the state transition.
                     // Update the cached state and start and end offsets after 
releasing the acquisition lock on timeout.
                     maybeUpdateCachedStateAndOffsets();
+
+                    // Persister batch state has been moved to ARCHIVING, we 
must now start the DLQ flow and transition to ARCHIVED.
+                    dlqBatches.forEach(dlqBatch -> initiateDLQAndArchive(
+                        dlqBatch.updatedState(),
+                        dlqBatch.firstOffset(),
+                        dlqBatch.lastOffset(),
+                        dlqBatch.deliveryCount(),
+                        ShareGroupDLQ.DELIVERY_COUNT_EXCEEDED
+                    ));
                 });
             }
 
@@ -3043,13 +3007,15 @@ public class SharePartition {
 
     private void releaseAcquisitionLockOnTimeoutForCompleteBatch(InFlightBatch 
inFlightBatch,
                                                                  
List<PersisterStateBatch> stateBatches,
+                                                                 
List<DlqBatch> dlqBatches,
                                                                  String 
memberId) {
         if (inFlightBatch.batchState() == RecordState.ACQUIRED) {
             InFlightState updateResult = inFlightBatch.tryUpdateBatchState(
                     inFlightBatch.lastOffset() < startOffset ? 
RecordState.ARCHIVED : RecordState.AVAILABLE,
                     DeliveryCountOps.NO_OP,
                     maxDeliveryCount(),
-                    EMPTY_MEMBER_ID);
+                    EMPTY_MEMBER_ID,
+                    shareGroupDlqEnableSupplier.get());
             if (updateResult == null) {
                 log.error("Unable to release acquisition lock on timeout for 
the batch: {}"
                         + " for the share partition: {}-{} memberId: {}", 
inFlightBatch, groupId, topicIdPartition, memberId);
@@ -3060,6 +3026,16 @@ public class SharePartition {
 
             // Cancel the acquisition lock timeout task for the batch since it 
is completed now.
             updateResult.cancelAndClearAcquisitionLockTimeoutTask();
+
+            if (updateResult.state() == RecordState.ARCHIVING) {
+                // Don't increment deliveryCompleteCount here — deferred to 
phase 2
+                // Don't updateFindNextFetchOffset — ARCHIVING is not fetchable
+                dlqBatches.add(new DlqBatch(updateResult,
+                    inFlightBatch.firstOffset(), inFlightBatch.lastOffset(),
+                    (short) updateResult.deliveryCount()));
+                return;
+            }
+
             if (updateResult.state() != RecordState.ARCHIVED) {
                 updateFindNextFetchOffset(true);
             }
@@ -3079,6 +3055,7 @@ public class SharePartition {
 
     private void 
releaseAcquisitionLockOnTimeoutForPerOffsetBatch(InFlightBatch inFlightBatch,
                                                                   
List<PersisterStateBatch> stateBatches,
+                                                                  
List<DlqBatch> dlqBatches,
                                                                   String 
memberId,
                                                                   long 
firstOffset,
                                                                   long 
lastOffset) {
@@ -3103,7 +3080,8 @@ public class SharePartition {
                     offsetState.getKey() < startOffset ? RecordState.ARCHIVED 
: RecordState.AVAILABLE,
                     DeliveryCountOps.NO_OP,
                     maxDeliveryCount(),
-                    EMPTY_MEMBER_ID);
+                    EMPTY_MEMBER_ID,
+                    shareGroupDlqEnableSupplier.get());
             if (updateResult == null) {
                 log.error("Unable to release acquisition lock on timeout for 
the offset: {} in batch: {}"
                                 + " for the share partition: {}-{} memberId: 
{}", offsetState.getKey(), inFlightBatch,
@@ -3115,6 +3093,15 @@ public class SharePartition {
 
             // Cancel the acquisition lock timeout task for the offset since 
it is completed now.
             updateResult.cancelAndClearAcquisitionLockTimeoutTask();
+
+            if (updateResult.state() == RecordState.ARCHIVING) {
+                // Don't increment deliveryCompleteCount here — deferred to 
phase 2
+                // Don't updateFindNextFetchOffset — ARCHIVING is not fetchable
+                dlqBatches.add(new DlqBatch(updateResult, offsetState.getKey(),
+                    offsetState.getKey(), (short) 
updateResult.deliveryCount()));
+                continue;
+            }
+
             if (updateResult.state() != RecordState.ARCHIVED) {
                 updateFindNextFetchOffset(true);
             }
@@ -3339,6 +3326,57 @@ public class SharePartition {
         return ACK_TYPE_TO_RECORD_STATE.get(ackType);
     }
 
+    // Visible for testing.
+
+    /**
+     * The DLQ flow comprises 2 phases:
+     * Phase 1: State transitions to ARCHIVING (happens in the normal 
acknowledge/release/timeout path) and persists ARCHIVING to the persister
+     * Phase 2: Enqueues to DLQ, then transitions ARCHIVING → ARCHIVED and 
persists ARCHIVED to the persister
+     * This method handles the complete phase 2 flow.
+     */
+    void initiateDLQAndArchive(InFlightState updatedState, long firstOffset,
+                               long lastOffset, short deliveryCount, Throwable 
dlqCause) {
+        // Step 1: Enqueue to DLQ
+        shareGroupDLQ.enqueue(new ShareGroupDLQRecordParameter(
+            groupId, topicIdPartition, firstOffset, lastOffset,
+            Optional.of(deliveryCount), Optional.ofNullable(dlqCause), false
+        )).whenComplete((v1, dlqException) -> {
+            if (dlqException != null) {
+                log.error("Failed to write to DLQ, proceeding to ARCHIVED 
regardless.", dlqException);
+            }
+
+            // Step 2: Transition ARCHIVING → ARCHIVED
+            PersisterStateBatch stateBatch;
+            lock.writeLock().lock();
+            try {
+                // At this point ARCHIVED is imminent. If we rollback here or 
tryUpdateState fails,
+                // we risk stalling. So just move to ARCHIVED.
+                updatedState.archive();
+                stateBatch = new PersisterStateBatch(firstOffset, lastOffset, 
RecordState.ARCHIVED.id, deliveryCount);
+                
deliveryCompleteCount.addAndGet(numInFlightRecordsInBatch(firstOffset, 
lastOffset));
+            } finally {
+                lock.writeLock().unlock();
+            }
+
+            // Step 3: Persist ARCHIVED. On failure, ARCHIVED stays in memory 
— the
+            // persister catches up when start offset advances past these 
offsets.
+            writeShareGroupState(List.of(stateBatch))
+                .whenComplete((v2, phase2Exception) -> {
+                    boolean cacheUpdated = false;
+                    lock.writeLock().lock();
+                    try {
+                        if (phase2Exception != null) {
+                            log.error("Could not persist ARCHIVED state for 
{}", stateBatch, phase2Exception);
+                        }
+                        cacheUpdated = maybeUpdateCachedStateAndOffsets();
+                    } finally {
+                        lock.writeLock().unlock();
+                        maybeCompleteDelayedShareFetchRequest(cacheUpdated);
+                    }
+                });
+        });
+    }
+
     // Visible for testing.
     boolean containsAbortMarker(RecordBatch batch) {
         if (!batch.isControlBatch())
@@ -3534,6 +3572,16 @@ public class SharePartition {
         int maxRecords
     ) { }
 
+    /**
+     * Record comprising state as well as offset information for processing by 
DLQ logic.
+     */
+    private record DlqBatch(
+        InFlightState updatedState,
+        long firstOffset, long lastOffset,
+        short deliveryCount
+    ) {
+    }
+
     // Visibility for testing
     static Map<Byte, RecordState> ackTypeToRecordStateMapping() {
         return ACK_TYPE_TO_RECORD_STATE;
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index f7b91bcc93e..2732d4897f6 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -56,6 +56,7 @@ import org.apache.kafka.coordinator.group.GroupConfigManager;
 import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
 import 
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider;
 import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
+import org.apache.kafka.server.share.dlq.ShareGroupDLQ;
 import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask;
 import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
 import org.apache.kafka.server.share.fetch.InFlightBatch;
@@ -63,12 +64,15 @@ import org.apache.kafka.server.share.fetch.InFlightState;
 import org.apache.kafka.server.share.fetch.RecordState;
 import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
 import org.apache.kafka.server.share.metrics.SharePartitionMetrics;
+import org.apache.kafka.server.share.persister.GroupTopicPartitionData;
 import org.apache.kafka.server.share.persister.NoOpStatePersister;
 import org.apache.kafka.server.share.persister.PartitionFactory;
+import org.apache.kafka.server.share.persister.PartitionStateBatchData;
 import org.apache.kafka.server.share.persister.Persister;
 import org.apache.kafka.server.share.persister.PersisterStateBatch;
 import org.apache.kafka.server.share.persister.ReadShareGroupStateResult;
 import org.apache.kafka.server.share.persister.TopicData;
+import org.apache.kafka.server.share.persister.WriteShareGroupStateParameters;
 import org.apache.kafka.server.share.persister.WriteShareGroupStateResult;
 import org.apache.kafka.server.storage.log.FetchIsolation;
 import org.apache.kafka.server.storage.log.FetchPartitionData;
@@ -81,6 +85,10 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
 import java.nio.ByteBuffer;
@@ -97,6 +105,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
+import java.util.stream.Stream;
 
 import static kafka.server.share.SharePartition.EMPTY_MEMBER_ID;
 import static 
org.apache.kafka.server.share.fetch.ShareFetchTestUtils.memoryRecordsBuilder;
@@ -117,7 +126,7 @@ import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-@SuppressWarnings("resource")
+@SuppressWarnings({"resource", "ClassFanOutComplexity"})
 public class SharePartitionTest {
 
     private static final String ACQUISITION_LOCK_NEVER_GOT_RELEASED = 
"Acquisition lock never got released.";
@@ -12738,6 +12747,555 @@ public class SharePartitionTest {
         assertEquals(0, sharePartition.deliveryCompleteCount());
     }
 
+    @Test
+    public void testAcquisitionLockTimeoutWithDlqEnabledCompleteBatch() throws 
InterruptedException {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+            .withMaxDeliveryCount(2)
+            .withState(SharePartitionState.ACTIVE)
+            .withShareGroupDlqEnableSupplier(() -> true)
+            .build();
+
+        // Acquire two batches so the first stays in cache after being 
archived.
+        fetchAcquiredRecords(sharePartition, memoryRecords(10), 10);
+        fetchAcquiredRecords(sharePartition, memoryRecords(10, 10), 10);
+
+        assertEquals(2, sharePartition.timer().size());
+
+        // First timeout: delivery count 1 < maxDeliveryCount 2, so records go 
to AVAILABLE.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+        TestUtils.waitForCondition(
+            () -> sharePartition.timer().size() == 0 &&
+                sharePartition.cachedState().get(0L).batchState() == 
RecordState.AVAILABLE &&
+                sharePartition.cachedState().get(0L).batchDeliveryCount() == 1 
&&
+                sharePartition.cachedState().get(10L).batchState() == 
RecordState.AVAILABLE &&
+                sharePartition.cachedState().get(10L).batchDeliveryCount() == 
1,
+            DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+            () -> assertionFailedMessage(sharePartition, Map.of(0L, List.of(), 
10L, List.of())));
+
+        assertEquals(0, sharePartition.deliveryCompleteCount());
+
+        // Re-acquire the first batch, bringing delivery count to 2 (== 
maxDeliveryCount).
+        fetchAcquiredRecords(sharePartition, memoryRecords(10), 10);
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(0L).batchState());
+        assertEquals(2, 
sharePartition.cachedState().get(0L).batchDeliveryCount());
+        assertEquals(1, sharePartition.timer().size());
+
+        // Second timeout: delivery count 2 >= maxDeliveryCount 2, 
tryUpdateState redirects
+        // AVAILABLE -> ARCHIVING (DLQ enabled). Phase 2 completes: ARCHIVING 
-> ARCHIVED.
+        // With NoOp DLQ + NoOp persister, the full 2-phase flow completes 
synchronously.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+        TestUtils.waitForCondition(
+            () -> sharePartition.timer().size() == 0 &&
+                sharePartition.cachedState().get(0L) == null,
+            DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+            () -> "Batch at offset 0 was not evicted after DLQ archival. Timer 
size: " +
+                sharePartition.timer().size() + ", cachedState keys: " + 
sharePartition.cachedState().keySet());
+
+        // Batch at offset 0 is evicted from cache after reaching ARCHIVED and 
start offset advancing.
+        assertEquals(10, sharePartition.startOffset());
+        // deliveryCompleteCount is 0 because eviction subtracts the count.
+        assertEquals(0, sharePartition.deliveryCompleteCount());
+        // Second batch remains AVAILABLE.
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(10L).batchState());
+    }
+
+    @Test
+    public void testAcquisitionLockTimeoutWithDlqEnabledPerOffsetBatch() 
throws InterruptedException {
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+            .withMaxDeliveryCount(2)
+            .withState(SharePartitionState.ACTIVE)
+            .withShareGroupDlqEnableSupplier(() -> true)
+            .build();
+
+        // Acquire a batch of 10 records (offsets 0-9).
+        fetchAcquiredRecords(sharePartition, memoryRecords(10), 10);
+        assertEquals(1, sharePartition.timer().size());
+
+        // First timeout: all go to AVAILABLE with delivery count 1.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+        TestUtils.waitForCondition(
+            () -> sharePartition.timer().size() == 0 &&
+                sharePartition.cachedState().get(0L).batchState() == 
RecordState.AVAILABLE &&
+                sharePartition.cachedState().get(0L).batchDeliveryCount() == 1,
+            DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+            () -> assertionFailedMessage(sharePartition, Map.of(0L, 
List.of())));
+
+        // Re-acquire only the first 5 records (offsets 0-4), forcing offset 
state initialization.
+        fetchAcquiredRecords(sharePartition, memoryRecords(5), 5);
+
+        // Offsets 0-4 should be ACQUIRED with delivery count 2, offsets 5-9 
remain AVAILABLE.
+        assertNotNull(sharePartition.cachedState().get(0L).offsetState());
+        for (long i = 0; i < 5; i++) {
+            assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(0L).offsetState().get(i).state());
+            assertEquals(2, 
sharePartition.cachedState().get(0L).offsetState().get(i).deliveryCount());
+        }
+        for (long i = 5; i < 10; i++) {
+            assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(0L).offsetState().get(i).state());
+            assertEquals(1, 
sharePartition.cachedState().get(0L).offsetState().get(i).deliveryCount());
+        }
+
+        // Second timeout for offsets 0-4: delivery count 2 >= 
maxDeliveryCount 2.
+        // tryUpdateState redirects AVAILABLE -> ARCHIVING. Phase 2: ARCHIVING 
-> ARCHIVED.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+        TestUtils.waitForCondition(
+            () -> {
+                Map<Long, InFlightState> offsetState = 
sharePartition.cachedState().get(0L).offsetState();
+                if (offsetState == null) return false;
+                for (long i = 0; i < 5; i++) {
+                    if (offsetState.get(i).state() != RecordState.ARCHIVED || 
offsetState.get(i).deliveryCount() != 2) {
+                        return false;
+                    }
+                }
+                for (long i = 5; i < 10; i++) {
+                    if (offsetState.get(i).state() != RecordState.AVAILABLE || 
offsetState.get(i).deliveryCount() != 1) {
+                        return false;
+                    }
+                }
+                return sharePartition.timer().size() == 0;
+            },
+            DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+            () -> assertionFailedMessage(sharePartition, Map.of(0L, 
List.of(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L))));
+
+        // Offsets 0-4 are ARCHIVED, 5-9 are AVAILABLE. Next fetch offset 
moves to 5
+        // since offsets 0-4 are no longer fetchable.
+        assertEquals(5, sharePartition.nextFetchOffset());
+    }
+
+    @Test
+    public void testAcquisitionLockTimeoutWithDlqDisabledCompleteBatch() 
throws InterruptedException {
+        // Verify that without DLQ, max delivery count still causes ARCHIVED 
(not ARCHIVING).
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+            .withMaxDeliveryCount(2)
+            .withState(SharePartitionState.ACTIVE)
+            .withShareGroupDlqEnableSupplier(() -> false)
+            .build();
+
+        // Acquire two batches.
+        fetchAcquiredRecords(sharePartition, memoryRecords(10), 10);
+        fetchAcquiredRecords(sharePartition, memoryRecords(10, 10), 10);
+
+        // First timeout: records go to AVAILABLE.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+        TestUtils.waitForCondition(
+            () -> sharePartition.timer().size() == 0 &&
+                sharePartition.cachedState().get(0L).batchState() == 
RecordState.AVAILABLE,
+            DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+            () -> assertionFailedMessage(sharePartition, Map.of(0L, 
List.of())));
+
+        // Re-acquire first batch.
+        fetchAcquiredRecords(sharePartition, memoryRecords(10), 10);
+
+        // Second timeout: delivery count reaches max, goes directly to 
ARCHIVED (no ARCHIVING).
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+        TestUtils.waitForCondition(
+            () -> sharePartition.timer().size() == 0 &&
+                sharePartition.cachedState().get(0L) == null,
+            DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+            () -> "Batch at offset 0 was not evicted after archival. Timer 
size: " +
+                sharePartition.timer().size() + ", cachedState keys: " + 
sharePartition.cachedState().keySet());
+
+        // Batch evicted, start offset advances.
+        assertEquals(10, sharePartition.startOffset());
+        assertEquals(0, sharePartition.deliveryCompleteCount());
+    }
+
+    @Test
+    public void testAcquisitionLockTimeoutWithDlqEnabledMixedOffsets() throws 
InterruptedException {
+        // Test where some offsets in a batch exceed max delivery count and 
some don't.
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+            .withMaxDeliveryCount(2)
+            .withState(SharePartitionState.ACTIVE)
+            .withShareGroupDlqEnableSupplier(() -> true)
+            .build();
+
+        // Acquire batch of 10 records (offsets 0-9).
+        fetchAcquiredRecords(sharePartition, memoryRecords(10), 10);
+
+        // Timeout #1: all go to AVAILABLE, delivery count 1.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+        TestUtils.waitForCondition(
+            () -> sharePartition.timer().size() == 0 &&
+                sharePartition.cachedState().get(0L).batchState() == 
RecordState.AVAILABLE &&
+                sharePartition.cachedState().get(0L).batchDeliveryCount() == 1,
+            DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+            () -> assertionFailedMessage(sharePartition, Map.of(0L, 
List.of())));
+
+        // Re-acquire only offsets 0-4. This forces offset state 
initialization.
+        fetchAcquiredRecords(sharePartition, memoryRecords(5), 5);
+
+        // Acknowledge offsets 0-2 as ACCEPT. Only offsets 3-4 remain ACQUIRED.
+        CompletableFuture<Void> ackResult = sharePartition.acknowledge(
+            MEMBER_ID,
+            List.of(new ShareAcknowledgementBatch(0, 4, List.of(
+                AcknowledgeType.ACCEPT.id, AcknowledgeType.ACCEPT.id, 
AcknowledgeType.ACCEPT.id,
+                AcknowledgeType.ACCEPT.id, AcknowledgeType.ACCEPT.id))));
+        assertNull(ackResult.join());
+
+        // Re-acquire offsets 5-9. These will have delivery count 2.
+        fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5);
+
+        // Now offsets 0-4: ACKNOWLEDGED (delivery count 2), offsets 5-9: 
ACQUIRED (delivery count 2).
+        // Timeout #2 for offsets 5-9: delivery count 2 >= max 2 → ARCHIVING → 
ARCHIVED.
+        // Once all offsets reach terminal state, the batch is evicted from 
cache.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+        TestUtils.waitForCondition(
+            () -> sharePartition.timer().size() == 0 &&
+                sharePartition.cachedState().get(0L) == null,
+            DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+            () -> "Batch at offset 0 was not evicted after DLQ archival. Timer 
size: " +
+                sharePartition.timer().size() + ", cachedState keys: " + 
sharePartition.cachedState().keySet());
+    }
+
+    @Test
+    public void 
testAcquisitionLockTimeoutWithDlqEnabledWriteFailureCompleteBatch() throws 
InterruptedException {
+        // Phase 1 persist of ARCHIVING fails, but phase 2 still proceeds 
unconditionally
+        // because timeout path uses tryUpdateState (no rollback).
+        Persister persister = Mockito.mock(Persister.class);
+        mockPersisterReadStateMethod(persister);
+
+        // First call succeeds (for acknowledge of first batch), subsequent 
calls fail.
+        WriteShareGroupStateResult writeShareGroupStateResult = 
Mockito.mock(WriteShareGroupStateResult.class);
+        
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionErrorData(0, 
Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message())))));
+        
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withPersister(persister)
+            .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+            .withMaxDeliveryCount(2)
+            .withState(SharePartitionState.ACTIVE)
+            .withShareGroupDlqEnableSupplier(() -> true)
+            .build();
+
+        // Acquire two batches.
+        fetchAcquiredRecords(sharePartition, memoryRecords(10), 10);
+        fetchAcquiredRecords(sharePartition, memoryRecords(10, 10), 10);
+        assertEquals(2, sharePartition.timer().size());
+
+        // First timeout: delivery count 1 < max 2, records go to AVAILABLE.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+        TestUtils.waitForCondition(
+            () -> sharePartition.timer().size() == 0 &&
+                sharePartition.cachedState().get(0L).batchState() == 
RecordState.AVAILABLE &&
+                sharePartition.cachedState().get(0L).batchDeliveryCount() == 1,
+            DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+            () -> assertionFailedMessage(sharePartition, Map.of(0L, List.of(), 
10L, List.of())));
+
+        // Re-acquire first batch, delivery count becomes 2 (== max).
+        fetchAcquiredRecords(sharePartition, memoryRecords(10), 10);
+        assertEquals(2, 
sharePartition.cachedState().get(0L).batchDeliveryCount());
+
+        // Second timeout: tryUpdateState redirects AVAILABLE → ARCHIVING.
+        // Phase 1 persist fails. Phase 2 proceeds (DLQ enqueue + 
tryUpdateState(ARCHIVED)).
+        // Phase 2 persist also fails, but since isTimeout=true, no rollback — 
ARCHIVED stays in memory.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+        TestUtils.waitForCondition(
+            () -> sharePartition.timer().size() == 0 &&
+                sharePartition.cachedState().get(0L) == null,
+            DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+            () -> "Batch at offset 0 was not evicted. Timer size: " +
+                sharePartition.timer().size() + ", cachedState keys: " + 
sharePartition.cachedState().keySet());
+
+        // Despite both persists failing, batch reached ARCHIVED in memory (no 
rollback for timeout)
+        // and was evicted from cache.
+        assertEquals(10, sharePartition.startOffset());
+        assertEquals(0, sharePartition.deliveryCompleteCount());
+        // Second batch went to AVAILABLE (delivery count 1 < max).
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(10L).batchState());
+    }
+
+    @Test
+    public void 
testAcquisitionLockTimeoutWithDlqEnabledWriteFailurePerOffsetBatch() throws 
InterruptedException {
+        // Phase 1 persist of ARCHIVING fails for per-offset batch, but phase 
2 still proceeds.
+        Persister persister = Mockito.mock(Persister.class);
+        mockPersisterReadStateMethod(persister);
+
+        // All write calls return error.
+        WriteShareGroupStateResult writeShareGroupStateResult = 
Mockito.mock(WriteShareGroupStateResult.class);
+        
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionErrorData(0, 
Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message())))));
+        
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withPersister(persister)
+            .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+            .withMaxDeliveryCount(2)
+            .withState(SharePartitionState.ACTIVE)
+            .withShareGroupDlqEnableSupplier(() -> true)
+            .build();
+
+        // Acquire batch of 10 records (offsets 0-9).
+        fetchAcquiredRecords(sharePartition, memoryRecords(10), 10);
+        assertEquals(1, sharePartition.timer().size());
+
+        // First timeout: all go to AVAILABLE, delivery count 1.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+        TestUtils.waitForCondition(
+            () -> sharePartition.timer().size() == 0 &&
+                sharePartition.cachedState().get(0L).batchState() == 
RecordState.AVAILABLE &&
+                sharePartition.cachedState().get(0L).batchDeliveryCount() == 1,
+            DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+            () -> assertionFailedMessage(sharePartition, Map.of(0L, 
List.of())));
+
+        // Re-acquire only first 5 offsets, forcing offset state 
initialization.
+        fetchAcquiredRecords(sharePartition, memoryRecords(5), 5);
+        assertNotNull(sharePartition.cachedState().get(0L).offsetState());
+        for (long i = 0; i < 5; i++) {
+            assertEquals(2, 
sharePartition.cachedState().get(0L).offsetState().get(i).deliveryCount());
+        }
+
+        // Second timeout for offsets 0-4: delivery count 2 >= max 2 → 
ARCHIVING.
+        // Phase 1 persist fails. Phase 2 proceeds (tryUpdateState(ARCHIVED), 
no rollback).
+        // Phase 2 persist also fails, but ARCHIVED stays in memory.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+        TestUtils.waitForCondition(
+            () -> {
+                Map<Long, InFlightState> offsetState = 
sharePartition.cachedState().get(0L).offsetState();
+                if (offsetState == null) return false;
+                for (long i = 0; i < 5; i++) {
+                    if (offsetState.get(i).state() != RecordState.ARCHIVED || 
offsetState.get(i).deliveryCount() != 2) {
+                        return false;
+                    }
+                }
+                for (long i = 5; i < 10; i++) {
+                    if (offsetState.get(i).state() != RecordState.AVAILABLE || 
offsetState.get(i).deliveryCount() != 1) {
+                        return false;
+                    }
+                }
+                return sharePartition.timer().size() == 0;
+            },
+            DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+            () -> assertionFailedMessage(sharePartition, Map.of(0L, 
List.of(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L))));
+
+        // Despite both persists failing, offsets 0-4 reached ARCHIVED in 
memory (no rollback for timeout).
+        // Offsets 5-9 remain AVAILABLE.
+        assertEquals(5, sharePartition.nextFetchOffset());
+    }
+
+    @Test
+    public void 
testAcquisitionLockTimeoutWithDlqPhase1FailsPhase2SucceedsCompleteBatch() 
throws InterruptedException {
+        // Phase 1 persist (ARCHIVING) fails, phase 2 persist (ARCHIVED) 
succeeds.
+        // Records should reach ARCHIVED despite phase 1 failure.
+        Persister persister = Mockito.mock(Persister.class);
+        mockPersisterReadStateMethod(persister);
+
+        WriteShareGroupStateResult failureResult = 
Mockito.mock(WriteShareGroupStateResult.class);
+        Mockito.when(failureResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionErrorData(0, 
Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message())))));
+
+        WriteShareGroupStateResult successResult = 
Mockito.mock(WriteShareGroupStateResult.class);
+        Mockito.when(successResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), 
Errors.NONE.message())))));
+
+        // First writeState call (phase 1) fails, subsequent calls (phase 2) 
succeed.
+        Mockito.when(persister.writeState(Mockito.any()))
+            .thenReturn(CompletableFuture.completedFuture(failureResult))
+            .thenReturn(CompletableFuture.completedFuture(successResult));
+
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withPersister(persister)
+            .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+            .withMaxDeliveryCount(2)
+            .withState(SharePartitionState.ACTIVE)
+            .withShareGroupDlqEnableSupplier(() -> true)
+            .build();
+
+        // Acquire two batches.
+        fetchAcquiredRecords(sharePartition, memoryRecords(10), 10);
+        fetchAcquiredRecords(sharePartition, memoryRecords(10, 10), 10);
+        assertEquals(2, sharePartition.timer().size());
+
+        // First timeout: delivery count 1 < max 2, records go to AVAILABLE.
+        // This also calls writeState (fails), but timeout path commits 
unconditionally.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+        TestUtils.waitForCondition(
+            () -> sharePartition.timer().size() == 0 &&
+                sharePartition.cachedState().get(0L).batchState() == 
RecordState.AVAILABLE &&
+                sharePartition.cachedState().get(0L).batchDeliveryCount() == 1,
+            DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+            () -> assertionFailedMessage(sharePartition, Map.of(0L, List.of(), 
10L, List.of())));
+
+        // Re-acquire first batch, delivery count becomes 2 (== max).
+        fetchAcquiredRecords(sharePartition, memoryRecords(10), 10);
+        assertEquals(2, 
sharePartition.cachedState().get(0L).batchDeliveryCount());
+
+        // Reset mock: phase 1 fails, phase 2 succeeds.
+        Mockito.when(persister.writeState(Mockito.any()))
+            .thenReturn(CompletableFuture.completedFuture(failureResult))
+            .thenReturn(CompletableFuture.completedFuture(successResult));
+
+        // Second timeout: ARCHIVING (phase 1 persist fails), phase 2 proceeds 
and succeeds.
+        // DLQ enqueue + ARCHIVING → ARCHIVED persisted successfully.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+        TestUtils.waitForCondition(
+            () -> sharePartition.timer().size() == 0 &&
+                sharePartition.cachedState().get(0L) == null,
+            DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+            () -> "Batch at offset 0 was not evicted. Timer size: " +
+                sharePartition.timer().size() + ", cachedState keys: " + 
sharePartition.cachedState().keySet());
+
+        // Phase 1 failed but phase 2 succeeded — batch reached ARCHIVED and 
was evicted.
+        assertEquals(10, sharePartition.startOffset());
+        assertEquals(0, sharePartition.deliveryCompleteCount());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(10L).batchState());
+    }
+
+    @Test
+    public void 
testAcquisitionLockTimeoutWithDlqPhase1FailsPhase2SucceedsPerOffsetBatch() 
throws InterruptedException {
+        // Phase 1 persist (ARCHIVING) fails, phase 2 persist (ARCHIVED) 
succeeds for per-offset batch.
+        Persister persister = Mockito.mock(Persister.class);
+        mockPersisterReadStateMethod(persister);
+
+        WriteShareGroupStateResult failureResult = 
Mockito.mock(WriteShareGroupStateResult.class);
+        Mockito.when(failureResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionErrorData(0, 
Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message())))));
+
+        WriteShareGroupStateResult successResult = 
Mockito.mock(WriteShareGroupStateResult.class);
+        Mockito.when(successResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), 
Errors.NONE.message())))));
+
+        // First writeState call fails, subsequent calls succeed.
+        Mockito.when(persister.writeState(Mockito.any()))
+            .thenReturn(CompletableFuture.completedFuture(failureResult))
+            .thenReturn(CompletableFuture.completedFuture(successResult));
+
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withPersister(persister)
+            .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+            .withMaxDeliveryCount(2)
+            .withState(SharePartitionState.ACTIVE)
+            .withShareGroupDlqEnableSupplier(() -> true)
+            .build();
+
+        // Acquire batch of 10 records (offsets 0-9).
+        fetchAcquiredRecords(sharePartition, memoryRecords(10), 10);
+        assertEquals(1, sharePartition.timer().size());
+
+        // First timeout: all go to AVAILABLE, delivery count 1.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+        TestUtils.waitForCondition(
+            () -> sharePartition.timer().size() == 0 &&
+                sharePartition.cachedState().get(0L).batchState() == 
RecordState.AVAILABLE &&
+                sharePartition.cachedState().get(0L).batchDeliveryCount() == 1,
+            DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+            () -> assertionFailedMessage(sharePartition, Map.of(0L, 
List.of())));
+
+        // Re-acquire only first 5 offsets, forcing offset state 
initialization.
+        fetchAcquiredRecords(sharePartition, memoryRecords(5), 5);
+        assertNotNull(sharePartition.cachedState().get(0L).offsetState());
+        for (long i = 0; i < 5; i++) {
+            assertEquals(2, 
sharePartition.cachedState().get(0L).offsetState().get(i).deliveryCount());
+        }
+
+        // Reset mock: phase 1 fails, phase 2 calls succeed.
+        Mockito.when(persister.writeState(Mockito.any()))
+            .thenReturn(CompletableFuture.completedFuture(failureResult))
+            .thenReturn(CompletableFuture.completedFuture(successResult));
+
+        // Second timeout for offsets 0-4: delivery count 2 >= max 2 → 
ARCHIVING.
+        // Phase 1 persist fails, but phase 2 succeeds for each offset → 
ARCHIVED.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+        TestUtils.waitForCondition(
+            () -> {
+                Map<Long, InFlightState> offsetState = 
sharePartition.cachedState().get(0L).offsetState();
+                if (offsetState == null) return false;
+                for (long i = 0; i < 5; i++) {
+                    if (offsetState.get(i).state() != RecordState.ARCHIVED || 
offsetState.get(i).deliveryCount() != 2) {
+                        return false;
+                    }
+                }
+                for (long i = 5; i < 10; i++) {
+                    if (offsetState.get(i).state() != RecordState.AVAILABLE || 
offsetState.get(i).deliveryCount() != 1) {
+                        return false;
+                    }
+                }
+                return sharePartition.timer().size() == 0;
+            },
+            DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+            () -> assertionFailedMessage(sharePartition, Map.of(0L, 
List.of(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L))));
+
+        // Phase 1 failed but phase 2 succeeded — offsets 0-4 reached ARCHIVED.
+        // Offsets 5-9 remain AVAILABLE.
+        assertEquals(5, sharePartition.nextFetchOffset());
+    }
+
+    // Unit tests for processDlqPhase2 method directly.
+
+    private static Stream<Arguments> initiateDLQAndArchiveParameters() {
+        return Stream.of(
+            //          name,                                     
persistSucceeds, expectedState,        dlqCause,                             
firstOffset, lastOffset, deliveryCount
+            Arguments.of("persist succeeds",                      true,        
    RecordState.ARCHIVED, ShareGroupDLQ.DELIVERY_COUNT_EXCEEDED, 0L,          
9L,         (short) 2),
+            Arguments.of("persist fails, no rollback",            false,       
    RecordState.ARCHIVED, ShareGroupDLQ.DELIVERY_COUNT_EXCEEDED, 0L,          
9L,         (short) 2),
+            Arguments.of("client reject cause",                   true,        
    RecordState.ARCHIVED, ShareGroupDLQ.CLIENT_REJECT,           5L,          
5L,         (short) 1),
+            Arguments.of("null cause",                            true,        
    RecordState.ARCHIVED, null,                                  0L,          
4L,         (short) 1),
+            Arguments.of("single offset",                         true,        
    RecordState.ARCHIVED, ShareGroupDLQ.DELIVERY_COUNT_EXCEEDED, 7L,          
7L,         (short) 3),
+            Arguments.of("delivery count exceeded cause",         true,        
    RecordState.ARCHIVED, ShareGroupDLQ.DELIVERY_COUNT_EXCEEDED, 10L,         
19L,        (short) 5)
+        );
+    }
+
+    @ParameterizedTest(name = "{0}")
+    @MethodSource("initiateDLQAndArchiveParameters")
+    public void testInitiateDLQAndArchive(String name, boolean persistSucceeds,
+                                          RecordState expectedState, Throwable 
dlqCause,
+                                          long firstOffset, long lastOffset, 
short deliveryCount) {
+        Persister persister = Mockito.mock(Persister.class);
+        mockPersisterReadStateMethod(persister);
+
+        WriteShareGroupStateResult writeResult = 
Mockito.mock(WriteShareGroupStateResult.class);
+        Errors error = persistSucceeds ? Errors.NONE : 
Errors.GROUP_ID_NOT_FOUND;
+        Mockito.when(writeResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionErrorData(0, error.code(), 
error.message())))));
+        
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeResult));
+
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withPersister(persister)
+            .withState(SharePartitionState.ACTIVE)
+            .withShareGroupDlqEnableSupplier(() -> true)
+            .build();
+
+        InFlightState state = new InFlightState(RecordState.ARCHIVING, 
deliveryCount, EMPTY_MEMBER_ID);
+
+        sharePartition.initiateDLQAndArchive(state, firstOffset, lastOffset, 
deliveryCount, dlqCause);
+
+        assertEquals(expectedState, state.state());
+        assertFalse(state.hasOngoingStateTransition());
+
+        // Verify persister.writeState was called exactly once with the 
correct state batch.
+        ArgumentCaptor<WriteShareGroupStateParameters> captor =
+            ArgumentCaptor.forClass(WriteShareGroupStateParameters.class);
+        Mockito.verify(persister, 
Mockito.times(1)).writeState(captor.capture());
+
+        WriteShareGroupStateParameters params = captor.getValue();
+        GroupTopicPartitionData<PartitionStateBatchData> data =
+            params.groupTopicPartitionData();
+        assertEquals(GROUP_ID, data.groupId());
+        assertEquals(1, data.topicsData().size());
+        assertEquals(TOPIC_ID_PARTITION.topicId(), 
data.topicsData().get(0).topicId());
+        assertEquals(1, data.topicsData().get(0).partitions().size());
+
+        PartitionStateBatchData partitionData = 
data.topicsData().get(0).partitions().get(0);
+        assertEquals(1, partitionData.stateBatches().size());
+        PersisterStateBatch stateBatch = partitionData.stateBatches().get(0);
+        assertEquals(firstOffset, stateBatch.firstOffset());
+        assertEquals(lastOffset, stateBatch.lastOffset());
+        assertEquals(RecordState.ARCHIVED.id(), stateBatch.deliveryState());
+        assertEquals(deliveryCount, stateBatch.deliveryCount());
+
+        // Verify readState was not called by processDlqPhase2.
+        Mockito.verify(persister, Mockito.never()).readState(Mockito.any());
+    }
+
     private static ShareGroupConfigProvider configProviderWithRenewDisabled() {
         ShareGroupConfigProvider configProvider = 
Mockito.mock(ShareGroupConfigProvider.class);
         
Mockito.when(configProvider.isRenewAcknowledgeEnabled(GROUP_ID)).thenReturn(false);
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/NoOpShareGroupDLQManager.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/NoOpShareGroupDLQManager.java
index ea4f5367d1b..5d64499a4f9 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/dlq/NoOpShareGroupDLQManager.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/dlq/NoOpShareGroupDLQManager.java
@@ -32,7 +32,7 @@ public class NoOpShareGroupDLQManager implements 
ShareGroupDLQManager {
 
     @Override
     public CompletableFuture<Void> enqueue(ShareGroupDLQRecordParameter param) 
{
-        log.trace("Enqueuing share group dlq record parameter: {}", param);
+        log.warn("Enqueuing share group dlq record parameter: {}", param);
         return CompletableFuture.completedFuture(null);
     }
 
diff --git 
a/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightBatch.java 
b/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightBatch.java
index 583a5477611..83fb32dd2aa 100644
--- 
a/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightBatch.java
+++ 
b/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightBatch.java
@@ -166,11 +166,12 @@ public class InFlightBatch {
      * @param ops      The behavior on the delivery count.
      * @param maxDeliveryCount The maximum delivery count for the records.
      * @param newMemberId The new member id for the records.
+     * @param dlqSupportEnabled Boolean indicating if share group DLQ support 
is enabled.
      * @return {@code InFlightState} if update succeeds, null otherwise. 
Returning state helps update chaining.
      * @throws IllegalStateException if the offset state is maintained and the 
batch state is not available.
      */
-    public InFlightState tryUpdateBatchState(RecordState newState, 
DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) {
-        return inFlightState().tryUpdateState(newState, ops, maxDeliveryCount, 
newMemberId);
+    public InFlightState tryUpdateBatchState(RecordState newState, 
DeliveryCountOps ops, int maxDeliveryCount, String newMemberId, boolean 
dlqSupportEnabled) {
+        return inFlightState().tryUpdateState(newState, ops, maxDeliveryCount, 
newMemberId, dlqSupportEnabled);
     }
 
     /**
@@ -181,13 +182,14 @@ public class InFlightBatch {
      * @param ops      The behavior on the delivery count.
      * @param maxDeliveryCount The maximum delivery count for the records.
      * @param newMemberId The new member id for the records.
+     * @param dlqSupportEnabled Boolean indicating if share group DLQ support 
is enabled.
      * @return {@code InFlightState} if update succeeds, null otherwise. 
Returning state helps update chaining.
      * @throws IllegalStateException if the offset state is maintained and the 
batch state is not available.
      */
     public InFlightState startBatchStateTransition(RecordState newState, 
DeliveryCountOps ops, int maxDeliveryCount,
-        String newMemberId
+        String newMemberId, boolean dlqSupportEnabled
     ) {
-        return inFlightState().startStateTransition(newState, ops, 
maxDeliveryCount, newMemberId);
+        return inFlightState().startStateTransition(newState, ops, 
maxDeliveryCount, newMemberId, dlqSupportEnabled);
     }
 
     /**
diff --git 
a/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightState.java 
b/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightState.java
index 0ed99562ef8..c5e9cb5c1af 100644
--- 
a/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightState.java
+++ 
b/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightState.java
@@ -147,7 +147,7 @@ public class InFlightState {
      * @return {@code InFlightState} if update succeeds, null otherwise. 
Returning state
      *         helps update chaining.
      */
-    public InFlightState tryUpdateState(RecordState newState, DeliveryCountOps 
ops, int maxDeliveryCount, String newMemberId) {
+    public InFlightState tryUpdateState(RecordState newState, DeliveryCountOps 
ops, int maxDeliveryCount, String newMemberId, boolean dlqSupportEnabled) {
         try {
             // If the state transition is in progress, the state should not be 
updated.
             if (hasOngoingStateTransition()) {
@@ -161,7 +161,7 @@ public class InFlightState {
             }
 
             if (newState == RecordState.AVAILABLE && ops != 
DeliveryCountOps.DECREASE && deliveryCount >= maxDeliveryCount) {
-                newState = RecordState.ARCHIVED;
+                newState = dlqSupportEnabled ? RecordState.ARCHIVING : 
RecordState.ARCHIVED;
             }
             state = state.validateTransition(newState);
             if (newState != RecordState.ARCHIVED) {
@@ -200,9 +200,9 @@ public class InFlightState {
      * @return {@code InFlightState} if update succeeds, null otherwise. 
Returning state
      *         helps update chaining.
      */
-    public InFlightState startStateTransition(RecordState newState, 
DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) {
+    public InFlightState startStateTransition(RecordState newState, 
DeliveryCountOps ops, int maxDeliveryCount, String newMemberId, boolean 
dlqSupportEnabled) {
         InFlightState currentState = new InFlightState(state, deliveryCount, 
memberId, acquisitionLockTimeoutTask);
-        InFlightState updatedState = tryUpdateState(newState, ops, 
maxDeliveryCount, newMemberId);
+        InFlightState updatedState = tryUpdateState(newState, ops, 
maxDeliveryCount, newMemberId, dlqSupportEnabled);
         if (updatedState != null) {
             rollbackState = new RollbackState(currentState, maxDeliveryCount);
         }

Reply via email to