This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ab6755c0c17 KAFKA-19815: Implement acquisitionLockTimeoutMs (#20895)
ab6755c0c17 is described below

commit ab6755c0c17a89a33f33f5c7efd115b5adcc8c1e
Author: Andrew Schofield <[email protected]>
AuthorDate: Mon Nov 17 14:19:06 2025 +0000

    KAFKA-19815: Implement acquisitionLockTimeoutMs (#20895)
    
    Implement KafkaShareConsumer.acquisitionLockTimeoutMs as part of
    KIP-1222.
    
    Reviewers: Apoorv Mittal <[email protected]>
---
 .../kafka/clients/consumer/ShareConsumerTest.java  |  7 +++
 .../consumer/internals/ShareCompletedFetch.java    | 17 ++++---
 .../internals/ShareConsumeRequestManager.java      |  2 +
 .../consumer/internals/ShareConsumerImpl.java      |  8 ++-
 .../clients/consumer/internals/ShareFetch.java     | 17 ++++++-
 .../consumer/internals/ShareInFlightBatch.java     |  9 +++-
 .../internals/ShareCompletedFetchTest.java         | 15 ++++++
 .../consumer/internals/ShareConsumerImplTest.java  | 59 +++++++++++++---------
 .../consumer/internals/ShareFetchBufferTest.java   |  3 ++
 .../internals/ShareFetchCollectorTest.java         |  3 ++
 10 files changed, 104 insertions(+), 36 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index da253fa1c42..90164404b33 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -260,8 +260,10 @@ public class ShareConsumerTest {
             producer.send(record);
             producer.flush();
             shareConsumer.subscribe(Set.of(tp.topic()));
+            assertEquals(Optional.empty(), 
shareConsumer.acquisitionLockTimeoutMs());
             ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 1);
             assertEquals(1, records.count());
+            assertEquals(Optional.of(15000), 
shareConsumer.acquisitionLockTimeoutMs());
             verifyShareGroupStateTopicRecordsProduced();
         }
     }
@@ -276,8 +278,10 @@ public class ShareConsumerTest {
             producer.send(record);
             producer.flush();
             shareConsumer.subscribe(Set.of(tp.topic()));
+            assertEquals(Optional.empty(), 
shareConsumer.acquisitionLockTimeoutMs());
             ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 1);
             assertEquals(1, records.count());
+            assertEquals(Optional.of(15000), 
shareConsumer.acquisitionLockTimeoutMs());
             producer.send(record);
             records = shareConsumer.poll(Duration.ofMillis(5000));
             assertEquals(1, records.count());
@@ -323,11 +327,14 @@ public class ShareConsumerTest {
             shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgementCommitCallback(partitionOffsetsMap, 
partitionExceptionMap));
 
             shareConsumer.subscribe(Set.of(tp.topic()));
