This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f8eb4294d67 KAFKA-16191: Clean up of consumer client internal events 
(#15438)
f8eb4294d67 is described below

commit f8eb4294d67b37e854aa14fb989d5e074df82ac2
Author: Kirk True <[email protected]>
AuthorDate: Thu Feb 29 02:22:23 2024 -0800

    KAFKA-16191: Clean up of consumer client internal events (#15438)
    
    There are a few minor issues with the event sub-classes in the
    org.apache.kafka.clients.consumer.internals.events package that should be 
cleaned up:
    
    - Update the names of subclasses to remove "Application" or "Background"
    - Make toString() final in the base classes and clean up the 
implementations of toStringBase()
    - Fix minor whitespace inconsistencies
    - Make variable/method names consistent
    
    Reviewer: Bruno Cadonna <[email protected]>
---
 .../consumer/internals/AsyncKafkaConsumer.java     |  89 +++++++-------
 .../internals/CoordinatorRequestManager.java       |   6 +-
 .../internals/HeartbeatRequestManager.java         |   4 +-
 .../consumer/internals/MembershipManagerImpl.java  |   4 +-
 .../consumer/internals/OffsetsRequestManager.java  |   4 +-
 ...tEvent.java => AbstractTopicMetadataEvent.java} |  24 ++--
 ...tableEvent.java => AllTopicsMetadataEvent.java} |   9 +-
 .../internals/events/ApplicationEvent.java         |  35 ++++--
 .../internals/events/ApplicationEventHandler.java  |   6 +-
 .../events/ApplicationEventProcessor.java          | 123 +++++++++++--------
 ...cationEvent.java => AssignmentChangeEvent.java} |  33 +----
 ...ApplicationEvent.java => AsyncCommitEvent.java} |  15 +--
 .../consumer/internals/events/BackgroundEvent.java |  30 +++--
 .../internals/events/BackgroundEventHandler.java   |   2 +-
 ...ommitApplicationEvent.java => CommitEvent.java} |  21 +---
 ...plicationEvent.java => CommitOnCloseEvent.java} |  11 +-
 .../events/CompletableApplicationEvent.java        |  45 +------
 .../events/CompletableBackgroundEvent.java         |  28 +----
 .../internals/events/CompletableEvent.java         |   1 -
 ...merRebalanceListenerCallbackCompletedEvent.java |  31 +----
 ...nsumerRebalanceListenerCallbackNeededEvent.java |  30 +----
 .../internals/events/ErrorBackgroundEvent.java     |  59 ---------
 ...nCloseApplicationEvent.java => ErrorEvent.java} |  23 ++--
 ...nEvent.java => FetchCommittedOffsetsEvent.java} |  31 +----
 .../internals/events/GroupMetadataUpdateEvent.java |  32 +----
 ...pplicationEvent.java => LeaveOnCloseEvent.java} |  12 +-
 ...ApplicationEvent.java => ListOffsetsEvent.java} |  33 +----
 .../NewTopicsMetadataUpdateRequestEvent.java       |   7 --
 .../internals/events/PollApplicationEvent.java     |  57 ---------
 ...adataUpdateRequestEvent.java => PollEvent.java} |  21 ++--
 ...licationEvent.java => ResetPositionsEvent.java} |   4 +-
 ...tionEvent.java => SubscriptionChangeEvent.java} |   4 +-
 ...tApplicationEvent.java => SyncCommitEvent.java} |  17 ++-
 .../events/TopicMetadataApplicationEvent.java      |  78 ------------
 ...plicationEvent.java => TopicMetadataEvent.java} |  21 ++--
 ...ApplicationEvent.java => UnsubscribeEvent.java} |   5 +-
 ...ationEvent.java => ValidatePositionsEvent.java} |   4 +-
 .../consumer/internals/AsyncKafkaConsumerTest.java | 134 ++++++++++-----------
 .../internals/ConsumerNetworkThreadTest.java       |  54 ++++-----
 .../internals/CoordinatorRequestManagerTest.java   |   6 +-
 .../internals/HeartbeatRequestManagerTest.java     |  22 +---
 .../internals/MembershipManagerImplTest.java       |   8 +-
 .../internals/OffsetsRequestManagerTest.java       |   6 +-
 .../events/ApplicationEventProcessorTest.java      |   4 +-
 44 files changed, 405 insertions(+), 788 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index d4b461b0140..d810c5f053b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -36,32 +36,33 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import 
org.apache.kafka.clients.consumer.internals.events.AllTopicsMetadataEvent;
 import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
-import 
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.AsyncCommitApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
+import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
 import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
-import 
org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.CommitOnCloseApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
-import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
-import 
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.GroupMetadataUpdateEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.LeaveOnCloseApplicationEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.LeaveOnCloseEvent;
+import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
-import org.apache.kafka.clients.consumer.internals.events.PollApplicationEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeApplicationEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.SyncCommitApplicationEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.TopicMetadataApplicationEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.UnsubscribeApplicationEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.PollEvent;
+import org.apache.kafka.clients.consumer.internals.events.ResetPositionsEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeEvent;
+import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
+import org.apache.kafka.clients.consumer.internals.events.TopicMetadataEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsEvent;
 import 
org.apache.kafka.clients.consumer.internals.metrics.KafkaConsumerMetrics;
 import 
org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager;
 import org.apache.kafka.common.Cluster;
@@ -179,7 +180,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
         /**
          * Process the events—if any—that were produced by the {@link 
ConsumerNetworkThread network thread}.
-         * It is possible that {@link 
org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent an 
error}
+         * It is possible that {@link ErrorEvent an error}
          * could occur when processing the events. In such cases, the 
processor will take a reference to the first
          * error, continue to process the remaining events, and then throw the 
first error that occurred.
          */
@@ -209,7 +210,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         public void process(final BackgroundEvent event) {
             switch (event.type()) {
                 case ERROR:
-                    process((ErrorBackgroundEvent) event);
+                    process((ErrorEvent) event);
                     break;
 
                 case GROUP_METADATA_UPDATE:
@@ -226,7 +227,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             }
         }
 
