This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4362ab70901 KAFKA-17947: Update currentLag(), pause(), and resume() to 
update SubscriptionState in background thread (#17699)
4362ab70901 is described below

commit 4362ab70901a1069ff396e11a8507c66ce1dc641
Author: Kirk True <[email protected]>
AuthorDate: Wed Dec 4 18:31:44 2024 -0800

    KAFKA-17947: Update currentLag(), pause(), and resume() to update 
SubscriptionState in background thread (#17699)
    
    Reviewers: Lianet Magrans <[email protected]>
---
 .../consumer/internals/AsyncKafkaConsumer.java     | 116 +++++++++++----------
 .../internals/events/ApplicationEvent.java         |   1 +
 .../events/ApplicationEventProcessor.java          | 112 +++++++++++++++++---
 .../consumer/internals/events/CurrentLagEvent.java |  50 +++++++++
 .../internals/events/PausePartitionsEvent.java     |  42 ++++++++
 .../internals/events/ResumePartitionsEvent.java    |  42 ++++++++
 6 files changed, 294 insertions(+), 69 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 32e5fe32f48..c75d0e263fc 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -51,13 +51,16 @@ import 
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper
 import 
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.CreateFetchRequestsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CurrentLagEvent;
 import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
 import 
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.LeaveGroupOnCloseEvent;
 import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.PausePartitionsEvent;
 import org.apache.kafka.clients.consumer.internals.events.PollEvent;
 import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ResumePartitionsEvent;
 import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
 import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
 import org.apache.kafka.clients.consumer.internals.events.TopicMetadataEvent;
@@ -243,7 +246,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     private final ConsumerMetadata metadata;
     private final Metrics metrics;
     private final long retryBackoffMs;
-    private final int defaultApiTimeoutMs;
+    private final Duration defaultApiTimeoutMs;
     private final boolean autoCommitEnabled;
     private volatile boolean closed = false;
     // Init value is needed to avoid NPE in case of exception raised in the 
constructor
@@ -313,7 +316,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             this.log = logContext.logger(getClass());
 
             log.debug("Initializing the Kafka consumer");
-            this.defaultApiTimeoutMs = 
config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
+            this.defaultApiTimeoutMs = 
Duration.ofMillis(config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG));
             this.time = time;
             List<MetricsReporter> reporters = 
CommonClientConfigs.metricsReporters(clientId, config);
             this.clientTelemetryReporter = 
CommonClientConfigs.telemetryReporter(clientId, config);
@@ -455,7 +458,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         this.groupMetadata.set(initializeGroupMetadata(groupId, 
Optional.empty()));
         this.metadata = metadata;
         this.retryBackoffMs = retryBackoffMs;
-        this.defaultApiTimeoutMs = defaultApiTimeoutMs;
+        this.defaultApiTimeoutMs = Duration.ofMillis(defaultApiTimeoutMs);
         this.deserializers = deserializers;
         this.applicationEventHandler = applicationEventHandler;
         this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, 
"consumer");
@@ -483,7 +486,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         this.metrics = new Metrics(time);
         this.metadata = metadata;
         this.retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
-        this.defaultApiTimeoutMs = 
config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
+        this.defaultApiTimeoutMs = 
Duration.ofMillis(config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG));
         this.deserializers = new Deserializers<>(keyDeserializer, 
valueDeserializer);
         this.clientTelemetryReporter = Optional.empty();
 
