This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 86e7885a81c KAFKA-16100: Add timeout to all the CompletableApplicationEvents (#15455) 86e7885a81c is described below commit 86e7885a81c7132e522d2c59dc6fcf81026cc60d Author: Kirk True <k...@kirktrue.pro> AuthorDate: Thu Mar 7 06:00:21 2024 -0800 KAFKA-16100: Add timeout to all the CompletableApplicationEvents (#15455) This is part of the larger task of enforcing the timeouts for application events, per KAFKA-15974. This takes a first step by adding a Timer to all of the CompletableApplicationEvent subclasses. For the few classes that already included a timeout, this refactors them to use the Timer mechanism instead. Reviewers: Andrew Schofield <aschofi...@confluent.io>, Bruno Cadonna <cado...@apache.org> --- .../consumer/internals/AsyncKafkaConsumer.java | 41 +++++++++++++--------- .../events/AbstractTopicMetadataEvent.java | 17 ++------- .../internals/events/AllTopicsMetadataEvent.java | 6 ++-- .../events/ApplicationEventProcessor.java | 25 +++---------- .../internals/events/AsyncCommitEvent.java | 2 +- .../consumer/internals/events/CommitEvent.java | 20 +++++++++-- .../events/CompletableApplicationEvent.java | 20 +++++++++-- .../events/FetchCommittedOffsetsEvent.java | 17 +++------ .../internals/events/LeaveOnCloseEvent.java | 6 ++-- .../internals/events/ListOffsetsEvent.java | 5 +-- .../internals/events/ResetPositionsEvent.java | 6 ++-- .../consumer/internals/events/SyncCommitEvent.java | 21 ++--------- .../internals/events/TopicMetadataEvent.java | 6 ++-- .../internals/events/UnsubscribeEvent.java | 6 ++-- .../internals/events/ValidatePositionsEvent.java | 6 ++-- .../internals/ConsumerNetworkThreadTest.java | 19 ++++++---- .../events/ApplicationEventProcessorTest.java | 17 ++++----- 17 files changed, 120 insertions(+), 120 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 5354503c016..fcd57469c2a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -938,13 +938,14 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { return Collections.emptyMap(); } + final Timer timer = time.timer(timeout); final FetchCommittedOffsetsEvent event = new FetchCommittedOffsetsEvent( partitions, - timeout.toMillis()); + timer); wakeupTrigger.setActiveTask(event.future()); try { final Map<TopicPartition, OffsetAndMetadata> committedOffsets = applicationEventHandler.addAndGet(event, - time.timer(timeout)); + timer); committedOffsets.forEach(this::updateLastSeenEpochIfNewer); return committedOffsets; } catch (TimeoutException e) { @@ -990,11 +991,12 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { throw new TimeoutException(); } - final TopicMetadataEvent topicMetadataEvent = new TopicMetadataEvent(topic, timeout.toMillis()); + final Timer timer = time.timer(timeout); + final TopicMetadataEvent topicMetadataEvent = new TopicMetadataEvent(topic, timer); wakeupTrigger.setActiveTask(topicMetadataEvent.future()); try { Map<String, List<PartitionInfo>> topicMetadata = - applicationEventHandler.addAndGet(topicMetadataEvent, time.timer(timeout)); + applicationEventHandler.addAndGet(topicMetadataEvent, timer); return topicMetadata.getOrDefault(topic, Collections.emptyList()); } finally { @@ -1018,10 +1020,11 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { throw new TimeoutException(); } - final AllTopicsMetadataEvent topicMetadataEvent = new AllTopicsMetadataEvent(timeout.toMillis()); + final Timer timer = time.timer(timeout); + final AllTopicsMetadataEvent topicMetadataEvent = new AllTopicsMetadataEvent(timer); wakeupTrigger.setActiveTask(topicMetadataEvent.future()); try { - return applicationEventHandler.addAndGet(topicMetadataEvent, time.timer(timeout)); + return applicationEventHandler.addAndGet(topicMetadataEvent, timer); } finally { wakeupTrigger.clearTask(); } @@ -1089,16 +1092,18 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { if (timestampsToSearch.isEmpty()) { return Collections.emptyMap(); } + final Timer timer = time.timer(timeout); final ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent( timestampsToSearch, - true); + true, + timer); // If timeout is set to zero return empty immediately; otherwise try to get the results // and throw timeout exception if it cannot complete in time. if (timeout.toMillis() == 0L) return listOffsetsEvent.emptyResult(); - return applicationEventHandler.addAndGet(listOffsetsEvent, time.timer(timeout)); + return applicationEventHandler.addAndGet(listOffsetsEvent, timer); } finally { release(); } @@ -1139,12 +1144,14 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { Map<TopicPartition, Long> timestampToSearch = partitions .stream() .collect(Collectors.toMap(Function.identity(), tp -> timestamp)); + Timer timer = time.timer(timeout); ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent( timestampToSearch, - false); + false, + timer); Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = applicationEventHandler.addAndGet( listOffsetsEvent, - time.timer(timeout)); + timer); return offsetAndTimestampMap .entrySet() .stream() @@ -1274,7 +1281,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { completeQuietly( () -> { maybeRevokePartitions(); - applicationEventHandler.addAndGet(new LeaveOnCloseEvent(), timer); + applicationEventHandler.addAndGet(new LeaveOnCloseEvent(timer), timer); }, "Failed to send leaveGroup heartbeat with a timeout(ms)=" + timer.timeoutMs(), firstException); } @@ -1351,7 +1358,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { long commitStart = time.nanoseconds(); try { Timer requestTimer = time.timer(timeout.toMillis()); - SyncCommitEvent syncCommitEvent = new SyncCommitEvent(offsets, timeout.toMillis()); + SyncCommitEvent syncCommitEvent = new SyncCommitEvent(offsets, requestTimer); CompletableFuture<Void> commitFuture = commit(syncCommitEvent); wakeupTrigger.setActiveTask(commitFuture); ConsumerUtils.getResult(commitFuture, requestTimer); @@ -1465,10 +1472,10 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { try { fetchBuffer.retainAll(Collections.emptySet()); if (groupMetadata.get().isPresent()) { - UnsubscribeEvent unsubscribeEvent = new UnsubscribeEvent(); + Timer timer = time.timer(Long.MAX_VALUE); + UnsubscribeEvent unsubscribeEvent = new UnsubscribeEvent(timer); applicationEventHandler.add(unsubscribeEvent); log.info("Unsubscribing all topics or patterns and assigned partitions"); - Timer timer = time.timer(Long.MAX_VALUE); try { processBackgroundEvents(backgroundEventProcessor, unsubscribeEvent.future(), timer); @@ -1579,7 +1586,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { // Validate positions using the partition leader end offsets, to detect if any partition // has been truncated due to a leader change. This will trigger an OffsetForLeaderEpoch // request, retrieve the partition end offsets, and validate the current position against it. - applicationEventHandler.addAndGet(new ValidatePositionsEvent(), timer); + applicationEventHandler.addAndGet(new ValidatePositionsEvent(timer), timer); cachedSubscriptionHasAllFetchPositions = subscriptions.hasAllFetchPositions(); if (cachedSubscriptionHasAllFetchPositions) return true; @@ -1602,7 +1609,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { // which are awaiting reset. This will trigger a ListOffset request, retrieve the // partition offsets according to the strategy (ex. earliest, latest), and update the // positions. - applicationEventHandler.addAndGet(new ResetPositionsEvent(), timer); + applicationEventHandler.addAndGet(new ResetPositionsEvent(timer), timer); return true; } catch (TimeoutException e) { return false; @@ -1635,7 +1642,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { final FetchCommittedOffsetsEvent event = new FetchCommittedOffsetsEvent( initializingPartitions, - timer.remainingMs()); + timer); final Map<TopicPartition, OffsetAndMetadata> offsets = applicationEventHandler.addAndGet(event, timer); refreshCommittedOffsets(offsets, metadata, subscriptions); return true; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java index 31c21817d85..3347002cc6f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java @@ -17,25 +17,14 @@ package org.apache.kafka.clients.consumer.internals.events; import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.utils.Timer; import java.util.List; import java.util.Map; public abstract class AbstractTopicMetadataEvent extends CompletableApplicationEvent<Map<String, List<PartitionInfo>>> { - private final long timeoutMs; - - protected AbstractTopicMetadataEvent(final Type type, final long timeoutMs) { - super(type); - this.timeoutMs = timeoutMs; - } - - public long timeoutMs() { - return timeoutMs; - } - - @Override - public String toStringBase() { - return super.toStringBase() + ", timeoutMs=" + timeoutMs; + protected AbstractTopicMetadataEvent(final Type type, final Timer timer) { + super(type, timer); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AllTopicsMetadataEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AllTopicsMetadataEvent.java index 154703aaee1..bda18e64210 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AllTopicsMetadataEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AllTopicsMetadataEvent.java @@ -16,9 +16,11 @@ */ package org.apache.kafka.clients.consumer.internals.events; +import org.apache.kafka.common.utils.Timer; + public class AllTopicsMetadataEvent extends AbstractTopicMetadataEvent { - public AllTopicsMetadataEvent(final long timeoutMs) { - super(Type.ALL_TOPICS_METADATA, timeoutMs); + public AllTopicsMetadataEvent(final Timer timer) { + super(Type.ALL_TOPICS_METADATA, timer); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index c86aa8815f2..33825307465 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -165,8 +165,7 @@ public class ApplicationEventProcessor extends EventProcessor<ApplicationEvent> } CommitRequestManager manager = requestManagers.commitRequestManager.get(); - long expirationTimeoutMs = getExpirationTimeForTimeout(event.retryTimeoutMs()); - CompletableFuture<Void> future = manager.commitSync(event.offsets(), expirationTimeoutMs); + CompletableFuture<Void> future = manager.commitSync(event.offsets(), event.deadlineMs()); future.whenComplete(complete(event.future())); } @@ -177,8 +176,7 @@ public class ApplicationEventProcessor extends EventProcessor<ApplicationEvent> return; } CommitRequestManager manager = requestManagers.commitRequestManager.get(); - long expirationTimeMs = getExpirationTimeForTimeout(event.timeout()); - CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = manager.fetchOffsets(event.partitions(), expirationTimeMs); + CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = manager.fetchOffsets(event.partitions(), event.deadlineMs()); future.whenComplete(complete(event.future())); } @@ -250,16 +248,14 @@ public class ApplicationEventProcessor extends EventProcessor<ApplicationEvent> } private void process(final TopicMetadataEvent event) { - final long expirationTimeMs = getExpirationTimeForTimeout(event.timeoutMs()); final CompletableFuture<Map<String, List<PartitionInfo>>> future = - requestManagers.topicMetadataRequestManager.requestTopicMetadata(event.topic(), expirationTimeMs); + requestManagers.topicMetadataRequestManager.requestTopicMetadata(event.topic(), event.deadlineMs()); future.whenComplete(complete(event.future())); } private void process(final AllTopicsMetadataEvent event) { - final long expirationTimeMs = getExpirationTimeForTimeout(event.timeoutMs()); final CompletableFuture<Map<String, List<PartitionInfo>>> future = - requestManagers.topicMetadataRequestManager.requestAllTopicsMetadata(expirationTimeMs); + requestManagers.topicMetadataRequestManager.requestAllTopicsMetadata(event.deadlineMs()); future.whenComplete(complete(event.future())); } @@ -296,19 +292,6 @@ public class ApplicationEventProcessor extends EventProcessor<ApplicationEvent> future.whenComplete(complete(event.future())); } - /** - * @return Expiration time in milliseconds calculated with the current time plus the given - * timeout. Returns Long.MAX_VALUE if the expiration overflows it. - * Visible for testing. - */ - long getExpirationTimeForTimeout(final long timeoutMs) { - long expiration = System.currentTimeMillis() + timeoutMs; - if (expiration < 0) { - return Long.MAX_VALUE; - } - return expiration; - } - private <T> BiConsumer<? super T, ? super Throwable> complete(final CompletableFuture<T> b) { return (value, exception) -> { if (exception != null) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncCommitEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncCommitEvent.java index 2f03fdfb1e5..c36f0534b36 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncCommitEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncCommitEvent.java @@ -27,6 +27,6 @@ import java.util.Map; public class AsyncCommitEvent extends CommitEvent { public AsyncCommitEvent(final Map<TopicPartition, OffsetAndMetadata> offsets) { - super(Type.COMMIT_ASYNC, offsets); + super(Type.COMMIT_ASYNC, offsets, Long.MAX_VALUE); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java index 253d27e2573..1da7b84039a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java @@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals.events; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Timer; import java.util.Collections; import java.util.Map; @@ -29,15 +30,28 @@ public abstract class CommitEvent extends CompletableApplicationEvent<Void> { */ private final Map<TopicPartition, OffsetAndMetadata> offsets; - protected CommitEvent(final Type type, final Map<TopicPartition, OffsetAndMetadata> offsets) { - super(type); - this.offsets = Collections.unmodifiableMap(offsets); + protected CommitEvent(final Type type, final Map<TopicPartition, OffsetAndMetadata> offsets, final Timer timer) { + super(type, timer); + this.offsets = validate(offsets); + } + + protected CommitEvent(final Type type, final Map<TopicPartition, OffsetAndMetadata> offsets, final long deadlineMs) { + super(type, deadlineMs); + this.offsets = validate(offsets); + } + /** + * Validates the offsets are not negative and then returns the given offset map as + * {@link Collections#unmodifiableMap(Map) as unmodifiable}. + */ + private static Map<TopicPartition, OffsetAndMetadata> validate(final Map<TopicPartition, OffsetAndMetadata> offsets) { for (OffsetAndMetadata offsetAndMetadata : offsets.values()) { if (offsetAndMetadata.offset() < 0) { throw new IllegalArgumentException("Invalid offset: " + offsetAndMetadata.offset()); } } + + return Collections.unmodifiableMap(offsets); } public Map<TopicPartition, OffsetAndMetadata> offsets() { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java index a62c3aaa4c4..dae9e9f1017 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java @@ -16,6 +16,9 @@ */ package org.apache.kafka.clients.consumer.internals.events; +import org.apache.kafka.common.utils.Timer; + +import java.util.Objects; import java.util.concurrent.CompletableFuture; /** @@ -27,10 +30,19 @@ import java.util.concurrent.CompletableFuture; public abstract class CompletableApplicationEvent<T> extends ApplicationEvent implements CompletableEvent<T> { private final CompletableFuture<T> future; + private final long deadlineMs; - protected CompletableApplicationEvent(final Type type) { + protected CompletableApplicationEvent(final Type type, final Timer timer) { super(type); this.future = new CompletableFuture<>(); + Objects.requireNonNull(timer); + this.deadlineMs = timer.remainingMs() + timer.currentTimeMs(); + } + + protected CompletableApplicationEvent(final Type type, final long deadlineMs) { + super(type); + this.future = new CompletableFuture<>(); + this.deadlineMs = deadlineMs; } @Override @@ -38,8 +50,12 @@ public abstract class CompletableApplicationEvent<T> extends ApplicationEvent im return future; } + public long deadlineMs() { + return deadlineMs; + } + @Override protected String toStringBase() { - return super.toStringBase() + ", future=" + future; + return super.toStringBase() + ", future=" + future + ", deadlineMs=" + deadlineMs; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/FetchCommittedOffsetsEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/FetchCommittedOffsetsEvent.java index 7cf56b990b0..980a8f11042 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/FetchCommittedOffsetsEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/FetchCommittedOffsetsEvent.java @@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals.events; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Timer; import java.util.Collections; import java.util.Map; @@ -30,27 +31,17 @@ public class FetchCommittedOffsetsEvent extends CompletableApplicationEvent<Map< */ private final Set<TopicPartition> partitions; - /** - * Time until which the request will be retried if it fails with a retriable error. - */ - private final long timeoutMs; - - public FetchCommittedOffsetsEvent(final Set<TopicPartition> partitions, final long timeoutMs) { - super(Type.FETCH_COMMITTED_OFFSETS); + public FetchCommittedOffsetsEvent(final Set<TopicPartition> partitions, final Timer timer) { + super(Type.FETCH_COMMITTED_OFFSETS, timer); this.partitions = Collections.unmodifiableSet(partitions); - this.timeoutMs = timeoutMs; } public Set<TopicPartition> partitions() { return partitions; } - public long timeout() { - return timeoutMs; - } - @Override public String toStringBase() { - return super.toStringBase() + ", partitions=" + partitions + ", partitions=" + partitions; + return super.toStringBase() + ", partitions=" + partitions; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseEvent.java index 5ee19a7cc02..e77b4dfb289 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseEvent.java @@ -16,9 +16,11 @@ */ package org.apache.kafka.clients.consumer.internals.events; +import org.apache.kafka.common.utils.Timer; + public class LeaveOnCloseEvent extends CompletableApplicationEvent<Void> { - public LeaveOnCloseEvent() { - super(Type.LEAVE_ON_CLOSE); + public LeaveOnCloseEvent(final Timer timer) { + super(Type.LEAVE_ON_CLOSE, timer); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java index fd3b321173f..e218705846e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java @@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals.events; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Timer; import java.util.Collections; import java.util.HashMap; @@ -36,8 +37,8 @@ public class ListOffsetsEvent extends CompletableApplicationEvent<Map<TopicParti private final Map<TopicPartition, Long> timestampsToSearch; private final boolean requireTimestamps; - public ListOffsetsEvent(final Map<TopicPartition, Long> timestampToSearch, final boolean requireTimestamps) { - super(Type.LIST_OFFSETS); + public ListOffsetsEvent(final Map<TopicPartition, Long> timestampToSearch, final boolean requireTimestamps, final Timer timer) { + super(Type.LIST_OFFSETS, timer); this.timestampsToSearch = Collections.unmodifiableMap(timestampToSearch); this.requireTimestamps = requireTimestamps; } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsEvent.java index 06f6ebbb68a..65893b62eca 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsEvent.java @@ -17,6 +17,8 @@ package org.apache.kafka.clients.consumer.internals.events; +import org.apache.kafka.common.utils.Timer; + /** * Event for resetting offsets for all assigned partitions that require it. This is an * asynchronous event that generates ListOffsets requests, and completes by updating in-memory @@ -24,7 +26,7 @@ package org.apache.kafka.clients.consumer.internals.events; */ public class ResetPositionsEvent extends CompletableApplicationEvent<Void> { - public ResetPositionsEvent() { - super(Type.RESET_POSITIONS); + public ResetPositionsEvent(final Timer timer) { + super(Type.RESET_POSITIONS, timer); } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitEvent.java index 7e00e0da596..87945616ea7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitEvent.java @@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals.events; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Timer; import java.util.Map; @@ -27,23 +28,7 @@ import java.util.Map; */ public class SyncCommitEvent extends CommitEvent { - /** - * Time to wait for a response, retrying on retriable errors. - */ - private final long retryTimeoutMs; - - public SyncCommitEvent(final Map<TopicPartition, OffsetAndMetadata> offsets, - final long retryTimeoutMs) { - super(Type.COMMIT_SYNC, offsets); - this.retryTimeoutMs = retryTimeoutMs; - } - - public Long retryTimeoutMs() { - return retryTimeoutMs; - } - - @Override - public String toStringBase() { - return super.toStringBase() + ", offsets=" + offsets() + ", retryTimeoutMs=" + retryTimeoutMs; + public SyncCommitEvent(final Map<TopicPartition, OffsetAndMetadata> offsets, final Timer timer) { + super(Type.COMMIT_SYNC, offsets, timer); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicMetadataEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicMetadataEvent.java index ebbb2a6c468..33e1270ce60 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicMetadataEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicMetadataEvent.java @@ -16,14 +16,16 @@ */ package org.apache.kafka.clients.consumer.internals.events; +import org.apache.kafka.common.utils.Timer; + import java.util.Objects; public class TopicMetadataEvent extends AbstractTopicMetadataEvent { private final String topic; - public TopicMetadataEvent(final String topic, final long timeoutMs) { - super(Type.TOPIC_METADATA, timeoutMs); + public TopicMetadataEvent(final String topic, final Timer timer) { + super(Type.TOPIC_METADATA, timer); this.topic = Objects.requireNonNull(topic); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UnsubscribeEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UnsubscribeEvent.java index 07af36e5feb..0b988370014 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UnsubscribeEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UnsubscribeEvent.java @@ -17,6 +17,8 @@ package org.apache.kafka.clients.consumer.internals.events; +import org.apache.kafka.common.utils.Timer; + /** * Application event triggered when a user calls the unsubscribe API. This will make the consumer * release all its assignments and send a heartbeat request to leave the consumer group. @@ -26,8 +28,8 @@ package org.apache.kafka.clients.consumer.internals.events; */ public class UnsubscribeEvent extends CompletableApplicationEvent<Void> { - public UnsubscribeEvent() { - super(Type.UNSUBSCRIBE); + public UnsubscribeEvent(final Timer timer) { + super(Type.UNSUBSCRIBE, timer); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ValidatePositionsEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ValidatePositionsEvent.java index efa358b4c78..21e7f3cf6eb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ValidatePositionsEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ValidatePositionsEvent.java @@ -17,6 +17,8 @@ package org.apache.kafka.clients.consumer.internals.events; +import org.apache.kafka.common.utils.Timer; + /** * Event for validating offsets for all assigned partitions for which a leader change has been * detected. This is an asynchronous event that generates OffsetForLeaderEpoch requests, and @@ -24,7 +26,7 @@ package org.apache.kafka.clients.consumer.internals.events; */ public class ValidatePositionsEvent extends CompletableApplicationEvent<Void> { - public ValidatePositionsEvent() { - super(Type.VALIDATE_POSITIONS); + public ValidatePositionsEvent(final Timer timer) { + super(Type.VALIDATE_POSITIONS, timer); } } \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java index cbd56d8b5e2..e4d492fb581 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java @@ -41,6 +41,7 @@ import org.apache.kafka.common.requests.OffsetCommitRequest; import org.apache.kafka.common.requests.OffsetCommitResponse; import org.apache.kafka.common.requests.RequestTestUtils; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; @@ -161,7 +162,8 @@ public class ConsumerNetworkThreadTest { @Test public void testSyncCommitEvent() { - ApplicationEvent e = new SyncCommitEvent(new HashMap<>(), 100L); + Timer timer = time.timer(100); + ApplicationEvent e = new SyncCommitEvent(new HashMap<>(), timer); applicationEventsQueue.add(e); consumerNetworkThread.runOnce(); verify(applicationEventProcessor).process(any(SyncCommitEvent.class)); @@ -170,7 +172,8 @@ public class ConsumerNetworkThreadTest { @Test public void testListOffsetsEventIsProcessed() { Map<TopicPartition, Long> timestamps = Collections.singletonMap(new TopicPartition("topic1", 1), 5L); - ApplicationEvent e = new ListOffsetsEvent(timestamps, true); + Timer timer = time.timer(100); + ApplicationEvent e = new ListOffsetsEvent(timestamps, true, timer); applicationEventsQueue.add(e); consumerNetworkThread.runOnce(); verify(applicationEventProcessor).process(any(ListOffsetsEvent.class)); @@ -179,7 +182,8 @@ public class ConsumerNetworkThreadTest { @Test public void testResetPositionsEventIsProcessed() { - ResetPositionsEvent e = new ResetPositionsEvent(); + Timer timer = time.timer(100); + ResetPositionsEvent e = new ResetPositionsEvent(timer); applicationEventsQueue.add(e); consumerNetworkThread.runOnce(); verify(applicationEventProcessor).process(any(ResetPositionsEvent.class)); @@ -190,7 +194,8 @@ public class ConsumerNetworkThreadTest { public void testResetPositionsProcessFailureIsIgnored() { doThrow(new NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded(); - ResetPositionsEvent event = new ResetPositionsEvent(); + Timer timer = time.timer(100); + ResetPositionsEvent event = new ResetPositionsEvent(timer); applicationEventsQueue.add(event); assertDoesNotThrow(() -> consumerNetworkThread.runOnce()); @@ -199,7 +204,8 @@ public class ConsumerNetworkThreadTest { @Test public void testValidatePositionsEventIsProcessed() { - ValidatePositionsEvent e = new ValidatePositionsEvent(); + Timer timer = time.timer(100); + ValidatePositionsEvent e = new ValidatePositionsEvent(timer); applicationEventsQueue.add(e); consumerNetworkThread.runOnce(); verify(applicationEventProcessor).process(any(ValidatePositionsEvent.class)); @@ -224,7 +230,8 @@ public class ConsumerNetworkThreadTest { @Test void testFetchTopicMetadata() { - applicationEventsQueue.add(new TopicMetadataEvent("topic", Long.MAX_VALUE)); + Timer timer = time.timer(Long.MAX_VALUE); + applicationEventsQueue.add(new TopicMetadataEvent("topic", timer)); consumerNetworkThread.runOnce(); verify(applicationEventProcessor).process(any(TopicMetadataEvent.class)); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java index f3e2557ae94..b23660e5469 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java @@ -27,6 +27,9 @@ import org.apache.kafka.clients.consumer.internals.OffsetsRequestManager; import org.apache.kafka.clients.consumer.internals.RequestManagers; import org.apache.kafka.clients.consumer.internals.TopicMetadataRequestManager; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -36,7 +39,6 @@ import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -44,6 +46,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class ApplicationEventProcessorTest { + private final Time time = new MockTime(1); private ApplicationEventProcessor processor; private BlockingQueue applicationEventQueue = mock(BlockingQueue.class); private RequestManagers requestManagers; @@ -96,18 +99,10 @@ public class ApplicationEventProcessorTest { verify(commitRequestManager).signalClose(); } - @Test - public void testExpirationCalculation() { - assertEquals(Long.MAX_VALUE, processor.getExpirationTimeForTimeout(Long.MAX_VALUE)); - assertEquals(Long.MAX_VALUE, processor.getExpirationTimeForTimeout(Long.MAX_VALUE - 1)); - long timeout = processor.getExpirationTimeForTimeout(1000); - assertTrue(timeout > 0); - assertTrue(timeout < Long.MAX_VALUE); - } - @Test public void testPrepClosingLeaveGroupEvent() { - LeaveOnCloseEvent event = new LeaveOnCloseEvent(); + Timer timer = time.timer(100); + LeaveOnCloseEvent event = new LeaveOnCloseEvent(timer); when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager); when(membershipManager.leaveGroup()).thenReturn(CompletableFuture.completedFuture(null)); processor.process(event);