+            assertEquals(Optional.empty(), 
shareConsumer.acquisitionLockTimeoutMs());
 
             TestUtils.waitForCondition(() -> 
shareConsumer.poll(Duration.ofMillis(2000)).count() == 1,
                 DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records 
for share consumer");
 
+            assertEquals(Optional.of(15000), 
shareConsumer.acquisitionLockTimeoutMs());
             shareConsumer.subscribe(Set.of(tp2.topic()));
+            assertEquals(Optional.of(15000), 
shareConsumer.acquisitionLockTimeoutMs());
 
             // Waiting for heartbeat to propagate the subscription change.
             TestUtils.waitForCondition(() -> {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
index 95e40a3c826..abdc56be2ba 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
@@ -60,6 +60,7 @@ public class ShareCompletedFetch {
     final int nodeId;
     final TopicIdPartition partition;
     final ShareFetchResponseData.PartitionData partitionData;
+    final Optional<Integer> acquisitionLockTimeoutMs;
     final short requestVersion;
 
     private final Logger log;
@@ -77,21 +78,23 @@ public class ShareCompletedFetch {
     private final List<OffsetAndDeliveryCount> acquiredRecordList;
     private ListIterator<OffsetAndDeliveryCount> acquiredRecordIterator;
     private OffsetAndDeliveryCount nextAcquired;
-    private final ShareFetchMetricsAggregator metricAggregator;
+    private final ShareFetchMetricsAggregator metricsAggregator;
 
     ShareCompletedFetch(final LogContext logContext,
                         final BufferSupplier decompressionBufferSupplier,
                         final int nodeId,
                         final TopicIdPartition partition,
                         final ShareFetchResponseData.PartitionData 
partitionData,
-                        final ShareFetchMetricsAggregator metricAggregator,
+                        final Optional<Integer> acquisitionLockTimeoutMs,
+                        final ShareFetchMetricsAggregator metricsAggregator,
                         final short requestVersion) {
         this.log = 
logContext.logger(org.apache.kafka.clients.consumer.internals.ShareCompletedFetch.class);
         this.decompressionBufferSupplier = decompressionBufferSupplier;
         this.nodeId = nodeId;
         this.partition = partition;
         this.partitionData = partitionData;
-        this.metricAggregator = metricAggregator;
+        this.acquisitionLockTimeoutMs = acquisitionLockTimeoutMs;
+        this.metricsAggregator = metricsAggregator;
         this.requestVersion = requestVersion;
         this.batches = 
ShareFetchResponse.recordsOrFail(partitionData).batches().iterator();
         this.acquiredRecordList = 
buildAcquiredRecordList(partitionData.acquiredRecords());
@@ -154,7 +157,7 @@ public class ShareCompletedFetch {
      * and number of records parsed. After all partitions have reported, we 
write the metric.
      */
     void recordAggregatedMetrics(int bytes, int records) {
-        metricAggregator.record(partition.topicPartition(), bytes, records);
+        metricsAggregator.record(partition.topicPartition(), bytes, records);
     }
 
     /**
@@ -173,7 +176,7 @@ public class ShareCompletedFetch {
                                                  final int maxRecords,
                                                  final boolean checkCrcs) {
         // Creating an empty ShareInFlightBatch
-        ShareInFlightBatch<K, V> inFlightBatch = new 
ShareInFlightBatch<>(nodeId, partition);
+        ShareInFlightBatch<K, V> inFlightBatch = new 
ShareInFlightBatch<>(nodeId, partition, acquisitionLockTimeoutMs);
 
         if (cachedBatchException != null) {
             // If the event that a CRC check fails, reject the entire record 
batch because it is corrupt.
@@ -318,13 +321,13 @@ public class ShareCompletedFetch {
         try {
             key = keyBytes == null ? null : 
deserializers.keyDeserializer().deserialize(partition.topic(), headers, 
keyBytes);
         } catch (RuntimeException e) {
-            log.error("Key Deserializers with error: {}", deserializers);
+            log.error("Key deserializers with error: {}", deserializers);
             throw 
newRecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin.KEY,
 partition.topicPartition(), timestampType, record, e, headers);
         }
         try {
             value = valueBytes == null ? null : 
deserializers.valueDeserializer().deserialize(partition.topic(), headers, 
valueBytes);
         } catch (RuntimeException e) {
-            log.error("Value Deserializers with error: {}", deserializers);
+            log.error("Value deserializers with error: {}", deserializers);
             throw 
newRecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin.VALUE,
 partition.topicPartition(), timestampType, record, e, headers);
         }
         return new ConsumerRecord<>(partition.topic(), partition.partition(), 
record.offset(),
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index 332492245e7..3bfa730b8e9 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -821,6 +821,8 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                         fetchTarget.id(),
                         tip,
                         partitionData,
+                        response.data().acquisitionLockTimeoutMs() > 0
+                            ? 
Optional.of(response.data().acquisitionLockTimeoutMs()) : Optional.empty(),
                         shareFetchMetricsAggregator,
                         requestVersion)
                 );
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index 079a6280055..d23a5b28454 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -892,8 +892,12 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
      */
     @Override
     public Optional<Integer> acquisitionLockTimeoutMs() {
-        // To be implemented
-        return Optional.empty();
+        acquireAndEnsureOpen();
+        try {
+            return currentFetch.acquisitionLockTimeoutMs();
+        } finally {
+            release();
+        }
     }
 
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
index 6eb3ba7d77d..cd5203524ab 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
@@ -30,6 +30,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 
 /**
  * {@link ShareFetch} represents the records fetched from the broker to be 
returned to the consumer
@@ -41,13 +42,15 @@ import java.util.Objects;
  */
 public class ShareFetch<K, V> {
     private final Map<TopicIdPartition, ShareInFlightBatch<K, V>> batches;
+    private Optional<Integer> acquisitionLockTimeoutMs;
 
     public static <K, V> ShareFetch<K, V> empty() {
-        return new ShareFetch<>(new HashMap<>());
+        return new ShareFetch<>(new HashMap<>(), Optional.empty());
     }
 
-    private ShareFetch(Map<TopicIdPartition, ShareInFlightBatch<K, V>> 
batches) {
+    private ShareFetch(Map<TopicIdPartition, ShareInFlightBatch<K, V>> 
batches, Optional<Integer> acquisitionLockTimeoutMs) {
         this.batches = batches;
+        this.acquisitionLockTimeoutMs = acquisitionLockTimeoutMs;
     }
 
     /**
@@ -67,6 +70,9 @@ public class ShareFetch<K, V> {
             // but it might conceivably happen in some rare cases (such as 
partition leader changes).
             currentBatch.merge(batch);
         }
+        if (batch.getAcquisitionLockTimeoutMs().isPresent()) {
+            acquisitionLockTimeoutMs = batch.getAcquisitionLockTimeoutMs();
+        }
     }
 
     /**
@@ -108,6 +114,13 @@ public class ShareFetch<K, V> {
         return numRecords() == 0;
     }
 
+    /**
+     * @return The most up-to-date value of acquisition lock timeout, if 
available
+     */
+    public Optional<Integer> acquisitionLockTimeoutMs() {
+        return acquisitionLockTimeoutMs;
+    }
+
     /**
      * @return {@code true} if this fetch contains records being renewed
      */
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatch.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatch.java
index 34051fc4fdb..ae0baa6c1c7 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatch.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
@@ -36,16 +37,18 @@ public class ShareInFlightBatch<K, V> {
     private Map<Long, ConsumerRecord<K, V>> renewedRecords;
     private final Set<Long> acknowledgedRecords;
     private Acknowledgements acknowledgements;
+    private final Optional<Integer> acquisitionLockTimeoutMs;
     private ShareInFlightBatchException exception;
     private boolean hasCachedException = false;
     private boolean checkForRenewAcknowledgements = false;
 
-    public ShareInFlightBatch(int nodeId, TopicIdPartition partition) {
+    public ShareInFlightBatch(int nodeId, TopicIdPartition partition, 
Optional<Integer> acquisitionLockTimeoutMs) {
         this.nodeId = nodeId;
         this.partition = partition;
         this.inFlightRecords = new TreeMap<>();
         this.acknowledgedRecords = new TreeSet<>();
         this.acknowledgements = Acknowledgements.empty();
+        this.acquisitionLockTimeoutMs = acquisitionLockTimeoutMs;
     }
 
     public void addAcknowledgement(long offset, AcknowledgeType type) {
@@ -182,6 +185,10 @@ public class ShareInFlightBatch<K, V> {
         return acknowledgements;
     }
 
+    Optional<Integer> getAcquisitionLockTimeoutMs() {
+        return acquisitionLockTimeoutMs;
+    }
+
     public boolean isEmpty() {
         return inFlightRecords.isEmpty() && acknowledgements.isEmpty();
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
index a6a4108189c..c219b566992 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
@@ -65,6 +65,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class ShareCompletedFetchTest {
     private static final String TOPIC_NAME = "test";
     private static final TopicIdPartition TIP = new 
TopicIdPartition(Uuid.randomUuid(), 0, TOPIC_NAME);
+    private static final Optional<Integer> DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS 
= Optional.of(30000);
     private static final long PRODUCER_ID = 1000L;
     private static final short PRODUCER_EPOCH = 0;
 
@@ -89,6 +90,7 @@ public class ShareCompletedFetchTest {
         assertEquals(Optional.of((short) 1), record.deliveryCount());
         Acknowledgements acknowledgements = batch.getAcknowledgements();
         assertEquals(0, acknowledgements.size());
+        assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS, 
batch.getAcquisitionLockTimeoutMs());
 
         batch = completedFetch.fetchRecords(deserializers, 10, true);
         records = batch.getInFlightRecords();
@@ -98,12 +100,14 @@ public class ShareCompletedFetchTest {
         assertEquals(Optional.of((short) 1), record.deliveryCount());
         acknowledgements = batch.getAcknowledgements();
         assertEquals(0, acknowledgements.size());
+        assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS, 
batch.getAcquisitionLockTimeoutMs());
 
         batch = completedFetch.fetchRecords(deserializers, 10, true);
         records = batch.getInFlightRecords();
         assertEquals(0, records.size());
         acknowledgements = batch.getAcknowledgements();
         assertEquals(0, acknowledgements.size());
+        assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS, 
batch.getAcquisitionLockTimeoutMs());
     }
 
     @Test
@@ -126,12 +130,14 @@ public class ShareCompletedFetchTest {
         assertEquals(Optional.of((short) 1), record.deliveryCount());
         Acknowledgements acknowledgements = batch.getAcknowledgements();
         assertEquals(0, acknowledgements.size());
+        assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS, 
batch.getAcquisitionLockTimeoutMs());
 
         batch = completedFetch.fetchRecords(deserializers, 10, true);
         records = batch.getInFlightRecords();
         assertEquals(0, records.size());
         acknowledgements = batch.getAcknowledgements();
         assertEquals(0, acknowledgements.size());
+        assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS, 
batch.getAcquisitionLockTimeoutMs());
     }
 
     @Test
@@ -154,12 +160,14 @@ public class ShareCompletedFetchTest {
         assertEquals(Optional.of((short) 1), record.deliveryCount());
         Acknowledgements acknowledgements = batch.getAcknowledgements();
         assertEquals(0, acknowledgements.size());
+        assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS, 
batch.getAcquisitionLockTimeoutMs());
 
         batch = completedFetch.fetchRecords(deserializers, 10, true);
         records = batch.getInFlightRecords();
         assertEquals(0, records.size());
         acknowledgements = batch.getAcknowledgements();
         assertEquals(0, acknowledgements.size());
+        assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS, 
batch.getAcquisitionLockTimeoutMs());
     }
 
     @Test
@@ -177,6 +185,7 @@ public class ShareCompletedFetchTest {
             assertEquals(10, records.size());
             Acknowledgements acknowledgements = batch.getAcknowledgements();
             assertEquals(0, acknowledgements.size());
+            assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS, 
batch.getAcquisitionLockTimeoutMs());
         }
     }
 
@@ -195,6 +204,7 @@ public class ShareCompletedFetchTest {
             assertEquals(0, records.size());
             Acknowledgements acknowledgements = batch.getAcknowledgements();
             assertEquals(0, acknowledgements.size());
+            assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS, 
batch.getAcquisitionLockTimeoutMs());
         }
     }
 
@@ -210,6 +220,7 @@ public class ShareCompletedFetchTest {
             assertEquals(0, records.size());
             Acknowledgements acknowledgements = batch.getAcknowledgements();
             assertEquals(0, acknowledgements.size());
+            assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS, 
batch.getAcquisitionLockTimeoutMs());
         }
     }
 
@@ -263,6 +274,7 @@ public class ShareCompletedFetchTest {
                 acknowledgements = batch.getAcknowledgements();
                 assertEquals(1, acknowledgements.size());
                 assertEquals(AcknowledgeType.RELEASE, 
acknowledgements.get(1L));
+                assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS, 
batch.getAcquisitionLockTimeoutMs());
 
                 // Record 2 then results in an empty batch, because record 1 
has now been skipped
                 batch = completedFetch.fetchRecords(deserializers, 10, false);
@@ -280,6 +292,7 @@ public class ShareCompletedFetchTest {
                 acknowledgements = batch.getAcknowledgements();
                 assertEquals(1, acknowledgements.size());
                 assertEquals(AcknowledgeType.RELEASE, 
acknowledgements.get(2L));
+                assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS, 
batch.getAcquisitionLockTimeoutMs());
 
                 // Record 3 is returned in the next batch, because record 2 
has now been skipped
                 batch = completedFetch.fetchRecords(deserializers, 10, false);
@@ -289,6 +302,7 @@ public class ShareCompletedFetchTest {
                 assertEquals(3L, fetchedRecords.get(0).offset());
                 acknowledgements = batch.getAcknowledgements();
                 assertEquals(0, acknowledgements.size());
+                assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS, 
batch.getAcquisitionLockTimeoutMs());
             }
         }
     }
