This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ab6755c0c17 KAFKA-19815: Implement acquisitionLockTimeoutMs (#20895)
ab6755c0c17 is described below
commit ab6755c0c17a89a33f33f5c7efd115b5adcc8c1e
Author: Andrew Schofield <[email protected]>
AuthorDate: Mon Nov 17 14:19:06 2025 +0000
KAFKA-19815: Implement acquisitionLockTimeoutMs (#20895)
Implement KafkaShareConsumer.acquisitionLockTimeoutMs as part of
KIP-1222.
Reviewers: Apoorv Mittal <[email protected]>
---
.../kafka/clients/consumer/ShareConsumerTest.java | 7 +++
.../consumer/internals/ShareCompletedFetch.java | 17 ++++---
.../internals/ShareConsumeRequestManager.java | 2 +
.../consumer/internals/ShareConsumerImpl.java | 8 ++-
.../clients/consumer/internals/ShareFetch.java | 17 ++++++-
.../consumer/internals/ShareInFlightBatch.java | 9 +++-
.../internals/ShareCompletedFetchTest.java | 15 ++++++
.../consumer/internals/ShareConsumerImplTest.java | 59 +++++++++++++---------
.../consumer/internals/ShareFetchBufferTest.java | 3 ++
.../internals/ShareFetchCollectorTest.java | 3 ++
10 files changed, 104 insertions(+), 36 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index da253fa1c42..90164404b33 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -260,8 +260,10 @@ public class ShareConsumerTest {
producer.send(record);
producer.flush();
shareConsumer.subscribe(Set.of(tp.topic()));
+ assertEquals(Optional.empty(),
shareConsumer.acquisitionLockTimeoutMs());
ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 1);
assertEquals(1, records.count());
+ assertEquals(Optional.of(15000),
shareConsumer.acquisitionLockTimeoutMs());
verifyShareGroupStateTopicRecordsProduced();
}
}
@@ -276,8 +278,10 @@ public class ShareConsumerTest {
producer.send(record);
producer.flush();
shareConsumer.subscribe(Set.of(tp.topic()));
+ assertEquals(Optional.empty(),
shareConsumer.acquisitionLockTimeoutMs());
ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 1);
assertEquals(1, records.count());
+ assertEquals(Optional.of(15000),
shareConsumer.acquisitionLockTimeoutMs());
producer.send(record);
records = shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(1, records.count());
@@ -323,11 +327,14 @@ public class ShareConsumerTest {
shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgementCommitCallback(partitionOffsetsMap,
partitionExceptionMap));
shareConsumer.subscribe(Set.of(tp.topic()));
+ assertEquals(Optional.empty(),
shareConsumer.acquisitionLockTimeoutMs());
TestUtils.waitForCondition(() ->
shareConsumer.poll(Duration.ofMillis(2000)).count() == 1,
DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records
for share consumer");
+ assertEquals(Optional.of(15000),
shareConsumer.acquisitionLockTimeoutMs());
shareConsumer.subscribe(Set.of(tp2.topic()));
+ assertEquals(Optional.of(15000),
shareConsumer.acquisitionLockTimeoutMs());
// Waiting for heartbeat to propagate the subscription change.
TestUtils.waitForCondition(() -> {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
index 95e40a3c826..abdc56be2ba 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
@@ -60,6 +60,7 @@ public class ShareCompletedFetch {
final int nodeId;
final TopicIdPartition partition;
final ShareFetchResponseData.PartitionData partitionData;
+ final Optional<Integer> acquisitionLockTimeoutMs;
final short requestVersion;
private final Logger log;
@@ -77,21 +78,23 @@ public class ShareCompletedFetch {
private final List<OffsetAndDeliveryCount> acquiredRecordList;
private ListIterator<OffsetAndDeliveryCount> acquiredRecordIterator;
private OffsetAndDeliveryCount nextAcquired;
- private final ShareFetchMetricsAggregator metricAggregator;
+ private final ShareFetchMetricsAggregator metricsAggregator;
ShareCompletedFetch(final LogContext logContext,
final BufferSupplier decompressionBufferSupplier,
final int nodeId,
final TopicIdPartition partition,
final ShareFetchResponseData.PartitionData
partitionData,
- final ShareFetchMetricsAggregator metricAggregator,
+ final Optional<Integer> acquisitionLockTimeoutMs,
+ final ShareFetchMetricsAggregator metricsAggregator,
final short requestVersion) {
this.log =
logContext.logger(org.apache.kafka.clients.consumer.internals.ShareCompletedFetch.class);
this.decompressionBufferSupplier = decompressionBufferSupplier;
this.nodeId = nodeId;
this.partition = partition;
this.partitionData = partitionData;
- this.metricAggregator = metricAggregator;
+ this.acquisitionLockTimeoutMs = acquisitionLockTimeoutMs;
+ this.metricsAggregator = metricsAggregator;
this.requestVersion = requestVersion;
this.batches =
ShareFetchResponse.recordsOrFail(partitionData).batches().iterator();
this.acquiredRecordList =
buildAcquiredRecordList(partitionData.acquiredRecords());
@@ -154,7 +157,7 @@ public class ShareCompletedFetch {
* and number of records parsed. After all partitions have reported, we
write the metric.
*/
void recordAggregatedMetrics(int bytes, int records) {
- metricAggregator.record(partition.topicPartition(), bytes, records);
+ metricsAggregator.record(partition.topicPartition(), bytes, records);
}
/**
@@ -173,7 +176,7 @@ public class ShareCompletedFetch {
final int maxRecords,
final boolean checkCrcs) {
// Creating an empty ShareInFlightBatch
- ShareInFlightBatch<K, V> inFlightBatch = new
ShareInFlightBatch<>(nodeId, partition);
+ ShareInFlightBatch<K, V> inFlightBatch = new
ShareInFlightBatch<>(nodeId, partition, acquisitionLockTimeoutMs);
if (cachedBatchException != null) {
// If the event that a CRC check fails, reject the entire record
batch because it is corrupt.
@@ -318,13 +321,13 @@ public class ShareCompletedFetch {
try {
key = keyBytes == null ? null :
deserializers.keyDeserializer().deserialize(partition.topic(), headers,
keyBytes);
} catch (RuntimeException e) {
- log.error("Key Deserializers with error: {}", deserializers);
+ log.error("Key deserializers with error: {}", deserializers);
throw
newRecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin.KEY,
partition.topicPartition(), timestampType, record, e, headers);
}
try {
value = valueBytes == null ? null :
deserializers.valueDeserializer().deserialize(partition.topic(), headers,
valueBytes);
} catch (RuntimeException e) {
- log.error("Value Deserializers with error: {}", deserializers);
+ log.error("Value deserializers with error: {}", deserializers);
throw
newRecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin.VALUE,
partition.topicPartition(), timestampType, record, e, headers);
}
return new ConsumerRecord<>(partition.topic(), partition.partition(),
record.offset(),
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index 332492245e7..3bfa730b8e9 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -821,6 +821,8 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
fetchTarget.id(),
tip,
partitionData,
+ response.data().acquisitionLockTimeoutMs() > 0
+ ?
Optional.of(response.data().acquisitionLockTimeoutMs()) : Optional.empty(),
shareFetchMetricsAggregator,
requestVersion)
);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index 079a6280055..d23a5b28454 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -892,8 +892,12 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
*/
@Override
public Optional<Integer> acquisitionLockTimeoutMs() {
- // To be implemented
- return Optional.empty();
+ acquireAndEnsureOpen();
+ try {
+ return currentFetch.acquisitionLockTimeoutMs();
+ } finally {
+ release();
+ }
}
/**
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
index 6eb3ba7d77d..cd5203524ab 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
@@ -30,6 +30,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
/**
* {@link ShareFetch} represents the records fetched from the broker to be
returned to the consumer
@@ -41,13 +42,15 @@ import java.util.Objects;
*/
public class ShareFetch<K, V> {
private final Map<TopicIdPartition, ShareInFlightBatch<K, V>> batches;
+ private Optional<Integer> acquisitionLockTimeoutMs;
public static <K, V> ShareFetch<K, V> empty() {
- return new ShareFetch<>(new HashMap<>());
+ return new ShareFetch<>(new HashMap<>(), Optional.empty());
}
- private ShareFetch(Map<TopicIdPartition, ShareInFlightBatch<K, V>>
batches) {
+ private ShareFetch(Map<TopicIdPartition, ShareInFlightBatch<K, V>>
batches, Optional<Integer> acquisitionLockTimeoutMs) {
this.batches = batches;
+ this.acquisitionLockTimeoutMs = acquisitionLockTimeoutMs;
}
/**
@@ -67,6 +70,9 @@ public class ShareFetch<K, V> {
// but it might conceivably happen in some rare cases (such as
partition leader changes).
currentBatch.merge(batch);
}
+ if (batch.getAcquisitionLockTimeoutMs().isPresent()) {
+ acquisitionLockTimeoutMs = batch.getAcquisitionLockTimeoutMs();
+ }
}
/**
@@ -108,6 +114,13 @@ public class ShareFetch<K, V> {
return numRecords() == 0;
}
+ /**
+ * @return The most up-to-date value of acquisition lock timeout, if
available
+ */
+ public Optional<Integer> acquisitionLockTimeoutMs() {
+ return acquisitionLockTimeoutMs;
+ }
+
/**
* @return {@code true} if this fetch contains records being renewed
*/
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatch.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatch.java
index 34051fc4fdb..ae0baa6c1c7 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatch.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatch.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
@@ -36,16 +37,18 @@ public class ShareInFlightBatch<K, V> {
private Map<Long, ConsumerRecord<K, V>> renewedRecords;
private final Set<Long> acknowledgedRecords;
private Acknowledgements acknowledgements;
+ private final Optional<Integer> acquisitionLockTimeoutMs;
private ShareInFlightBatchException exception;
private boolean hasCachedException = false;
private boolean checkForRenewAcknowledgements = false;
- public ShareInFlightBatch(int nodeId, TopicIdPartition partition) {
+ public ShareInFlightBatch(int nodeId, TopicIdPartition partition,
Optional<Integer> acquisitionLockTimeoutMs) {
this.nodeId = nodeId;
this.partition = partition;
this.inFlightRecords = new TreeMap<>();
this.acknowledgedRecords = new TreeSet<>();
this.acknowledgements = Acknowledgements.empty();
+ this.acquisitionLockTimeoutMs = acquisitionLockTimeoutMs;
}
public void addAcknowledgement(long offset, AcknowledgeType type) {
@@ -182,6 +185,10 @@ public class ShareInFlightBatch<K, V> {
return acknowledgements;
}
+ Optional<Integer> getAcquisitionLockTimeoutMs() {
+ return acquisitionLockTimeoutMs;
+ }
+
public boolean isEmpty() {
return inFlightRecords.isEmpty() && acknowledgements.isEmpty();
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
index a6a4108189c..c219b566992 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
@@ -65,6 +65,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class ShareCompletedFetchTest {
private static final String TOPIC_NAME = "test";
private static final TopicIdPartition TIP = new
TopicIdPartition(Uuid.randomUuid(), 0, TOPIC_NAME);
+ private static final Optional<Integer> DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS
= Optional.of(30000);
private static final long PRODUCER_ID = 1000L;
private static final short PRODUCER_EPOCH = 0;
@@ -89,6 +90,7 @@ public class ShareCompletedFetchTest {
assertEquals(Optional.of((short) 1), record.deliveryCount());
Acknowledgements acknowledgements = batch.getAcknowledgements();
assertEquals(0, acknowledgements.size());
+ assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS,
batch.getAcquisitionLockTimeoutMs());
batch = completedFetch.fetchRecords(deserializers, 10, true);
records = batch.getInFlightRecords();
@@ -98,12 +100,14 @@ public class ShareCompletedFetchTest {
assertEquals(Optional.of((short) 1), record.deliveryCount());
acknowledgements = batch.getAcknowledgements();
assertEquals(0, acknowledgements.size());
+ assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS,
batch.getAcquisitionLockTimeoutMs());
batch = completedFetch.fetchRecords(deserializers, 10, true);
records = batch.getInFlightRecords();
assertEquals(0, records.size());
acknowledgements = batch.getAcknowledgements();
assertEquals(0, acknowledgements.size());
+ assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS,
batch.getAcquisitionLockTimeoutMs());
}
@Test
@@ -126,12 +130,14 @@ public class ShareCompletedFetchTest {
assertEquals(Optional.of((short) 1), record.deliveryCount());
Acknowledgements acknowledgements = batch.getAcknowledgements();
assertEquals(0, acknowledgements.size());
+ assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS,
batch.getAcquisitionLockTimeoutMs());
batch = completedFetch.fetchRecords(deserializers, 10, true);
records = batch.getInFlightRecords();
assertEquals(0, records.size());
acknowledgements = batch.getAcknowledgements();
assertEquals(0, acknowledgements.size());
+ assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS,
batch.getAcquisitionLockTimeoutMs());
}
@Test
@@ -154,12 +160,14 @@ public class ShareCompletedFetchTest {
assertEquals(Optional.of((short) 1), record.deliveryCount());
Acknowledgements acknowledgements = batch.getAcknowledgements();
assertEquals(0, acknowledgements.size());
+ assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS,
batch.getAcquisitionLockTimeoutMs());
batch = completedFetch.fetchRecords(deserializers, 10, true);
records = batch.getInFlightRecords();
assertEquals(0, records.size());
acknowledgements = batch.getAcknowledgements();
assertEquals(0, acknowledgements.size());
+ assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS,
batch.getAcquisitionLockTimeoutMs());
}
@Test
@@ -177,6 +185,7 @@ public class ShareCompletedFetchTest {
assertEquals(10, records.size());
Acknowledgements acknowledgements = batch.getAcknowledgements();
assertEquals(0, acknowledgements.size());
+ assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS,
batch.getAcquisitionLockTimeoutMs());
}
}
@@ -195,6 +204,7 @@ public class ShareCompletedFetchTest {
assertEquals(0, records.size());
Acknowledgements acknowledgements = batch.getAcknowledgements();
assertEquals(0, acknowledgements.size());
+ assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS,
batch.getAcquisitionLockTimeoutMs());
}
}
@@ -210,6 +220,7 @@ public class ShareCompletedFetchTest {
assertEquals(0, records.size());
Acknowledgements acknowledgements = batch.getAcknowledgements();
assertEquals(0, acknowledgements.size());
+ assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS,
batch.getAcquisitionLockTimeoutMs());
}
}
@@ -263,6 +274,7 @@ public class ShareCompletedFetchTest {
acknowledgements = batch.getAcknowledgements();
assertEquals(1, acknowledgements.size());
assertEquals(AcknowledgeType.RELEASE,
acknowledgements.get(1L));
+ assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS,
batch.getAcquisitionLockTimeoutMs());
// Record 2 then results in an empty batch, because record 1
has now been skipped
batch = completedFetch.fetchRecords(deserializers, 10, false);
@@ -280,6 +292,7 @@ public class ShareCompletedFetchTest {
acknowledgements = batch.getAcknowledgements();
assertEquals(1, acknowledgements.size());
assertEquals(AcknowledgeType.RELEASE,
acknowledgements.get(2L));
+ assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS,
batch.getAcquisitionLockTimeoutMs());
// Record 3 is returned in the next batch, because record 2
has now been skipped
batch = completedFetch.fetchRecords(deserializers, 10, false);
@@ -289,6 +302,7 @@ public class ShareCompletedFetchTest {
assertEquals(3L, fetchedRecords.get(0).offset());
acknowledgements = batch.getAcknowledgements();
assertEquals(0, acknowledgements.size());
+ assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS,
batch.getAcquisitionLockTimeoutMs());
}
}
}
@@ -428,6 +442,7 @@ public class ShareCompletedFetchTest {
0,
TIP,
partitionData,
+ DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS,
shareFetchMetricsAggregator,
ApiKeys.SHARE_FETCH.latestVersion());
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
index 489b0d78200..50723082fcc 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
@@ -69,6 +69,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -101,6 +102,7 @@ import static org.mockito.Mockito.verify;
@SuppressWarnings("unchecked")
public class ShareConsumerImplTest {
+ private static final Optional<Integer> DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS
= Optional.of(30000);
private ShareConsumerImpl<String, String> consumer = null;
private final Time time = new MockTime(1);
@@ -257,7 +259,7 @@ public class ShareConsumerImplTest {
SubscriptionState subscriptions = new SubscriptionState(new
LogContext(), AutoOffsetResetStrategy.NONE);
consumer = newConsumer(subscriptions);
- // Setup subscription
+ // Set up subscription
final String topicName = "foo";
final List<String> subscriptionTopic =
Collections.singletonList(topicName);
completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptions,
subscriptionTopic);
@@ -265,7 +267,7 @@ public class ShareConsumerImplTest {
// Create a fetch with only GAP (no records)
final TopicIdPartition tip = new TopicIdPartition(Uuid.randomUuid(),
0, topicName);
- final ShareInFlightBatch<String, String> batch = new
ShareInFlightBatch<>(0, tip);
+ final ShareInFlightBatch<String, String> batch = new
ShareInFlightBatch<>(0, tip, DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS);
// Add GAP without adding any records
batch.addGap(1);
@@ -275,7 +277,7 @@ public class ShareConsumerImplTest {
consumer.poll(Duration.ZERO);
- // Verify that a ShareAcknowledeAsyncEvent was sent with the
acknowledgement GAP for offset 1
+ // Verify that a ShareAcknowledgeAsyncEvent was sent with the
acknowledgement GAP for offset 1
verify(applicationEventHandler).add(argThat(event -> {
if (!(event instanceof ShareAcknowledgeAsyncEvent)) {
return false;
@@ -316,7 +318,7 @@ public class ShareConsumerImplTest {
final String topicName = "foo";
final int partition = 3;
final TopicIdPartition tip = new TopicIdPartition(Uuid.randomUuid(),
partition, topicName);
- final ShareInFlightBatch<String, String> batch = new
ShareInFlightBatch<>(0, tip);
+ final ShareInFlightBatch<String, String> batch = new
ShareInFlightBatch<>(0, tip, DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS);
batch.addRecord(new ConsumerRecord<>(topicName, partition, 2, "key1",
"value1"));
doAnswer(invocation -> {
consumer.wakeup();
@@ -345,6 +347,8 @@ public class ShareConsumerImplTest {
consumer.close();
final IllegalStateException res =
assertThrows(IllegalStateException.class, consumer::subscription);
assertEquals("This consumer has already been closed.",
res.getMessage());
+ final IllegalStateException res2 =
assertThrows(IllegalStateException.class, consumer::acquisitionLockTimeoutMs);
+ assertEquals("This consumer has already been closed.",
res2.getMessage());
}
@Test
@@ -352,9 +356,9 @@ public class ShareConsumerImplTest {
SubscriptionState subscriptions = new SubscriptionState(new
LogContext(), AutoOffsetResetStrategy.NONE);
consumer = newConsumer(subscriptions);
- // Setup test data
+ // Set up test data
String topic = "test-topic";
- // Setup an empty fetch.
+ // Set up an empty fetch.
ShareFetch<String, String> firstFetch = ShareFetch.empty();
doReturn(firstFetch)
@@ -362,7 +366,7 @@ public class ShareConsumerImplTest {
.when(fetchCollector)
.collect(any(ShareFetchBuffer.class));
- // Setup subscription
+ // Set up subscription
List<String> topics = Collections.singletonList(topic);
completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptions,
topics);
consumer.subscribe(topics);
@@ -396,7 +400,7 @@ public class ShareConsumerImplTest {
@Test
public void testExplicitModeUnacknowledgedRecords() {
- // Setup consumer with explicit acknowledgement mode
+ // Set up consumer with explicit acknowledgement mode
SubscriptionState subscriptions = new SubscriptionState(new
LogContext(), AutoOffsetResetStrategy.NONE);
consumer = newConsumer(
mock(ShareFetchBuffer.class),
@@ -405,15 +409,15 @@ public class ShareConsumerImplTest {
"client-id",
"explicit");
- // Setup test data
+ // Set up test data
String topic = "test-topic";
int partition = 0;
TopicIdPartition tip = new TopicIdPartition(Uuid.randomUuid(),
partition, topic);
- ShareInFlightBatch<String, String> batch = new ShareInFlightBatch<>(0,
tip);
+ ShareInFlightBatch<String, String> batch = new ShareInFlightBatch<>(0,
tip, DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS);
batch.addRecord(new ConsumerRecord<>(topic, partition, 0, "key1",
"value1"));
batch.addRecord(new ConsumerRecord<>(topic, partition, 1, "key2",
"value2"));
- // Setup first fetch to return records
+ // Set up first fetch to return records
ShareFetch<String, String> firstFetch = ShareFetch.empty();
firstFetch.add(tip, batch);
doReturn(firstFetch)
@@ -421,14 +425,16 @@ public class ShareConsumerImplTest {
.when(fetchCollector)
.collect(any(ShareFetchBuffer.class));
- // Setup subscription
+ // Set up subscription
List<String> topics = Collections.singletonList(topic);
completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptions,
topics);
consumer.subscribe(topics);
+ assertEquals(Optional.empty(), consumer.acquisitionLockTimeoutMs());
// First poll should succeed and return records
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
assertEquals(2, records.count(), "Should have received 2 records");
+ assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS,
consumer.acquisitionLockTimeoutMs());
// Second poll should fail because records weren't acknowledged
IllegalStateException exception = assertThrows(
@@ -439,6 +445,7 @@ public class ShareConsumerImplTest {
exception.getMessage().contains("All records must be acknowledged
in explicit acknowledgement mode."),
"Unexpected error message: " + exception.getMessage()
);
+ assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS,
consumer.acquisitionLockTimeoutMs());
// Verify that acknowledging one record but not all still throws
exception
Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
@@ -455,9 +462,9 @@ public class ShareConsumerImplTest {
// Verify that after acknowledging all records, poll succeeds
consumer.acknowledge(iterator.next());
- // Setup second fetch to return new records
+ // Set up second fetch to return new records
ShareFetch<String, String> secondFetch = ShareFetch.empty();
- ShareInFlightBatch<String, String> newBatch = new
ShareInFlightBatch<>(2, tip);
+ ShareInFlightBatch<String, String> newBatch = new
ShareInFlightBatch<>(2, tip, DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS);
newBatch.addRecord(new ConsumerRecord<>(topic, partition, 2, "key3",
"value3"));
newBatch.addRecord(new ConsumerRecord<>(topic, partition, 3, "key4",
"value4"));
secondFetch.add(tip, newBatch);
@@ -474,7 +481,7 @@ public class ShareConsumerImplTest {
@Test
public void testExplicitModeRenewAndAcknowledgeOnPoll() {
- // Setup consumer with explicit acknowledgement mode
+ // Set up consumer with explicit acknowledgement mode
SubscriptionState subscriptions = new SubscriptionState(new
LogContext(), AutoOffsetResetStrategy.NONE);
consumer = newConsumer(
mock(ShareFetchBuffer.class),
@@ -483,22 +490,22 @@ public class ShareConsumerImplTest {
"client-id",
"explicit");
- // Setup test data
+ // Set up test data
String topic = "test-topic";
int partition = 0;
TopicIdPartition tip = new TopicIdPartition(Uuid.randomUuid(),
partition, topic);
- ShareInFlightBatch<String, String> batch = new ShareInFlightBatch<>(0,
tip);
+ ShareInFlightBatch<String, String> batch = new ShareInFlightBatch<>(0,
tip, DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS);
batch.addRecord(new ConsumerRecord<>(topic, partition, 0, "key1",
"value1"));
batch.addRecord(new ConsumerRecord<>(topic, partition, 1, "key2",
"value2"));
- // Setup first fetch to return records
+ // Set up first fetch to return records
ShareFetch<String, String> firstFetch = ShareFetch.empty();
firstFetch.add(tip, batch);
doReturn(firstFetch)
.when(fetchCollector)
.collect(any(ShareFetchBuffer.class));
- // Setup subscription
+ // Set up subscription
List<String> topics = Collections.singletonList(topic);
completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptions,
topics);
consumer.subscribe(topics);
@@ -506,6 +513,7 @@ public class ShareConsumerImplTest {
// First poll should succeed and return records
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
assertEquals(2, records.count(), "Should have received 2 records");
+ assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS,
consumer.acquisitionLockTimeoutMs());
// Renew the first record and accept the second
Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
@@ -516,6 +524,7 @@ public class ShareConsumerImplTest {
records = consumer.poll(Duration.ofMillis(100));
assertEquals(0, records.count(), "Should have received 1 record");
assertTrue(firstFetch.hasRenewals());
+ assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS,
consumer.acquisitionLockTimeoutMs());
Acknowledgements acks = Acknowledgements.empty();
acks.add(0, AcknowledgeType.RENEW);
@@ -530,8 +539,9 @@ public class ShareConsumerImplTest {
ConsumerRecord<String, String> renewedRecord = iterator.next();
assertEquals(0, renewedRecord.offset());
consumer.acknowledge(renewedRecord);
+ assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS,
consumer.acquisitionLockTimeoutMs());
- // Setup next fetch to return no records
+ // Set up next fetch to return no records
doReturn(ShareFetch.empty())
.when(fetchCollector)
.collect(any(ShareFetchBuffer.class));
@@ -540,9 +550,9 @@ public class ShareConsumerImplTest {
records = consumer.poll(Duration.ofMillis(100));
assertTrue(records.isEmpty());
- // Setup next fetch to return new records
+ // Set up next fetch to return new records
ShareFetch<String, String> thirdFetch = ShareFetch.empty();
- ShareInFlightBatch<String, String> newBatch = new
ShareInFlightBatch<>(2, tip);
+ ShareInFlightBatch<String, String> newBatch = new
ShareInFlightBatch<>(2, tip, DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS);
newBatch.addRecord(new ConsumerRecord<>(topic, partition, 2, "key3",
"value3"));
newBatch.addRecord(new ConsumerRecord<>(topic, partition, 3, "key4",
"value4"));
thirdFetch.add(tip, newBatch);
@@ -555,6 +565,7 @@ public class ShareConsumerImplTest {
// Verify that poll succeeds and returns new records
ConsumerRecords<String, String> newRecords =
consumer.poll(Duration.ofMillis(100));
assertEquals(2, newRecords.count(), "Should have received 2 new
records");
+ assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS,
consumer.acquisitionLockTimeoutMs());
}
@Test
@@ -571,7 +582,7 @@ public class ShareConsumerImplTest {
SubscriptionState subscriptions = new SubscriptionState(new
LogContext(), AutoOffsetResetStrategy.NONE);
consumer = newConsumer(subscriptions);
- // Setup the expected successful completion of close events
+ // Set up the expected successful completion of close events
completeShareAcknowledgeOnCloseApplicationEventSuccessfully();
completeShareUnsubscribeApplicationEventSuccessfully(subscriptions);
@@ -779,7 +790,7 @@ public class ShareConsumerImplTest {
final TopicPartition tp = new TopicPartition("topic", 0);
final TopicIdPartition tip = new TopicIdPartition(Uuid.randomUuid(),
tp);
- final ShareInFlightBatch<String, String> batch = new
ShareInFlightBatch<>(0, tip);
+ final ShareInFlightBatch<String, String> batch = new
ShareInFlightBatch<>(0, tip, DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS);
batch.addRecord(new ConsumerRecord<>("topic", 0, 2, "key1", "value1"));
final ShareFetch<String, String> fetch = ShareFetch.empty();
fetch.add(tip, batch);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchBufferTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchBufferTest.java
index 72f9856d731..d6e1a38a2a7 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchBufferTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchBufferTest.java
@@ -34,6 +34,7 @@ import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.util.List;
+import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
@@ -58,6 +59,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
*/
public class ShareFetchBufferTest {
+ private static final Optional<Integer> DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS
= Optional.of(30000);
private final Time time = new MockTime(0, 0, 0);
private final TopicIdPartition topicAPartition0 = new
TopicIdPartition(Uuid.randomUuid(), 0, "topic-a");
private final TopicIdPartition topicAPartition1 = new
TopicIdPartition(Uuid.randomUuid(), 1, "topic-a");
@@ -170,6 +172,7 @@ public class ShareFetchBufferTest {
0,
tp,
partitionData,
+ DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS,
shareFetchMetricsAggregator,
ApiKeys.SHARE_FETCH.latestVersion());
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
index d3291983d6e..ebc85ed493b 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
@@ -74,6 +74,7 @@ public class ShareFetchCollectorTest {
private static final int DEFAULT_RECORD_COUNT = 10;
private static final int DEFAULT_MAX_POLL_RECORDS =
ConsumerConfig.DEFAULT_MAX_POLL_RECORDS;
+ private static final Optional<Integer> DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS
= Optional.of(30000);
private final TopicIdPartition topicAPartition0 = new
TopicIdPartition(Uuid.randomUuid(), 0, "topic-a");
private LogContext logContext;
@@ -155,6 +156,7 @@ public class ShareFetchCollectorTest {
ShareFetch<String, String> fetch = fetchCollector.collect(fetchBuffer);
assertFalse(fetch.isEmpty());
assertEquals(recordCount, fetch.numRecords());
+ assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS,
fetch.acquisitionLockTimeoutMs());
// When we collected the data from the buffer, this will cause the
completed fetch to get initialized.
assertTrue(completedFetch.isInitialized());
@@ -417,6 +419,7 @@ public class ShareFetchCollectorTest {
0,
topicAPartition0,
partitionData,
+ DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS,
shareFetchMetricsAggregator,
ApiKeys.SHARE_FETCH.latestVersion());
}