This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4362ab70901 KAFKA-17947: Update currentLag(), pause(), and resume() to
update SubscriptionState in background thread (#17699)
4362ab70901 is described below
commit 4362ab70901a1069ff396e11a8507c66ce1dc641
Author: Kirk True <[email protected]>
AuthorDate: Wed Dec 4 18:31:44 2024 -0800
KAFKA-17947: Update currentLag(), pause(), and resume() to update
SubscriptionState in background thread (#17699)
Reviewers: Lianet Magrans <[email protected]>
---
.../consumer/internals/AsyncKafkaConsumer.java | 116 +++++++++++----------
.../internals/events/ApplicationEvent.java | 1 +
.../events/ApplicationEventProcessor.java | 112 +++++++++++++++++---
.../consumer/internals/events/CurrentLagEvent.java | 50 +++++++++
.../internals/events/PausePartitionsEvent.java | 42 ++++++++
.../internals/events/ResumePartitionsEvent.java | 42 ++++++++
6 files changed, 294 insertions(+), 69 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 32e5fe32f48..c75d0e263fc 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -51,13 +51,16 @@ import
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper
import
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
import
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
import
org.apache.kafka.clients.consumer.internals.events.CreateFetchRequestsEvent;
+import org.apache.kafka.clients.consumer.internals.events.CurrentLagEvent;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
import
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
import
org.apache.kafka.clients.consumer.internals.events.LeaveGroupOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
+import org.apache.kafka.clients.consumer.internals.events.PausePartitionsEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
+import
org.apache.kafka.clients.consumer.internals.events.ResumePartitionsEvent;
import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.TopicMetadataEvent;
@@ -243,7 +246,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
private final ConsumerMetadata metadata;
private final Metrics metrics;
private final long retryBackoffMs;
- private final int defaultApiTimeoutMs;
+ private final Duration defaultApiTimeoutMs;
private final boolean autoCommitEnabled;
private volatile boolean closed = false;
// Init value is needed to avoid NPE in case of exception raised in the
constructor
@@ -313,7 +316,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
this.log = logContext.logger(getClass());
log.debug("Initializing the Kafka consumer");
- this.defaultApiTimeoutMs =
config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
+ this.defaultApiTimeoutMs =
Duration.ofMillis(config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG));
this.time = time;
List<MetricsReporter> reporters =
CommonClientConfigs.metricsReporters(clientId, config);
this.clientTelemetryReporter =
CommonClientConfigs.telemetryReporter(clientId, config);
@@ -455,7 +458,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
this.groupMetadata.set(initializeGroupMetadata(groupId,
Optional.empty()));
this.metadata = metadata;
this.retryBackoffMs = retryBackoffMs;
- this.defaultApiTimeoutMs = defaultApiTimeoutMs;
+ this.defaultApiTimeoutMs = Duration.ofMillis(defaultApiTimeoutMs);
this.deserializers = deserializers;
this.applicationEventHandler = applicationEventHandler;
this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics,
"consumer");
@@ -483,7 +486,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
this.metrics = new Metrics(time);
this.metadata = metadata;
this.retryBackoffMs =
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
- this.defaultApiTimeoutMs =
config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
+ this.defaultApiTimeoutMs =
Duration.ofMillis(config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG));
this.deserializers = new Deserializers<>(keyDeserializer,
valueDeserializer);
this.clientTelemetryReporter = Optional.empty();
@@ -763,7 +766,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
*/
@Override
public void commitSync() {
- commitSync(Duration.ofMillis(defaultApiTimeoutMs));
+ commitSync(defaultApiTimeoutMs);
}
/**
@@ -828,9 +831,12 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
acquireAndEnsureOpen();
try {
log.info("Seeking to offset {} for partition {}", offset,
partition);
- Timer timer = time.timer(defaultApiTimeoutMs);
SeekUnvalidatedEvent seekUnvalidatedEventEvent = new
SeekUnvalidatedEvent(
- calculateDeadlineMs(timer), partition, offset,
Optional.empty());
+ defaultApiTimeoutDeadlineMs(),
+ partition,
+ offset,
+ Optional.empty()
+ );
applicationEventHandler.addAndGet(seekUnvalidatedEventEvent);
} finally {
release();
@@ -853,10 +859,12 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
log.info("Seeking to offset {} for partition {}", offset,
partition);
}
- Timer timer = time.timer(defaultApiTimeoutMs);
- SeekUnvalidatedEvent seekUnvalidatedEventEvent = new
SeekUnvalidatedEvent(
- calculateDeadlineMs(timer), partition,
offsetAndMetadata.offset(), offsetAndMetadata.leaderEpoch());
- applicationEventHandler.addAndGet(seekUnvalidatedEventEvent);
+ applicationEventHandler.addAndGet(new SeekUnvalidatedEvent(
+ defaultApiTimeoutDeadlineMs(),
+ partition,
+ offsetAndMetadata.offset(),
+ offsetAndMetadata.leaderEpoch()
+ ));
} finally {
release();
}
@@ -878,9 +886,11 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
acquireAndEnsureOpen();
try {
- Timer timer = time.timer(defaultApiTimeoutMs);
- ResetOffsetEvent event = new ResetOffsetEvent(partitions,
offsetResetStrategy, calculateDeadlineMs(timer));
- applicationEventHandler.addAndGet(event);
+ applicationEventHandler.addAndGet(new ResetOffsetEvent(
+ partitions,
+ offsetResetStrategy,
+ defaultApiTimeoutDeadlineMs())
+ );
} finally {
release();
}
@@ -888,7 +898,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
@Override
public long position(TopicPartition partition) {
- return position(partition, Duration.ofMillis(defaultApiTimeoutMs));
+ return position(partition, defaultApiTimeoutMs);
}
@Override
@@ -918,7 +928,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
@Override
public Map<TopicPartition, OffsetAndMetadata> committed(final
Set<TopicPartition> partitions) {
- return committed(partitions, Duration.ofMillis(defaultApiTimeoutMs));
+ return committed(partitions, defaultApiTimeoutMs);
}
@Override
@@ -965,7 +975,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
@Override
public List<PartitionInfo> partitionsFor(String topic) {
- return partitionsFor(topic, Duration.ofMillis(defaultApiTimeoutMs));
+ return partitionsFor(topic, defaultApiTimeoutMs);
}
@Override
@@ -998,7 +1008,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
@Override
public Map<String, List<PartitionInfo>> listTopics() {
- return listTopics(Duration.ofMillis(defaultApiTimeoutMs));
+ return listTopics(defaultApiTimeoutMs);
}
@Override
@@ -1035,10 +1045,10 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
public void pause(Collection<TopicPartition> partitions) {
acquireAndEnsureOpen();
try {
- log.debug("Pausing partitions {}", partitions);
- for (TopicPartition partition : partitions) {
- subscriptions.pause(partition);
- }
+ Objects.requireNonNull(partitions, "The partitions to pause must
be nonnull");
+
+ if (!partitions.isEmpty())
+ applicationEventHandler.addAndGet(new
PausePartitionsEvent(partitions, defaultApiTimeoutDeadlineMs()));
} finally {
release();
}
@@ -1048,10 +1058,10 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
public void resume(Collection<TopicPartition> partitions) {
acquireAndEnsureOpen();
try {
- log.debug("Resuming partitions {}", partitions);
- for (TopicPartition partition : partitions) {
- subscriptions.resume(partition);
- }
+ Objects.requireNonNull(partitions, "The partitions to resume must
be nonnull");
+
+ if (!partitions.isEmpty())
+ applicationEventHandler.addAndGet(new
ResumePartitionsEvent(partitions, defaultApiTimeoutDeadlineMs()));
} finally {
release();
}
@@ -1059,7 +1069,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
@Override
public Map<TopicPartition, OffsetAndTimestamp>
offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
- return offsetsForTimes(timestampsToSearch,
Duration.ofMillis(defaultApiTimeoutMs));
+ return offsetsForTimes(timestampsToSearch, defaultApiTimeoutMs);
}
@Override
@@ -1107,7 +1117,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
@Override
public Map<TopicPartition, Long>
beginningOffsets(Collection<TopicPartition> partitions) {
- return beginningOffsets(partitions,
Duration.ofMillis(defaultApiTimeoutMs));
+ return beginningOffsets(partitions, defaultApiTimeoutMs);
}
@Override
@@ -1117,7 +1127,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
@Override
public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition>
partitions) {
- return endOffsets(partitions, Duration.ofMillis(defaultApiTimeoutMs));
+ return endOffsets(partitions, defaultApiTimeoutMs);
}
@Override
@@ -1173,25 +1183,11 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
public OptionalLong currentLag(TopicPartition topicPartition) {
acquireAndEnsureOpen();
try {
- final Long lag = subscriptions.partitionLag(topicPartition,
isolationLevel);
-
- // if the log end offset is not known and hence cannot return lag
and there is
- // no in-flight list offset requested yet,
- // issue a list offset request for that partition so that next time
- // we may get the answer; we do not need to wait for the return
value
- // since we would not try to poll the network client synchronously
- if (lag == null) {
- if (subscriptions.partitionEndOffset(topicPartition,
isolationLevel) == null &&
-
!subscriptions.partitionEndOffsetRequested(topicPartition)) {
- log.info("Requesting the log end offset for {} in order to
compute lag", topicPartition);
- subscriptions.requestPartitionEndOffset(topicPartition);
- endOffsets(Collections.singleton(topicPartition),
Duration.ofMillis(0));
- }
-
- return OptionalLong.empty();
- }
-
- return OptionalLong.of(lag);
+ return applicationEventHandler.addAndGet(new CurrentLagEvent(
+ topicPartition,
+ isolationLevel,
+ defaultApiTimeoutDeadlineMs()
+ ));
} finally {
release();
}
@@ -1435,7 +1431,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
@Override
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
- commitSync(Optional.of(offsets),
Duration.ofMillis(defaultApiTimeoutMs));
+ commitSync(Optional.of(offsets), defaultApiTimeoutMs);
}
@Override
@@ -1556,9 +1552,11 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
// be no following rebalance.
//
// See the ApplicationEventProcessor.process() method that handles
this event for more detail.
- Timer timer = time.timer(defaultApiTimeoutMs);
- AssignmentChangeEvent assignmentChangeEvent = new
AssignmentChangeEvent(timer.currentTimeMs(), calculateDeadlineMs(timer),
partitions);
- applicationEventHandler.addAndGet(assignmentChangeEvent);
+ applicationEventHandler.addAndGet(new AssignmentChangeEvent(
+ time.milliseconds(),
+ defaultApiTimeoutDeadlineMs(),
+ partitions
+ ));
} finally {
release();
}
@@ -1866,7 +1864,10 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
"null" : "empty"));
log.info("Subscribed to pattern: '{}'", pattern);
applicationEventHandler.addAndGet(new
TopicPatternSubscriptionChangeEvent(
- pattern, listener,
calculateDeadlineMs(time.timer(defaultApiTimeoutMs))));
+ pattern,
+ listener,
+ defaultApiTimeoutDeadlineMs()
+ ));
} finally {
release();
}
@@ -1928,7 +1929,10 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
fetchBuffer.retainAll(currentTopicPartitions);
log.info("Subscribed to topic(s): {}", String.join(", ",
topics));
applicationEventHandler.addAndGet(new
TopicSubscriptionChangeEvent(
- new HashSet<>(topics), listener,
calculateDeadlineMs(time.timer(defaultApiTimeoutMs))));
+ new HashSet<>(topics),
+ listener,
+ defaultApiTimeoutDeadlineMs()
+ ));
}
} finally {
release();
@@ -2095,4 +2099,8 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
SubscriptionState subscriptions() {
return subscriptions;
}
+
+ private long defaultApiTimeoutDeadlineMs() {
+ return calculateDeadlineMs(time, defaultApiTimeoutMs);
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
index 406ca55b79b..c30c0c2edde 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
@@ -35,6 +35,7 @@ public abstract class ApplicationEvent {
UPDATE_SUBSCRIPTION_METADATA, UNSUBSCRIBE,
CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED,
COMMIT_ON_CLOSE, CREATE_FETCH_REQUESTS, LEAVE_GROUP_ON_CLOSE,
+ PAUSE_PARTITIONS, RESUME_PARTITIONS, CURRENT_LAG,
SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC,
SHARE_SUBSCRIPTION_CHANGE, SHARE_UNSUBSCRIBE,
SHARE_ACKNOWLEDGE_ON_CLOSE,
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 916810609b3..9c119e28b7b 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -27,18 +27,22 @@ import
org.apache.kafka.clients.consumer.internals.RequestManagers;
import org.apache.kafka.clients.consumer.internals.ShareConsumeRequestManager;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
@@ -180,6 +184,18 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
process((SeekUnvalidatedEvent) event);
return;
+ case PAUSE_PARTITIONS:
+ process((PausePartitionsEvent) event);
+ return;
+
+ case RESUME_PARTITIONS:
+ process((ResumePartitionsEvent) event);
+ return;
+
+ case CURRENT_LAG:
+ process((CurrentLagEvent) event);
+ return;
+
default:
log.warn("Application event type {} was not expected",
event.type());
}
@@ -544,6 +560,87 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
manager.setAcknowledgementCommitCallbackRegistered(event.isCallbackRegistered());
}
+ private void process(final SeekUnvalidatedEvent event) {
+ try {
+ event.offsetEpoch().ifPresent(epoch ->
metadata.updateLastSeenEpochIfNewer(event.partition(), epoch));
+ SubscriptionState.FetchPosition newPosition = new
SubscriptionState.FetchPosition(
+ event.offset(),
+ event.offsetEpoch(),
+ metadata.currentLeader(event.partition())
+ );
+ subscriptions.seekUnvalidated(event.partition(), newPosition);
+ event.future().complete(null);
+ } catch (Exception e) {
+ event.future().completeExceptionally(e);
+ }
+ }
+
+ private void process(final PausePartitionsEvent event) {
+ try {
+ Collection<TopicPartition> partitions = event.partitions();
+ log.debug("Pausing partitions {}", partitions);
+
+ for (TopicPartition partition : partitions) {
+ subscriptions.pause(partition);
+ }
+
+ event.future().complete(null);
+ } catch (Exception e) {
+ event.future().completeExceptionally(e);
+ }
+ }
+
+ private void process(final ResumePartitionsEvent event) {
+ try {
+ Collection<TopicPartition> partitions = event.partitions();
+ log.debug("Resuming partitions {}", partitions);
+
+ for (TopicPartition partition : partitions) {
+ subscriptions.resume(partition);
+ }
+
+ event.future().complete(null);
+ } catch (Exception e) {
+ event.future().completeExceptionally(e);
+ }
+ }
+
+ private void process(final CurrentLagEvent event) {
+ try {
+ final TopicPartition topicPartition = event.partition();
+ final IsolationLevel isolationLevel = event.isolationLevel();
+ final Long lag = subscriptions.partitionLag(topicPartition,
isolationLevel);
+
+ final OptionalLong lagOpt;
+ if (lag == null) {
+ if (subscriptions.partitionEndOffset(topicPartition,
isolationLevel) == null &&
+
!subscriptions.partitionEndOffsetRequested(topicPartition)) {
+ // If the log end offset is unknown and there isn't
already an in-flight list offset
+ // request, issue one with the goal that the lag will be
available the next time the
+ // user calls currentLag().
+ log.info("Requesting the log end offset for {} in order to
compute lag", topicPartition);
+ subscriptions.requestPartitionEndOffset(topicPartition);
+
+ // Emulates the Consumer.endOffsets() logic...
+ Map<TopicPartition, Long> timestampToSearch =
Collections.singletonMap(
+ topicPartition,
+ ListOffsetsRequest.LATEST_TIMESTAMP
+ );
+
+
requestManagers.offsetsRequestManager.fetchOffsets(timestampToSearch, false);
+ }
+
+ lagOpt = OptionalLong.empty();
+ } else {
+ lagOpt = OptionalLong.of(lag);
+ }
+
+ event.future().complete(lagOpt);
+ } catch (Exception e) {
+ event.future().completeExceptionally(e);
+ }
+ }
+
private <T> BiConsumer<? super T, ? super Throwable> complete(final
CompletableFuture<T> b) {
return (value, exception) -> {
if (exception != null)
@@ -575,21 +672,6 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
};
}
- private void process(final SeekUnvalidatedEvent event) {
- try {
- event.offsetEpoch().ifPresent(epoch ->
metadata.updateLastSeenEpochIfNewer(event.partition(), epoch));
- SubscriptionState.FetchPosition newPosition = new
SubscriptionState.FetchPosition(
- event.offset(),
- event.offsetEpoch(),
- metadata.currentLeader(event.partition())
- );
- subscriptions.seekUnvalidated(event.partition(), newPosition);
- event.future().complete(null);
- } catch (Exception e) {
- event.future().completeExceptionally(e);
- }
- }
-
/**
* This function evaluates the regex that the consumer subscribed to
* against the list of topic names from metadata, and updates
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CurrentLagEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CurrentLagEvent.java
new file mode 100644
index 00000000000..95bd23fde67
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CurrentLagEvent.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Objects;
+import java.util.OptionalLong;
+
+public class CurrentLagEvent extends CompletableApplicationEvent<OptionalLong>
{
+
+ private final TopicPartition partition;
+
+ private final IsolationLevel isolationLevel;
+
+ public CurrentLagEvent(final TopicPartition partition, final
IsolationLevel isolationLevel, final long deadlineMs) {
+ super(Type.CURRENT_LAG, deadlineMs);
+ this.partition = Objects.requireNonNull(partition);
+ this.isolationLevel = Objects.requireNonNull(isolationLevel);
+ }
+
+ public TopicPartition partition() {
+ return partition;
+ }
+
+ public IsolationLevel isolationLevel() {
+ return isolationLevel;
+ }
+
+ @Override
+ public String toStringBase() {
+ return super.toStringBase() + ", partition=" + partition + ",
isolationLevel=" + isolationLevel;
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PausePartitionsEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PausePartitionsEvent.java
new file mode 100644
index 00000000000..14c729e1d4a
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PausePartitionsEvent.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Collection;
+import java.util.Collections;
+
+public class PausePartitionsEvent extends CompletableApplicationEvent<Void> {
+
+ private final Collection<TopicPartition> partitions;
+
+ public PausePartitionsEvent(final Collection<TopicPartition> partitions,
final long deadlineMs) {
+ super(Type.PAUSE_PARTITIONS, deadlineMs);
+ this.partitions = Collections.unmodifiableCollection(partitions);
+ }
+
+ public Collection<TopicPartition> partitions() {
+ return partitions;
+ }
+
+ @Override
+ public String toStringBase() {
+ return super.toStringBase() + ", partitions=" + partitions;
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResumePartitionsEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResumePartitionsEvent.java
new file mode 100644
index 00000000000..02a49a057ad
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResumePartitionsEvent.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Collection;
+import java.util.Collections;
+
+public class ResumePartitionsEvent extends CompletableApplicationEvent<Void> {
+
+ private final Collection<TopicPartition> partitions;
+
+ public ResumePartitionsEvent(final Collection<TopicPartition> partitions,
final long deadlineMs) {
+ super(Type.RESUME_PARTITIONS, deadlineMs);
+ this.partitions = Collections.unmodifiableCollection(partitions);
+ }
+
+ public Collection<TopicPartition> partitions() {
+ return partitions;
+ }
+
+ @Override
+ public String toStringBase() {
+ return super.toStringBase() + ", partitions=" + partitions;
+ }
+}