@@ -428,6 +442,7 @@ public class ShareCompletedFetchTest {
             0,
             TIP,
             partitionData,
+            DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS,
             shareFetchMetricsAggregator,
             ApiKeys.SHARE_FETCH.latestVersion());
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
index 489b0d78200..50723082fcc 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
@@ -69,6 +69,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -101,6 +102,7 @@ import static org.mockito.Mockito.verify;
 @SuppressWarnings("unchecked")
 public class ShareConsumerImplTest {
 
+    private static final Optional<Integer> DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS 
= Optional.of(30000);
     private ShareConsumerImpl<String, String> consumer = null;
 
     private final Time time = new MockTime(1);
@@ -257,7 +259,7 @@ public class ShareConsumerImplTest {
         SubscriptionState subscriptions = new SubscriptionState(new 
LogContext(), AutoOffsetResetStrategy.NONE);
         consumer = newConsumer(subscriptions);
 
-        // Setup subscription
+        // Set up subscription
         final String topicName = "foo";
         final List<String> subscriptionTopic = 
Collections.singletonList(topicName);
         
completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptions, 
subscriptionTopic);
@@ -265,7 +267,7 @@ public class ShareConsumerImplTest {
 
         // Create a fetch with only GAP (no records)
         final TopicIdPartition tip = new TopicIdPartition(Uuid.randomUuid(), 
0, topicName);
-        final ShareInFlightBatch<String, String> batch = new 
ShareInFlightBatch<>(0, tip);
+        final ShareInFlightBatch<String, String> batch = new 
ShareInFlightBatch<>(0, tip, DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS);
         // Add GAP without adding any records
         batch.addGap(1);
         
@@ -275,7 +277,7 @@ public class ShareConsumerImplTest {
 
         consumer.poll(Duration.ZERO);
 
-        // Verify that a ShareAcknowledeAsyncEvent was sent with the 
acknowledgement GAP for offset 1
+        // Verify that a ShareAcknowledgeAsyncEvent was sent with the 
acknowledgement GAP for offset 1
         verify(applicationEventHandler).add(argThat(event -> {
             if (!(event instanceof ShareAcknowledgeAsyncEvent)) {
                 return false;
@@ -316,7 +318,7 @@ public class ShareConsumerImplTest {
         final String topicName = "foo";
         final int partition = 3;
         final TopicIdPartition tip = new TopicIdPartition(Uuid.randomUuid(), 
partition, topicName);
-        final ShareInFlightBatch<String, String> batch = new 
ShareInFlightBatch<>(0, tip);
+        final ShareInFlightBatch<String, String> batch = new 
ShareInFlightBatch<>(0, tip, DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS);
         batch.addRecord(new ConsumerRecord<>(topicName, partition, 2, "key1", 
"value1"));
         doAnswer(invocation -> {
             consumer.wakeup();
@@ -345,6 +347,8 @@ public class ShareConsumerImplTest {
         consumer.close();
         final IllegalStateException res = 
assertThrows(IllegalStateException.class, consumer::subscription);
         assertEquals("This consumer has already been closed.", 
res.getMessage());
+        final IllegalStateException res2 = 
assertThrows(IllegalStateException.class, consumer::acquisitionLockTimeoutMs);
+        assertEquals("This consumer has already been closed.", 
res2.getMessage());
     }
 
     @Test
@@ -352,9 +356,9 @@ public class ShareConsumerImplTest {
         SubscriptionState subscriptions = new SubscriptionState(new 
LogContext(), AutoOffsetResetStrategy.NONE);
         consumer = newConsumer(subscriptions);
 
-        // Setup test data
+        // Set up test data
         String topic = "test-topic";
-        // Setup an empty fetch.
+        // Set up an empty fetch.
         ShareFetch<String, String> firstFetch = ShareFetch.empty();
 
         doReturn(firstFetch)
@@ -362,7 +366,7 @@ public class ShareConsumerImplTest {
                 .when(fetchCollector)
                 .collect(any(ShareFetchBuffer.class));
 
-        // Setup subscription
+        // Set up subscription
         List<String> topics = Collections.singletonList(topic);
         
completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptions, 
topics);
         consumer.subscribe(topics);
@@ -396,7 +400,7 @@ public class ShareConsumerImplTest {
 
     @Test
     public void testExplicitModeUnacknowledgedRecords() {
-        // Setup consumer with explicit acknowledgement mode
+        // Set up consumer with explicit acknowledgement mode
         SubscriptionState subscriptions = new SubscriptionState(new 
LogContext(), AutoOffsetResetStrategy.NONE);
         consumer = newConsumer(
                 mock(ShareFetchBuffer.class),
@@ -405,15 +409,15 @@ public class ShareConsumerImplTest {
                 "client-id",
                 "explicit");
 
-        // Setup test data
+        // Set up test data
         String topic = "test-topic";
         int partition = 0;
         TopicIdPartition tip = new TopicIdPartition(Uuid.randomUuid(), 
partition, topic);
-        ShareInFlightBatch<String, String> batch = new ShareInFlightBatch<>(0, 
tip);
+        ShareInFlightBatch<String, String> batch = new ShareInFlightBatch<>(0, 
tip, DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS);
         batch.addRecord(new ConsumerRecord<>(topic, partition, 0, "key1", 
"value1"));
         batch.addRecord(new ConsumerRecord<>(topic, partition, 1, "key2", 
"value2"));
 
-        // Setup first fetch to return records
+        // Set up first fetch to return records
         ShareFetch<String, String> firstFetch = ShareFetch.empty();
         firstFetch.add(tip, batch);
         doReturn(firstFetch)
@@ -421,14 +425,16 @@ public class ShareConsumerImplTest {
             .when(fetchCollector)
             .collect(any(ShareFetchBuffer.class));
 
-        // Setup subscription
+        // Set up subscription
         List<String> topics = Collections.singletonList(topic);
         
completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptions, 
topics);
         consumer.subscribe(topics);
+        assertEquals(Optional.empty(), consumer.acquisitionLockTimeoutMs());
 
         // First poll should succeed and return records
         ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(100));
         assertEquals(2, records.count(), "Should have received 2 records");
+        assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS, 
consumer.acquisitionLockTimeoutMs());
 
         // Second poll should fail because records weren't acknowledged
         IllegalStateException exception = assertThrows(
@@ -439,6 +445,7 @@ public class ShareConsumerImplTest {
             exception.getMessage().contains("All records must be acknowledged 
in explicit acknowledgement mode."),
             "Unexpected error message: " + exception.getMessage()
         );
+        assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS, 
consumer.acquisitionLockTimeoutMs());
 
         // Verify that acknowledging one record but not all still throws 
exception
         Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
@@ -455,9 +462,9 @@ public class ShareConsumerImplTest {
         // Verify that after acknowledging all records, poll succeeds
         consumer.acknowledge(iterator.next());
         
-        // Setup second fetch to return new records
+        // Set up second fetch to return new records
         ShareFetch<String, String> secondFetch = ShareFetch.empty();
-        ShareInFlightBatch<String, String> newBatch = new 
ShareInFlightBatch<>(2, tip);
+        ShareInFlightBatch<String, String> newBatch = new 
ShareInFlightBatch<>(2, tip, DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS);
         newBatch.addRecord(new ConsumerRecord<>(topic, partition, 2, "key3", 
"value3"));
         newBatch.addRecord(new ConsumerRecord<>(topic, partition, 3, "key4", 
"value4"));
         secondFetch.add(tip, newBatch);
@@ -474,7 +481,7 @@ public class ShareConsumerImplTest {
 
     @Test
     public void testExplicitModeRenewAndAcknowledgeOnPoll() {
-        // Setup consumer with explicit acknowledgement mode
+        // Set up consumer with explicit acknowledgement mode
         SubscriptionState subscriptions = new SubscriptionState(new 
LogContext(), AutoOffsetResetStrategy.NONE);
         consumer = newConsumer(
             mock(ShareFetchBuffer.class),
@@ -483,22 +490,22 @@ public class ShareConsumerImplTest {
             "client-id",
             "explicit");
 
-        // Setup test data
+        // Set up test data
         String topic = "test-topic";
         int partition = 0;
         TopicIdPartition tip = new TopicIdPartition(Uuid.randomUuid(), 
partition, topic);
-        ShareInFlightBatch<String, String> batch = new ShareInFlightBatch<>(0, 
tip);
+        ShareInFlightBatch<String, String> batch = new ShareInFlightBatch<>(0, 
tip, DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS);
         batch.addRecord(new ConsumerRecord<>(topic, partition, 0, "key1", 
"value1"));
         batch.addRecord(new ConsumerRecord<>(topic, partition, 1, "key2", 
"value2"));
 
-        // Setup first fetch to return records
+        // Set up first fetch to return records
         ShareFetch<String, String> firstFetch = ShareFetch.empty();
         firstFetch.add(tip, batch);
         doReturn(firstFetch)
             .when(fetchCollector)
             .collect(any(ShareFetchBuffer.class));
 
-        // Setup subscription
+        // Set up subscription
         List<String> topics = Collections.singletonList(topic);
         
completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptions, 
topics);
         consumer.subscribe(topics);
@@ -506,6 +513,7 @@ public class ShareConsumerImplTest {
         // First poll should succeed and return records
         ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(100));
         assertEquals(2, records.count(), "Should have received 2 records");
+        assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS, 
consumer.acquisitionLockTimeoutMs());
 
         // Renew the first record and accept the second
         Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
@@ -516,6 +524,7 @@ public class ShareConsumerImplTest {
         records = consumer.poll(Duration.ofMillis(100));
         assertEquals(0, records.count(), "Should have received 1 record");
         assertTrue(firstFetch.hasRenewals());
+        assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS, 
consumer.acquisitionLockTimeoutMs());
 
         Acknowledgements acks = Acknowledgements.empty();
         acks.add(0, AcknowledgeType.RENEW);
@@ -530,8 +539,9 @@ public class ShareConsumerImplTest {
         ConsumerRecord<String, String> renewedRecord = iterator.next();
         assertEquals(0, renewedRecord.offset());
         consumer.acknowledge(renewedRecord);
+        assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS, 
consumer.acquisitionLockTimeoutMs());
 
-        // Setup next fetch to return no records
+        // Set up next fetch to return no records
         doReturn(ShareFetch.empty())
             .when(fetchCollector)
             .collect(any(ShareFetchBuffer.class));
@@ -540,9 +550,9 @@ public class ShareConsumerImplTest {
         records = consumer.poll(Duration.ofMillis(100));
         assertTrue(records.isEmpty());
 
-        // Setup next fetch to return new records
+        // Set up next fetch to return new records
         ShareFetch<String, String> thirdFetch = ShareFetch.empty();
-        ShareInFlightBatch<String, String> newBatch = new 
ShareInFlightBatch<>(2, tip);
+        ShareInFlightBatch<String, String> newBatch = new 
ShareInFlightBatch<>(2, tip, DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS);
         newBatch.addRecord(new ConsumerRecord<>(topic, partition, 2, "key3", 
"value3"));
         newBatch.addRecord(new ConsumerRecord<>(topic, partition, 3, "key4", 
"value4"));
         thirdFetch.add(tip, newBatch);
@@ -555,6 +565,7 @@ public class ShareConsumerImplTest {
         // Verify that poll succeeds and returns new records
         ConsumerRecords<String, String> newRecords = 
consumer.poll(Duration.ofMillis(100));
         assertEquals(2, newRecords.count(), "Should have received 2 new 
records");
+        assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS, 
consumer.acquisitionLockTimeoutMs());
     }
 
     @Test
@@ -571,7 +582,7 @@ public class ShareConsumerImplTest {
         SubscriptionState subscriptions = new SubscriptionState(new 
LogContext(), AutoOffsetResetStrategy.NONE);
         consumer = newConsumer(subscriptions);
 
-        // Setup the expected successful completion of close events
+        // Set up the expected successful completion of close events
         completeShareAcknowledgeOnCloseApplicationEventSuccessfully();
         completeShareUnsubscribeApplicationEventSuccessfully(subscriptions);
 
@@ -779,7 +790,7 @@ public class ShareConsumerImplTest {
 
         final TopicPartition tp = new TopicPartition("topic", 0);
         final TopicIdPartition tip = new TopicIdPartition(Uuid.randomUuid(), 
tp);
-        final ShareInFlightBatch<String, String> batch = new 
ShareInFlightBatch<>(0, tip);
+        final ShareInFlightBatch<String, String> batch = new 
ShareInFlightBatch<>(0, tip, DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS);
         batch.addRecord(new ConsumerRecord<>("topic", 0, 2, "key1", "value1"));
         final ShareFetch<String, String> fetch = ShareFetch.empty();
         fetch.add(tip, batch);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchBufferTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchBufferTest.java
index 72f9856d731..d6e1a38a2a7 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchBufferTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchBufferTest.java
@@ -34,6 +34,7 @@ import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
 import java.util.List;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -58,6 +59,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
  */
 public class ShareFetchBufferTest {
 
+    private static final Optional<Integer> DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS 
= Optional.of(30000);
     private final Time time = new MockTime(0, 0, 0);
     private final TopicIdPartition topicAPartition0 = new 
TopicIdPartition(Uuid.randomUuid(), 0, "topic-a");
     private final TopicIdPartition topicAPartition1 = new 
TopicIdPartition(Uuid.randomUuid(), 1, "topic-a");
@@ -170,6 +172,7 @@ public class ShareFetchBufferTest {
                 0,
                 tp,
                 partitionData,
+                DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS,
                 shareFetchMetricsAggregator,
                 ApiKeys.SHARE_FETCH.latestVersion());
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
index d3291983d6e..ebc85ed493b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
@@ -74,6 +74,7 @@ public class ShareFetchCollectorTest {
 
     private static final int DEFAULT_RECORD_COUNT = 10;
     private static final int DEFAULT_MAX_POLL_RECORDS = 
ConsumerConfig.DEFAULT_MAX_POLL_RECORDS;
+    private static final Optional<Integer> DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS 
= Optional.of(30000);
     private final TopicIdPartition topicAPartition0 = new 
TopicIdPartition(Uuid.randomUuid(), 0, "topic-a");
     private LogContext logContext;
 
@@ -155,6 +156,7 @@ public class ShareFetchCollectorTest {
         ShareFetch<String, String> fetch = fetchCollector.collect(fetchBuffer);
         assertFalse(fetch.isEmpty());
         assertEquals(recordCount, fetch.numRecords());
+        assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS, 
fetch.acquisitionLockTimeoutMs());
 
         // When we collected the data from the buffer, this will cause the 
completed fetch to get initialized.
         assertTrue(completedFetch.isInitialized());
@@ -417,6 +419,7 @@ public class ShareFetchCollectorTest {
                     0,
                     topicAPartition0,
                     partitionData,
+                    DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS,
                     shareFetchMetricsAggregator,
                     ApiKeys.SHARE_FETCH.latestVersion());
         }


Reply via email to