@@ -763,7 +766,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
      */
     @Override
     public void commitSync() {
-        commitSync(Duration.ofMillis(defaultApiTimeoutMs));
+        commitSync(defaultApiTimeoutMs);
     }
 
     /**
@@ -828,9 +831,12 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         acquireAndEnsureOpen();
         try {
             log.info("Seeking to offset {} for partition {}", offset, 
partition);
-            Timer timer = time.timer(defaultApiTimeoutMs);
             SeekUnvalidatedEvent seekUnvalidatedEventEvent = new 
SeekUnvalidatedEvent(
-                    calculateDeadlineMs(timer), partition, offset, 
Optional.empty());
+                defaultApiTimeoutDeadlineMs(),
+                partition,
+                offset,
+                Optional.empty()
+            );
             applicationEventHandler.addAndGet(seekUnvalidatedEventEvent);
         } finally {
             release();
@@ -853,10 +859,12 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                 log.info("Seeking to offset {} for partition {}", offset, 
partition);
             }
 
-            Timer timer = time.timer(defaultApiTimeoutMs);
-            SeekUnvalidatedEvent seekUnvalidatedEventEvent = new 
SeekUnvalidatedEvent(
-                    calculateDeadlineMs(timer), partition, 
offsetAndMetadata.offset(), offsetAndMetadata.leaderEpoch());
-            applicationEventHandler.addAndGet(seekUnvalidatedEventEvent);
+            applicationEventHandler.addAndGet(new SeekUnvalidatedEvent(
+                defaultApiTimeoutDeadlineMs(),
+                partition,
+                offsetAndMetadata.offset(),
+                offsetAndMetadata.leaderEpoch()
+            ));
         } finally {
             release();
         }
@@ -878,9 +886,11 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
         acquireAndEnsureOpen();
         try {
-            Timer timer = time.timer(defaultApiTimeoutMs);
-            ResetOffsetEvent event = new ResetOffsetEvent(partitions, 
offsetResetStrategy, calculateDeadlineMs(timer));
-            applicationEventHandler.addAndGet(event);
+            applicationEventHandler.addAndGet(new ResetOffsetEvent(
+                partitions,
+                offsetResetStrategy,
+                defaultApiTimeoutDeadlineMs())
+            );
         } finally {
             release();
         }
@@ -888,7 +898,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     @Override
     public long position(TopicPartition partition) {
-        return position(partition, Duration.ofMillis(defaultApiTimeoutMs));
+        return position(partition, defaultApiTimeoutMs);
     }
 
     @Override
@@ -918,7 +928,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     @Override
     public Map<TopicPartition, OffsetAndMetadata> committed(final 
Set<TopicPartition> partitions) {
-        return committed(partitions, Duration.ofMillis(defaultApiTimeoutMs));
+        return committed(partitions, defaultApiTimeoutMs);
     }
 
     @Override
@@ -965,7 +975,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     @Override
     public List<PartitionInfo> partitionsFor(String topic) {
-        return partitionsFor(topic, Duration.ofMillis(defaultApiTimeoutMs));
+        return partitionsFor(topic, defaultApiTimeoutMs);
     }
 
     @Override
@@ -998,7 +1008,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     @Override
     public Map<String, List<PartitionInfo>> listTopics() {
-        return listTopics(Duration.ofMillis(defaultApiTimeoutMs));
+        return listTopics(defaultApiTimeoutMs);
     }
 
     @Override
@@ -1035,10 +1045,10 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     public void pause(Collection<TopicPartition> partitions) {
         acquireAndEnsureOpen();
         try {
-            log.debug("Pausing partitions {}", partitions);
-            for (TopicPartition partition : partitions) {
-                subscriptions.pause(partition);
-            }
+            Objects.requireNonNull(partitions, "The partitions to pause must 
be nonnull");
+
+            if (!partitions.isEmpty())
+                applicationEventHandler.addAndGet(new 
PausePartitionsEvent(partitions, defaultApiTimeoutDeadlineMs()));
         } finally {
             release();
         }
@@ -1048,10 +1058,10 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     public void resume(Collection<TopicPartition> partitions) {
         acquireAndEnsureOpen();
         try {
-            log.debug("Resuming partitions {}", partitions);
-            for (TopicPartition partition : partitions) {
-                subscriptions.resume(partition);
-            }
+            Objects.requireNonNull(partitions, "The partitions to resume must 
be nonnull");
+
+            if (!partitions.isEmpty())
+                applicationEventHandler.addAndGet(new 
ResumePartitionsEvent(partitions, defaultApiTimeoutDeadlineMs()));
         } finally {
             release();
         }
@@ -1059,7 +1069,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     @Override
     public Map<TopicPartition, OffsetAndTimestamp> 
offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
-        return offsetsForTimes(timestampsToSearch, 
Duration.ofMillis(defaultApiTimeoutMs));
+        return offsetsForTimes(timestampsToSearch, defaultApiTimeoutMs);
     }
 
     @Override
@@ -1107,7 +1117,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     @Override
     public Map<TopicPartition, Long> 
beginningOffsets(Collection<TopicPartition> partitions) {
-        return beginningOffsets(partitions, 
Duration.ofMillis(defaultApiTimeoutMs));
+        return beginningOffsets(partitions, defaultApiTimeoutMs);
     }
 
     @Override
@@ -1117,7 +1127,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     @Override
     public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> 
partitions) {
-        return endOffsets(partitions, Duration.ofMillis(defaultApiTimeoutMs));
+        return endOffsets(partitions, defaultApiTimeoutMs);
     }
 
     @Override
@@ -1173,25 +1183,11 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     public OptionalLong currentLag(TopicPartition topicPartition) {
         acquireAndEnsureOpen();
         try {
-            final Long lag = subscriptions.partitionLag(topicPartition, 
isolationLevel);
-
-            // if the log end offset is not known and hence cannot return lag 
and there is
-            // no in-flight list offset requested yet,
-            // issue a list offset request for that partition so that next time
-            // we may get the answer; we do not need to wait for the return 
value
-            // since we would not try to poll the network client synchronously
-            if (lag == null) {
-                if (subscriptions.partitionEndOffset(topicPartition, 
isolationLevel) == null &&
-                    
!subscriptions.partitionEndOffsetRequested(topicPartition)) {
-                    log.info("Requesting the log end offset for {} in order to 
compute lag", topicPartition);
-                    subscriptions.requestPartitionEndOffset(topicPartition);
-                    endOffsets(Collections.singleton(topicPartition), 
Duration.ofMillis(0));
-                }
-
-                return OptionalLong.empty();
-            }
-
-            return OptionalLong.of(lag);
+            return applicationEventHandler.addAndGet(new CurrentLagEvent(
+                topicPartition,
+                isolationLevel,
+                defaultApiTimeoutDeadlineMs()
+            ));
         } finally {
             release();
         }
@@ -1435,7 +1431,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     @Override
     public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
-        commitSync(Optional.of(offsets), 
Duration.ofMillis(defaultApiTimeoutMs));
+        commitSync(Optional.of(offsets), defaultApiTimeoutMs);
     }
 
     @Override
@@ -1556,9 +1552,11 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             // be no following rebalance.
             //
             // See the ApplicationEventProcessor.process() method that handles 
this event for more detail.
-            Timer timer = time.timer(defaultApiTimeoutMs);
-            AssignmentChangeEvent assignmentChangeEvent = new 
AssignmentChangeEvent(timer.currentTimeMs(), calculateDeadlineMs(timer), 
partitions);
-            applicationEventHandler.addAndGet(assignmentChangeEvent);
+            applicationEventHandler.addAndGet(new AssignmentChangeEvent(
+                time.milliseconds(),
+                defaultApiTimeoutDeadlineMs(),
+                partitions
+            ));
         } finally {
             release();
         }
@@ -1866,7 +1864,10 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                     "null" : "empty"));
             log.info("Subscribed to pattern: '{}'", pattern);
             applicationEventHandler.addAndGet(new 
TopicPatternSubscriptionChangeEvent(
-                pattern, listener, 
calculateDeadlineMs(time.timer(defaultApiTimeoutMs))));
+                pattern,
+                listener,
+                defaultApiTimeoutDeadlineMs()
+            ));
         } finally {
             release();
         }
@@ -1928,7 +1929,10 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                 fetchBuffer.retainAll(currentTopicPartitions);
                 log.info("Subscribed to topic(s): {}", String.join(", ", 
topics));
                 applicationEventHandler.addAndGet(new 
TopicSubscriptionChangeEvent(
-                    new HashSet<>(topics), listener, 
calculateDeadlineMs(time.timer(defaultApiTimeoutMs))));
+                    new HashSet<>(topics),
+                    listener,
+                    defaultApiTimeoutDeadlineMs()
+                ));
             }
         } finally {
             release();
@@ -2095,4 +2099,8 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     SubscriptionState subscriptions() {
         return subscriptions;
     }
+
+    private long defaultApiTimeoutDeadlineMs() {
+        return calculateDeadlineMs(time, defaultApiTimeoutMs);
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
index 406ca55b79b..c30c0c2edde 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
@@ -35,6 +35,7 @@ public abstract class ApplicationEvent {
         UPDATE_SUBSCRIPTION_METADATA, UNSUBSCRIBE,
         CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED,
         COMMIT_ON_CLOSE, CREATE_FETCH_REQUESTS, LEAVE_GROUP_ON_CLOSE,
+        PAUSE_PARTITIONS, RESUME_PARTITIONS, CURRENT_LAG,
         SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC,
         SHARE_SUBSCRIPTION_CHANGE, SHARE_UNSUBSCRIBE,
         SHARE_ACKNOWLEDGE_ON_CLOSE,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 916810609b3..9c119e28b7b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -27,18 +27,22 @@ import 
org.apache.kafka.clients.consumer.internals.RequestManagers;
 import org.apache.kafka.clients.consumer.internals.ShareConsumeRequestManager;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
 import org.apache.kafka.common.utils.LogContext;
 
 import org.slf4j.Logger;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.BiConsumer;
@@ -180,6 +184,18 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
                 process((SeekUnvalidatedEvent) event);
                 return;
 
+            case PAUSE_PARTITIONS:
+                process((PausePartitionsEvent) event);
+                return;
+
+            case RESUME_PARTITIONS:
+                process((ResumePartitionsEvent) event);
+                return;
+
+            case CURRENT_LAG:
+                process((CurrentLagEvent) event);
+                return;
+
             default:
                 log.warn("Application event type {} was not expected", 
event.type());
         }
@@ -544,6 +560,87 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
         
manager.setAcknowledgementCommitCallbackRegistered(event.isCallbackRegistered());
     }
 
+    private void process(final SeekUnvalidatedEvent event) {
+        try {
+            event.offsetEpoch().ifPresent(epoch -> 
metadata.updateLastSeenEpochIfNewer(event.partition(), epoch));
+            SubscriptionState.FetchPosition newPosition = new 
SubscriptionState.FetchPosition(
+                event.offset(),
+                event.offsetEpoch(),
+                metadata.currentLeader(event.partition())
+            );
+            subscriptions.seekUnvalidated(event.partition(), newPosition);
+            event.future().complete(null);
+        } catch (Exception e) {
+            event.future().completeExceptionally(e);
+        }
+    }
+
+    private void process(final PausePartitionsEvent event) {
+        try {
+            Collection<TopicPartition> partitions = event.partitions();
+            log.debug("Pausing partitions {}", partitions);
+
+            for (TopicPartition partition : partitions) {
+                subscriptions.pause(partition);
+            }
+
+            event.future().complete(null);
+        } catch (Exception e) {
+            event.future().completeExceptionally(e);
+        }
+    }
+
+    private void process(final ResumePartitionsEvent event) {
+        try {
+            Collection<TopicPartition> partitions = event.partitions();
+            log.debug("Resuming partitions {}", partitions);
+
+            for (TopicPartition partition : partitions) {
+                subscriptions.resume(partition);
+            }
+
+            event.future().complete(null);
+        } catch (Exception e) {
+            event.future().completeExceptionally(e);
+        }
+    }
+
+    private void process(final CurrentLagEvent event) {
+        try {
+            final TopicPartition topicPartition = event.partition();
+            final IsolationLevel isolationLevel = event.isolationLevel();
+            final Long lag = subscriptions.partitionLag(topicPartition, 
isolationLevel);
+
+            final OptionalLong lagOpt;
+            if (lag == null) {
+                if (subscriptions.partitionEndOffset(topicPartition, 
isolationLevel) == null &&
+                    
!subscriptions.partitionEndOffsetRequested(topicPartition)) {
+                    // If the log end offset is unknown and there isn't 
already an in-flight list offset
+                    // request, issue one with the goal that the lag will be 
available the next time the
+                    // user calls currentLag().
+                    log.info("Requesting the log end offset for {} in order to 
compute lag", topicPartition);
+                    subscriptions.requestPartitionEndOffset(topicPartition);
+
+                    // Emulates the Consumer.endOffsets() logic...
+                    Map<TopicPartition, Long> timestampToSearch = 
Collections.singletonMap(
+                        topicPartition,
+                        ListOffsetsRequest.LATEST_TIMESTAMP
+                    );
+
+                    
requestManagers.offsetsRequestManager.fetchOffsets(timestampToSearch, false);
+                }
+
+                lagOpt = OptionalLong.empty();
+            } else {
+                lagOpt = OptionalLong.of(lag);
+            }
+
+            event.future().complete(lagOpt);
+        } catch (Exception e) {
+            event.future().completeExceptionally(e);
+        }
+    }
+
     private <T> BiConsumer<? super T, ? super Throwable> complete(final 
CompletableFuture<T> b) {
         return (value, exception) -> {
             if (exception != null)
@@ -575,21 +672,6 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
         };
     }
 
-    private void process(final SeekUnvalidatedEvent event) {
-        try {
-            event.offsetEpoch().ifPresent(epoch -> 
metadata.updateLastSeenEpochIfNewer(event.partition(), epoch));
-            SubscriptionState.FetchPosition newPosition = new 
SubscriptionState.FetchPosition(
-                    event.offset(),
-                    event.offsetEpoch(),
-                    metadata.currentLeader(event.partition())
-            );
-            subscriptions.seekUnvalidated(event.partition(), newPosition);
-            event.future().complete(null);
-        } catch (Exception e) {
-            event.future().completeExceptionally(e);
-        }
-    }
-
     /**
      * This function evaluates the regex that the consumer subscribed to
      * against the list of topic names from metadata, and updates
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CurrentLagEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CurrentLagEvent.java
new file mode 100644
index 00000000000..95bd23fde67
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CurrentLagEvent.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Objects;
+import java.util.OptionalLong;
+
+public class CurrentLagEvent extends CompletableApplicationEvent<OptionalLong> 
{
+
+    private final TopicPartition partition;
+
+    private final IsolationLevel isolationLevel;
+
+    public CurrentLagEvent(final TopicPartition partition, final 
IsolationLevel isolationLevel, final long deadlineMs) {
+        super(Type.CURRENT_LAG, deadlineMs);
+        this.partition = Objects.requireNonNull(partition);
+        this.isolationLevel = Objects.requireNonNull(isolationLevel);
+    }
+
+    public TopicPartition partition() {
+        return partition;
+    }
+
+    public IsolationLevel isolationLevel() {
+        return isolationLevel;
+    }
+
+    @Override
+    public String toStringBase() {
+        return super.toStringBase() + ", partition=" + partition + ", 
isolationLevel=" + isolationLevel;
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PausePartitionsEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PausePartitionsEvent.java
new file mode 100644
index 00000000000..14c729e1d4a
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PausePartitionsEvent.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Collection;
+import java.util.Collections;
+
+public class PausePartitionsEvent extends CompletableApplicationEvent<Void> {
+
+    private final Collection<TopicPartition> partitions;
+
+    public PausePartitionsEvent(final Collection<TopicPartition> partitions, 
final long deadlineMs) {
+        super(Type.PAUSE_PARTITIONS, deadlineMs);
+        this.partitions = Collections.unmodifiableCollection(partitions);
+    }
+
+    public Collection<TopicPartition> partitions() {
+        return partitions;
+    }
+
+    @Override
+    public String toStringBase() {
+        return super.toStringBase() + ", partitions=" + partitions;
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResumePartitionsEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResumePartitionsEvent.java
new file mode 100644
index 00000000000..02a49a057ad
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResumePartitionsEvent.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Collection;
+import java.util.Collections;
+
+public class ResumePartitionsEvent extends CompletableApplicationEvent<Void> {
+
+    private final Collection<TopicPartition> partitions;
+
+    public ResumePartitionsEvent(final Collection<TopicPartition> partitions, 
final long deadlineMs) {
+        super(Type.RESUME_PARTITIONS, deadlineMs);
+        this.partitions = Collections.unmodifiableCollection(partitions);
+    }
+
+    public Collection<TopicPartition> partitions() {
+        return partitions;
+    }
+
+    @Override
+    public String toStringBase() {
+        return super.toStringBase() + ", partitions=" + partitions;
+    }
+}

Reply via email to