This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 86e7885a81c KAFKA-16100: Add timeout to all the 
CompletableApplicationEvents (#15455)
86e7885a81c is described below

commit 86e7885a81c7132e522d2c59dc6fcf81026cc60d
Author: Kirk True <k...@kirktrue.pro>
AuthorDate: Thu Mar 7 06:00:21 2024 -0800

    KAFKA-16100: Add timeout to all the CompletableApplicationEvents (#15455)
    
    This is part of the larger task of enforcing the timeouts for application 
events, per KAFKA-15974.
    
    This takes a first step by adding a Timer to all of the 
CompletableApplicationEvent subclasses. For the few classes that already 
included a timeout, this refactors them to use the Timer mechanism instead.
    
    Reviewers: Andrew Schofield <aschofi...@confluent.io>, Bruno Cadonna 
<cado...@apache.org>
---
 .../consumer/internals/AsyncKafkaConsumer.java     | 41 +++++++++++++---------
 .../events/AbstractTopicMetadataEvent.java         | 17 ++-------
 .../internals/events/AllTopicsMetadataEvent.java   |  6 ++--
 .../events/ApplicationEventProcessor.java          | 25 +++----------
 .../internals/events/AsyncCommitEvent.java         |  2 +-
 .../consumer/internals/events/CommitEvent.java     | 20 +++++++++--
 .../events/CompletableApplicationEvent.java        | 20 +++++++++--
 .../events/FetchCommittedOffsetsEvent.java         | 17 +++------
 .../internals/events/LeaveOnCloseEvent.java        |  6 ++--
 .../internals/events/ListOffsetsEvent.java         |  5 +--
 .../internals/events/ResetPositionsEvent.java      |  6 ++--
 .../consumer/internals/events/SyncCommitEvent.java | 21 ++---------
 .../internals/events/TopicMetadataEvent.java       |  6 ++--
 .../internals/events/UnsubscribeEvent.java         |  6 ++--
 .../internals/events/ValidatePositionsEvent.java   |  6 ++--
 .../internals/ConsumerNetworkThreadTest.java       | 19 ++++++----
 .../events/ApplicationEventProcessorTest.java      | 17 ++++-----
 17 files changed, 120 insertions(+), 120 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 5354503c016..fcd57469c2a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -938,13 +938,14 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                 return Collections.emptyMap();
             }
 
+            final Timer timer = time.timer(timeout);
             final FetchCommittedOffsetsEvent event = new 
FetchCommittedOffsetsEvent(
                 partitions,
-                timeout.toMillis());
+                timer);
             wakeupTrigger.setActiveTask(event.future());
             try {
                 final Map<TopicPartition, OffsetAndMetadata> committedOffsets 
= applicationEventHandler.addAndGet(event,
-                    time.timer(timeout));
+                    timer);
                 committedOffsets.forEach(this::updateLastSeenEpochIfNewer);
                 return committedOffsets;
             } catch (TimeoutException e) {
@@ -990,11 +991,12 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                 throw new TimeoutException();
             }
 
-            final TopicMetadataEvent topicMetadataEvent = new 
TopicMetadataEvent(topic, timeout.toMillis());
+            final Timer timer = time.timer(timeout);
+            final TopicMetadataEvent topicMetadataEvent = new 
TopicMetadataEvent(topic, timer);
             wakeupTrigger.setActiveTask(topicMetadataEvent.future());
             try {
                 Map<String, List<PartitionInfo>> topicMetadata =
-                        applicationEventHandler.addAndGet(topicMetadataEvent, 
time.timer(timeout));
+                        applicationEventHandler.addAndGet(topicMetadataEvent, 
timer);
 
                 return topicMetadata.getOrDefault(topic, 
Collections.emptyList());
             } finally {
@@ -1018,10 +1020,11 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                 throw new TimeoutException();
             }
 
-            final AllTopicsMetadataEvent topicMetadataEvent = new 
AllTopicsMetadataEvent(timeout.toMillis());
+            final Timer timer = time.timer(timeout);
+            final AllTopicsMetadataEvent topicMetadataEvent = new 
AllTopicsMetadataEvent(timer);
             wakeupTrigger.setActiveTask(topicMetadataEvent.future());
             try {
-                return applicationEventHandler.addAndGet(topicMetadataEvent, 
time.timer(timeout));
+                return applicationEventHandler.addAndGet(topicMetadataEvent, 
timer);
             } finally {
                 wakeupTrigger.clearTask();
             }