-        private void process(final ErrorBackgroundEvent event) {
+        private void process(final ErrorEvent event) {
             throw event.error();
         }
 
@@ -703,7 +704,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             do {
 
                 // Make sure to let the background thread know that we are 
still polling.
-                applicationEventHandler.add(new 
PollApplicationEvent(timer.currentTimeMs()));
+                applicationEventHandler.add(new 
PollEvent(timer.currentTimeMs()));
 
                 // We must not allow wake-ups between polling for fetches and 
returning the records.
                 // If the polled fetches are not empty the consumed position 
has already been updated in the polling
@@ -768,7 +769,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, 
OffsetCommitCallback callback) {
         acquireAndEnsureOpen();
         try {
-            AsyncCommitApplicationEvent asyncCommitEvent = new 
AsyncCommitApplicationEvent(offsets);
+            AsyncCommitEvent asyncCommitEvent = new AsyncCommitEvent(offsets);
             CompletableFuture<Void> future = commit(asyncCommitEvent);
             future.whenComplete((r, t) -> {
 
@@ -790,7 +791,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         }
     }
 
-    private CompletableFuture<Void> commit(final CommitApplicationEvent 
commitEvent) {
+    private CompletableFuture<Void> commit(final CommitEvent commitEvent) {
         maybeInvokeCommitCallbacks();
         maybeThrowFencedInstanceException();
         maybeThrowInvalidGroupIdException();
@@ -936,7 +937,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                 return Collections.emptyMap();
             }
 
-            final FetchCommittedOffsetsApplicationEvent event = new 
FetchCommittedOffsetsApplicationEvent(
+            final FetchCommittedOffsetsEvent event = new 
FetchCommittedOffsetsEvent(
                 partitions,
                 timeout.toMillis());
             wakeupTrigger.setActiveTask(event.future());
@@ -988,12 +989,11 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                 throw new TimeoutException();
             }
 
-            final TopicMetadataApplicationEvent topicMetadataApplicationEvent =
-                    new TopicMetadataApplicationEvent(topic, 
timeout.toMillis());
-            
wakeupTrigger.setActiveTask(topicMetadataApplicationEvent.future());
+            final TopicMetadataEvent topicMetadataEvent = new 
TopicMetadataEvent(topic, timeout.toMillis());
+            wakeupTrigger.setActiveTask(topicMetadataEvent.future());
             try {
                 Map<String, List<PartitionInfo>> topicMetadata =
-                        
applicationEventHandler.addAndGet(topicMetadataApplicationEvent, 
time.timer(timeout));
+                        applicationEventHandler.addAndGet(topicMetadataEvent, 
time.timer(timeout));
 
                 return topicMetadata.getOrDefault(topic, 
Collections.emptyList());
             } finally {
@@ -1017,11 +1017,10 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                 throw new TimeoutException();
             }
 
-            final TopicMetadataApplicationEvent topicMetadataApplicationEvent =
-                    new TopicMetadataApplicationEvent(timeout.toMillis());
-            
wakeupTrigger.setActiveTask(topicMetadataApplicationEvent.future());
+            final AllTopicsMetadataEvent topicMetadataEvent = new 
AllTopicsMetadataEvent(timeout.toMillis());
+            wakeupTrigger.setActiveTask(topicMetadataEvent.future());
             try {
-                return 
applicationEventHandler.addAndGet(topicMetadataApplicationEvent, 
time.timer(timeout));
+                return applicationEventHandler.addAndGet(topicMetadataEvent, 
time.timer(timeout));
             } finally {
                 wakeupTrigger.clearTask();
             }
@@ -1089,7 +1088,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             if (timestampsToSearch.isEmpty()) {
                 return Collections.emptyMap();
             }
-            final ListOffsetsApplicationEvent listOffsetsEvent = new 
ListOffsetsApplicationEvent(
+            final ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent(
                 timestampsToSearch,
                 true);
 
@@ -1139,7 +1138,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             Map<TopicPartition, Long> timestampToSearch = partitions
                 .stream()
                 .collect(Collectors.toMap(Function.identity(), tp -> 
timestamp));
-            ListOffsetsApplicationEvent listOffsetsEvent = new 
ListOffsetsApplicationEvent(
+            ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent(
                 timestampToSearch,
                 false);
             Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = 
applicationEventHandler.addAndGet(
@@ -1267,11 +1266,11 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         if (!groupMetadata.isPresent())
             return;
         maybeAutoCommitSync(autoCommitEnabled, timer, firstException);
-        applicationEventHandler.add(new CommitOnCloseApplicationEvent());
+        applicationEventHandler.add(new CommitOnCloseEvent());
         completeQuietly(
             () -> {
                 maybeRevokePartitions();
-                applicationEventHandler.addAndGet(new 
LeaveOnCloseApplicationEvent(), timer);
+                applicationEventHandler.addAndGet(new LeaveOnCloseEvent(), 
timer);
             },
             "Failed to send leaveGroup heartbeat with a timeout(ms)=" + 
timer.timeoutMs(), firstException);
     }
@@ -1349,7 +1348,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         long commitStart = time.nanoseconds();
         try {
             Timer requestTimer = time.timer(timeout.toMillis());
-            SyncCommitApplicationEvent syncCommitEvent = new 
SyncCommitApplicationEvent(offsets, timeout.toMillis());
+            SyncCommitEvent syncCommitEvent = new SyncCommitEvent(offsets, 
timeout.toMillis());
             CompletableFuture<Void> commitFuture = commit(syncCommitEvent);
             wakeupTrigger.setActiveTask(commitFuture);
             ConsumerUtils.getResult(commitFuture, requestTimer);
@@ -1429,7 +1428,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             // be no following rebalance.
             //
             // See the ApplicationEventProcessor.process() method that handles 
this event for more detail.
-            applicationEventHandler.add(new 
AssignmentChangeApplicationEvent(subscriptions.allConsumed(), 
time.milliseconds()));
+            applicationEventHandler.add(new 
AssignmentChangeEvent(subscriptions.allConsumed(), time.milliseconds()));
 
             log.info("Assigned to partition(s): {}", join(partitions, ", "));
             if (subscriptions.assignFromUser(new HashSet<>(partitions)))
@@ -1463,13 +1462,13 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         try {
             fetchBuffer.retainAll(Collections.emptySet());
             if (groupMetadata.isPresent()) {
-                UnsubscribeApplicationEvent unsubscribeApplicationEvent = new 
UnsubscribeApplicationEvent();
-                applicationEventHandler.add(unsubscribeApplicationEvent);
+                UnsubscribeEvent unsubscribeEvent = new UnsubscribeEvent();
+                applicationEventHandler.add(unsubscribeEvent);
                 log.info("Unsubscribing all topics or patterns and assigned 
partitions");
                 Timer timer = time.timer(Long.MAX_VALUE);
 
                 try {
-                    processBackgroundEvents(backgroundEventProcessor, 
unsubscribeApplicationEvent.future(), timer);
+                    processBackgroundEvents(backgroundEventProcessor, 
unsubscribeEvent.future(), timer);
                     log.info("Unsubscribed all topics or patterns and assigned 
partitions");
                 } catch (TimeoutException e) {
                     log.error("Failed while waiting for the unsubscribe event 
to complete");
@@ -1567,7 +1566,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             // Validate positions using the partition leader end offsets, to 
detect if any partition
             // has been truncated due to a leader change. This will trigger an 
OffsetForLeaderEpoch
             // request, retrieve the partition end offsets, and validate the 
current position against it.
-            applicationEventHandler.addAndGet(new 
ValidatePositionsApplicationEvent(), timer);
+            applicationEventHandler.addAndGet(new ValidatePositionsEvent(), 
timer);
 
             cachedSubscriptionHasAllFetchPositions = 
subscriptions.hasAllFetchPositions();
             if (cachedSubscriptionHasAllFetchPositions) return true;
@@ -1590,7 +1589,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             // which are awaiting reset. This will trigger a ListOffset 
request, retrieve the
             // partition offsets according to the strategy (ex. earliest, 
latest), and update the
             // positions.
-            applicationEventHandler.addAndGet(new 
ResetPositionsApplicationEvent(), timer);
+            applicationEventHandler.addAndGet(new ResetPositionsEvent(), 
timer);
             return true;
         } catch (TimeoutException e) {
             return false;
@@ -1620,8 +1619,8 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
         log.debug("Refreshing committed offsets for partitions {}", 
initializingPartitions);
         try {
-            final FetchCommittedOffsetsApplicationEvent event =
-                new FetchCommittedOffsetsApplicationEvent(
+            final FetchCommittedOffsetsEvent event =
+                new FetchCommittedOffsetsEvent(
                     initializingPartitions,
                     timer.remainingMs());
             final Map<TopicPartition, OffsetAndMetadata> offsets = 
applicationEventHandler.addAndGet(event, timer);
@@ -1770,7 +1769,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
                 // Trigger subscribe event to effectively join the group if 
not already part of it,
                 // or just send the new subscription to the broker.
-                applicationEventHandler.add(new 
SubscriptionChangeApplicationEvent());
+                applicationEventHandler.add(new SubscriptionChangeEvent());
             }
         } finally {
             release();
@@ -1797,7 +1796,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
      * <p/>
      *
      * As an example, take {@link #unsubscribe()}. To start unsubscribing, the 
application thread enqueues an
-     * {@link UnsubscribeApplicationEvent} on the application event queue. 
That event will eventually trigger the
+     * {@link UnsubscribeEvent} on the application event queue. That event 
will eventually trigger the
      * rebalancing logic in the background thread. Critically, as part of this 
rebalancing work, the
      * {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection)} 
callback needs to be invoked. However,
      * this callback must be executed on the application thread. To achieve 
this, the background thread enqueues a
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
index d6a72812a52..a6cc28fb0f4 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
@@ -17,7 +17,7 @@
 package org.apache.kafka.clients.consumer.internals;
 
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
-import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
@@ -175,12 +175,12 @@ public class CoordinatorRequestManager implements 
RequestManager {
         if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) {
             log.debug("FindCoordinator request failed due to authorization 
error {}", exception.getMessage());
             KafkaException groupAuthorizationException = 
GroupAuthorizationException.forGroupId(this.groupId);
-            backgroundEventHandler.add(new 
ErrorBackgroundEvent(groupAuthorizationException));
+            backgroundEventHandler.add(new 
ErrorEvent(groupAuthorizationException));
             return;
         }
 
         log.warn("FindCoordinator request failed due to fatal exception", 
exception);
-        backgroundEventHandler.add(new ErrorBackgroundEvent(exception));
+        backgroundEventHandler.add(new ErrorEvent(exception));
     }
 
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
index d551dbe2508..826774a6a64 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
@@ -21,7 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
-import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.GroupMetadataUpdateEvent;
 import 
org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
 import org.apache.kafka.common.Uuid;
@@ -446,7 +446,7 @@ public class HeartbeatRequestManager implements 
RequestManager {
     }
 
     private void handleFatalFailure(Throwable error) {
-        backgroundEventHandler.add(new ErrorBackgroundEvent(error));
+        backgroundEventHandler.add(new ErrorEvent(error));
         membershipManager.transitionToFatal();
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
index 6f3947eea46..e74c7e30a2c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
@@ -26,7 +26,7 @@ import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler
 import 
org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
-import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import 
org.apache.kafka.clients.consumer.internals.metrics.RebalanceMetricsManager;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicIdPartition;
@@ -255,7 +255,7 @@ public class MembershipManagerImpl implements 
MembershipManager {
     /**
      * Serves as the conduit by which we can report events to the application 
thread. This is needed as we send
      * {@link ConsumerRebalanceListenerCallbackNeededEvent callbacks} and, if 
needed,
-     * {@link ErrorBackgroundEvent errors} to the application thread.
+     * {@link ErrorEvent errors} to the application thread.
      */
     private final BackgroundEventHandler backgroundEventHandler;
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
index 34f4b30c44d..c5156e9e0b9 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
@@ -25,7 +25,7 @@ import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import 
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetData;
 import 
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetResult;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
-import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import org.apache.kafka.common.ClusterResource;
 import org.apache.kafka.common.ClusterResourceListener;
 import org.apache.kafka.common.IsolationLevel;
@@ -199,7 +199,7 @@ public class OffsetsRequestManager implements 
RequestManager, ClusterResourceLis
         try {
             offsetResetTimestamps = 
offsetFetcherUtils.getOffsetResetTimestamp();
         } catch (Exception e) {
-            backgroundEventHandler.add(new ErrorBackgroundEvent(e));
+            backgroundEventHandler.add(new ErrorEvent(e));
             return CompletableFuture.completedFuture(null);
         }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NewTopicsMetadataUpdateRequestEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java
similarity index 61%
copy from 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NewTopicsMetadataUpdateRequestEvent.java
copy to 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java
index c06a3a717dc..31c21817d85 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NewTopicsMetadataUpdateRequestEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java
@@ -16,16 +16,26 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
-public class NewTopicsMetadataUpdateRequestEvent extends ApplicationEvent {
+import org.apache.kafka.common.PartitionInfo;
 
-    public NewTopicsMetadataUpdateRequestEvent() {
-        super(Type.NEW_TOPICS_METADATA_UPDATE);
+import java.util.List;
+import java.util.Map;
+
+public abstract class AbstractTopicMetadataEvent extends 
CompletableApplicationEvent<Map<String, List<PartitionInfo>>> {
+
+    private final long timeoutMs;
+
+    protected AbstractTopicMetadataEvent(final Type type, final long 
timeoutMs) {
+        super(type);
+        this.timeoutMs = timeoutMs;
+    }
+
+    public long timeoutMs() {
+        return timeoutMs;
     }
 
     @Override
-    public String toString() {
-        return "NewTopicsMetadataUpdateRequestEvent{" +
-                toStringBase() +
-                '}';
+    public String toStringBase() {
+        return super.toStringBase() + ", timeoutMs=" + timeoutMs;
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AllTopicsMetadataEvent.java
similarity index 82%
copy from 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEvent.java
copy to 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AllTopicsMetadataEvent.java
index 8fdcc20fa83..154703aaee1 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AllTopicsMetadataEvent.java
@@ -16,10 +16,9 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
-import java.util.concurrent.CompletableFuture;
-
-public interface CompletableEvent<T> {
-
-    CompletableFuture<T> future();
+public class AllTopicsMetadataEvent extends AbstractTopicMetadataEvent {
 
+    public AllTopicsMetadataEvent(final long timeoutMs) {
+        super(Type.ALL_TOPICS_METADATA, timeoutMs);
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
index ac7ccc56c55..2897117da8b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
@@ -16,52 +16,65 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
+import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
+import org.apache.kafka.common.Uuid;
+
 import java.util.Objects;
 
 /**
- * This is the abstract definition of the events created by the KafkaConsumer 
API
+ * This is the abstract definition of the events created by the {@link 
AsyncKafkaConsumer} on the user's
+ * application thread.
  */
 public abstract class ApplicationEvent {
 
     public enum Type {
         COMMIT_ASYNC, COMMIT_SYNC, POLL, FETCH_COMMITTED_OFFSETS, 
NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE,
-        LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, 
SUBSCRIPTION_CHANGE,
+        LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, 
ALL_TOPICS_METADATA, SUBSCRIPTION_CHANGE,
         UNSUBSCRIBE, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED,
         COMMIT_ON_CLOSE, LEAVE_ON_CLOSE
     }
 
     private final Type type;
 
+    /**
+     * This identifies a particular event. It is used to disambiguate events 
via {@link #hashCode()} and
+     * {@link #equals(Object)} and can be used in log messages when debugging.
+     */
+    private final Uuid id;
+
     protected ApplicationEvent(Type type) {
         this.type = Objects.requireNonNull(type);
+        this.id = Uuid.randomUuid();
     }
 
     public Type type() {
         return type;
     }
 
+    public Uuid id() {
+        return id;
+    }
+
     @Override
-    public boolean equals(Object o) {
+    public final boolean equals(Object o) {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
-
         ApplicationEvent that = (ApplicationEvent) o;
-
-        return type == that.type;
+        return type == that.type && id.equals(that.id);
     }
 
     @Override
-    public int hashCode() {
-        return type.hashCode();
+    public final int hashCode() {
+        return Objects.hash(type, id);
     }
 
     protected String toStringBase() {
-        return "type=" + type;
+        return "type=" + type + ", id=" + id;
     }
 
     @Override
-    public String toString() {
-        return "ApplicationEvent{" +
+    public final String toString() {
+        return getClass().getSimpleName() + "{" +
                 toStringBase() +
                 '}';
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java
index 7535edf5970..eac1cc3d62f 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.clients.consumer.internals.events;
 
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
+import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
 import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
 import org.apache.kafka.clients.consumer.internals.RequestManagers;
 import org.apache.kafka.common.internals.IdempotentCloser;
@@ -99,10 +100,9 @@ public class ApplicationEventHandler implements Closeable {
      *
      * <p/>
      *
-     * See {@link CompletableApplicationEvent#get(Timer)} and {@link 
Future#get(long, TimeUnit)} for more details.
+     * See {@link ConsumerUtils#getResult(Future, Timer)} and {@link 
Future#get(long, TimeUnit)} for more details.
      *
      * @param event A {@link CompletableApplicationEvent} created by the 
polling thread
-     * @param timer Timer for which to wait for the event to complete
      * @return      Value that is the result of the event
      * @param <T>   Type of return value of the event
      */
@@ -110,7 +110,7 @@ public class ApplicationEventHandler implements Closeable {
         Objects.requireNonNull(event, "CompletableApplicationEvent provided to 
addAndGet must be non-null");
         Objects.requireNonNull(timer, "Timer provided to addAndGet must be 
non-null");
         add(event);
-        return event.get(timer);
+        return ConsumerUtils.getResult(event.future(), timer);
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 9e48b4de6da..c86aa8815f2 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.clients.consumer.internals.CachedSupplier;
 import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
@@ -34,6 +35,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
 import java.util.function.Supplier;
 
 /**
@@ -65,23 +67,24 @@ public class ApplicationEventProcessor extends 
EventProcessor<ApplicationEvent>
         return process((event, error) -> error.ifPresent(e -> log.warn("Error 
processing event {}", e.getMessage(), e)));
     }
 
+    @SuppressWarnings({"CyclomaticComplexity"})
     @Override
     public void process(ApplicationEvent event) {
         switch (event.type()) {
             case COMMIT_ASYNC:
-                process((AsyncCommitApplicationEvent) event);
+                process((AsyncCommitEvent) event);
                 return;
 
             case COMMIT_SYNC:
-                process((SyncCommitApplicationEvent) event);
+                process((SyncCommitEvent) event);
                 return;
 
             case POLL:
-                process((PollApplicationEvent) event);
+                process((PollEvent) event);
                 return;
 
             case FETCH_COMMITTED_OFFSETS:
-                process((FetchCommittedOffsetsApplicationEvent) event);
+                process((FetchCommittedOffsetsEvent) event);
                 return;
 
             case NEW_TOPICS_METADATA_UPDATE:
@@ -89,31 +92,35 @@ public class ApplicationEventProcessor extends 
EventProcessor<ApplicationEvent>
                 return;
 
             case ASSIGNMENT_CHANGE:
-                process((AssignmentChangeApplicationEvent) event);
+                process((AssignmentChangeEvent) event);
                 return;
 
             case TOPIC_METADATA:
-                process((TopicMetadataApplicationEvent) event);
+                process((TopicMetadataEvent) event);
+                return;
+
+            case ALL_TOPICS_METADATA:
+                process((AllTopicsMetadataEvent) event);
                 return;
 
             case LIST_OFFSETS:
-                process((ListOffsetsApplicationEvent) event);
+                process((ListOffsetsEvent) event);
                 return;
 
             case RESET_POSITIONS:
-                process((ResetPositionsApplicationEvent) event);
+                process((ResetPositionsEvent) event);
                 return;
 
             case VALIDATE_POSITIONS:
-                process((ValidatePositionsApplicationEvent) event);
+                process((ValidatePositionsEvent) event);
                 return;
 
             case SUBSCRIPTION_CHANGE:
-                process((SubscriptionChangeApplicationEvent) event);
+                process((SubscriptionChangeEvent) event);
                 return;
 
             case UNSUBSCRIBE:
-                process((UnsubscribeApplicationEvent) event);
+                process((UnsubscribeEvent) event);
                 return;
 
             case CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED:
@@ -121,11 +128,11 @@ public class ApplicationEventProcessor extends 
EventProcessor<ApplicationEvent>
                 return;
 
             case COMMIT_ON_CLOSE:
-                process((CommitOnCloseApplicationEvent) event);
+                process((CommitOnCloseEvent) event);
                 return;
 
             case LEAVE_ON_CLOSE:
-                process((LeaveOnCloseApplicationEvent) event);
+                process((LeaveOnCloseEvent) event);
                 return;
 
             default:
@@ -133,7 +140,7 @@ public class ApplicationEventProcessor extends 
EventProcessor<ApplicationEvent>
         }
     }
 
-    private void process(final PollApplicationEvent event) {
+    private void process(final PollEvent event) {
         if (!requestManagers.commitRequestManager.isPresent()) {
             return;
         }
@@ -142,20 +149,28 @@ public class ApplicationEventProcessor extends 
EventProcessor<ApplicationEvent>
         requestManagers.heartbeatRequestManager.ifPresent(hrm -> 
hrm.resetPollTimer(event.pollTimeMs()));
     }
 
-    private void process(final AsyncCommitApplicationEvent event) {
+    private void process(final AsyncCommitEvent event) {
+        if (!requestManagers.commitRequestManager.isPresent()) {
+            return;
+        }
+
         CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
-        CompletableFuture<Void> commitResult = 
manager.commitAsync(event.offsets());
-        event.chain(commitResult);
+        CompletableFuture<Void> future = manager.commitAsync(event.offsets());
+        future.whenComplete(complete(event.future()));
     }
 
-    private void process(final SyncCommitApplicationEvent event) {
+    private void process(final SyncCommitEvent event) {
+        if (!requestManagers.commitRequestManager.isPresent()) {
+            return;
+        }
+
         CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
         long expirationTimeoutMs = 
getExpirationTimeForTimeout(event.retryTimeoutMs());
-        CompletableFuture<Void> commitResult = 
manager.commitSync(event.offsets(), expirationTimeoutMs);
-        event.chain(commitResult);
+        CompletableFuture<Void> future = manager.commitSync(event.offsets(), 
expirationTimeoutMs);
+        future.whenComplete(complete(event.future()));
     }
 
-    private void process(final FetchCommittedOffsetsApplicationEvent event) {
+    private void process(final FetchCommittedOffsetsEvent event) {
         if (!requestManagers.commitRequestManager.isPresent()) {
             event.future().completeExceptionally(new KafkaException("Unable to 
fetch committed " +
                     "offset because the CommittedRequestManager is not 
available. Check if group.id was set correctly"));
@@ -163,19 +178,19 @@ public class ApplicationEventProcessor extends 
EventProcessor<ApplicationEvent>
         }
         CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
         long expirationTimeMs = getExpirationTimeForTimeout(event.timeout());
-        event.chain(manager.fetchOffsets(event.partitions(), 
expirationTimeMs));
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
manager.fetchOffsets(event.partitions(), expirationTimeMs);
+        future.whenComplete(complete(event.future()));
     }
 
     private void process(final NewTopicsMetadataUpdateRequestEvent ignored) {
         metadata.requestUpdateForNewTopics();
     }
 
-
     /**
      * Commit all consumed if auto-commit is enabled. Note this will trigger 
an async commit,
      * that will not be retried if the commit request fails.
      */
-    private void process(final AssignmentChangeApplicationEvent event) {
+    private void process(final AssignmentChangeEvent event) {
         if (!requestManagers.commitRequestManager.isPresent()) {
             return;
         }
@@ -184,11 +199,11 @@ public class ApplicationEventProcessor extends 
EventProcessor<ApplicationEvent>
         manager.maybeAutoCommitAsync();
     }
 
-    private void process(final ListOffsetsApplicationEvent event) {
+    private void process(final ListOffsetsEvent event) {
         final CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> 
future =
                 
requestManagers.offsetsRequestManager.fetchOffsets(event.timestampsToSearch(),
                         event.requireTimestamps());
-        event.chain(future);
+        future.whenComplete(complete(event.future()));
     }
 
     /**
@@ -196,7 +211,7 @@ public class ApplicationEventProcessor extends 
EventProcessor<ApplicationEvent>
      * consumer join the group if it is not part of it yet, or send the 
updated subscription if
      * it is already a member.
      */
-    private void process(final SubscriptionChangeApplicationEvent ignored) {
+    private void process(final SubscriptionChangeEvent ignored) {
         if (!requestManagers.heartbeatRequestManager.isPresent()) {
             log.warn("Group membership manager not present when processing a 
subscribe event");
             return;
@@ -213,38 +228,39 @@ public class ApplicationEventProcessor extends 
EventProcessor<ApplicationEvent>
      *              execution for releasing the assignment completes, and the 
request to leave
      *              the group is sent out.
      */
-    private void process(final UnsubscribeApplicationEvent event) {
+    private void process(final UnsubscribeEvent event) {
         if (!requestManagers.heartbeatRequestManager.isPresent()) {
             KafkaException error = new KafkaException("Group membership 
manager not present when processing an unsubscribe event");
             event.future().completeExceptionally(error);
             return;
         }
         MembershipManager membershipManager = 
requestManagers.heartbeatRequestManager.get().membershipManager();
-        CompletableFuture<Void> result = membershipManager.leaveGroup();
-        event.chain(result);
+        CompletableFuture<Void> future = membershipManager.leaveGroup();
+        future.whenComplete(complete(event.future()));
     }
 
-    private void process(final ResetPositionsApplicationEvent event) {
-        CompletableFuture<Void> result = 
requestManagers.offsetsRequestManager.resetPositionsIfNeeded();
-        event.chain(result);
+    private void process(final ResetPositionsEvent event) {
+        CompletableFuture<Void> future = 
requestManagers.offsetsRequestManager.resetPositionsIfNeeded();
+        future.whenComplete(complete(event.future()));
     }
 
-    private void process(final ValidatePositionsApplicationEvent event) {
-        CompletableFuture<Void> result = 
requestManagers.offsetsRequestManager.validatePositionsIfNeeded();
-        event.chain(result);
+    private void process(final ValidatePositionsEvent event) {
+        CompletableFuture<Void> future = 
requestManagers.offsetsRequestManager.validatePositionsIfNeeded();
+        future.whenComplete(complete(event.future()));
     }
 
-    private void process(final TopicMetadataApplicationEvent event) {
-        final CompletableFuture<Map<String, List<PartitionInfo>>> future;
-
-        long expirationTimeMs = 
getExpirationTimeForTimeout(event.getTimeoutMs());
-        if (event.isAllTopics()) {
-            future = 
requestManagers.topicMetadataRequestManager.requestAllTopicsMetadata(expirationTimeMs);
-        } else {
-            future = 
requestManagers.topicMetadataRequestManager.requestTopicMetadata(event.topic(), 
expirationTimeMs);
-        }
+    private void process(final TopicMetadataEvent event) {
+        final long expirationTimeMs = 
getExpirationTimeForTimeout(event.timeoutMs());
+        final CompletableFuture<Map<String, List<PartitionInfo>>> future =
+                
requestManagers.topicMetadataRequestManager.requestTopicMetadata(event.topic(), 
expirationTimeMs);
+        future.whenComplete(complete(event.future()));
+    }
 
-        event.chain(future);
+    private void process(final AllTopicsMetadataEvent event) {
+        final long expirationTimeMs = 
getExpirationTimeForTimeout(event.timeoutMs());
+        final CompletableFuture<Map<String, List<PartitionInfo>>> future =
+                
requestManagers.topicMetadataRequestManager.requestAllTopicsMetadata(expirationTimeMs);
+        future.whenComplete(complete(event.future()));
     }
 
     private void process(final ConsumerRebalanceListenerCallbackCompletedEvent 
event) {
@@ -259,14 +275,14 @@ public class ApplicationEventProcessor extends 
EventProcessor<ApplicationEvent>
         manager.consumerRebalanceListenerCallbackCompleted(event);
     }
 
-    private void process(final CommitOnCloseApplicationEvent event) {
+    private void process(final CommitOnCloseEvent event) {
         if (!requestManagers.commitRequestManager.isPresent())
             return;
         log.debug("Signal CommitRequestManager closing");
         requestManagers.commitRequestManager.get().signalClose();
     }
 
-    private void process(final LeaveOnCloseApplicationEvent event) {
+    private void process(final LeaveOnCloseEvent event) {
         if (!requestManagers.heartbeatRequestManager.isPresent()) {
             event.future().complete(null);
             return;
@@ -277,7 +293,7 @@ public class ApplicationEventProcessor extends 
EventProcessor<ApplicationEvent>
         log.debug("Leaving group before closing");
         CompletableFuture<Void> future = membershipManager.leaveGroup();
         // The future will be completed on heartbeat sent
-        event.chain(future);
+        future.whenComplete(complete(event.future()));
     }
 
     /**
@@ -293,6 +309,15 @@ public class ApplicationEventProcessor extends 
EventProcessor<ApplicationEvent>
         return expiration;
     }
 
+    private <T> BiConsumer<? super T, ? super Throwable> complete(final 
CompletableFuture<T> b) {
+        return (value, exception) -> {
+            if (exception != null)
+                b.completeExceptionally(exception);
+            else
+                b.complete(value);
+        };
+    }
+
     /**
      * Creates a {@link Supplier} for deferred creation during invocation by
      * {@link ConsumerNetworkThread}.
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeEvent.java
similarity index 56%
rename from 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeApplicationEvent.java
rename to 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeEvent.java
index ccf7199f260..c9efa2e9dff 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeEvent.java
@@ -22,13 +22,12 @@ import org.apache.kafka.common.TopicPartition;
 import java.util.Collections;
 import java.util.Map;
 
-public class AssignmentChangeApplicationEvent extends ApplicationEvent {
+public class AssignmentChangeEvent extends ApplicationEvent {
 
     private final Map<TopicPartition, OffsetAndMetadata> offsets;
     private final long currentTimeMs;
 
-    public AssignmentChangeApplicationEvent(final Map<TopicPartition, 
OffsetAndMetadata> offsets,
-                                            final long currentTimeMs) {
+    public AssignmentChangeEvent(final Map<TopicPartition, OffsetAndMetadata> 
offsets, final long currentTimeMs) {
         super(Type.ASSIGNMENT_CHANGE);
         this.offsets = Collections.unmodifiableMap(offsets);
         this.currentTimeMs = currentTimeMs;
@@ -43,31 +42,7 @@ public class AssignmentChangeApplicationEvent extends 
ApplicationEvent {
     }
 
     @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        if (!super.equals(o)) return false;
-
-        AssignmentChangeApplicationEvent that = 
(AssignmentChangeApplicationEvent) o;
-
-        if (currentTimeMs != that.currentTimeMs) return false;
-        return offsets.equals(that.offsets);
-    }
-
-    @Override
-    public int hashCode() {
-        int result = super.hashCode();
-        result = 31 * result + offsets.hashCode();
-        result = 31 * result + (int) (currentTimeMs ^ (currentTimeMs >>> 32));
-        return result;
-    }
-
-    @Override
-    public String toString() {
-        return "AssignmentChangeApplicationEvent{" +
-                toStringBase() +
-                ", offsets=" + offsets +
-                ", currentTimeMs=" + currentTimeMs +
-                '}';
+    protected String toStringBase() {
+        return super.toStringBase() + ", offsets=" + offsets + ", 
currentTimeMs=" + currentTimeMs;
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncCommitApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncCommitEvent.java
similarity index 73%
rename from 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncCommitApplicationEvent.java
rename to 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncCommitEvent.java
index 7a939ce3cfd..2f03fdfb1e5 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncCommitApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncCommitEvent.java
@@ -18,22 +18,15 @@ package org.apache.kafka.clients.consumer.internals.events;
 
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
+
 import java.util.Map;
 
 /**
  * Event to commit offsets without waiting for a response, so the request 
won't be retried.
  */
-public class AsyncCommitApplicationEvent extends CommitApplicationEvent {
-
-    public AsyncCommitApplicationEvent(final Map<TopicPartition, 
OffsetAndMetadata> offsets) {
-        super(offsets, Type.COMMIT_ASYNC);
-    }
+public class AsyncCommitEvent extends CommitEvent {
 
-    @Override
-    public String toString() {
-        return "AsyncCommitApplicationEvent{" +
-            toStringBase() +
-            ", offsets=" + offsets() +
-            '}';
+    public AsyncCommitEvent(final Map<TopicPartition, OffsetAndMetadata> 
offsets) {
+        super(Type.COMMIT_ASYNC, offsets);
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
index e5d522201ef..9bc3fbebc30 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.clients.consumer.internals.events;
 
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
+import org.apache.kafka.common.Uuid;
 
 import java.util.Objects;
 
@@ -31,36 +32,45 @@ public abstract class BackgroundEvent {
 
     private final Type type;
 
-    public BackgroundEvent(Type type) {
+    /**
+     * This identifies a particular event. It is used to disambiguate events 
via {@link #hashCode()} and
+     * {@link #equals(Object)} and can be used in log messages when debugging.
+     */
+    private final Uuid id;
+
+    protected BackgroundEvent(Type type) {
         this.type = Objects.requireNonNull(type);
+        this.id = Uuid.randomUuid();
     }
 
     public Type type() {
         return type;
     }
 
+    public Uuid id() {
+        return id;
+    }
+
     @Override
-    public boolean equals(Object o) {
+    public final boolean equals(Object o) {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
-
         BackgroundEvent that = (BackgroundEvent) o;
-
-        return type == that.type;
+        return type == that.type && id.equals(that.id);
     }
 
     @Override
-    public int hashCode() {
-        return type.hashCode();
+    public final int hashCode() {
+        return Objects.hash(type, id);
     }
 
     protected String toStringBase() {
-        return "type=" + type;
+        return "type=" + type + ", id=" + id;
     }
 
     @Override
-    public String toString() {
-        return "BackgroundEvent{" +
+    public final String toString() {
+        return getClass().getSimpleName() + "{" +
                 toStringBase() +
                 '}';
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java
index 103493d2531..48421484f1d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java
@@ -26,7 +26,7 @@ import java.util.Queue;
 /**
  * An event handler that receives {@link BackgroundEvent background events} 
from the
  * {@link ConsumerNetworkThread network thread} which are then made available 
to the application thread
- * via the {@link BackgroundEventProcessor}.
+ * via an {@link EventProcessor}.
  */
 
 public class BackgroundEventHandler {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java
similarity index 71%
rename from 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java
rename to 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java
index 69d969d7b0f..253d27e2573 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java
@@ -22,14 +22,14 @@ import org.apache.kafka.common.TopicPartition;
 import java.util.Collections;
 import java.util.Map;
 
-public abstract class CommitApplicationEvent extends 
CompletableApplicationEvent<Void> {
+public abstract class CommitEvent extends CompletableApplicationEvent<Void> {
 
     /**
      * Offsets to commit per partition.
      */
     private final Map<TopicPartition, OffsetAndMetadata> offsets;
 
-    public CommitApplicationEvent(final Map<TopicPartition, OffsetAndMetadata> 
offsets, Type type) {
+    protected CommitEvent(final Type type, final Map<TopicPartition, 
OffsetAndMetadata> offsets) {
         super(type);
         this.offsets = Collections.unmodifiableMap(offsets);
 
@@ -45,20 +45,7 @@ public abstract class CommitApplicationEvent extends 
CompletableApplicationEvent
     }
 
     @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        if (!super.equals(o)) return false;
-
-        CommitApplicationEvent that = (CommitApplicationEvent) o;
-
-        return offsets.equals(that.offsets);
-    }
-
-    @Override
-    public int hashCode() {
-        int result = super.hashCode();
-        result = 31 * result + offsets.hashCode();
-        return result;
+    protected String toStringBase() {
+        return super.toStringBase() + ", offsets=" + offsets;
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitOnCloseApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitOnCloseEvent.java
similarity index 76%
copy from 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitOnCloseApplicationEvent.java
copy to 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitOnCloseEvent.java
index 4cc07e945f9..7d2e29fcedb 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitOnCloseApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitOnCloseEvent.java
@@ -16,16 +16,9 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
-public class CommitOnCloseApplicationEvent extends ApplicationEvent {
+public class CommitOnCloseEvent extends ApplicationEvent {
 
-    public CommitOnCloseApplicationEvent() {
+    public CommitOnCloseEvent() {
         super(Type.COMMIT_ON_CLOSE);
     }
-
-    @Override
-    public String toString() {
-        return "CommitOnCloseApplicationEvent{" +
-                toStringBase() +
-                '}';
-    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java
index 365c620e0c0..a62c3aaa4c4 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java
@@ -16,9 +16,6 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
-import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
-import org.apache.kafka.common.utils.Timer;
-
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -31,56 +28,18 @@ public abstract class CompletableApplicationEvent<T> 
extends ApplicationEvent im
 
     private final CompletableFuture<T> future;
 
-    protected CompletableApplicationEvent(Type type) {
+    protected CompletableApplicationEvent(final Type type) {
         super(type);
         this.future = new CompletableFuture<>();
     }
 
+    @Override
     public CompletableFuture<T> future() {
         return future;
     }
 
-    public T get(Timer timer) {
-        return ConsumerUtils.getResult(future, timer);
-    }
-
-    public void chain(final CompletableFuture<T> providedFuture) {
-        providedFuture.whenComplete((value, exception) -> {
-            if (exception != null) {
-                this.future.completeExceptionally(exception);
-            } else {
-                this.future.complete(value);
-            }
-        });
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        if (!super.equals(o)) return false;
-
-        CompletableApplicationEvent<?> that = (CompletableApplicationEvent<?>) 
o;
-
-        return future.equals(that.future);
-    }
-
-    @Override
-    public int hashCode() {
-        int result = super.hashCode();
-        result = 31 * result + future.hashCode();
-        return result;
-    }
-
     @Override
     protected String toStringBase() {
         return super.toStringBase() + ", future=" + future;
     }
-
-    @Override
-    public String toString() {
-        return getClass().getSimpleName() + "{" +
-                toStringBase() +
-                '}';
-    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableBackgroundEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableBackgroundEvent.java
index 640ee6103af..1a58515a5cb 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableBackgroundEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableBackgroundEvent.java
@@ -28,42 +28,18 @@ public abstract class CompletableBackgroundEvent<T> extends 
BackgroundEvent impl
 
     private final CompletableFuture<T> future;
 
-    protected CompletableBackgroundEvent(Type type) {
+    protected CompletableBackgroundEvent(final Type type) {
         super(type);
         this.future = new CompletableFuture<>();
     }
 
+    @Override
     public CompletableFuture<T> future() {
         return future;
     }
 
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        if (!super.equals(o)) return false;
-
-        CompletableBackgroundEvent<?> that = (CompletableBackgroundEvent<?>) o;
-
-        return future.equals(that.future);
-    }
-
-    @Override
-    public int hashCode() {
-        int result = super.hashCode();
-        result = 31 * result + future.hashCode();
-        return result;
-    }
-
     @Override
     protected String toStringBase() {
         return super.toStringBase() + ", future=" + future;
     }
-
-    @Override
-    public String toString() {
-        return getClass().getSimpleName() + "{" +
-                toStringBase() +
-                '}';
-    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEvent.java
index 8fdcc20fa83..97559d8cb9b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEvent.java
@@ -21,5 +21,4 @@ import java.util.concurrent.CompletableFuture;
 public interface CompletableEvent<T> {
 
     CompletableFuture<T> future();
-
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRebalanceListenerCallbackCompletedEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRebalanceListenerCallbackCompletedEvent.java
index b260c6154ea..a10e98df1d0 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRebalanceListenerCallbackCompletedEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRebalanceListenerCallbackCompletedEvent.java
@@ -34,9 +34,9 @@ public class ConsumerRebalanceListenerCallbackCompletedEvent 
extends Application
     private final CompletableFuture<Void> future;
     private final Optional<KafkaException> error;
 
-    public 
ConsumerRebalanceListenerCallbackCompletedEvent(ConsumerRebalanceListenerMethodName
 methodName,
-                                                           
CompletableFuture<Void> future,
-                                                           
Optional<KafkaException> error) {
+    public ConsumerRebalanceListenerCallbackCompletedEvent(final 
ConsumerRebalanceListenerMethodName methodName,
+                                                           final 
CompletableFuture<Void> future,
+                                                           final 
Optional<KafkaException> error) {
         super(Type.CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED);
         this.methodName = Objects.requireNonNull(methodName);
         this.future = Objects.requireNonNull(future);
@@ -55,24 +55,6 @@ public class ConsumerRebalanceListenerCallbackCompletedEvent 
extends Application
         return error;
     }
 
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        if (!super.equals(o)) return false;
-
-        ConsumerRebalanceListenerCallbackCompletedEvent that = 
(ConsumerRebalanceListenerCallbackCompletedEvent) o;
-
-        return methodName == that.methodName &&
-                future.equals(that.future) &&
-                error.equals(that.error);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(methodName, future, error);
-    }
-
     @Override
     protected String toStringBase() {
         return super.toStringBase() +
@@ -80,11 +62,4 @@ public class ConsumerRebalanceListenerCallbackCompletedEvent 
extends Application
                 ", future=" + future +
                 ", error=" + error;
     }
-
-    @Override
-    public String toString() {
-        return getClass().getSimpleName() + "{" +
-                toStringBase() +
-                '}';
-    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRebalanceListenerCallbackNeededEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRebalanceListenerCallbackNeededEvent.java
index 7b17c034abd..6ce833580c8 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRebalanceListenerCallbackNeededEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRebalanceListenerCallbackNeededEvent.java
@@ -37,8 +37,8 @@ public class ConsumerRebalanceListenerCallbackNeededEvent 
extends CompletableBac
     private final ConsumerRebalanceListenerMethodName methodName;
     private final SortedSet<TopicPartition> partitions;
 
-    public 
ConsumerRebalanceListenerCallbackNeededEvent(ConsumerRebalanceListenerMethodName
 methodName,
-                                                        
SortedSet<TopicPartition> partitions) {
+    public ConsumerRebalanceListenerCallbackNeededEvent(final 
ConsumerRebalanceListenerMethodName methodName,
+                                                        final 
SortedSet<TopicPartition> partitions) {
         super(Type.CONSUMER_REBALANCE_LISTENER_CALLBACK_NEEDED);
         this.methodName = Objects.requireNonNull(methodName);
         this.partitions = Collections.unmodifiableSortedSet(partitions);
@@ -52,36 +52,10 @@ public class ConsumerRebalanceListenerCallbackNeededEvent 
extends CompletableBac
         return partitions;
     }
 
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        if (!super.equals(o)) return false;
-
-        ConsumerRebalanceListenerCallbackNeededEvent that = 
(ConsumerRebalanceListenerCallbackNeededEvent) o;
-
-        return methodName == that.methodName && 
partitions.equals(that.partitions);
-    }
-
-    @Override
-    public int hashCode() {
-        int result = super.hashCode();
-        result = 31 * result + methodName.hashCode();
-        result = 31 * result + partitions.hashCode();
-        return result;
-    }
-
     @Override
     protected String toStringBase() {
         return super.toStringBase() +
                 ", methodName=" + methodName +
                 ", partitions=" + partitions;
     }
-
-    @Override
-    public String toString() {
-        return getClass().getSimpleName() + "{" +
-                toStringBase() +
-                '}';
-    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ErrorBackgroundEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ErrorBackgroundEvent.java
deleted file mode 100644
index 2945f22986b..00000000000
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ErrorBackgroundEvent.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.consumer.internals.events;
-
-import org.apache.kafka.common.KafkaException;
-
-public class ErrorBackgroundEvent extends BackgroundEvent {
-
-    private final RuntimeException error;
-
-    public ErrorBackgroundEvent(Throwable t) {
-        super(Type.ERROR);
-        this.error = t instanceof RuntimeException ? (RuntimeException) t : 
new KafkaException(t);
-    }
-
-    public RuntimeException error() {
-        return error;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        if (!super.equals(o)) return false;
-
-        ErrorBackgroundEvent that = (ErrorBackgroundEvent) o;
-
-        return error.equals(that.error);
-    }
-
-    @Override
-    public int hashCode() {
-        int result = super.hashCode();
-        result = 31 * result + error.hashCode();
-        return result;
-    }
-
-    @Override
-    public String toString() {
-        return "ErrorBackgroundEvent{" +
-            toStringBase() +
-            ", error=" + error +
-            '}';
-    }
-}
\ No newline at end of file
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitOnCloseApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ErrorEvent.java
similarity index 65%
copy from 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitOnCloseApplicationEvent.java
copy to 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ErrorEvent.java
index 4cc07e945f9..5e6d8223823 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitOnCloseApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ErrorEvent.java
@@ -16,16 +16,23 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
-public class CommitOnCloseApplicationEvent extends ApplicationEvent {
+import org.apache.kafka.common.KafkaException;
 
-    public CommitOnCloseApplicationEvent() {
-        super(Type.COMMIT_ON_CLOSE);
+public class ErrorEvent extends BackgroundEvent {
+
+    private final RuntimeException error;
+
+    public ErrorEvent(Throwable t) {
+        super(Type.ERROR);
+        this.error = t instanceof RuntimeException ? (RuntimeException) t : 
new KafkaException(t);
+    }
+
+    public RuntimeException error() {
+        return error;
     }
 
     @Override
-    public String toString() {
-        return "CommitOnCloseApplicationEvent{" +
-                toStringBase() +
-                '}';
+    public String toStringBase() {
+        return super.toStringBase() + ", error=" + error;
     }
-}
+}
\ No newline at end of file
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/FetchCommittedOffsetsApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/FetchCommittedOffsetsEvent.java
similarity index 60%
rename from 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/FetchCommittedOffsetsApplicationEvent.java
rename to 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/FetchCommittedOffsetsEvent.java
index 34b2d97705c..7cf56b990b0 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/FetchCommittedOffsetsApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/FetchCommittedOffsetsEvent.java
@@ -23,7 +23,7 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 
-public class FetchCommittedOffsetsApplicationEvent extends 
CompletableApplicationEvent<Map<TopicPartition, OffsetAndMetadata>> {
+public class FetchCommittedOffsetsEvent extends 
CompletableApplicationEvent<Map<TopicPartition, OffsetAndMetadata>> {
 
     /**
      * Partitions to retrieve committed offsets for.
@@ -35,8 +35,7 @@ public class FetchCommittedOffsetsApplicationEvent extends 
CompletableApplicatio
      */
     private final long timeoutMs;
 
-    public FetchCommittedOffsetsApplicationEvent(final Set<TopicPartition> 
partitions,
-                                                 final long timeoutMs) {
+    public FetchCommittedOffsetsEvent(final Set<TopicPartition> partitions, 
final long timeoutMs) {
         super(Type.FETCH_COMMITTED_OFFSETS);
         this.partitions = Collections.unmodifiableSet(partitions);
         this.timeoutMs = timeoutMs;
@@ -51,29 +50,7 @@ public class FetchCommittedOffsetsApplicationEvent extends 
CompletableApplicatio
     }
 
     @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        if (!super.equals(o)) return false;
-
-        FetchCommittedOffsetsApplicationEvent that = 
(FetchCommittedOffsetsApplicationEvent) o;
-
-        return partitions.equals(that.partitions);
-    }
-
-    @Override
-    public int hashCode() {
-        int result = super.hashCode();
-        result = 31 * result + partitions.hashCode();
-        return result;
-    }
-
-    @Override
-    public String toString() {
-        return getClass().getSimpleName() + "{" +
-                toStringBase() +
-                ", partitions=" + partitions +
-                ", timeout=" + timeoutMs + "ms" +
-                '}';
+    public String toStringBase() {
+        return super.toStringBase() + ", partitions=" + partitions + ", 
partitions=" + partitions;
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/GroupMetadataUpdateEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/GroupMetadataUpdateEvent.java
index 120e6717242..001f5498183 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/GroupMetadataUpdateEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/GroupMetadataUpdateEvent.java
@@ -19,8 +19,6 @@ package org.apache.kafka.clients.consumer.internals.events;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
 
-import java.util.Objects;
-
 /**
  * This event is sent by the {@link ConsumerNetworkThread consumer's network 
thread} to the application thread
  * so that when the user calls the {@link Consumer#groupMetadata()} API, the 
information is up-to-date. The
@@ -29,11 +27,10 @@ import java.util.Objects;
  */
 public class GroupMetadataUpdateEvent extends BackgroundEvent {
 
-    final private int memberEpoch;
-    final private String memberId;
+    private final int memberEpoch;
+    private final String memberId;
 
-    public GroupMetadataUpdateEvent(final int memberEpoch,
-                                    final String memberId) {
+    public GroupMetadataUpdateEvent(final int memberEpoch, final String 
memberId) {
         super(Type.GROUP_METADATA_UPDATE);
         this.memberEpoch = memberEpoch;
         this.memberId = memberId;
@@ -47,33 +44,10 @@ public class GroupMetadataUpdateEvent extends 
BackgroundEvent {
         return memberId;
     }
 
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        if (!super.equals(o)) return false;
-        GroupMetadataUpdateEvent that = (GroupMetadataUpdateEvent) o;
-        return memberEpoch == that.memberEpoch &&
-            Objects.equals(memberId, that.memberId);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(super.hashCode(), memberEpoch, memberId);
-    }
-
     @Override
     public String toStringBase() {
         return super.toStringBase() +
             ", memberEpoch=" + memberEpoch +
             ", memberId='" + memberId + '\'';
     }
-
-    @Override
-    public String toString() {
-        return "GroupMetadataUpdateEvent{" +
-            toStringBase() +
-            '}';
-    }
-
 }
\ No newline at end of file
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseEvent.java
similarity index 76%
rename from 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseApplicationEvent.java
rename to 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseEvent.java
index ee0b6ffa61c..5ee19a7cc02 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseEvent.java
@@ -16,15 +16,9 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
-public class LeaveOnCloseApplicationEvent extends 
CompletableApplicationEvent<Void> {
-    public LeaveOnCloseApplicationEvent() {
-        super(Type.LEAVE_ON_CLOSE);
-    }
+public class LeaveOnCloseEvent extends CompletableApplicationEvent<Void> {
 
-    @Override
-    public String toString() {
-        return "LeaveOnCloseApplicationEvent{" +
-            toStringBase() +
-            '}';
+    public LeaveOnCloseEvent() {
+        super(Type.LEAVE_ON_CLOSE);
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java
similarity index 69%
rename from 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsApplicationEvent.java
rename to 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java
index 2466d062726..fd3b321173f 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java
@@ -31,12 +31,12 @@ import java.util.Map;
  * {@link OffsetAndTimestamp} found (offset of the first message whose 
timestamp is greater than
  * or equals to the target timestamp)
  */
-public class ListOffsetsApplicationEvent extends 
CompletableApplicationEvent<Map<TopicPartition, OffsetAndTimestamp>> {
+public class ListOffsetsEvent extends 
CompletableApplicationEvent<Map<TopicPartition, OffsetAndTimestamp>> {
 
     private final Map<TopicPartition, Long> timestampsToSearch;
     private final boolean requireTimestamps;
 
-    public ListOffsetsApplicationEvent(Map<TopicPartition, Long> 
timestampToSearch, boolean requireTimestamps) {
+    public ListOffsetsEvent(final Map<TopicPartition, Long> timestampToSearch, 
final boolean requireTimestamps) {
         super(Type.LIST_OFFSETS);
         this.timestampsToSearch = 
Collections.unmodifiableMap(timestampToSearch);
         this.requireTimestamps = requireTimestamps;
@@ -64,31 +64,10 @@ public class ListOffsetsApplicationEvent extends 
CompletableApplicationEvent<Map
     }
 
     @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        if (!super.equals(o)) return false;
-
-        ListOffsetsApplicationEvent that = (ListOffsetsApplicationEvent) o;
-
-        if (requireTimestamps != that.requireTimestamps) return false;
-        return timestampsToSearch.equals(that.timestampsToSearch);
-    }
-
-    @Override
-    public int hashCode() {
-        int result = super.hashCode();
-        result = 31 * result + timestampsToSearch.hashCode();
-        result = 31 * result + (requireTimestamps ? 1 : 0);
-        return result;
-    }
-
-    @Override
-    public String toString() {
-        return getClass().getSimpleName() + " {" +
-                toStringBase() +
-                ", timestampsToSearch=" + timestampsToSearch + ", " +
-                "requireTimestamps=" + requireTimestamps + '}';
+    public String toStringBase() {
+        return super.toStringBase() +
+                ", timestampsToSearch=" + timestampsToSearch +
+                ", requireTimestamps=" + requireTimestamps;
     }
 
 }
\ No newline at end of file
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NewTopicsMetadataUpdateRequestEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NewTopicsMetadataUpdateRequestEvent.java
index c06a3a717dc..b06bd456f5f 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NewTopicsMetadataUpdateRequestEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NewTopicsMetadataUpdateRequestEvent.java
@@ -21,11 +21,4 @@ public class NewTopicsMetadataUpdateRequestEvent extends 
ApplicationEvent {
     public NewTopicsMetadataUpdateRequestEvent() {
         super(Type.NEW_TOPICS_METADATA_UPDATE);
     }
-
-    @Override
-    public String toString() {
-        return "NewTopicsMetadataUpdateRequestEvent{" +
-                toStringBase() +
-                '}';
-    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollApplicationEvent.java
deleted file mode 100644
index b958f0ec417..00000000000
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollApplicationEvent.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.consumer.internals.events;
-
-public class PollApplicationEvent extends ApplicationEvent {
-
-    private final long pollTimeMs;
-
-    public PollApplicationEvent(final long pollTimeMs) {
-        super(Type.POLL);
-        this.pollTimeMs = pollTimeMs;
-    }
-
-    public long pollTimeMs() {
-        return pollTimeMs;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        if (!super.equals(o)) return false;
-
-        PollApplicationEvent that = (PollApplicationEvent) o;
-
-        return pollTimeMs == that.pollTimeMs;
-    }
-
-    @Override
-    public int hashCode() {
-        int result = super.hashCode();
-        result = 31 * result + (int) (pollTimeMs ^ (pollTimeMs >>> 32));
-        return result;
-    }
-
-    @Override
-    public String toString() {
-        return "PollApplicationEvent{" +
-                toStringBase() +
-                ", pollTimeMs=" + pollTimeMs +
-                '}';
-    }
-}
\ No newline at end of file
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NewTopicsMetadataUpdateRequestEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollEvent.java
similarity index 70%
copy from 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NewTopicsMetadataUpdateRequestEvent.java
copy to 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollEvent.java
index c06a3a717dc..96614c06e9b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NewTopicsMetadataUpdateRequestEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollEvent.java
@@ -16,16 +16,21 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
-public class NewTopicsMetadataUpdateRequestEvent extends ApplicationEvent {
+public class PollEvent extends ApplicationEvent {
 
-    public NewTopicsMetadataUpdateRequestEvent() {
-        super(Type.NEW_TOPICS_METADATA_UPDATE);
+    private final long pollTimeMs;
+
+    public PollEvent(final long pollTimeMs) {
+        super(Type.POLL);
+        this.pollTimeMs = pollTimeMs;
+    }
+
+    public long pollTimeMs() {
+        return pollTimeMs;
     }
 
     @Override
-    public String toString() {
-        return "NewTopicsMetadataUpdateRequestEvent{" +
-                toStringBase() +
-                '}';
+    public String toStringBase() {
+        return super.toStringBase() + ", pollTimeMs=" + pollTimeMs;
     }
-}
+}
\ No newline at end of file
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsEvent.java
similarity index 89%
rename from 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsApplicationEvent.java
rename to 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsEvent.java
index 5d9b07f9de0..06f6ebbb68a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsEvent.java
@@ -22,9 +22,9 @@ package org.apache.kafka.clients.consumer.internals.events;
  * asynchronous event that generates ListOffsets requests, and completes by 
updating in-memory
  * positions when responses are received.
  */
-public class ResetPositionsApplicationEvent extends 
CompletableApplicationEvent<Void> {
+public class ResetPositionsEvent extends CompletableApplicationEvent<Void> {
 
-    public ResetPositionsApplicationEvent() {
+    public ResetPositionsEvent() {
         super(Type.RESET_POSITIONS);
     }
 }
\ No newline at end of file
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SubscriptionChangeApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SubscriptionChangeEvent.java
similarity index 90%
rename from 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SubscriptionChangeApplicationEvent.java
rename to 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SubscriptionChangeEvent.java
index 73fd15fb144..ad5fd34c06f 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SubscriptionChangeApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SubscriptionChangeEvent.java
@@ -22,9 +22,9 @@ package org.apache.kafka.clients.consumer.internals.events;
  * calls the subscribe API. This will make the consumer join a consumer group 
if not part of it
  * yet, or just send the updated subscription to the broker if it's already a 
member of the group.
  */
-public class SubscriptionChangeApplicationEvent extends ApplicationEvent {
+public class SubscriptionChangeEvent extends ApplicationEvent {
 
-    public SubscriptionChangeApplicationEvent() {
+    public SubscriptionChangeEvent() {
         super(Type.SUBSCRIPTION_CHANGE);
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitEvent.java
similarity index 73%
rename from 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitApplicationEvent.java
rename to 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitEvent.java
index 43dfee6ab18..7e00e0da596 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitEvent.java
@@ -18,22 +18,23 @@ package org.apache.kafka.clients.consumer.internals.events;
 
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
+
 import java.util.Map;
 
 /**
  * Event to commit offsets waiting for a response and retrying on expected 
retriable errors until
  * the timer expires.
  */
-public class SyncCommitApplicationEvent extends CommitApplicationEvent {
+public class SyncCommitEvent extends CommitEvent {
 
     /**
      * Time to wait for a response, retrying on retriable errors.
      */
     private final long retryTimeoutMs;
 
-    public SyncCommitApplicationEvent(final Map<TopicPartition, 
OffsetAndMetadata> offsets,
-                                      final long retryTimeoutMs) {
-        super(offsets, Type.COMMIT_SYNC);
+    public SyncCommitEvent(final Map<TopicPartition, OffsetAndMetadata> 
offsets,
+                           final long retryTimeoutMs) {
+        super(Type.COMMIT_SYNC, offsets);
         this.retryTimeoutMs = retryTimeoutMs;
     }
 
@@ -42,11 +43,7 @@ public class SyncCommitApplicationEvent extends 
CommitApplicationEvent {
     }
 
     @Override
-    public String toString() {
-        return "SyncCommitApplicationEvent{" +
-            toStringBase() +
-            ", offsets=" + offsets() +
-            ", retryTimeout=" + retryTimeoutMs + "ms" +
-            '}';
+    public String toStringBase() {
+        return super.toStringBase() + ", offsets=" + offsets() + ", 
retryTimeoutMs=" + retryTimeoutMs;
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicMetadataApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicMetadataApplicationEvent.java
deleted file mode 100644
index dd6f842cc26..00000000000
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicMetadataApplicationEvent.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.consumer.internals.events;
-
-import org.apache.kafka.common.PartitionInfo;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-public class TopicMetadataApplicationEvent extends 
CompletableApplicationEvent<Map<String, List<PartitionInfo>>> {
-    private final String topic;
-    private final boolean allTopics;
-    private final long timeoutMs;
-
-    public TopicMetadataApplicationEvent(final long timeoutMs) {
-        super(Type.TOPIC_METADATA);
-        this.topic = null;
-        this.allTopics = true;
-        this.timeoutMs = timeoutMs;
-    }
-
-    public TopicMetadataApplicationEvent(final String topic, final long 
timeoutMs) {
-        super(Type.TOPIC_METADATA);
-        this.topic = topic;
-        this.allTopics = false;
-        this.timeoutMs = timeoutMs;
-    }
-
-    public String topic() {
-        return topic;
-    }
-
-    public boolean isAllTopics() {
-        return allTopics;
-    }
-
-    public long getTimeoutMs() {
-        return timeoutMs;
-    }
-    @Override
-    public String toString() {
-        return getClass().getSimpleName() + " {" + toStringBase() +
-                ", topic=" + topic +
-                ", allTopics=" + allTopics +
-                ", timeoutMs=" + timeoutMs + "}";
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (!(o instanceof TopicMetadataApplicationEvent)) return false;
-        if (!super.equals(o)) return false;
-
-        TopicMetadataApplicationEvent that = (TopicMetadataApplicationEvent) o;
-
-        return topic.equals(that.topic) && (allTopics == that.allTopics) && 
(timeoutMs == that.timeoutMs);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(super.hashCode(), topic, allTopics, timeoutMs);
-    }
-}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitOnCloseApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicMetadataEvent.java
similarity index 66%
rename from 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitOnCloseApplicationEvent.java
rename to 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicMetadataEvent.java
index 4cc07e945f9..ebbb2a6c468 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitOnCloseApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicMetadataEvent.java
@@ -16,16 +16,23 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
-public class CommitOnCloseApplicationEvent extends ApplicationEvent {
+import java.util.Objects;
 
-    public CommitOnCloseApplicationEvent() {
-        super(Type.COMMIT_ON_CLOSE);
+public class TopicMetadataEvent extends AbstractTopicMetadataEvent {
+
+    private final String topic;
+
+    public TopicMetadataEvent(final String topic, final long timeoutMs) {
+        super(Type.TOPIC_METADATA, timeoutMs);
+        this.topic = Objects.requireNonNull(topic);
+    }
+
+    public String topic() {
+        return topic;
     }
 
     @Override
-    public String toString() {
-        return "CommitOnCloseApplicationEvent{" +
-                toStringBase() +
-                '}';
+    public String toStringBase() {
+        return super.toStringBase() + ", topic=" + topic;
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UnsubscribeApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UnsubscribeEvent.java
similarity index 91%
rename from 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UnsubscribeApplicationEvent.java
rename to 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UnsubscribeEvent.java
index a1ccb896fdf..07af36e5feb 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UnsubscribeApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UnsubscribeEvent.java
@@ -24,8 +24,9 @@ package org.apache.kafka.clients.consumer.internals.events;
  * complete and the heartbeat to leave the group is sent out (minimal effort 
to send the
  * leave group heartbeat, without waiting for any response or considering 
timeouts).
  */
-public class UnsubscribeApplicationEvent extends 
CompletableApplicationEvent<Void> {
-    public UnsubscribeApplicationEvent() {
+public class UnsubscribeEvent extends CompletableApplicationEvent<Void> {
+
+    public UnsubscribeEvent() {
         super(Type.UNSUBSCRIBE);
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ValidatePositionsApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ValidatePositionsEvent.java
similarity index 89%
rename from 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ValidatePositionsApplicationEvent.java
rename to 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ValidatePositionsEvent.java
index 3b093e0b683..efa358b4c78 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ValidatePositionsApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ValidatePositionsEvent.java
@@ -22,9 +22,9 @@ package org.apache.kafka.clients.consumer.internals.events;
  * detected. This is an asynchronous event that generates OffsetForLeaderEpoch 
requests, and
  * completes by validating in-memory positions against the offsets received in 
the responses.
  */
-public class ValidatePositionsApplicationEvent extends 
CompletableApplicationEvent<Void> {
+public class ValidatePositionsEvent extends CompletableApplicationEvent<Void> {
 
-    public ValidatePositionsApplicationEvent() {
+    public ValidatePositionsEvent() {
         super(Type.VALIDATE_POSITIONS);
     }
 }
\ No newline at end of file
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 976677dec86..2d6899612f0 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -31,26 +31,26 @@ import 
org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
 import org.apache.kafka.clients.consumer.RoundRobinAssignor;
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
-import 
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.AsyncCommitApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
+import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
 import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.CommitOnCloseApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
-import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
-import 
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.GroupMetadataUpdateEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.LeaveOnCloseApplicationEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.LeaveOnCloseEvent;
+import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
-import org.apache.kafka.clients.consumer.internals.events.PollApplicationEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeApplicationEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.SyncCommitApplicationEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.UnsubscribeApplicationEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.PollEvent;
+import org.apache.kafka.clients.consumer.internals.events.ResetPositionsEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeEvent;
+import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsEvent;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
@@ -256,9 +256,9 @@ public class AsyncKafkaConsumerTest {
 
         consumer.commitAsync(offsets, null);
 
-        final ArgumentCaptor<AsyncCommitApplicationEvent> commitEventCaptor = 
ArgumentCaptor.forClass(AsyncCommitApplicationEvent.class);
+        final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor = 
ArgumentCaptor.forClass(AsyncCommitEvent.class);
         verify(applicationEventHandler).add(commitEventCaptor.capture());
-        final AsyncCommitApplicationEvent commitEvent = 
commitEventCaptor.getValue();
+        final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
         assertEquals(offsets, commitEvent.offsets());
         assertDoesNotThrow(() -> commitEvent.future().complete(null));
         assertDoesNotThrow(() -> consumer.commitAsync(offsets, null));
@@ -310,9 +310,9 @@ public class AsyncKafkaConsumerTest {
 
         assertDoesNotThrow(() -> consumer.commitAsync(offsets, callback));
 
-        final ArgumentCaptor<AsyncCommitApplicationEvent> commitEventCaptor = 
ArgumentCaptor.forClass(AsyncCommitApplicationEvent.class);
+        final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor = 
ArgumentCaptor.forClass(AsyncCommitEvent.class);
         verify(applicationEventHandler).add(commitEventCaptor.capture());
-        final AsyncCommitApplicationEvent commitEvent = 
commitEventCaptor.getValue();
+        final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
         
commitEvent.future().completeExceptionally(Errors.FENCED_INSTANCE_ID.exception());
 
         assertThrows(Errors.FENCED_INSTANCE_ID.exception().getClass(), () -> 
consumer.commitAsync());
@@ -325,7 +325,7 @@ public class AsyncKafkaConsumerTest {
         
completeFetchedCommittedOffsetApplicationEventSuccessfully(topicPartitionOffsets);
 
         assertEquals(topicPartitionOffsets, 
consumer.committed(topicPartitionOffsets.keySet(), Duration.ofMillis(1000)));
-        
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class),
 any());
+        
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class),
 any());
         final Metric metric = consumer.metrics()
             
.get(consumer.metricsRegistry().metricName("committed-time-ns-total", 
"consumer-metrics"));
         assertTrue((double) metric.metricValue() > 0);
@@ -347,7 +347,7 @@ public class AsyncKafkaConsumerTest {
 
         verify(metadata).updateLastSeenEpochIfNewer(t0, 2);
         verify(metadata).updateLastSeenEpochIfNewer(t2, 3);
-        
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class),
 any());
+        
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class),
 any());
     }
 
     @Test
@@ -355,9 +355,9 @@ public class AsyncKafkaConsumerTest {
         consumer = newConsumer();
         Map<TopicPartition, OffsetAndMetadata> offsets = 
mockTopicPartitionOffset();
         when(applicationEventHandler.addAndGet(
-            any(FetchCommittedOffsetsApplicationEvent.class), 
any())).thenAnswer(invocation -> {
+            any(FetchCommittedOffsetsEvent.class), 
any())).thenAnswer(invocation -> {
                 CompletableApplicationEvent<?> event = 
invocation.getArgument(0);
-                assertInstanceOf(FetchCommittedOffsetsApplicationEvent.class, 
event);
+                assertInstanceOf(FetchCommittedOffsetsEvent.class, event);
                 throw new KafkaException("Test exception");
             });
 
@@ -530,7 +530,7 @@ public class AsyncKafkaConsumerTest {
 
         verify(metadata).updateLastSeenEpochIfNewer(t0, 2);
         verify(metadata).updateLastSeenEpochIfNewer(t1, 1);
-        
verify(applicationEventHandler).add(ArgumentMatchers.isA(SyncCommitApplicationEvent.class));
+        
verify(applicationEventHandler).add(ArgumentMatchers.isA(SyncCommitEvent.class));
     }
 
     @Test
@@ -564,7 +564,7 @@ public class AsyncKafkaConsumerTest {
 
         verify(metadata).updateLastSeenEpochIfNewer(t0, 2);
         verify(metadata).updateLastSeenEpochIfNewer(t1, 1);
-        
verify(applicationEventHandler).add(ArgumentMatchers.isA(AsyncCommitApplicationEvent.class));
+        
verify(applicationEventHandler).add(ArgumentMatchers.isA(AsyncCommitEvent.class));
     }
 
     @Test
@@ -598,8 +598,8 @@ public class AsyncKafkaConsumerTest {
         consumer = newConsumer();
         doReturn(null).when(applicationEventHandler).addAndGet(any(), any());
         consumer.close();
-        
verify(applicationEventHandler).addAndGet(any(LeaveOnCloseApplicationEvent.class),
 any());
-        
verify(applicationEventHandler).add(any(CommitOnCloseApplicationEvent.class));
+        
verify(applicationEventHandler).addAndGet(any(LeaveOnCloseEvent.class), any());
+        verify(applicationEventHandler).add(any(CommitOnCloseEvent.class));
     }
 
     @Test
@@ -641,7 +641,7 @@ public class AsyncKafkaConsumerTest {
         subscriptions.assignFromSubscribed(singleton(tp));
         doThrow(new 
KafkaException()).when(listener).onPartitionsRevoked(eq(singleton(tp)));
         assertThrows(KafkaException.class, () -> 
consumer.close(Duration.ZERO));
-        verify(applicationEventHandler, 
never()).addAndGet(any(LeaveOnCloseApplicationEvent.class), any());
+        verify(applicationEventHandler, 
never()).addAndGet(any(LeaveOnCloseEvent.class), any());
         verify(listener).onPartitionsRevoked(eq(singleton(tp)));
         assertEquals(emptySet(), subscriptions.assignedPartitions());
     }
@@ -677,7 +677,7 @@ public class AsyncKafkaConsumerTest {
         subscriptions.assignFromSubscribed(singleton(new 
TopicPartition("topic", 0)));
         subscriptions.seek(new TopicPartition("topic", 0), 100);
         consumer.maybeAutoCommitSync(true, time.timer(100), null);
-        
verify(applicationEventHandler).add(any(SyncCommitApplicationEvent.class));
+        verify(applicationEventHandler).add(any(SyncCommitEvent.class));
     }
 
     @Test
@@ -695,7 +695,7 @@ public class AsyncKafkaConsumerTest {
         subscriptions.assignFromSubscribed(singleton(new 
TopicPartition("topic", 0)));
         subscriptions.seek(new TopicPartition("topic", 0), 100);
         consumer.maybeAutoCommitSync(false, time.timer(100), null);
-        verify(applicationEventHandler, 
never()).add(any(SyncCommitApplicationEvent.class));
+        verify(applicationEventHandler, 
never()).add(any(SyncCommitEvent.class));
     }
 
     private void assertMockCommitCallbackInvoked(final Executable task,
@@ -729,7 +729,7 @@ public class AsyncKafkaConsumerTest {
         consumer.assign(singleton(tp));
         assertTrue(consumer.subscription().isEmpty());
         assertTrue(consumer.assignment().contains(tp));
-        
verify(applicationEventHandler).add(any(AssignmentChangeApplicationEvent.class));
+        verify(applicationEventHandler).add(any(AssignmentChangeEvent.class));
         
verify(applicationEventHandler).add(any(NewTopicsMetadataUpdateRequestEvent.class));
     }
 
@@ -781,7 +781,7 @@ public class AsyncKafkaConsumerTest {
         Map<TopicPartition, Long> expectedOffsets = 
expectedOffsetsAndTimestamp.entrySet().stream()
             .collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().offset()));
         assertEquals(expectedOffsets, result);
-        
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsApplicationEvent.class),
+        
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsEvent.class),
             ArgumentMatchers.isA(Timer.class));
     }
 
@@ -792,13 +792,13 @@ public class AsyncKafkaConsumerTest {
         Throwable eventProcessingFailure = new KafkaException("Unexpected 
failure " +
             "processing List Offsets event");
         
doThrow(eventProcessingFailure).when(applicationEventHandler).addAndGet(
-            any(ListOffsetsApplicationEvent.class),
+            any(ListOffsetsEvent.class),
             any());
         Throwable consumerError = assertThrows(KafkaException.class,
             () -> consumer.beginningOffsets(partitions,
                 Duration.ofMillis(1)));
         assertEquals(eventProcessingFailure, consumerError);
-        
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsApplicationEvent.class),
+        
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsEvent.class),
             ArgumentMatchers.isA(Timer.class));
     }
 
@@ -810,7 +810,7 @@ public class AsyncKafkaConsumerTest {
             () -> consumer.beginningOffsets(
                 Collections.singletonList(new TopicPartition("t1", 0)),
                 Duration.ofMillis(1)));
-        
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsApplicationEvent.class),
+        
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsEvent.class),
             ArgumentMatchers.isA(Timer.class));
     }
 
@@ -850,7 +850,7 @@ public class AsyncKafkaConsumerTest {
         Map<TopicPartition, OffsetAndTimestamp> result =
                 assertDoesNotThrow(() -> 
consumer.offsetsForTimes(timestampToSearch, Duration.ofMillis(1)));
         assertEquals(expectedResult, result);
-        
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsApplicationEvent.class),
+        
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsEvent.class),
                 ArgumentMatchers.isA(Timer.class));
     }
 
@@ -869,7 +869,7 @@ public class AsyncKafkaConsumerTest {
                 assertDoesNotThrow(() -> 
consumer.offsetsForTimes(timestampToSearch,
                         Duration.ZERO));
         assertEquals(expectedResult, result);
-        verify(applicationEventHandler, 
never()).addAndGet(ArgumentMatchers.isA(ListOffsetsApplicationEvent.class),
+        verify(applicationEventHandler, 
never()).addAndGet(ArgumentMatchers.isA(ListOffsetsEvent.class),
             ArgumentMatchers.isA(Timer.class));
     }
 
@@ -880,12 +880,12 @@ public class AsyncKafkaConsumerTest {
         doAnswer(invocation -> {
             CompletableApplicationEvent<?> event = invocation.getArgument(0);
             Timer timer = invocation.getArgument(1);
-            assertInstanceOf(FetchCommittedOffsetsApplicationEvent.class, 
event);
+            assertInstanceOf(FetchCommittedOffsetsEvent.class, event);
             assertTrue(event.future().isCompletedExceptionally());
             return ConsumerUtils.getResult(event.future(), timer);
         })
             .when(applicationEventHandler)
-            .addAndGet(any(FetchCommittedOffsetsApplicationEvent.class), 
any(Timer.class));
+            .addAndGet(any(FetchCommittedOffsetsEvent.class), 
any(Timer.class));
 
         consumer.wakeup();
         assertThrows(WakeupException.class, () -> 
consumer.committed(offsets.keySet()));
@@ -1011,7 +1011,7 @@ public class AsyncKafkaConsumerTest {
         consumer.subscribe(singletonList(topic));
         assertEquals(singleton(topic), consumer.subscription());
         assertTrue(consumer.assignment().isEmpty());
-        
verify(applicationEventHandler).add(ArgumentMatchers.isA(SubscriptionChangeApplicationEvent.class));
+        
verify(applicationEventHandler).add(ArgumentMatchers.isA(SubscriptionChangeEvent.class));
     }
 
     @Test
@@ -1023,7 +1023,7 @@ public class AsyncKafkaConsumerTest {
 
         assertTrue(consumer.subscription().isEmpty());
         assertTrue(consumer.assignment().isEmpty());
-        
verify(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeApplicationEvent.class));
+        
verify(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeEvent.class));
     }
 
     @Test
@@ -1034,7 +1034,7 @@ public class AsyncKafkaConsumerTest {
         consumer.subscribe(Collections.emptyList());
         assertTrue(consumer.subscription().isEmpty());
         assertTrue(consumer.assignment().isEmpty());
-        
verify(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeApplicationEvent.class));
+        
verify(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeEvent.class));
     }
 
     @Test
@@ -1356,8 +1356,8 @@ public class AsyncKafkaConsumerTest {
         consumer = newConsumer(config);
 
         final KafkaException expectedException = new KafkaException("Nobody 
expects the Spanish Inquisition");
-        final ErrorBackgroundEvent errorBackgroundEvent = new 
ErrorBackgroundEvent(expectedException);
-        backgroundEventQueue.add(errorBackgroundEvent);
+        final ErrorEvent errorEvent = new ErrorEvent(expectedException);
+        backgroundEventQueue.add(errorEvent);
         consumer.assign(singletonList(new TopicPartition("topic", 0)));
         final KafkaException exception = assertThrows(KafkaException.class, () 
-> consumer.poll(Duration.ZERO));
 
@@ -1371,11 +1371,11 @@ public class AsyncKafkaConsumerTest {
         consumer = newConsumer(config);
 
         final KafkaException expectedException1 = new KafkaException("Nobody 
expects the Spanish Inquisition");
-        final ErrorBackgroundEvent errorBackgroundEvent1 = new 
ErrorBackgroundEvent(expectedException1);
-        backgroundEventQueue.add(errorBackgroundEvent1);
+        final ErrorEvent errorEvent1 = new ErrorEvent(expectedException1);
+        backgroundEventQueue.add(errorEvent1);
         final KafkaException expectedException2 = new KafkaException("Spam, 
Spam, Spam");
-        final ErrorBackgroundEvent errorBackgroundEvent2 = new 
ErrorBackgroundEvent(expectedException2);
-        backgroundEventQueue.add(errorBackgroundEvent2);
+        final ErrorEvent errorEvent2 = new ErrorEvent(expectedException2);
+        backgroundEventQueue.add(errorEvent2);
         consumer.assign(singletonList(new TopicPartition("topic", 0)));
         final KafkaException exception = assertThrows(KafkaException.class, () 
-> consumer.poll(Duration.ZERO));
 
@@ -1472,7 +1472,7 @@ public class AsyncKafkaConsumerTest {
 
         consumer.subscribe(singletonList("topic1"));
         consumer.poll(Duration.ofMillis(100));
-        verify(applicationEventHandler).add(any(PollApplicationEvent.class));
+        verify(applicationEventHandler).add(any(PollEvent.class));
     }
 
     private void testInvalidGroupId(final String groupId) {
@@ -1510,20 +1510,20 @@ public class AsyncKafkaConsumerTest {
         consumer.poll(Duration.ZERO);
 
         verify(applicationEventHandler, atLeast(1))
-            
.addAndGet(ArgumentMatchers.isA(ValidatePositionsApplicationEvent.class), 
ArgumentMatchers.isA(Timer.class));
+            .addAndGet(ArgumentMatchers.isA(ValidatePositionsEvent.class), 
ArgumentMatchers.isA(Timer.class));
 
         if (committedOffsetsEnabled) {
             // Verify there was an FetchCommittedOffsets event and no 
ResetPositions event
             verify(applicationEventHandler, atLeast(1))
-                
.addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class), 
ArgumentMatchers.isA(Timer.class));
+                
.addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class), 
ArgumentMatchers.isA(Timer.class));
             verify(applicationEventHandler, never())
-                
.addAndGet(ArgumentMatchers.isA(ResetPositionsApplicationEvent.class), 
ArgumentMatchers.isA(Timer.class));
+                .addAndGet(ArgumentMatchers.isA(ResetPositionsEvent.class), 
ArgumentMatchers.isA(Timer.class));
         } else {
             // Verify there was not any FetchCommittedOffsets event but there 
should be a ResetPositions
             verify(applicationEventHandler, never())
-                
.addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class), 
ArgumentMatchers.isA(Timer.class));
+                
.addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class), 
ArgumentMatchers.isA(Timer.class));
             verify(applicationEventHandler, atLeast(1))
-                
.addAndGet(ArgumentMatchers.isA(ResetPositionsApplicationEvent.class), 
ArgumentMatchers.isA(Timer.class));
+                .addAndGet(ArgumentMatchers.isA(ResetPositionsEvent.class), 
ArgumentMatchers.isA(Timer.class));
         }
     }
 
@@ -1538,11 +1538,11 @@ public class AsyncKafkaConsumerTest {
         consumer.poll(Duration.ZERO);
 
         verify(applicationEventHandler, atLeast(1))
-            
.addAndGet(ArgumentMatchers.isA(ValidatePositionsApplicationEvent.class), 
ArgumentMatchers.isA(Timer.class));
+            .addAndGet(ArgumentMatchers.isA(ValidatePositionsEvent.class), 
ArgumentMatchers.isA(Timer.class));
         verify(applicationEventHandler, atLeast(1))
-            
.addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class), 
ArgumentMatchers.isA(Timer.class));
+            .addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class), 
ArgumentMatchers.isA(Timer.class));
         verify(applicationEventHandler, atLeast(1))
-            
.addAndGet(ArgumentMatchers.isA(ResetPositionsApplicationEvent.class), 
ArgumentMatchers.isA(Timer.class));
+            .addAndGet(ArgumentMatchers.isA(ResetPositionsEvent.class), 
ArgumentMatchers.isA(Timer.class));
     }
 
     @Test
@@ -1697,54 +1697,54 @@ public class AsyncKafkaConsumerTest {
 
     private void completeCommitAsyncApplicationEventExceptionally(Exception 
ex) {
         doAnswer(invocation -> {
-            AsyncCommitApplicationEvent event = invocation.getArgument(0);
+            AsyncCommitEvent event = invocation.getArgument(0);
             event.future().completeExceptionally(ex);
             return null;
-        
}).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncCommitApplicationEvent.class));
+        
}).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncCommitEvent.class));
     }
 
     private void completeCommitSyncApplicationEventExceptionally(Exception ex) 
{
         doAnswer(invocation -> {
-            SyncCommitApplicationEvent event = invocation.getArgument(0);
+            SyncCommitEvent event = invocation.getArgument(0);
             event.future().completeExceptionally(ex);
             return null;
-        
}).when(applicationEventHandler).add(ArgumentMatchers.isA(SyncCommitApplicationEvent.class));
+        
}).when(applicationEventHandler).add(ArgumentMatchers.isA(SyncCommitEvent.class));
     }
 
     private void completeCommitAsyncApplicationEventSuccessfully() {
         doAnswer(invocation -> {
-            AsyncCommitApplicationEvent event = invocation.getArgument(0);
+            AsyncCommitEvent event = invocation.getArgument(0);
             event.future().complete(null);
             return null;
-        
}).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncCommitApplicationEvent.class));
+        
}).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncCommitEvent.class));
     }
 
     private void completeCommitSyncApplicationEventSuccessfully() {
         doAnswer(invocation -> {
-            SyncCommitApplicationEvent event = invocation.getArgument(0);
+            SyncCommitEvent event = invocation.getArgument(0);
             event.future().complete(null);
             return null;
-        
}).when(applicationEventHandler).add(ArgumentMatchers.isA(SyncCommitApplicationEvent.class));
+        
}).when(applicationEventHandler).add(ArgumentMatchers.isA(SyncCommitEvent.class));
     }
 
     private void 
completeFetchedCommittedOffsetApplicationEventSuccessfully(final 
Map<TopicPartition, OffsetAndMetadata> committedOffsets) {
         doReturn(committedOffsets)
             .when(applicationEventHandler)
-            .addAndGet(any(FetchCommittedOffsetsApplicationEvent.class), 
any(Timer.class));
+            .addAndGet(any(FetchCommittedOffsetsEvent.class), 
any(Timer.class));
     }
 
     private void 
completeFetchedCommittedOffsetApplicationEventExceptionally(Exception ex) {
         doThrow(ex)
             .when(applicationEventHandler)
-            .addAndGet(any(FetchCommittedOffsetsApplicationEvent.class), 
any(Timer.class));
+            .addAndGet(any(FetchCommittedOffsetsEvent.class), 
any(Timer.class));
     }
 
     private void completeUnsubscribeApplicationEventSuccessfully() {
         doAnswer(invocation -> {
-            UnsubscribeApplicationEvent event = invocation.getArgument(0);
+            UnsubscribeEvent event = invocation.getArgument(0);
             event.future().complete(null);
             return null;
-        
}).when(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeApplicationEvent.class));
+        
}).when(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeEvent.class));
     }
 
     private void forceCommitCallbackInvocation() {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
index a491df417de..cbd56d8b5e2 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
@@ -20,16 +20,16 @@ import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
-import 
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.AsyncCommitApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
+import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
-import org.apache.kafka.clients.consumer.internals.events.PollApplicationEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.SyncCommitApplicationEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.TopicMetadataApplicationEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.PollEvent;
+import org.apache.kafka.clients.consumer.internals.events.ResetPositionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
+import org.apache.kafka.clients.consumer.internals.events.TopicMetadataEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsEvent;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.message.FindCoordinatorRequestData;
@@ -137,7 +137,7 @@ public class ConsumerNetworkThreadTest {
 
     @Test
     public void testApplicationEvent() {
-        ApplicationEvent e = new PollApplicationEvent(100);
+        ApplicationEvent e = new PollEvent(100);
         applicationEventsQueue.add(e);
         consumerNetworkThread.runOnce();
         verify(applicationEventProcessor, times(1)).process(e);
@@ -153,36 +153,36 @@ public class ConsumerNetworkThreadTest {
 
     @Test
     public void testAsyncCommitEvent() {
-        ApplicationEvent e = new AsyncCommitApplicationEvent(new HashMap<>());
+        ApplicationEvent e = new AsyncCommitEvent(new HashMap<>());
         applicationEventsQueue.add(e);
         consumerNetworkThread.runOnce();
-        
verify(applicationEventProcessor).process(any(AsyncCommitApplicationEvent.class));
+        verify(applicationEventProcessor).process(any(AsyncCommitEvent.class));
     }
 
     @Test
     public void testSyncCommitEvent() {
-        ApplicationEvent e = new SyncCommitApplicationEvent(new HashMap<>(), 
100L);
+        ApplicationEvent e = new SyncCommitEvent(new HashMap<>(), 100L);
         applicationEventsQueue.add(e);
         consumerNetworkThread.runOnce();
-        
verify(applicationEventProcessor).process(any(SyncCommitApplicationEvent.class));
+        verify(applicationEventProcessor).process(any(SyncCommitEvent.class));
     }
 
     @Test
     public void testListOffsetsEventIsProcessed() {
         Map<TopicPartition, Long> timestamps = Collections.singletonMap(new 
TopicPartition("topic1", 1), 5L);
-        ApplicationEvent e = new ListOffsetsApplicationEvent(timestamps, true);
+        ApplicationEvent e = new ListOffsetsEvent(timestamps, true);
         applicationEventsQueue.add(e);
         consumerNetworkThread.runOnce();
-        
verify(applicationEventProcessor).process(any(ListOffsetsApplicationEvent.class));
+        verify(applicationEventProcessor).process(any(ListOffsetsEvent.class));
         assertTrue(applicationEventsQueue.isEmpty());
     }
 
     @Test
     public void testResetPositionsEventIsProcessed() {
-        ResetPositionsApplicationEvent e = new 
ResetPositionsApplicationEvent();
+        ResetPositionsEvent e = new ResetPositionsEvent();
         applicationEventsQueue.add(e);
         consumerNetworkThread.runOnce();
-        
verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class));
+        
verify(applicationEventProcessor).process(any(ResetPositionsEvent.class));
         assertTrue(applicationEventsQueue.isEmpty());
     }
 
@@ -190,19 +190,19 @@ public class ConsumerNetworkThreadTest {
     public void testResetPositionsProcessFailureIsIgnored() {
         doThrow(new 
NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded();
 
-        ResetPositionsApplicationEvent event = new 
ResetPositionsApplicationEvent();
+        ResetPositionsEvent event = new ResetPositionsEvent();
         applicationEventsQueue.add(event);
         assertDoesNotThrow(() -> consumerNetworkThread.runOnce());
 
-        
verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class));
+        
verify(applicationEventProcessor).process(any(ResetPositionsEvent.class));
     }
 
     @Test
     public void testValidatePositionsEventIsProcessed() {
-        ValidatePositionsApplicationEvent e = new 
ValidatePositionsApplicationEvent();
+        ValidatePositionsEvent e = new ValidatePositionsEvent();
         applicationEventsQueue.add(e);
         consumerNetworkThread.runOnce();
-        
verify(applicationEventProcessor).process(any(ValidatePositionsApplicationEvent.class));
+        
verify(applicationEventProcessor).process(any(ValidatePositionsEvent.class));
         assertTrue(applicationEventsQueue.isEmpty());
     }
 
@@ -211,11 +211,11 @@ public class ConsumerNetworkThreadTest {
         HashMap<TopicPartition, OffsetAndMetadata> offset = 
mockTopicPartitionOffset();
 
         final long currentTimeMs = time.milliseconds();
-        ApplicationEvent e = new AssignmentChangeApplicationEvent(offset, 
currentTimeMs);
+        ApplicationEvent e = new AssignmentChangeEvent(offset, currentTimeMs);
         applicationEventsQueue.add(e);
 
         consumerNetworkThread.runOnce();
-        
verify(applicationEventProcessor).process(any(AssignmentChangeApplicationEvent.class));
+        
verify(applicationEventProcessor).process(any(AssignmentChangeEvent.class));
         verify(networkClient, times(1)).poll(anyLong(), anyLong());
         verify(commitRequestManager, 
times(1)).updateAutoCommitTimer(currentTimeMs);
         // Assignment change should generate an async commit (not retried).
@@ -224,9 +224,9 @@ public class ConsumerNetworkThreadTest {
 
     @Test
     void testFetchTopicMetadata() {
-        applicationEventsQueue.add(new TopicMetadataApplicationEvent("topic", 
Long.MAX_VALUE));
+        applicationEventsQueue.add(new TopicMetadataEvent("topic", 
Long.MAX_VALUE));
         consumerNetworkThread.runOnce();
-        
verify(applicationEventProcessor).process(any(TopicMetadataApplicationEvent.class));
+        
verify(applicationEventProcessor).process(any(TopicMetadataEvent.class));
     }
 
     @Test
@@ -283,8 +283,8 @@ public class ConsumerNetworkThreadTest {
         coordinatorRequestManager.markCoordinatorUnknown("test", 
time.milliseconds());
         
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
"group-id", node));
         prepareOffsetCommitRequest(new HashMap<>(), Errors.NONE, false);
-        CompletableApplicationEvent<Void> event1 = spy(new 
AsyncCommitApplicationEvent(Collections.emptyMap()));
-        ApplicationEvent event2 = new 
AsyncCommitApplicationEvent(Collections.emptyMap());
+        CompletableApplicationEvent<Void> event1 = spy(new 
AsyncCommitEvent(Collections.emptyMap()));
+        ApplicationEvent event2 = new AsyncCommitEvent(Collections.emptyMap());
         CompletableFuture<Void> future = new CompletableFuture<>();
         when(event1.future()).thenReturn(future);
         applicationEventsQueue.add(event1);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
index d7ad1b55738..afee3fff396 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
@@ -18,7 +18,7 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.ClientResponse;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
-import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.TimeoutException;
@@ -114,10 +114,10 @@ public class CoordinatorRequestManagerTest {
         expectFindCoordinatorRequest(coordinatorManager, 
Errors.GROUP_AUTHORIZATION_FAILED);
 
         verify(backgroundEventHandler).add(argThat(backgroundEvent -> {
-            if (!(backgroundEvent instanceof ErrorBackgroundEvent))
+            if (!(backgroundEvent instanceof ErrorEvent))
                 return false;
 
-            RuntimeException exception = ((ErrorBackgroundEvent) 
backgroundEvent).error();
+            RuntimeException exception = ((ErrorEvent) 
backgroundEvent).error();
 
             if (!(exception instanceof GroupAuthorizationException))
                 return false;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
index 4d4492bcb47..90cfb90bb1b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
@@ -328,12 +328,8 @@ public class HeartbeatRequestManagerTest {
     @Test
     public void testConsumerGroupMetadataFirstUpdate() {
         final GroupMetadataUpdateEvent groupMetadataUpdateEvent = 
makeFirstGroupMetadataUpdate(memberId, memberEpoch);
-
-        final GroupMetadataUpdateEvent expectedGroupMetadataUpdateEvent = new 
GroupMetadataUpdateEvent(
-            memberEpoch,
-            memberId
-        );
-        assertEquals(expectedGroupMetadataUpdateEvent, 
groupMetadataUpdateEvent);
+        assertEquals(memberEpoch, groupMetadataUpdateEvent.memberEpoch());
+        assertEquals(memberId, groupMetadataUpdateEvent.memberId());
     }
 
     @Test
@@ -370,11 +366,8 @@ public class HeartbeatRequestManagerTest {
         final BackgroundEvent eventWithUpdatedMemberEpoch = 
backgroundEventQueue.poll();
         assertEquals(BackgroundEvent.Type.GROUP_METADATA_UPDATE, 
eventWithUpdatedMemberEpoch.type());
         final GroupMetadataUpdateEvent groupMetadataUpdateEvent = 
(GroupMetadataUpdateEvent) eventWithUpdatedMemberEpoch;
-        final GroupMetadataUpdateEvent expectedGroupMetadataUpdateEvent = new 
GroupMetadataUpdateEvent(
-            updatedMemberEpoch,
-            memberId
-        );
-        assertEquals(expectedGroupMetadataUpdateEvent, 
groupMetadataUpdateEvent);
+        assertEquals(updatedMemberEpoch, 
groupMetadataUpdateEvent.memberEpoch());
+        assertEquals(memberId, groupMetadataUpdateEvent.memberId());
     }
 
     @Test
@@ -398,11 +391,8 @@ public class HeartbeatRequestManagerTest {
         final BackgroundEvent eventWithUpdatedMemberEpoch = 
backgroundEventQueue.poll();
         assertEquals(BackgroundEvent.Type.GROUP_METADATA_UPDATE, 
eventWithUpdatedMemberEpoch.type());
         final GroupMetadataUpdateEvent groupMetadataUpdateEvent = 
(GroupMetadataUpdateEvent) eventWithUpdatedMemberEpoch;
-        final GroupMetadataUpdateEvent expectedGroupMetadataUpdateEvent = new 
GroupMetadataUpdateEvent(
-            memberEpoch,
-            updatedMemberId
-        );
-        assertEquals(expectedGroupMetadataUpdateEvent, 
groupMetadataUpdateEvent);
+        assertEquals(memberEpoch, groupMetadataUpdateEvent.memberEpoch());
+        assertEquals(updatedMemberId, groupMetadataUpdateEvent.memberId());
     }
 
     private GroupMetadataUpdateEvent makeFirstGroupMetadataUpdate(final String 
memberId, final int memberEpoch) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
index 8a0fcf85758..49c5819be7b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
@@ -1890,10 +1890,10 @@ public class MembershipManagerImplTest {
     }
 
     private ConsumerRebalanceListenerCallbackCompletedEvent 
performCallback(MembershipManagerImpl membershipManager,
-                                 ConsumerRebalanceListenerInvoker invoker,
-                                 ConsumerRebalanceListenerMethodName 
expectedMethodName,
-                                 SortedSet<TopicPartition> expectedPartitions,
-                                 boolean complete) {
+                                                                            
ConsumerRebalanceListenerInvoker invoker,
+                                                                            
ConsumerRebalanceListenerMethodName expectedMethodName,
+                                                                            
SortedSet<TopicPartition> expectedPartitions,
+                                                                            
boolean complete) {
         // We expect only our enqueued event in the background queue.
         assertEquals(1, backgroundEventQueue.size());
         assertNotNull(backgroundEventQueue.peek());
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
index 5ca034d6360..59ed7d440e2 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
-import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.ClusterResource;
 import org.apache.kafka.common.IsolationLevel;
@@ -553,8 +553,8 @@ public class OffsetsRequestManagerTest {
         assertNotNull(event);
 
         // Check that the event itself is of the expected type
-        assertInstanceOf(ErrorBackgroundEvent.class, event);
-        ErrorBackgroundEvent errorEvent = (ErrorBackgroundEvent) event;
+        assertInstanceOf(ErrorEvent.class, event);
+        ErrorEvent errorEvent = (ErrorEvent) event;
         assertNotNull(errorEvent.error());
 
         // Check that the error held in the event is of the expected type
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
index 8ea8cb7a729..f3e2557ae94 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
@@ -92,7 +92,7 @@ public class ApplicationEventProcessorTest {
     public void testPrepClosingCommitEvents() {
         List<NetworkClientDelegate.UnsentRequest> results = 
mockCommitResults();
         doReturn(new NetworkClientDelegate.PollResult(100, 
results)).when(commitRequestManager).pollOnClose();
-        processor.process(new CommitOnCloseApplicationEvent());
+        processor.process(new CommitOnCloseEvent());
         verify(commitRequestManager).signalClose();
     }
 
@@ -107,7 +107,7 @@ public class ApplicationEventProcessorTest {
 
     @Test
     public void testPrepClosingLeaveGroupEvent() {
-        LeaveOnCloseApplicationEvent event = new 
LeaveOnCloseApplicationEvent();
+        LeaveOnCloseEvent event = new LeaveOnCloseEvent();
         
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
         
when(membershipManager.leaveGroup()).thenReturn(CompletableFuture.completedFuture(null));
         processor.process(event);


Reply via email to