@@ -1089,16 +1092,18 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             if (timestampsToSearch.isEmpty()) {
                 return Collections.emptyMap();
             }
+            final Timer timer = time.timer(timeout);
             final ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent(
                 timestampsToSearch,
-                true);
+                true,
+                timer);
 
             // If timeout is set to zero return empty immediately; otherwise 
try to get the results
             // and throw timeout exception if it cannot complete in time.
             if (timeout.toMillis() == 0L)
                 return listOffsetsEvent.emptyResult();
 
-            return applicationEventHandler.addAndGet(listOffsetsEvent, 
time.timer(timeout));
+            return applicationEventHandler.addAndGet(listOffsetsEvent, timer);
         } finally {
             release();
         }
@@ -1139,12 +1144,14 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             Map<TopicPartition, Long> timestampToSearch = partitions
                 .stream()
                 .collect(Collectors.toMap(Function.identity(), tp -> 
timestamp));
+            Timer timer = time.timer(timeout);
             ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent(
                 timestampToSearch,
-                false);
+                false,
+                timer);
             Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = 
applicationEventHandler.addAndGet(
                 listOffsetsEvent,
-                time.timer(timeout));
+                timer);
             return offsetAndTimestampMap
                 .entrySet()
                 .stream()
@@ -1274,7 +1281,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         completeQuietly(
             () -> {
                 maybeRevokePartitions();
-                applicationEventHandler.addAndGet(new LeaveOnCloseEvent(), 
timer);
+                applicationEventHandler.addAndGet(new 
LeaveOnCloseEvent(timer), timer);
             },
             "Failed to send leaveGroup heartbeat with a timeout(ms)=" + 
timer.timeoutMs(), firstException);
     }
@@ -1351,7 +1358,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         long commitStart = time.nanoseconds();
         try {
             Timer requestTimer = time.timer(timeout.toMillis());
-            SyncCommitEvent syncCommitEvent = new SyncCommitEvent(offsets, 
timeout.toMillis());
+            SyncCommitEvent syncCommitEvent = new SyncCommitEvent(offsets, 
requestTimer);
             CompletableFuture<Void> commitFuture = commit(syncCommitEvent);
             wakeupTrigger.setActiveTask(commitFuture);
             ConsumerUtils.getResult(commitFuture, requestTimer);
@@ -1465,10 +1472,10 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         try {
             fetchBuffer.retainAll(Collections.emptySet());
             if (groupMetadata.get().isPresent()) {
-                UnsubscribeEvent unsubscribeEvent = new UnsubscribeEvent();
+                Timer timer = time.timer(Long.MAX_VALUE);
+                UnsubscribeEvent unsubscribeEvent = new 
UnsubscribeEvent(timer);
                 applicationEventHandler.add(unsubscribeEvent);
                 log.info("Unsubscribing all topics or patterns and assigned 
partitions");
-                Timer timer = time.timer(Long.MAX_VALUE);
 
                 try {
                     processBackgroundEvents(backgroundEventProcessor, 
unsubscribeEvent.future(), timer);
@@ -1579,7 +1586,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             // Validate positions using the partition leader end offsets, to 
detect if any partition
             // has been truncated due to a leader change. This will trigger an 
OffsetForLeaderEpoch
             // request, retrieve the partition end offsets, and validate the 
current position against it.
-            applicationEventHandler.addAndGet(new ValidatePositionsEvent(), 
timer);
+            applicationEventHandler.addAndGet(new 
ValidatePositionsEvent(timer), timer);
 
             cachedSubscriptionHasAllFetchPositions = 
subscriptions.hasAllFetchPositions();
             if (cachedSubscriptionHasAllFetchPositions) return true;
@@ -1602,7 +1609,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             // which are awaiting reset. This will trigger a ListOffset 
request, retrieve the
             // partition offsets according to the strategy (ex. earliest, 
latest), and update the
             // positions.
-            applicationEventHandler.addAndGet(new ResetPositionsEvent(), 
timer);
+            applicationEventHandler.addAndGet(new ResetPositionsEvent(timer), 
timer);
             return true;
         } catch (TimeoutException e) {
             return false;
@@ -1635,7 +1642,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             final FetchCommittedOffsetsEvent event =
                 new FetchCommittedOffsetsEvent(
                     initializingPartitions,
-                    timer.remainingMs());
+                    timer);
             final Map<TopicPartition, OffsetAndMetadata> offsets = 
applicationEventHandler.addAndGet(event, timer);
             refreshCommittedOffsets(offsets, metadata, subscriptions);
             return true;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java
index 31c21817d85..3347002cc6f 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java
@@ -17,25 +17,14 @@
 package org.apache.kafka.clients.consumer.internals.events;
 
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.utils.Timer;
 
 import java.util.List;
 import java.util.Map;
 
 public abstract class AbstractTopicMetadataEvent extends 
CompletableApplicationEvent<Map<String, List<PartitionInfo>>> {
 
-    private final long timeoutMs;
-
-    protected AbstractTopicMetadataEvent(final Type type, final long 
timeoutMs) {
-        super(type);
-        this.timeoutMs = timeoutMs;
-    }
-
-    public long timeoutMs() {
-        return timeoutMs;
-    }
-
-    @Override
-    public String toStringBase() {
-        return super.toStringBase() + ", timeoutMs=" + timeoutMs;
+    protected AbstractTopicMetadataEvent(final Type type, final Timer timer) {
+        super(type, timer);
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AllTopicsMetadataEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AllTopicsMetadataEvent.java
index 154703aaee1..bda18e64210 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AllTopicsMetadataEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AllTopicsMetadataEvent.java
@@ -16,9 +16,11 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
+import org.apache.kafka.common.utils.Timer;
+
 public class AllTopicsMetadataEvent extends AbstractTopicMetadataEvent {
 
-    public AllTopicsMetadataEvent(final long timeoutMs) {
-        super(Type.ALL_TOPICS_METADATA, timeoutMs);
+    public AllTopicsMetadataEvent(final Timer timer) {
+        super(Type.ALL_TOPICS_METADATA, timer);
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index c86aa8815f2..33825307465 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -165,8 +165,7 @@ public class ApplicationEventProcessor extends 
EventProcessor<ApplicationEvent>
         }
 
         CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
-        long expirationTimeoutMs = 
getExpirationTimeForTimeout(event.retryTimeoutMs());
-        CompletableFuture<Void> future = manager.commitSync(event.offsets(), 
expirationTimeoutMs);
+        CompletableFuture<Void> future = manager.commitSync(event.offsets(), 
event.deadlineMs());
         future.whenComplete(complete(event.future()));
     }
 
@@ -177,8 +176,7 @@ public class ApplicationEventProcessor extends 
EventProcessor<ApplicationEvent>
             return;
         }
         CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
-        long expirationTimeMs = getExpirationTimeForTimeout(event.timeout());
-        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
manager.fetchOffsets(event.partitions(), expirationTimeMs);
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
manager.fetchOffsets(event.partitions(), event.deadlineMs());
         future.whenComplete(complete(event.future()));
     }
 
@@ -250,16 +248,14 @@ public class ApplicationEventProcessor extends 
EventProcessor<ApplicationEvent>
     }
 
     private void process(final TopicMetadataEvent event) {
-        final long expirationTimeMs = 
getExpirationTimeForTimeout(event.timeoutMs());
         final CompletableFuture<Map<String, List<PartitionInfo>>> future =
-                
requestManagers.topicMetadataRequestManager.requestTopicMetadata(event.topic(), 
expirationTimeMs);
+                
requestManagers.topicMetadataRequestManager.requestTopicMetadata(event.topic(), 
event.deadlineMs());
         future.whenComplete(complete(event.future()));
     }
 
     private void process(final AllTopicsMetadataEvent event) {
-        final long expirationTimeMs = 
getExpirationTimeForTimeout(event.timeoutMs());
         final CompletableFuture<Map<String, List<PartitionInfo>>> future =
-                
requestManagers.topicMetadataRequestManager.requestAllTopicsMetadata(expirationTimeMs);
+                
requestManagers.topicMetadataRequestManager.requestAllTopicsMetadata(event.deadlineMs());
         future.whenComplete(complete(event.future()));
     }
 
@@ -296,19 +292,6 @@ public class ApplicationEventProcessor extends 
EventProcessor<ApplicationEvent>
         future.whenComplete(complete(event.future()));
     }
 
-    /**
-     * @return Expiration time in milliseconds calculated with the current 
time plus the given
-     * timeout. Returns Long.MAX_VALUE if the expiration overflows it.
-     * Visible for testing.
-     */
-    long getExpirationTimeForTimeout(final long timeoutMs) {
-        long expiration = System.currentTimeMillis() + timeoutMs;
-        if (expiration < 0) {
-            return Long.MAX_VALUE;
-        }
-        return expiration;
-    }
-
     private <T> BiConsumer<? super T, ? super Throwable> complete(final 
CompletableFuture<T> b) {
         return (value, exception) -> {
             if (exception != null)
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncCommitEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncCommitEvent.java
index 2f03fdfb1e5..c36f0534b36 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncCommitEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncCommitEvent.java
@@ -27,6 +27,6 @@ import java.util.Map;
 public class AsyncCommitEvent extends CommitEvent {
 
     public AsyncCommitEvent(final Map<TopicPartition, OffsetAndMetadata> 
offsets) {
-        super(Type.COMMIT_ASYNC, offsets);
+        super(Type.COMMIT_ASYNC, offsets, Long.MAX_VALUE);
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java
index 253d27e2573..1da7b84039a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals.events;
 
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Timer;
 
 import java.util.Collections;
 import java.util.Map;
@@ -29,15 +30,28 @@ public abstract class CommitEvent extends 
CompletableApplicationEvent<Void> {
      */
     private final Map<TopicPartition, OffsetAndMetadata> offsets;
 
-    protected CommitEvent(final Type type, final Map<TopicPartition, 
OffsetAndMetadata> offsets) {
-        super(type);
-        this.offsets = Collections.unmodifiableMap(offsets);
+    protected CommitEvent(final Type type, final Map<TopicPartition, 
OffsetAndMetadata> offsets, final Timer timer) {
+        super(type, timer);
+        this.offsets = validate(offsets);
+    }
+
+    protected CommitEvent(final Type type, final Map<TopicPartition, 
OffsetAndMetadata> offsets, final long deadlineMs) {
+        super(type, deadlineMs);
+        this.offsets = validate(offsets);
+    }
 
+    /**
+     * Validates the offsets are not negative and then returns the given 
offset map as
+     * {@link Collections#unmodifiableMap(Map) as unmodifiable}.
+     */
+    private static Map<TopicPartition, OffsetAndMetadata> validate(final 
Map<TopicPartition, OffsetAndMetadata> offsets) {
         for (OffsetAndMetadata offsetAndMetadata : offsets.values()) {
             if (offsetAndMetadata.offset() < 0) {
                 throw new IllegalArgumentException("Invalid offset: " + 
offsetAndMetadata.offset());
             }
         }
+
+        return Collections.unmodifiableMap(offsets);
     }
 
     public Map<TopicPartition, OffsetAndMetadata> offsets() {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java
index a62c3aaa4c4..dae9e9f1017 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java
@@ -16,6 +16,9 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
+import org.apache.kafka.common.utils.Timer;
+
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -27,10 +30,19 @@ import java.util.concurrent.CompletableFuture;
 public abstract class CompletableApplicationEvent<T> extends ApplicationEvent 
implements CompletableEvent<T> {
 
     private final CompletableFuture<T> future;
+    private final long deadlineMs;
 
-    protected CompletableApplicationEvent(final Type type) {
+    protected CompletableApplicationEvent(final Type type, final Timer timer) {
         super(type);
         this.future = new CompletableFuture<>();
+        Objects.requireNonNull(timer);
+        this.deadlineMs = timer.remainingMs() + timer.currentTimeMs();
+    }
+
+    protected CompletableApplicationEvent(final Type type, final long 
deadlineMs) {
+        super(type);
+        this.future = new CompletableFuture<>();
+        this.deadlineMs = deadlineMs;
     }
 
     @Override
@@ -38,8 +50,12 @@ public abstract class CompletableApplicationEvent<T> extends 
ApplicationEvent im
         return future;
     }
 
+    public long deadlineMs() {
+        return deadlineMs;
+    }
+
     @Override
     protected String toStringBase() {
-        return super.toStringBase() + ", future=" + future;
+        return super.toStringBase() + ", future=" + future + ", deadlineMs=" + 
deadlineMs;
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/FetchCommittedOffsetsEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/FetchCommittedOffsetsEvent.java
index 7cf56b990b0..980a8f11042 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/FetchCommittedOffsetsEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/FetchCommittedOffsetsEvent.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals.events;
 
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Timer;
 
 import java.util.Collections;
 import java.util.Map;
@@ -30,27 +31,17 @@ public class FetchCommittedOffsetsEvent extends 
CompletableApplicationEvent<Map<
      */
     private final Set<TopicPartition> partitions;
 
-    /**
-     * Time until which the request will be retried if it fails with a 
retriable error.
-     */
-    private final long timeoutMs;
-
-    public FetchCommittedOffsetsEvent(final Set<TopicPartition> partitions, 
final long timeoutMs) {
-        super(Type.FETCH_COMMITTED_OFFSETS);
+    public FetchCommittedOffsetsEvent(final Set<TopicPartition> partitions, 
final Timer timer) {
+        super(Type.FETCH_COMMITTED_OFFSETS, timer);
         this.partitions = Collections.unmodifiableSet(partitions);
-        this.timeoutMs = timeoutMs;
     }
 
     public Set<TopicPartition> partitions() {
         return partitions;
     }
 
-    public long timeout() {
-        return timeoutMs;
-    }
-
     @Override
     public String toStringBase() {
-        return super.toStringBase() + ", partitions=" + partitions + ", 
partitions=" + partitions;
+        return super.toStringBase() + ", partitions=" + partitions;
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseEvent.java
index 5ee19a7cc02..e77b4dfb289 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseEvent.java
@@ -16,9 +16,11 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
+import org.apache.kafka.common.utils.Timer;
+
 public class LeaveOnCloseEvent extends CompletableApplicationEvent<Void> {
 
-    public LeaveOnCloseEvent() {
-        super(Type.LEAVE_ON_CLOSE);
+    public LeaveOnCloseEvent(final Timer timer) {
+        super(Type.LEAVE_ON_CLOSE, timer);
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java
index fd3b321173f..e218705846e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals.events;
 
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Timer;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -36,8 +37,8 @@ public class ListOffsetsEvent extends 
CompletableApplicationEvent<Map<TopicParti
     private final Map<TopicPartition, Long> timestampsToSearch;
     private final boolean requireTimestamps;
 
-    public ListOffsetsEvent(final Map<TopicPartition, Long> timestampToSearch, 
final boolean requireTimestamps) {
-        super(Type.LIST_OFFSETS);
+    public ListOffsetsEvent(final Map<TopicPartition, Long> timestampToSearch, 
final boolean requireTimestamps, final Timer timer) {
+        super(Type.LIST_OFFSETS, timer);
         this.timestampsToSearch = 
Collections.unmodifiableMap(timestampToSearch);
         this.requireTimestamps = requireTimestamps;
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsEvent.java
index 06f6ebbb68a..65893b62eca 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsEvent.java
@@ -17,6 +17,8 @@
 
 package org.apache.kafka.clients.consumer.internals.events;
 
+import org.apache.kafka.common.utils.Timer;
+
 /**
  * Event for resetting offsets for all assigned partitions that require it. 
This is an
  * asynchronous event that generates ListOffsets requests, and completes by 
updating in-memory
@@ -24,7 +26,7 @@ package org.apache.kafka.clients.consumer.internals.events;
  */
 public class ResetPositionsEvent extends CompletableApplicationEvent<Void> {
 
-    public ResetPositionsEvent() {
-        super(Type.RESET_POSITIONS);
+    public ResetPositionsEvent(final Timer timer) {
+        super(Type.RESET_POSITIONS, timer);
     }
 }
\ No newline at end of file
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitEvent.java
index 7e00e0da596..87945616ea7 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitEvent.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals.events;
 
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Timer;
 
 import java.util.Map;
 
@@ -27,23 +28,7 @@ import java.util.Map;
  */
 public class SyncCommitEvent extends CommitEvent {
 
-    /**
-     * Time to wait for a response, retrying on retriable errors.
-     */
-    private final long retryTimeoutMs;
-
-    public SyncCommitEvent(final Map<TopicPartition, OffsetAndMetadata> 
offsets,
-                           final long retryTimeoutMs) {
-        super(Type.COMMIT_SYNC, offsets);
-        this.retryTimeoutMs = retryTimeoutMs;
-    }
-
-    public Long retryTimeoutMs() {
-        return retryTimeoutMs;
-    }
-
-    @Override
-    public String toStringBase() {
-        return super.toStringBase() + ", offsets=" + offsets() + ", 
retryTimeoutMs=" + retryTimeoutMs;
+    public SyncCommitEvent(final Map<TopicPartition, OffsetAndMetadata> 
offsets, final Timer timer) {
+        super(Type.COMMIT_SYNC, offsets, timer);
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicMetadataEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicMetadataEvent.java
index ebbb2a6c468..33e1270ce60 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicMetadataEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicMetadataEvent.java
@@ -16,14 +16,16 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
+import org.apache.kafka.common.utils.Timer;
+
 import java.util.Objects;
 
 public class TopicMetadataEvent extends AbstractTopicMetadataEvent {
 
     private final String topic;
 
-    public TopicMetadataEvent(final String topic, final long timeoutMs) {
-        super(Type.TOPIC_METADATA, timeoutMs);
+    public TopicMetadataEvent(final String topic, final Timer timer) {
+        super(Type.TOPIC_METADATA, timer);
         this.topic = Objects.requireNonNull(topic);
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UnsubscribeEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UnsubscribeEvent.java
index 07af36e5feb..0b988370014 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UnsubscribeEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UnsubscribeEvent.java
@@ -17,6 +17,8 @@
 
 package org.apache.kafka.clients.consumer.internals.events;
 
+import org.apache.kafka.common.utils.Timer;
+
 /**
  * Application event triggered when a user calls the unsubscribe API. This 
will make the consumer
  * release all its assignments and send a heartbeat request to leave the 
consumer group.
@@ -26,8 +28,8 @@ package org.apache.kafka.clients.consumer.internals.events;
  */
 public class UnsubscribeEvent extends CompletableApplicationEvent<Void> {
 
-    public UnsubscribeEvent() {
-        super(Type.UNSUBSCRIBE);
+    public UnsubscribeEvent(final Timer timer) {
+        super(Type.UNSUBSCRIBE, timer);
     }
 }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ValidatePositionsEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ValidatePositionsEvent.java
index efa358b4c78..21e7f3cf6eb 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ValidatePositionsEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ValidatePositionsEvent.java
@@ -17,6 +17,8 @@
 
 package org.apache.kafka.clients.consumer.internals.events;
 
+import org.apache.kafka.common.utils.Timer;
+
 /**
  * Event for validating offsets for all assigned partitions for which a leader 
change has been
  * detected. This is an asynchronous event that generates OffsetForLeaderEpoch 
requests, and
@@ -24,7 +26,7 @@ package org.apache.kafka.clients.consumer.internals.events;
  */
 public class ValidatePositionsEvent extends CompletableApplicationEvent<Void> {
 
-    public ValidatePositionsEvent() {
-        super(Type.VALIDATE_POSITIONS);
+    public ValidatePositionsEvent(final Timer timer) {
+        super(Type.VALIDATE_POSITIONS, timer);
     }
 }
\ No newline at end of file
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
index cbd56d8b5e2..e4d492fb581 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
@@ -41,6 +41,7 @@ import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.common.requests.OffsetCommitResponse;
 import org.apache.kafka.common.requests.RequestTestUtils;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.AfterEach;
@@ -161,7 +162,8 @@ public class ConsumerNetworkThreadTest {
 
     @Test
     public void testSyncCommitEvent() {
-        ApplicationEvent e = new SyncCommitEvent(new HashMap<>(), 100L);
+        Timer timer = time.timer(100);
+        ApplicationEvent e = new SyncCommitEvent(new HashMap<>(), timer);
         applicationEventsQueue.add(e);
         consumerNetworkThread.runOnce();
         verify(applicationEventProcessor).process(any(SyncCommitEvent.class));
@@ -170,7 +172,8 @@ public class ConsumerNetworkThreadTest {
     @Test
     public void testListOffsetsEventIsProcessed() {
         Map<TopicPartition, Long> timestamps = Collections.singletonMap(new 
TopicPartition("topic1", 1), 5L);
-        ApplicationEvent e = new ListOffsetsEvent(timestamps, true);
+        Timer timer = time.timer(100);
+        ApplicationEvent e = new ListOffsetsEvent(timestamps, true, timer);
         applicationEventsQueue.add(e);
         consumerNetworkThread.runOnce();
         verify(applicationEventProcessor).process(any(ListOffsetsEvent.class));
@@ -179,7 +182,8 @@ public class ConsumerNetworkThreadTest {
 
     @Test
     public void testResetPositionsEventIsProcessed() {
-        ResetPositionsEvent e = new ResetPositionsEvent();
+        Timer timer = time.timer(100);
+        ResetPositionsEvent e = new ResetPositionsEvent(timer);
         applicationEventsQueue.add(e);
         consumerNetworkThread.runOnce();
         
verify(applicationEventProcessor).process(any(ResetPositionsEvent.class));
@@ -190,7 +194,8 @@ public class ConsumerNetworkThreadTest {
     public void testResetPositionsProcessFailureIsIgnored() {
         doThrow(new 
NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded();
 
-        ResetPositionsEvent event = new ResetPositionsEvent();
+        Timer timer = time.timer(100);
+        ResetPositionsEvent event = new ResetPositionsEvent(timer);
         applicationEventsQueue.add(event);
         assertDoesNotThrow(() -> consumerNetworkThread.runOnce());
 
@@ -199,7 +204,8 @@ public class ConsumerNetworkThreadTest {
 
     @Test
     public void testValidatePositionsEventIsProcessed() {
-        ValidatePositionsEvent e = new ValidatePositionsEvent();
+        Timer timer = time.timer(100);
+        ValidatePositionsEvent e = new ValidatePositionsEvent(timer);
         applicationEventsQueue.add(e);
         consumerNetworkThread.runOnce();
         
verify(applicationEventProcessor).process(any(ValidatePositionsEvent.class));
@@ -224,7 +230,8 @@ public class ConsumerNetworkThreadTest {
 
     @Test
     void testFetchTopicMetadata() {
-        applicationEventsQueue.add(new TopicMetadataEvent("topic", 
Long.MAX_VALUE));
+        Timer timer = time.timer(Long.MAX_VALUE);
+        applicationEventsQueue.add(new TopicMetadataEvent("topic", timer));
         consumerNetworkThread.runOnce();
         
verify(applicationEventProcessor).process(any(TopicMetadataEvent.class));
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
index f3e2557ae94..b23660e5469 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
@@ -27,6 +27,9 @@ import 
org.apache.kafka.clients.consumer.internals.OffsetsRequestManager;
 import org.apache.kafka.clients.consumer.internals.RequestManagers;
 import org.apache.kafka.clients.consumer.internals.TopicMetadataRequestManager;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -36,7 +39,6 @@ import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -44,6 +46,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ApplicationEventProcessorTest {
+    private final Time time = new MockTime(1);
     private ApplicationEventProcessor processor;
     private BlockingQueue applicationEventQueue = mock(BlockingQueue.class);
     private RequestManagers requestManagers;
@@ -96,18 +99,10 @@ public class ApplicationEventProcessorTest {
         verify(commitRequestManager).signalClose();
     }
 
-    @Test
-    public void testExpirationCalculation() {
-        assertEquals(Long.MAX_VALUE, 
processor.getExpirationTimeForTimeout(Long.MAX_VALUE));
-        assertEquals(Long.MAX_VALUE, 
processor.getExpirationTimeForTimeout(Long.MAX_VALUE - 1));
-        long timeout = processor.getExpirationTimeForTimeout(1000);
-        assertTrue(timeout > 0);
-        assertTrue(timeout < Long.MAX_VALUE);
-    }
-
     @Test
     public void testPrepClosingLeaveGroupEvent() {
-        LeaveOnCloseEvent event = new LeaveOnCloseEvent();
+        Timer timer = time.timer(100);
+        LeaveOnCloseEvent event = new LeaveOnCloseEvent(timer);
         
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
         
when(membershipManager.leaveGroup()).thenReturn(CompletableFuture.completedFuture(null));
         processor.process(event);


Reply via email to