This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f8eb4294d67 KAFKA-16191: Clean up of consumer client internal events
(#15438)
f8eb4294d67 is described below
commit f8eb4294d67b37e854aa14fb989d5e074df82ac2
Author: Kirk True <[email protected]>
AuthorDate: Thu Feb 29 02:22:23 2024 -0800
KAFKA-16191: Clean up of consumer client internal events (#15438)
There are a few minor issues with the event sub-classes in the
org.apache.kafka.clients.consumer.internals.events package that should be
cleaned up:
- Update the names of subclasses to remove "Application" or "Background"
- Make toString() final in the base classes and clean up the
implementations of toStringBase()
- Fix minor whitespace inconsistencies
- Make variable/method names consistent
Reviewer: Bruno Cadonna <[email protected]>
---
.../consumer/internals/AsyncKafkaConsumer.java | 89 +++++++-------
.../internals/CoordinatorRequestManager.java | 6 +-
.../internals/HeartbeatRequestManager.java | 4 +-
.../consumer/internals/MembershipManagerImpl.java | 4 +-
.../consumer/internals/OffsetsRequestManager.java | 4 +-
...tEvent.java => AbstractTopicMetadataEvent.java} | 24 ++--
...tableEvent.java => AllTopicsMetadataEvent.java} | 9 +-
.../internals/events/ApplicationEvent.java | 35 ++++--
.../internals/events/ApplicationEventHandler.java | 6 +-
.../events/ApplicationEventProcessor.java | 123 +++++++++++--------
...cationEvent.java => AssignmentChangeEvent.java} | 33 +----
...ApplicationEvent.java => AsyncCommitEvent.java} | 15 +--
.../consumer/internals/events/BackgroundEvent.java | 30 +++--
.../internals/events/BackgroundEventHandler.java | 2 +-
...ommitApplicationEvent.java => CommitEvent.java} | 21 +---
...plicationEvent.java => CommitOnCloseEvent.java} | 11 +-
.../events/CompletableApplicationEvent.java | 45 +------
.../events/CompletableBackgroundEvent.java | 28 +----
.../internals/events/CompletableEvent.java | 1 -
...merRebalanceListenerCallbackCompletedEvent.java | 31 +----
...nsumerRebalanceListenerCallbackNeededEvent.java | 30 +----
.../internals/events/ErrorBackgroundEvent.java | 59 ---------
...nCloseApplicationEvent.java => ErrorEvent.java} | 23 ++--
...nEvent.java => FetchCommittedOffsetsEvent.java} | 31 +----
.../internals/events/GroupMetadataUpdateEvent.java | 32 +----
...pplicationEvent.java => LeaveOnCloseEvent.java} | 12 +-
...ApplicationEvent.java => ListOffsetsEvent.java} | 33 +----
.../NewTopicsMetadataUpdateRequestEvent.java | 7 --
.../internals/events/PollApplicationEvent.java | 57 ---------
...adataUpdateRequestEvent.java => PollEvent.java} | 21 ++--
...licationEvent.java => ResetPositionsEvent.java} | 4 +-
...tionEvent.java => SubscriptionChangeEvent.java} | 4 +-
...tApplicationEvent.java => SyncCommitEvent.java} | 17 ++-
.../events/TopicMetadataApplicationEvent.java | 78 ------------
...plicationEvent.java => TopicMetadataEvent.java} | 21 ++--
...ApplicationEvent.java => UnsubscribeEvent.java} | 5 +-
...ationEvent.java => ValidatePositionsEvent.java} | 4 +-
.../consumer/internals/AsyncKafkaConsumerTest.java | 134 ++++++++++-----------
.../internals/ConsumerNetworkThreadTest.java | 54 ++++-----
.../internals/CoordinatorRequestManagerTest.java | 6 +-
.../internals/HeartbeatRequestManagerTest.java | 22 +---
.../internals/MembershipManagerImplTest.java | 8 +-
.../internals/OffsetsRequestManagerTest.java | 6 +-
.../events/ApplicationEventProcessorTest.java | 4 +-
44 files changed, 405 insertions(+), 788 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index d4b461b0140..d810c5f053b 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -36,32 +36,33 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import
org.apache.kafka.clients.consumer.internals.events.AllTopicsMetadataEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
-import
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
-import
org.apache.kafka.clients.consumer.internals.events.AsyncCommitApplicationEvent;
+import
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
+import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
-import
org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent;
-import
org.apache.kafka.clients.consumer.internals.events.CommitOnCloseApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseEvent;
import
org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
import
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
import
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
-import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
-import
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsApplicationEvent;
+import
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
import
org.apache.kafka.clients.consumer.internals.events.GroupMetadataUpdateEvent;
-import
org.apache.kafka.clients.consumer.internals.events.LeaveOnCloseApplicationEvent;
-import
org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.LeaveOnCloseEvent;
+import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import
org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
-import org.apache.kafka.clients.consumer.internals.events.PollApplicationEvent;
-import
org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent;
-import
org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeApplicationEvent;
-import
org.apache.kafka.clients.consumer.internals.events.SyncCommitApplicationEvent;
-import
org.apache.kafka.clients.consumer.internals.events.TopicMetadataApplicationEvent;
-import
org.apache.kafka.clients.consumer.internals.events.UnsubscribeApplicationEvent;
-import
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.PollEvent;
+import org.apache.kafka.clients.consumer.internals.events.ResetPositionsEvent;
+import
org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeEvent;
+import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
+import org.apache.kafka.clients.consumer.internals.events.TopicMetadataEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsEvent;
import
org.apache.kafka.clients.consumer.internals.metrics.KafkaConsumerMetrics;
import
org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager;
import org.apache.kafka.common.Cluster;
@@ -179,7 +180,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
/**
* Process the events—if any—that were produced by the {@link
ConsumerNetworkThread network thread}.
- * It is possible that {@link
org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent an
error}
+ * It is possible that {@link ErrorEvent an error}
* could occur when processing the events. In such cases, the
processor will take a reference to the first
* error, continue to process the remaining events, and then throw the
first error that occurred.
*/
@@ -209,7 +210,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
public void process(final BackgroundEvent event) {
switch (event.type()) {
case ERROR:
- process((ErrorBackgroundEvent) event);
+ process((ErrorEvent) event);
break;
case GROUP_METADATA_UPDATE:
@@ -226,7 +227,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
}
}
- private void process(final ErrorBackgroundEvent event) {
+ private void process(final ErrorEvent event) {
throw event.error();
}
@@ -703,7 +704,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
do {
// Make sure to let the background thread know that we are
still polling.
- applicationEventHandler.add(new
PollApplicationEvent(timer.currentTimeMs()));
+ applicationEventHandler.add(new
PollEvent(timer.currentTimeMs()));
// We must not allow wake-ups between polling for fetches and
returning the records.
// If the polled fetches are not empty the consumed position
has already been updated in the polling
@@ -768,7 +769,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets,
OffsetCommitCallback callback) {
acquireAndEnsureOpen();
try {
- AsyncCommitApplicationEvent asyncCommitEvent = new
AsyncCommitApplicationEvent(offsets);
+ AsyncCommitEvent asyncCommitEvent = new AsyncCommitEvent(offsets);
CompletableFuture<Void> future = commit(asyncCommitEvent);
future.whenComplete((r, t) -> {
@@ -790,7 +791,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
}
}
- private CompletableFuture<Void> commit(final CommitApplicationEvent
commitEvent) {
+ private CompletableFuture<Void> commit(final CommitEvent commitEvent) {
maybeInvokeCommitCallbacks();
maybeThrowFencedInstanceException();
maybeThrowInvalidGroupIdException();
@@ -936,7 +937,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
return Collections.emptyMap();
}
- final FetchCommittedOffsetsApplicationEvent event = new
FetchCommittedOffsetsApplicationEvent(
+ final FetchCommittedOffsetsEvent event = new
FetchCommittedOffsetsEvent(
partitions,
timeout.toMillis());
wakeupTrigger.setActiveTask(event.future());
@@ -988,12 +989,11 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
throw new TimeoutException();
}
- final TopicMetadataApplicationEvent topicMetadataApplicationEvent =
- new TopicMetadataApplicationEvent(topic,
timeout.toMillis());
-
wakeupTrigger.setActiveTask(topicMetadataApplicationEvent.future());
+ final TopicMetadataEvent topicMetadataEvent = new
TopicMetadataEvent(topic, timeout.toMillis());
+ wakeupTrigger.setActiveTask(topicMetadataEvent.future());
try {
Map<String, List<PartitionInfo>> topicMetadata =
-
applicationEventHandler.addAndGet(topicMetadataApplicationEvent,
time.timer(timeout));
+ applicationEventHandler.addAndGet(topicMetadataEvent,
time.timer(timeout));
return topicMetadata.getOrDefault(topic,
Collections.emptyList());
} finally {
@@ -1017,11 +1017,10 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
throw new TimeoutException();
}
- final TopicMetadataApplicationEvent topicMetadataApplicationEvent =
- new TopicMetadataApplicationEvent(timeout.toMillis());
-
wakeupTrigger.setActiveTask(topicMetadataApplicationEvent.future());
+ final AllTopicsMetadataEvent topicMetadataEvent = new
AllTopicsMetadataEvent(timeout.toMillis());
+ wakeupTrigger.setActiveTask(topicMetadataEvent.future());
try {
- return
applicationEventHandler.addAndGet(topicMetadataApplicationEvent,
time.timer(timeout));
+ return applicationEventHandler.addAndGet(topicMetadataEvent,
time.timer(timeout));
} finally {
wakeupTrigger.clearTask();
}
@@ -1089,7 +1088,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
if (timestampsToSearch.isEmpty()) {
return Collections.emptyMap();
}
- final ListOffsetsApplicationEvent listOffsetsEvent = new
ListOffsetsApplicationEvent(
+ final ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent(
timestampsToSearch,
true);
@@ -1139,7 +1138,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
Map<TopicPartition, Long> timestampToSearch = partitions
.stream()
.collect(Collectors.toMap(Function.identity(), tp ->
timestamp));
- ListOffsetsApplicationEvent listOffsetsEvent = new
ListOffsetsApplicationEvent(
+ ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent(
timestampToSearch,
false);
Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap =
applicationEventHandler.addAndGet(
@@ -1267,11 +1266,11 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
if (!groupMetadata.isPresent())
return;
maybeAutoCommitSync(autoCommitEnabled, timer, firstException);
- applicationEventHandler.add(new CommitOnCloseApplicationEvent());
+ applicationEventHandler.add(new CommitOnCloseEvent());
completeQuietly(
() -> {
maybeRevokePartitions();
- applicationEventHandler.addAndGet(new
LeaveOnCloseApplicationEvent(), timer);
+ applicationEventHandler.addAndGet(new LeaveOnCloseEvent(),
timer);
},
"Failed to send leaveGroup heartbeat with a timeout(ms)=" +
timer.timeoutMs(), firstException);
}
@@ -1349,7 +1348,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
long commitStart = time.nanoseconds();
try {
Timer requestTimer = time.timer(timeout.toMillis());
- SyncCommitApplicationEvent syncCommitEvent = new
SyncCommitApplicationEvent(offsets, timeout.toMillis());
+ SyncCommitEvent syncCommitEvent = new SyncCommitEvent(offsets,
timeout.toMillis());
CompletableFuture<Void> commitFuture = commit(syncCommitEvent);
wakeupTrigger.setActiveTask(commitFuture);
ConsumerUtils.getResult(commitFuture, requestTimer);
@@ -1429,7 +1428,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
// be no following rebalance.
//
// See the ApplicationEventProcessor.process() method that handles
this event for more detail.
- applicationEventHandler.add(new
AssignmentChangeApplicationEvent(subscriptions.allConsumed(),
time.milliseconds()));
+ applicationEventHandler.add(new
AssignmentChangeEvent(subscriptions.allConsumed(), time.milliseconds()));
log.info("Assigned to partition(s): {}", join(partitions, ", "));
if (subscriptions.assignFromUser(new HashSet<>(partitions)))
@@ -1463,13 +1462,13 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
try {
fetchBuffer.retainAll(Collections.emptySet());
if (groupMetadata.isPresent()) {
- UnsubscribeApplicationEvent unsubscribeApplicationEvent = new
UnsubscribeApplicationEvent();
- applicationEventHandler.add(unsubscribeApplicationEvent);
+ UnsubscribeEvent unsubscribeEvent = new UnsubscribeEvent();
+ applicationEventHandler.add(unsubscribeEvent);
log.info("Unsubscribing all topics or patterns and assigned
partitions");
Timer timer = time.timer(Long.MAX_VALUE);
try {
- processBackgroundEvents(backgroundEventProcessor,
unsubscribeApplicationEvent.future(), timer);
+ processBackgroundEvents(backgroundEventProcessor,
unsubscribeEvent.future(), timer);
log.info("Unsubscribed all topics or patterns and assigned
partitions");
} catch (TimeoutException e) {
log.error("Failed while waiting for the unsubscribe event
to complete");
@@ -1567,7 +1566,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
// Validate positions using the partition leader end offsets, to
detect if any partition
// has been truncated due to a leader change. This will trigger an
OffsetForLeaderEpoch
// request, retrieve the partition end offsets, and validate the
current position against it.
- applicationEventHandler.addAndGet(new
ValidatePositionsApplicationEvent(), timer);
+ applicationEventHandler.addAndGet(new ValidatePositionsEvent(),
timer);
cachedSubscriptionHasAllFetchPositions =
subscriptions.hasAllFetchPositions();
if (cachedSubscriptionHasAllFetchPositions) return true;
@@ -1590,7 +1589,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
// which are awaiting reset. This will trigger a ListOffset
request, retrieve the
// partition offsets according to the strategy (ex. earliest,
latest), and update the
// positions.
- applicationEventHandler.addAndGet(new
ResetPositionsApplicationEvent(), timer);
+ applicationEventHandler.addAndGet(new ResetPositionsEvent(),
timer);
return true;
} catch (TimeoutException e) {
return false;
@@ -1620,8 +1619,8 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
log.debug("Refreshing committed offsets for partitions {}",
initializingPartitions);
try {
- final FetchCommittedOffsetsApplicationEvent event =
- new FetchCommittedOffsetsApplicationEvent(
+ final FetchCommittedOffsetsEvent event =
+ new FetchCommittedOffsetsEvent(
initializingPartitions,
timer.remainingMs());
final Map<TopicPartition, OffsetAndMetadata> offsets =
applicationEventHandler.addAndGet(event, timer);
@@ -1770,7 +1769,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
// Trigger subscribe event to effectively join the group if
not already part of it,
// or just send the new subscription to the broker.
- applicationEventHandler.add(new
SubscriptionChangeApplicationEvent());
+ applicationEventHandler.add(new SubscriptionChangeEvent());
}
} finally {
release();
@@ -1797,7 +1796,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
* <p/>
*
* As an example, take {@link #unsubscribe()}. To start unsubscribing, the
application thread enqueues an
- * {@link UnsubscribeApplicationEvent} on the application event queue.
That event will eventually trigger the
+ * {@link UnsubscribeEvent} on the application event queue. That event
will eventually trigger the
* rebalancing logic in the background thread. Critically, as part of this
rebalancing work, the
* {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection)}
callback needs to be invoked. However,
* this callback must be executed on the application thread. To achieve
this, the background thread enqueues a
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
index d6a72812a52..a6cc28fb0f4 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
@@ -17,7 +17,7 @@
package org.apache.kafka.clients.consumer.internals;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
-import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.GroupAuthorizationException;
@@ -175,12 +175,12 @@ public class CoordinatorRequestManager implements
RequestManager {
if (exception == Errors.GROUP_AUTHORIZATION_FAILED.exception()) {
log.debug("FindCoordinator request failed due to authorization
error {}", exception.getMessage());
KafkaException groupAuthorizationException =
GroupAuthorizationException.forGroupId(this.groupId);
- backgroundEventHandler.add(new
ErrorBackgroundEvent(groupAuthorizationException));
+ backgroundEventHandler.add(new
ErrorEvent(groupAuthorizationException));
return;
}
log.warn("FindCoordinator request failed due to fatal exception",
exception);
- backgroundEventHandler.add(new ErrorBackgroundEvent(exception));
+ backgroundEventHandler.add(new ErrorEvent(exception));
}
/**
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
index d551dbe2508..826774a6a64 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
@@ -21,7 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
-import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import
org.apache.kafka.clients.consumer.internals.events.GroupMetadataUpdateEvent;
import
org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
import org.apache.kafka.common.Uuid;
@@ -446,7 +446,7 @@ public class HeartbeatRequestManager implements
RequestManager {
}
private void handleFatalFailure(Throwable error) {
- backgroundEventHandler.add(new ErrorBackgroundEvent(error));
+ backgroundEventHandler.add(new ErrorEvent(error));
membershipManager.transitionToFatal();
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
index 6f3947eea46..e74c7e30a2c 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
@@ -26,7 +26,7 @@ import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler
import
org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent;
import
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
import
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
-import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import
org.apache.kafka.clients.consumer.internals.metrics.RebalanceMetricsManager;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
@@ -255,7 +255,7 @@ public class MembershipManagerImpl implements
MembershipManager {
/**
* Serves as the conduit by which we can report events to the application
thread. This is needed as we send
* {@link ConsumerRebalanceListenerCallbackNeededEvent callbacks} and, if
needed,
- * {@link ErrorBackgroundEvent errors} to the application thread.
+ * {@link ErrorEvent errors} to the application thread.
*/
private final BackgroundEventHandler backgroundEventHandler;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
index 34f4b30c44d..c5156e9e0b9 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
@@ -25,7 +25,7 @@ import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetData;
import
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetResult;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
-import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.ClusterResourceListener;
import org.apache.kafka.common.IsolationLevel;
@@ -199,7 +199,7 @@ public class OffsetsRequestManager implements
RequestManager, ClusterResourceLis
try {
offsetResetTimestamps =
offsetFetcherUtils.getOffsetResetTimestamp();
} catch (Exception e) {
- backgroundEventHandler.add(new ErrorBackgroundEvent(e));
+ backgroundEventHandler.add(new ErrorEvent(e));
return CompletableFuture.completedFuture(null);
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NewTopicsMetadataUpdateRequestEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java
similarity index 61%
copy from
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NewTopicsMetadataUpdateRequestEvent.java
copy to
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java
index c06a3a717dc..31c21817d85 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NewTopicsMetadataUpdateRequestEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java
@@ -16,16 +16,26 @@
*/
package org.apache.kafka.clients.consumer.internals.events;
-public class NewTopicsMetadataUpdateRequestEvent extends ApplicationEvent {
+import org.apache.kafka.common.PartitionInfo;
- public NewTopicsMetadataUpdateRequestEvent() {
- super(Type.NEW_TOPICS_METADATA_UPDATE);
+import java.util.List;
+import java.util.Map;
+
+public abstract class AbstractTopicMetadataEvent extends
CompletableApplicationEvent<Map<String, List<PartitionInfo>>> {
+
+ private final long timeoutMs;
+
+ protected AbstractTopicMetadataEvent(final Type type, final long
timeoutMs) {
+ super(type);
+ this.timeoutMs = timeoutMs;
+ }
+
+ public long timeoutMs() {
+ return timeoutMs;
}
@Override
- public String toString() {
- return "NewTopicsMetadataUpdateRequestEvent{" +
- toStringBase() +
- '}';
+ public String toStringBase() {
+ return super.toStringBase() + ", timeoutMs=" + timeoutMs;
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AllTopicsMetadataEvent.java
similarity index 82%
copy from
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEvent.java
copy to
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AllTopicsMetadataEvent.java
index 8fdcc20fa83..154703aaee1 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AllTopicsMetadataEvent.java
@@ -16,10 +16,9 @@
*/
package org.apache.kafka.clients.consumer.internals.events;
-import java.util.concurrent.CompletableFuture;
-
-public interface CompletableEvent<T> {
-
- CompletableFuture<T> future();
+public class AllTopicsMetadataEvent extends AbstractTopicMetadataEvent {
+ public AllTopicsMetadataEvent(final long timeoutMs) {
+ super(Type.ALL_TOPICS_METADATA, timeoutMs);
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
index ac7ccc56c55..2897117da8b 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
@@ -16,52 +16,65 @@
*/
package org.apache.kafka.clients.consumer.internals.events;
+import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
+import org.apache.kafka.common.Uuid;
+
import java.util.Objects;
/**
- * This is the abstract definition of the events created by the KafkaConsumer
API
+ * This is the abstract definition of the events created by the {@link
AsyncKafkaConsumer} on the user's
+ * application thread.
*/
public abstract class ApplicationEvent {
public enum Type {
COMMIT_ASYNC, COMMIT_SYNC, POLL, FETCH_COMMITTED_OFFSETS,
NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE,
- LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA,
SUBSCRIPTION_CHANGE,
+ LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA,
ALL_TOPICS_METADATA, SUBSCRIPTION_CHANGE,
UNSUBSCRIBE, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED,
COMMIT_ON_CLOSE, LEAVE_ON_CLOSE
}
private final Type type;
+ /**
+ * This identifies a particular event. It is used to disambiguate events
via {@link #hashCode()} and
+ * {@link #equals(Object)} and can be used in log messages when debugging.
+ */
+ private final Uuid id;
+
protected ApplicationEvent(Type type) {
this.type = Objects.requireNonNull(type);
+ this.id = Uuid.randomUuid();
}
public Type type() {
return type;
}
+ public Uuid id() {
+ return id;
+ }
+
@Override
- public boolean equals(Object o) {
+ public final boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
-
ApplicationEvent that = (ApplicationEvent) o;
-
- return type == that.type;
+ return type == that.type && id.equals(that.id);
}
@Override
- public int hashCode() {
- return type.hashCode();
+ public final int hashCode() {
+ return Objects.hash(type, id);
}
protected String toStringBase() {
- return "type=" + type;
+ return "type=" + type + ", id=" + id;
}
@Override
- public String toString() {
- return "ApplicationEvent{" +
+ public final String toString() {
+ return getClass().getSimpleName() + "{" +
toStringBase() +
'}';
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java
index 7535edf5970..eac1cc3d62f 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java
@@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
+import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.RequestManagers;
import org.apache.kafka.common.internals.IdempotentCloser;
@@ -99,10 +100,9 @@ public class ApplicationEventHandler implements Closeable {
*
* <p/>
*
- * See {@link CompletableApplicationEvent#get(Timer)} and {@link
Future#get(long, TimeUnit)} for more details.
+ * See {@link ConsumerUtils#getResult(Future, Timer)} and {@link
Future#get(long, TimeUnit)} for more details.
*
* @param event A {@link CompletableApplicationEvent} created by the
polling thread
- * @param timer Timer for which to wait for the event to complete
* @return Value that is the result of the event
* @param <T> Type of return value of the event
*/
@@ -110,7 +110,7 @@ public class ApplicationEventHandler implements Closeable {
Objects.requireNonNull(event, "CompletableApplicationEvent provided to
addAndGet must be non-null");
Objects.requireNonNull(timer, "Timer provided to addAndGet must be
non-null");
add(event);
- return event.get(timer);
+ return ConsumerUtils.getResult(event.future(), timer);
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 9e48b4de6da..c86aa8815f2 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer.internals.events;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.internals.CachedSupplier;
import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
@@ -34,6 +35,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
import java.util.function.Supplier;
/**
@@ -65,23 +67,24 @@ public class ApplicationEventProcessor extends
EventProcessor<ApplicationEvent>
return process((event, error) -> error.ifPresent(e -> log.warn("Error
processing event {}", e.getMessage(), e)));
}
+ @SuppressWarnings({"CyclomaticComplexity"})
@Override
public void process(ApplicationEvent event) {
switch (event.type()) {
case COMMIT_ASYNC:
- process((AsyncCommitApplicationEvent) event);
+ process((AsyncCommitEvent) event);
return;
case COMMIT_SYNC:
- process((SyncCommitApplicationEvent) event);
+ process((SyncCommitEvent) event);
return;
case POLL:
- process((PollApplicationEvent) event);
+ process((PollEvent) event);
return;
case FETCH_COMMITTED_OFFSETS:
- process((FetchCommittedOffsetsApplicationEvent) event);
+ process((FetchCommittedOffsetsEvent) event);
return;
case NEW_TOPICS_METADATA_UPDATE:
@@ -89,31 +92,35 @@ public class ApplicationEventProcessor extends
EventProcessor<ApplicationEvent>
return;
case ASSIGNMENT_CHANGE:
- process((AssignmentChangeApplicationEvent) event);
+ process((AssignmentChangeEvent) event);
return;
case TOPIC_METADATA:
- process((TopicMetadataApplicationEvent) event);
+ process((TopicMetadataEvent) event);
+ return;
+
+ case ALL_TOPICS_METADATA:
+ process((AllTopicsMetadataEvent) event);
return;
case LIST_OFFSETS:
- process((ListOffsetsApplicationEvent) event);
+ process((ListOffsetsEvent) event);
return;
case RESET_POSITIONS:
- process((ResetPositionsApplicationEvent) event);
+ process((ResetPositionsEvent) event);
return;
case VALIDATE_POSITIONS:
- process((ValidatePositionsApplicationEvent) event);
+ process((ValidatePositionsEvent) event);
return;
case SUBSCRIPTION_CHANGE:
- process((SubscriptionChangeApplicationEvent) event);
+ process((SubscriptionChangeEvent) event);
return;
case UNSUBSCRIBE:
- process((UnsubscribeApplicationEvent) event);
+ process((UnsubscribeEvent) event);
return;
case CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED:
@@ -121,11 +128,11 @@ public class ApplicationEventProcessor extends
EventProcessor<ApplicationEvent>
return;
case COMMIT_ON_CLOSE:
- process((CommitOnCloseApplicationEvent) event);
+ process((CommitOnCloseEvent) event);
return;
case LEAVE_ON_CLOSE:
- process((LeaveOnCloseApplicationEvent) event);
+ process((LeaveOnCloseEvent) event);
return;
default:
@@ -133,7 +140,7 @@ public class ApplicationEventProcessor extends
EventProcessor<ApplicationEvent>
}
}
- private void process(final PollApplicationEvent event) {
+ private void process(final PollEvent event) {
if (!requestManagers.commitRequestManager.isPresent()) {
return;
}
@@ -142,20 +149,28 @@ public class ApplicationEventProcessor extends
EventProcessor<ApplicationEvent>
requestManagers.heartbeatRequestManager.ifPresent(hrm ->
hrm.resetPollTimer(event.pollTimeMs()));
}
- private void process(final AsyncCommitApplicationEvent event) {
+ private void process(final AsyncCommitEvent event) {
+ if (!requestManagers.commitRequestManager.isPresent()) {
+ return;
+ }
+
CommitRequestManager manager =
requestManagers.commitRequestManager.get();
- CompletableFuture<Void> commitResult =
manager.commitAsync(event.offsets());
- event.chain(commitResult);
+ CompletableFuture<Void> future = manager.commitAsync(event.offsets());
+ future.whenComplete(complete(event.future()));
}
- private void process(final SyncCommitApplicationEvent event) {
+ private void process(final SyncCommitEvent event) {
+ if (!requestManagers.commitRequestManager.isPresent()) {
+ return;
+ }
+
CommitRequestManager manager =
requestManagers.commitRequestManager.get();
long expirationTimeoutMs =
getExpirationTimeForTimeout(event.retryTimeoutMs());
- CompletableFuture<Void> commitResult =
manager.commitSync(event.offsets(), expirationTimeoutMs);
- event.chain(commitResult);
+ CompletableFuture<Void> future = manager.commitSync(event.offsets(),
expirationTimeoutMs);
+ future.whenComplete(complete(event.future()));
}
- private void process(final FetchCommittedOffsetsApplicationEvent event) {
+ private void process(final FetchCommittedOffsetsEvent event) {
if (!requestManagers.commitRequestManager.isPresent()) {
event.future().completeExceptionally(new KafkaException("Unable to
fetch committed " +
"offset because the CommittedRequestManager is not
available. Check if group.id was set correctly"));
@@ -163,19 +178,19 @@ public class ApplicationEventProcessor extends
EventProcessor<ApplicationEvent>
}
CommitRequestManager manager =
requestManagers.commitRequestManager.get();
long expirationTimeMs = getExpirationTimeForTimeout(event.timeout());
- event.chain(manager.fetchOffsets(event.partitions(),
expirationTimeMs));
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
manager.fetchOffsets(event.partitions(), expirationTimeMs);
+ future.whenComplete(complete(event.future()));
}
private void process(final NewTopicsMetadataUpdateRequestEvent ignored) {
metadata.requestUpdateForNewTopics();
}
-
/**
* Commit all consumed if auto-commit is enabled. Note this will trigger
an async commit,
* that will not be retried if the commit request fails.
*/
- private void process(final AssignmentChangeApplicationEvent event) {
+ private void process(final AssignmentChangeEvent event) {
if (!requestManagers.commitRequestManager.isPresent()) {
return;
}
@@ -184,11 +199,11 @@ public class ApplicationEventProcessor extends
EventProcessor<ApplicationEvent>
manager.maybeAutoCommitAsync();
}
- private void process(final ListOffsetsApplicationEvent event) {
+ private void process(final ListOffsetsEvent event) {
final CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>>
future =
requestManagers.offsetsRequestManager.fetchOffsets(event.timestampsToSearch(),
event.requireTimestamps());
- event.chain(future);
+ future.whenComplete(complete(event.future()));
}
/**
@@ -196,7 +211,7 @@ public class ApplicationEventProcessor extends
EventProcessor<ApplicationEvent>
* consumer join the group if it is not part of it yet, or send the
updated subscription if
* it is already a member.
*/
- private void process(final SubscriptionChangeApplicationEvent ignored) {
+ private void process(final SubscriptionChangeEvent ignored) {
if (!requestManagers.heartbeatRequestManager.isPresent()) {
log.warn("Group membership manager not present when processing a
subscribe event");
return;
@@ -213,38 +228,39 @@ public class ApplicationEventProcessor extends
EventProcessor<ApplicationEvent>
* execution for releasing the assignment completes, and the
request to leave
* the group is sent out.
*/
- private void process(final UnsubscribeApplicationEvent event) {
+ private void process(final UnsubscribeEvent event) {
if (!requestManagers.heartbeatRequestManager.isPresent()) {
KafkaException error = new KafkaException("Group membership
manager not present when processing an unsubscribe event");
event.future().completeExceptionally(error);
return;
}
MembershipManager membershipManager =
requestManagers.heartbeatRequestManager.get().membershipManager();
- CompletableFuture<Void> result = membershipManager.leaveGroup();
- event.chain(result);
+ CompletableFuture<Void> future = membershipManager.leaveGroup();
+ future.whenComplete(complete(event.future()));
}
- private void process(final ResetPositionsApplicationEvent event) {
- CompletableFuture<Void> result =
requestManagers.offsetsRequestManager.resetPositionsIfNeeded();
- event.chain(result);
+ private void process(final ResetPositionsEvent event) {
+ CompletableFuture<Void> future =
requestManagers.offsetsRequestManager.resetPositionsIfNeeded();
+ future.whenComplete(complete(event.future()));
}
- private void process(final ValidatePositionsApplicationEvent event) {
- CompletableFuture<Void> result =
requestManagers.offsetsRequestManager.validatePositionsIfNeeded();
- event.chain(result);
+ private void process(final ValidatePositionsEvent event) {
+ CompletableFuture<Void> future =
requestManagers.offsetsRequestManager.validatePositionsIfNeeded();
+ future.whenComplete(complete(event.future()));
}
- private void process(final TopicMetadataApplicationEvent event) {
- final CompletableFuture<Map<String, List<PartitionInfo>>> future;
-
- long expirationTimeMs =
getExpirationTimeForTimeout(event.getTimeoutMs());
- if (event.isAllTopics()) {
- future =
requestManagers.topicMetadataRequestManager.requestAllTopicsMetadata(expirationTimeMs);
- } else {
- future =
requestManagers.topicMetadataRequestManager.requestTopicMetadata(event.topic(),
expirationTimeMs);
- }
+ private void process(final TopicMetadataEvent event) {
+ final long expirationTimeMs =
getExpirationTimeForTimeout(event.timeoutMs());
+ final CompletableFuture<Map<String, List<PartitionInfo>>> future =
+
requestManagers.topicMetadataRequestManager.requestTopicMetadata(event.topic(),
expirationTimeMs);
+ future.whenComplete(complete(event.future()));
+ }
- event.chain(future);
+ private void process(final AllTopicsMetadataEvent event) {
+ final long expirationTimeMs =
getExpirationTimeForTimeout(event.timeoutMs());
+ final CompletableFuture<Map<String, List<PartitionInfo>>> future =
+
requestManagers.topicMetadataRequestManager.requestAllTopicsMetadata(expirationTimeMs);
+ future.whenComplete(complete(event.future()));
}
private void process(final ConsumerRebalanceListenerCallbackCompletedEvent
event) {
@@ -259,14 +275,14 @@ public class ApplicationEventProcessor extends
EventProcessor<ApplicationEvent>
manager.consumerRebalanceListenerCallbackCompleted(event);
}
- private void process(final CommitOnCloseApplicationEvent event) {
+ private void process(final CommitOnCloseEvent event) {
if (!requestManagers.commitRequestManager.isPresent())
return;
log.debug("Signal CommitRequestManager closing");
requestManagers.commitRequestManager.get().signalClose();
}
- private void process(final LeaveOnCloseApplicationEvent event) {
+ private void process(final LeaveOnCloseEvent event) {
if (!requestManagers.heartbeatRequestManager.isPresent()) {
event.future().complete(null);
return;
@@ -277,7 +293,7 @@ public class ApplicationEventProcessor extends
EventProcessor<ApplicationEvent>
log.debug("Leaving group before closing");
CompletableFuture<Void> future = membershipManager.leaveGroup();
// The future will be completed on heartbeat sent
- event.chain(future);
+ future.whenComplete(complete(event.future()));
}
/**
@@ -293,6 +309,15 @@ public class ApplicationEventProcessor extends
EventProcessor<ApplicationEvent>
return expiration;
}
+ private <T> BiConsumer<? super T, ? super Throwable> complete(final
CompletableFuture<T> b) {
+ return (value, exception) -> {
+ if (exception != null)
+ b.completeExceptionally(exception);
+ else
+ b.complete(value);
+ };
+ }
+
/**
* Creates a {@link Supplier} for deferred creation during invocation by
* {@link ConsumerNetworkThread}.
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeApplicationEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeEvent.java
similarity index 56%
rename from
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeApplicationEvent.java
rename to
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeEvent.java
index ccf7199f260..c9efa2e9dff 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeApplicationEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AssignmentChangeEvent.java
@@ -22,13 +22,12 @@ import org.apache.kafka.common.TopicPartition;
import java.util.Collections;
import java.util.Map;
-public class AssignmentChangeApplicationEvent extends ApplicationEvent {
+public class AssignmentChangeEvent extends ApplicationEvent {
private final Map<TopicPartition, OffsetAndMetadata> offsets;
private final long currentTimeMs;
- public AssignmentChangeApplicationEvent(final Map<TopicPartition,
OffsetAndMetadata> offsets,
- final long currentTimeMs) {
+ public AssignmentChangeEvent(final Map<TopicPartition, OffsetAndMetadata>
offsets, final long currentTimeMs) {
super(Type.ASSIGNMENT_CHANGE);
this.offsets = Collections.unmodifiableMap(offsets);
this.currentTimeMs = currentTimeMs;
@@ -43,31 +42,7 @@ public class AssignmentChangeApplicationEvent extends
ApplicationEvent {
}
@Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- if (!super.equals(o)) return false;
-
- AssignmentChangeApplicationEvent that =
(AssignmentChangeApplicationEvent) o;
-
- if (currentTimeMs != that.currentTimeMs) return false;
- return offsets.equals(that.offsets);
- }
-
- @Override
- public int hashCode() {
- int result = super.hashCode();
- result = 31 * result + offsets.hashCode();
- result = 31 * result + (int) (currentTimeMs ^ (currentTimeMs >>> 32));
- return result;
- }
-
- @Override
- public String toString() {
- return "AssignmentChangeApplicationEvent{" +
- toStringBase() +
- ", offsets=" + offsets +
- ", currentTimeMs=" + currentTimeMs +
- '}';
+ protected String toStringBase() {
+ return super.toStringBase() + ", offsets=" + offsets + ",
currentTimeMs=" + currentTimeMs;
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncCommitApplicationEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncCommitEvent.java
similarity index 73%
rename from
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncCommitApplicationEvent.java
rename to
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncCommitEvent.java
index 7a939ce3cfd..2f03fdfb1e5 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncCommitApplicationEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncCommitEvent.java
@@ -18,22 +18,15 @@ package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
+
import java.util.Map;
/**
* Event to commit offsets without waiting for a response, so the request
won't be retried.
*/
-public class AsyncCommitApplicationEvent extends CommitApplicationEvent {
-
- public AsyncCommitApplicationEvent(final Map<TopicPartition,
OffsetAndMetadata> offsets) {
- super(offsets, Type.COMMIT_ASYNC);
- }
+public class AsyncCommitEvent extends CommitEvent {
- @Override
- public String toString() {
- return "AsyncCommitApplicationEvent{" +
- toStringBase() +
- ", offsets=" + offsets() +
- '}';
+ public AsyncCommitEvent(final Map<TopicPartition, OffsetAndMetadata>
offsets) {
+ super(Type.COMMIT_ASYNC, offsets);
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
index e5d522201ef..9bc3fbebc30 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
@@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
+import org.apache.kafka.common.Uuid;
import java.util.Objects;
@@ -31,36 +32,45 @@ public abstract class BackgroundEvent {
private final Type type;
- public BackgroundEvent(Type type) {
+ /**
+ * This identifies a particular event. It is used to disambiguate events
via {@link #hashCode()} and
+ * {@link #equals(Object)} and can be used in log messages when debugging.
+ */
+ private final Uuid id;
+
+ protected BackgroundEvent(Type type) {
this.type = Objects.requireNonNull(type);
+ this.id = Uuid.randomUuid();
}
public Type type() {
return type;
}
+ public Uuid id() {
+ return id;
+ }
+
@Override
- public boolean equals(Object o) {
+ public final boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
-
BackgroundEvent that = (BackgroundEvent) o;
-
- return type == that.type;
+ return type == that.type && id.equals(that.id);
}
@Override
- public int hashCode() {
- return type.hashCode();
+ public final int hashCode() {
+ return Objects.hash(type, id);
}
protected String toStringBase() {
- return "type=" + type;
+ return "type=" + type + ", id=" + id;
}
@Override
- public String toString() {
- return "BackgroundEvent{" +
+ public final String toString() {
+ return getClass().getSimpleName() + "{" +
toStringBase() +
'}';
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java
index 103493d2531..48421484f1d 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java
@@ -26,7 +26,7 @@ import java.util.Queue;
/**
* An event handler that receives {@link BackgroundEvent background events}
from the
* {@link ConsumerNetworkThread network thread} which are then made available
to the application thread
- * via the {@link BackgroundEventProcessor}.
+ * via an {@link EventProcessor}.
*/
public class BackgroundEventHandler {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java
similarity index 71%
rename from
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java
rename to
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java
index 69d969d7b0f..253d27e2573 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java
@@ -22,14 +22,14 @@ import org.apache.kafka.common.TopicPartition;
import java.util.Collections;
import java.util.Map;
-public abstract class CommitApplicationEvent extends
CompletableApplicationEvent<Void> {
+public abstract class CommitEvent extends CompletableApplicationEvent<Void> {
/**
* Offsets to commit per partition.
*/
private final Map<TopicPartition, OffsetAndMetadata> offsets;
- public CommitApplicationEvent(final Map<TopicPartition, OffsetAndMetadata>
offsets, Type type) {
+ protected CommitEvent(final Type type, final Map<TopicPartition,
OffsetAndMetadata> offsets) {
super(type);
this.offsets = Collections.unmodifiableMap(offsets);
@@ -45,20 +45,7 @@ public abstract class CommitApplicationEvent extends
CompletableApplicationEvent
}
@Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- if (!super.equals(o)) return false;
-
- CommitApplicationEvent that = (CommitApplicationEvent) o;
-
- return offsets.equals(that.offsets);
- }
-
- @Override
- public int hashCode() {
- int result = super.hashCode();
- result = 31 * result + offsets.hashCode();
- return result;
+ protected String toStringBase() {
+ return super.toStringBase() + ", offsets=" + offsets;
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitOnCloseApplicationEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitOnCloseEvent.java
similarity index 76%
copy from
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitOnCloseApplicationEvent.java
copy to
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitOnCloseEvent.java
index 4cc07e945f9..7d2e29fcedb 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitOnCloseApplicationEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitOnCloseEvent.java
@@ -16,16 +16,9 @@
*/
package org.apache.kafka.clients.consumer.internals.events;
-public class CommitOnCloseApplicationEvent extends ApplicationEvent {
+public class CommitOnCloseEvent extends ApplicationEvent {
- public CommitOnCloseApplicationEvent() {
+ public CommitOnCloseEvent() {
super(Type.COMMIT_ON_CLOSE);
}
-
- @Override
- public String toString() {
- return "CommitOnCloseApplicationEvent{" +
- toStringBase() +
- '}';
- }
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java
index 365c620e0c0..a62c3aaa4c4 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java
@@ -16,9 +16,6 @@
*/
package org.apache.kafka.clients.consumer.internals.events;
-import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
-import org.apache.kafka.common.utils.Timer;
-
import java.util.concurrent.CompletableFuture;
/**
@@ -31,56 +28,18 @@ public abstract class CompletableApplicationEvent<T>
extends ApplicationEvent im
private final CompletableFuture<T> future;
- protected CompletableApplicationEvent(Type type) {
+ protected CompletableApplicationEvent(final Type type) {
super(type);
this.future = new CompletableFuture<>();
}
+ @Override
public CompletableFuture<T> future() {
return future;
}
- public T get(Timer timer) {
- return ConsumerUtils.getResult(future, timer);
- }
-
- public void chain(final CompletableFuture<T> providedFuture) {
- providedFuture.whenComplete((value, exception) -> {
- if (exception != null) {
- this.future.completeExceptionally(exception);
- } else {
- this.future.complete(value);
- }
- });
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- if (!super.equals(o)) return false;
-
- CompletableApplicationEvent<?> that = (CompletableApplicationEvent<?>)
o;
-
- return future.equals(that.future);
- }
-
- @Override
- public int hashCode() {
- int result = super.hashCode();
- result = 31 * result + future.hashCode();
- return result;
- }
-
@Override
protected String toStringBase() {
return super.toStringBase() + ", future=" + future;
}
-
- @Override
- public String toString() {
- return getClass().getSimpleName() + "{" +
- toStringBase() +
- '}';
- }
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableBackgroundEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableBackgroundEvent.java
index 640ee6103af..1a58515a5cb 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableBackgroundEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableBackgroundEvent.java
@@ -28,42 +28,18 @@ public abstract class CompletableBackgroundEvent<T> extends
BackgroundEvent impl
private final CompletableFuture<T> future;
- protected CompletableBackgroundEvent(Type type) {
+ protected CompletableBackgroundEvent(final Type type) {
super(type);
this.future = new CompletableFuture<>();
}
+ @Override
public CompletableFuture<T> future() {
return future;
}
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- if (!super.equals(o)) return false;
-
- CompletableBackgroundEvent<?> that = (CompletableBackgroundEvent<?>) o;
-
- return future.equals(that.future);
- }
-
- @Override
- public int hashCode() {
- int result = super.hashCode();
- result = 31 * result + future.hashCode();
- return result;
- }
-
@Override
protected String toStringBase() {
return super.toStringBase() + ", future=" + future;
}
-
- @Override
- public String toString() {
- return getClass().getSimpleName() + "{" +
- toStringBase() +
- '}';
- }
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEvent.java
index 8fdcc20fa83..97559d8cb9b 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEvent.java
@@ -21,5 +21,4 @@ import java.util.concurrent.CompletableFuture;
public interface CompletableEvent<T> {
CompletableFuture<T> future();
-
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRebalanceListenerCallbackCompletedEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRebalanceListenerCallbackCompletedEvent.java
index b260c6154ea..a10e98df1d0 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRebalanceListenerCallbackCompletedEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRebalanceListenerCallbackCompletedEvent.java
@@ -34,9 +34,9 @@ public class ConsumerRebalanceListenerCallbackCompletedEvent
extends Application
private final CompletableFuture<Void> future;
private final Optional<KafkaException> error;
- public
ConsumerRebalanceListenerCallbackCompletedEvent(ConsumerRebalanceListenerMethodName
methodName,
-
CompletableFuture<Void> future,
-
Optional<KafkaException> error) {
+ public ConsumerRebalanceListenerCallbackCompletedEvent(final
ConsumerRebalanceListenerMethodName methodName,
+ final
CompletableFuture<Void> future,
+ final
Optional<KafkaException> error) {
super(Type.CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED);
this.methodName = Objects.requireNonNull(methodName);
this.future = Objects.requireNonNull(future);
@@ -55,24 +55,6 @@ public class ConsumerRebalanceListenerCallbackCompletedEvent
extends Application
return error;
}
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- if (!super.equals(o)) return false;
-
- ConsumerRebalanceListenerCallbackCompletedEvent that =
(ConsumerRebalanceListenerCallbackCompletedEvent) o;
-
- return methodName == that.methodName &&
- future.equals(that.future) &&
- error.equals(that.error);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(methodName, future, error);
- }
-
@Override
protected String toStringBase() {
return super.toStringBase() +
@@ -80,11 +62,4 @@ public class ConsumerRebalanceListenerCallbackCompletedEvent
extends Application
", future=" + future +
", error=" + error;
}
-
- @Override
- public String toString() {
- return getClass().getSimpleName() + "{" +
- toStringBase() +
- '}';
- }
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRebalanceListenerCallbackNeededEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRebalanceListenerCallbackNeededEvent.java
index 7b17c034abd..6ce833580c8 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRebalanceListenerCallbackNeededEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRebalanceListenerCallbackNeededEvent.java
@@ -37,8 +37,8 @@ public class ConsumerRebalanceListenerCallbackNeededEvent
extends CompletableBac
private final ConsumerRebalanceListenerMethodName methodName;
private final SortedSet<TopicPartition> partitions;
- public
ConsumerRebalanceListenerCallbackNeededEvent(ConsumerRebalanceListenerMethodName
methodName,
-
SortedSet<TopicPartition> partitions) {
+ public ConsumerRebalanceListenerCallbackNeededEvent(final
ConsumerRebalanceListenerMethodName methodName,
+ final
SortedSet<TopicPartition> partitions) {
super(Type.CONSUMER_REBALANCE_LISTENER_CALLBACK_NEEDED);
this.methodName = Objects.requireNonNull(methodName);
this.partitions = Collections.unmodifiableSortedSet(partitions);
@@ -52,36 +52,10 @@ public class ConsumerRebalanceListenerCallbackNeededEvent
extends CompletableBac
return partitions;
}
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- if (!super.equals(o)) return false;
-
- ConsumerRebalanceListenerCallbackNeededEvent that =
(ConsumerRebalanceListenerCallbackNeededEvent) o;
-
- return methodName == that.methodName &&
partitions.equals(that.partitions);
- }
-
- @Override
- public int hashCode() {
- int result = super.hashCode();
- result = 31 * result + methodName.hashCode();
- result = 31 * result + partitions.hashCode();
- return result;
- }
-
@Override
protected String toStringBase() {
return super.toStringBase() +
", methodName=" + methodName +
", partitions=" + partitions;
}
-
- @Override
- public String toString() {
- return getClass().getSimpleName() + "{" +
- toStringBase() +
- '}';
- }
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ErrorBackgroundEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ErrorBackgroundEvent.java
deleted file mode 100644
index 2945f22986b..00000000000
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ErrorBackgroundEvent.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.consumer.internals.events;
-
-import org.apache.kafka.common.KafkaException;
-
-public class ErrorBackgroundEvent extends BackgroundEvent {
-
- private final RuntimeException error;
-
- public ErrorBackgroundEvent(Throwable t) {
- super(Type.ERROR);
- this.error = t instanceof RuntimeException ? (RuntimeException) t :
new KafkaException(t);
- }
-
- public RuntimeException error() {
- return error;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- if (!super.equals(o)) return false;
-
- ErrorBackgroundEvent that = (ErrorBackgroundEvent) o;
-
- return error.equals(that.error);
- }
-
- @Override
- public int hashCode() {
- int result = super.hashCode();
- result = 31 * result + error.hashCode();
- return result;
- }
-
- @Override
- public String toString() {
- return "ErrorBackgroundEvent{" +
- toStringBase() +
- ", error=" + error +
- '}';
- }
-}
\ No newline at end of file
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitOnCloseApplicationEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ErrorEvent.java
similarity index 65%
copy from
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitOnCloseApplicationEvent.java
copy to
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ErrorEvent.java
index 4cc07e945f9..5e6d8223823 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitOnCloseApplicationEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ErrorEvent.java
@@ -16,16 +16,23 @@
*/
package org.apache.kafka.clients.consumer.internals.events;
-public class CommitOnCloseApplicationEvent extends ApplicationEvent {
+import org.apache.kafka.common.KafkaException;
- public CommitOnCloseApplicationEvent() {
- super(Type.COMMIT_ON_CLOSE);
+public class ErrorEvent extends BackgroundEvent {
+
+ private final RuntimeException error;
+
+ public ErrorEvent(Throwable t) {
+ super(Type.ERROR);
+ this.error = t instanceof RuntimeException ? (RuntimeException) t :
new KafkaException(t);
+ }
+
+ public RuntimeException error() {
+ return error;
}
@Override
- public String toString() {
- return "CommitOnCloseApplicationEvent{" +
- toStringBase() +
- '}';
+ public String toStringBase() {
+ return super.toStringBase() + ", error=" + error;
}
-}
+}
\ No newline at end of file
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/FetchCommittedOffsetsApplicationEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/FetchCommittedOffsetsEvent.java
similarity index 60%
rename from
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/FetchCommittedOffsetsApplicationEvent.java
rename to
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/FetchCommittedOffsetsEvent.java
index 34b2d97705c..7cf56b990b0 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/FetchCommittedOffsetsApplicationEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/FetchCommittedOffsetsEvent.java
@@ -23,7 +23,7 @@ import java.util.Collections;
import java.util.Map;
import java.util.Set;
-public class FetchCommittedOffsetsApplicationEvent extends
CompletableApplicationEvent<Map<TopicPartition, OffsetAndMetadata>> {
+public class FetchCommittedOffsetsEvent extends
CompletableApplicationEvent<Map<TopicPartition, OffsetAndMetadata>> {
/**
* Partitions to retrieve committed offsets for.
@@ -35,8 +35,7 @@ public class FetchCommittedOffsetsApplicationEvent extends
CompletableApplicatio
*/
private final long timeoutMs;
- public FetchCommittedOffsetsApplicationEvent(final Set<TopicPartition>
partitions,
- final long timeoutMs) {
+ public FetchCommittedOffsetsEvent(final Set<TopicPartition> partitions,
final long timeoutMs) {
super(Type.FETCH_COMMITTED_OFFSETS);
this.partitions = Collections.unmodifiableSet(partitions);
this.timeoutMs = timeoutMs;
@@ -51,29 +50,7 @@ public class FetchCommittedOffsetsApplicationEvent extends
CompletableApplicatio
}
@Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- if (!super.equals(o)) return false;
-
- FetchCommittedOffsetsApplicationEvent that =
(FetchCommittedOffsetsApplicationEvent) o;
-
- return partitions.equals(that.partitions);
- }
-
- @Override
- public int hashCode() {
- int result = super.hashCode();
- result = 31 * result + partitions.hashCode();
- return result;
- }
-
- @Override
- public String toString() {
- return getClass().getSimpleName() + "{" +
- toStringBase() +
- ", partitions=" + partitions +
- ", timeout=" + timeoutMs + "ms" +
- '}';
+ public String toStringBase() {
+ return super.toStringBase() + ", partitions=" + partitions + ",
partitions=" + partitions;
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/GroupMetadataUpdateEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/GroupMetadataUpdateEvent.java
index 120e6717242..001f5498183 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/GroupMetadataUpdateEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/GroupMetadataUpdateEvent.java
@@ -19,8 +19,6 @@ package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
-import java.util.Objects;
-
/**
* This event is sent by the {@link ConsumerNetworkThread consumer's network
thread} to the application thread
* so that when the user calls the {@link Consumer#groupMetadata()} API, the
information is up-to-date. The
@@ -29,11 +27,10 @@ import java.util.Objects;
*/
public class GroupMetadataUpdateEvent extends BackgroundEvent {
- final private int memberEpoch;
- final private String memberId;
+ private final int memberEpoch;
+ private final String memberId;
- public GroupMetadataUpdateEvent(final int memberEpoch,
- final String memberId) {
+ public GroupMetadataUpdateEvent(final int memberEpoch, final String
memberId) {
super(Type.GROUP_METADATA_UPDATE);
this.memberEpoch = memberEpoch;
this.memberId = memberId;
@@ -47,33 +44,10 @@ public class GroupMetadataUpdateEvent extends
BackgroundEvent {
return memberId;
}
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- if (!super.equals(o)) return false;
- GroupMetadataUpdateEvent that = (GroupMetadataUpdateEvent) o;
- return memberEpoch == that.memberEpoch &&
- Objects.equals(memberId, that.memberId);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(super.hashCode(), memberEpoch, memberId);
- }
-
@Override
public String toStringBase() {
return super.toStringBase() +
", memberEpoch=" + memberEpoch +
", memberId='" + memberId + '\'';
}
-
- @Override
- public String toString() {
- return "GroupMetadataUpdateEvent{" +
- toStringBase() +
- '}';
- }
-
}
\ No newline at end of file
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseApplicationEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseEvent.java
similarity index 76%
rename from
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseApplicationEvent.java
rename to
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseEvent.java
index ee0b6ffa61c..5ee19a7cc02 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseApplicationEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveOnCloseEvent.java
@@ -16,15 +16,9 @@
*/
package org.apache.kafka.clients.consumer.internals.events;
-public class LeaveOnCloseApplicationEvent extends
CompletableApplicationEvent<Void> {
- public LeaveOnCloseApplicationEvent() {
- super(Type.LEAVE_ON_CLOSE);
- }
+public class LeaveOnCloseEvent extends CompletableApplicationEvent<Void> {
- @Override
- public String toString() {
- return "LeaveOnCloseApplicationEvent{" +
- toStringBase() +
- '}';
+ public LeaveOnCloseEvent() {
+ super(Type.LEAVE_ON_CLOSE);
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsApplicationEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java
similarity index 69%
rename from
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsApplicationEvent.java
rename to
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java
index 2466d062726..fd3b321173f 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsApplicationEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java
@@ -31,12 +31,12 @@ import java.util.Map;
* {@link OffsetAndTimestamp} found (offset of the first message whose
timestamp is greater than
* or equals to the target timestamp)
*/
-public class ListOffsetsApplicationEvent extends
CompletableApplicationEvent<Map<TopicPartition, OffsetAndTimestamp>> {
+public class ListOffsetsEvent extends
CompletableApplicationEvent<Map<TopicPartition, OffsetAndTimestamp>> {
private final Map<TopicPartition, Long> timestampsToSearch;
private final boolean requireTimestamps;
- public ListOffsetsApplicationEvent(Map<TopicPartition, Long>
timestampToSearch, boolean requireTimestamps) {
+ public ListOffsetsEvent(final Map<TopicPartition, Long> timestampToSearch,
final boolean requireTimestamps) {
super(Type.LIST_OFFSETS);
this.timestampsToSearch =
Collections.unmodifiableMap(timestampToSearch);
this.requireTimestamps = requireTimestamps;
@@ -64,31 +64,10 @@ public class ListOffsetsApplicationEvent extends
CompletableApplicationEvent<Map
}
@Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- if (!super.equals(o)) return false;
-
- ListOffsetsApplicationEvent that = (ListOffsetsApplicationEvent) o;
-
- if (requireTimestamps != that.requireTimestamps) return false;
- return timestampsToSearch.equals(that.timestampsToSearch);
- }
-
- @Override
- public int hashCode() {
- int result = super.hashCode();
- result = 31 * result + timestampsToSearch.hashCode();
- result = 31 * result + (requireTimestamps ? 1 : 0);
- return result;
- }
-
- @Override
- public String toString() {
- return getClass().getSimpleName() + " {" +
- toStringBase() +
- ", timestampsToSearch=" + timestampsToSearch + ", " +
- "requireTimestamps=" + requireTimestamps + '}';
+ public String toStringBase() {
+ return super.toStringBase() +
+ ", timestampsToSearch=" + timestampsToSearch +
+ ", requireTimestamps=" + requireTimestamps;
}
}
\ No newline at end of file
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NewTopicsMetadataUpdateRequestEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NewTopicsMetadataUpdateRequestEvent.java
index c06a3a717dc..b06bd456f5f 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NewTopicsMetadataUpdateRequestEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NewTopicsMetadataUpdateRequestEvent.java
@@ -21,11 +21,4 @@ public class NewTopicsMetadataUpdateRequestEvent extends
ApplicationEvent {
public NewTopicsMetadataUpdateRequestEvent() {
super(Type.NEW_TOPICS_METADATA_UPDATE);
}
-
- @Override
- public String toString() {
- return "NewTopicsMetadataUpdateRequestEvent{" +
- toStringBase() +
- '}';
- }
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollApplicationEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollApplicationEvent.java
deleted file mode 100644
index b958f0ec417..00000000000
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollApplicationEvent.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.consumer.internals.events;
-
-public class PollApplicationEvent extends ApplicationEvent {
-
- private final long pollTimeMs;
-
- public PollApplicationEvent(final long pollTimeMs) {
- super(Type.POLL);
- this.pollTimeMs = pollTimeMs;
- }
-
- public long pollTimeMs() {
- return pollTimeMs;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- if (!super.equals(o)) return false;
-
- PollApplicationEvent that = (PollApplicationEvent) o;
-
- return pollTimeMs == that.pollTimeMs;
- }
-
- @Override
- public int hashCode() {
- int result = super.hashCode();
- result = 31 * result + (int) (pollTimeMs ^ (pollTimeMs >>> 32));
- return result;
- }
-
- @Override
- public String toString() {
- return "PollApplicationEvent{" +
- toStringBase() +
- ", pollTimeMs=" + pollTimeMs +
- '}';
- }
-}
\ No newline at end of file
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NewTopicsMetadataUpdateRequestEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollEvent.java
similarity index 70%
copy from
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NewTopicsMetadataUpdateRequestEvent.java
copy to
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollEvent.java
index c06a3a717dc..96614c06e9b 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/NewTopicsMetadataUpdateRequestEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollEvent.java
@@ -16,16 +16,21 @@
*/
package org.apache.kafka.clients.consumer.internals.events;
-public class NewTopicsMetadataUpdateRequestEvent extends ApplicationEvent {
+public class PollEvent extends ApplicationEvent {
- public NewTopicsMetadataUpdateRequestEvent() {
- super(Type.NEW_TOPICS_METADATA_UPDATE);
+ private final long pollTimeMs;
+
+ public PollEvent(final long pollTimeMs) {
+ super(Type.POLL);
+ this.pollTimeMs = pollTimeMs;
+ }
+
+ public long pollTimeMs() {
+ return pollTimeMs;
}
@Override
- public String toString() {
- return "NewTopicsMetadataUpdateRequestEvent{" +
- toStringBase() +
- '}';
+ public String toStringBase() {
+ return super.toStringBase() + ", pollTimeMs=" + pollTimeMs;
}
-}
+}
\ No newline at end of file
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsApplicationEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsEvent.java
similarity index 89%
rename from
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsApplicationEvent.java
rename to
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsEvent.java
index 5d9b07f9de0..06f6ebbb68a 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsApplicationEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ResetPositionsEvent.java
@@ -22,9 +22,9 @@ package org.apache.kafka.clients.consumer.internals.events;
* asynchronous event that generates ListOffsets requests, and completes by
updating in-memory
* positions when responses are received.
*/
-public class ResetPositionsApplicationEvent extends
CompletableApplicationEvent<Void> {
+public class ResetPositionsEvent extends CompletableApplicationEvent<Void> {
- public ResetPositionsApplicationEvent() {
+ public ResetPositionsEvent() {
super(Type.RESET_POSITIONS);
}
}
\ No newline at end of file
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SubscriptionChangeApplicationEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SubscriptionChangeEvent.java
similarity index 90%
rename from
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SubscriptionChangeApplicationEvent.java
rename to
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SubscriptionChangeEvent.java
index 73fd15fb144..ad5fd34c06f 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SubscriptionChangeApplicationEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SubscriptionChangeEvent.java
@@ -22,9 +22,9 @@ package org.apache.kafka.clients.consumer.internals.events;
* calls the subscribe API. This will make the consumer join a consumer group
if not part of it
* yet, or just send the updated subscription to the broker if it's already a
member of the group.
*/
-public class SubscriptionChangeApplicationEvent extends ApplicationEvent {
+public class SubscriptionChangeEvent extends ApplicationEvent {
- public SubscriptionChangeApplicationEvent() {
+ public SubscriptionChangeEvent() {
super(Type.SUBSCRIPTION_CHANGE);
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitApplicationEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitEvent.java
similarity index 73%
rename from
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitApplicationEvent.java
rename to
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitEvent.java
index 43dfee6ab18..7e00e0da596 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitApplicationEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitEvent.java
@@ -18,22 +18,23 @@ package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
+
import java.util.Map;
/**
* Event to commit offsets waiting for a response and retrying on expected
retriable errors until
* the timer expires.
*/
-public class SyncCommitApplicationEvent extends CommitApplicationEvent {
+public class SyncCommitEvent extends CommitEvent {
/**
* Time to wait for a response, retrying on retriable errors.
*/
private final long retryTimeoutMs;
- public SyncCommitApplicationEvent(final Map<TopicPartition,
OffsetAndMetadata> offsets,
- final long retryTimeoutMs) {
- super(offsets, Type.COMMIT_SYNC);
+ public SyncCommitEvent(final Map<TopicPartition, OffsetAndMetadata>
offsets,
+ final long retryTimeoutMs) {
+ super(Type.COMMIT_SYNC, offsets);
this.retryTimeoutMs = retryTimeoutMs;
}
@@ -42,11 +43,7 @@ public class SyncCommitApplicationEvent extends
CommitApplicationEvent {
}
@Override
- public String toString() {
- return "SyncCommitApplicationEvent{" +
- toStringBase() +
- ", offsets=" + offsets() +
- ", retryTimeout=" + retryTimeoutMs + "ms" +
- '}';
+ public String toStringBase() {
+ return super.toStringBase() + ", offsets=" + offsets() + ",
retryTimeoutMs=" + retryTimeoutMs;
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicMetadataApplicationEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicMetadataApplicationEvent.java
deleted file mode 100644
index dd6f842cc26..00000000000
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicMetadataApplicationEvent.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.consumer.internals.events;
-
-import org.apache.kafka.common.PartitionInfo;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-public class TopicMetadataApplicationEvent extends
CompletableApplicationEvent<Map<String, List<PartitionInfo>>> {
- private final String topic;
- private final boolean allTopics;
- private final long timeoutMs;
-
- public TopicMetadataApplicationEvent(final long timeoutMs) {
- super(Type.TOPIC_METADATA);
- this.topic = null;
- this.allTopics = true;
- this.timeoutMs = timeoutMs;
- }
-
- public TopicMetadataApplicationEvent(final String topic, final long
timeoutMs) {
- super(Type.TOPIC_METADATA);
- this.topic = topic;
- this.allTopics = false;
- this.timeoutMs = timeoutMs;
- }
-
- public String topic() {
- return topic;
- }
-
- public boolean isAllTopics() {
- return allTopics;
- }
-
- public long getTimeoutMs() {
- return timeoutMs;
- }
- @Override
- public String toString() {
- return getClass().getSimpleName() + " {" + toStringBase() +
- ", topic=" + topic +
- ", allTopics=" + allTopics +
- ", timeoutMs=" + timeoutMs + "}";
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (!(o instanceof TopicMetadataApplicationEvent)) return false;
- if (!super.equals(o)) return false;
-
- TopicMetadataApplicationEvent that = (TopicMetadataApplicationEvent) o;
-
- return topic.equals(that.topic) && (allTopics == that.allTopics) &&
(timeoutMs == that.timeoutMs);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(super.hashCode(), topic, allTopics, timeoutMs);
- }
-}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitOnCloseApplicationEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicMetadataEvent.java
similarity index 66%
rename from
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitOnCloseApplicationEvent.java
rename to
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicMetadataEvent.java
index 4cc07e945f9..ebbb2a6c468 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitOnCloseApplicationEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/TopicMetadataEvent.java
@@ -16,16 +16,23 @@
*/
package org.apache.kafka.clients.consumer.internals.events;
-public class CommitOnCloseApplicationEvent extends ApplicationEvent {
+import java.util.Objects;
- public CommitOnCloseApplicationEvent() {
- super(Type.COMMIT_ON_CLOSE);
+public class TopicMetadataEvent extends AbstractTopicMetadataEvent {
+
+ private final String topic;
+
+ public TopicMetadataEvent(final String topic, final long timeoutMs) {
+ super(Type.TOPIC_METADATA, timeoutMs);
+ this.topic = Objects.requireNonNull(topic);
+ }
+
+ public String topic() {
+ return topic;
}
@Override
- public String toString() {
- return "CommitOnCloseApplicationEvent{" +
- toStringBase() +
- '}';
+ public String toStringBase() {
+ return super.toStringBase() + ", topic=" + topic;
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UnsubscribeApplicationEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UnsubscribeEvent.java
similarity index 91%
rename from
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UnsubscribeApplicationEvent.java
rename to
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UnsubscribeEvent.java
index a1ccb896fdf..07af36e5feb 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UnsubscribeApplicationEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UnsubscribeEvent.java
@@ -24,8 +24,9 @@ package org.apache.kafka.clients.consumer.internals.events;
* complete and the heartbeat to leave the group is sent out (minimal effort
to send the
* leave group heartbeat, without waiting for any response or considering
timeouts).
*/
-public class UnsubscribeApplicationEvent extends
CompletableApplicationEvent<Void> {
- public UnsubscribeApplicationEvent() {
+public class UnsubscribeEvent extends CompletableApplicationEvent<Void> {
+
+ public UnsubscribeEvent() {
super(Type.UNSUBSCRIBE);
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ValidatePositionsApplicationEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ValidatePositionsEvent.java
similarity index 89%
rename from
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ValidatePositionsApplicationEvent.java
rename to
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ValidatePositionsEvent.java
index 3b093e0b683..efa358b4c78 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ValidatePositionsApplicationEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ValidatePositionsEvent.java
@@ -22,9 +22,9 @@ package org.apache.kafka.clients.consumer.internals.events;
* detected. This is an asynchronous event that generates OffsetForLeaderEpoch
requests, and
* completes by validating in-memory positions against the offsets received in
the responses.
*/
-public class ValidatePositionsApplicationEvent extends
CompletableApplicationEvent<Void> {
+public class ValidatePositionsEvent extends CompletableApplicationEvent<Void> {
- public ValidatePositionsApplicationEvent() {
+ public ValidatePositionsEvent() {
super(Type.VALIDATE_POSITIONS);
}
}
\ No newline at end of file
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 976677dec86..2d6899612f0 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -31,26 +31,26 @@ import
org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.clients.consumer.RoundRobinAssignor;
import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
-import
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
-import
org.apache.kafka.clients.consumer.internals.events.AsyncCommitApplicationEvent;
+import
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
+import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
-import
org.apache.kafka.clients.consumer.internals.events.CommitOnCloseApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseEvent;
import
org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
import
org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent;
import
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
-import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
-import
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsApplicationEvent;
+import
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
import
org.apache.kafka.clients.consumer.internals.events.GroupMetadataUpdateEvent;
-import
org.apache.kafka.clients.consumer.internals.events.LeaveOnCloseApplicationEvent;
-import
org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.LeaveOnCloseEvent;
+import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import
org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
-import org.apache.kafka.clients.consumer.internals.events.PollApplicationEvent;
-import
org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent;
-import
org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeApplicationEvent;
-import
org.apache.kafka.clients.consumer.internals.events.SyncCommitApplicationEvent;
-import
org.apache.kafka.clients.consumer.internals.events.UnsubscribeApplicationEvent;
-import
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.PollEvent;
+import org.apache.kafka.clients.consumer.internals.events.ResetPositionsEvent;
+import
org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeEvent;
+import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
+import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsEvent;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
@@ -256,9 +256,9 @@ public class AsyncKafkaConsumerTest {
consumer.commitAsync(offsets, null);
- final ArgumentCaptor<AsyncCommitApplicationEvent> commitEventCaptor =
ArgumentCaptor.forClass(AsyncCommitApplicationEvent.class);
+ final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor =
ArgumentCaptor.forClass(AsyncCommitEvent.class);
verify(applicationEventHandler).add(commitEventCaptor.capture());
- final AsyncCommitApplicationEvent commitEvent =
commitEventCaptor.getValue();
+ final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
assertEquals(offsets, commitEvent.offsets());
assertDoesNotThrow(() -> commitEvent.future().complete(null));
assertDoesNotThrow(() -> consumer.commitAsync(offsets, null));
@@ -310,9 +310,9 @@ public class AsyncKafkaConsumerTest {
assertDoesNotThrow(() -> consumer.commitAsync(offsets, callback));
- final ArgumentCaptor<AsyncCommitApplicationEvent> commitEventCaptor =
ArgumentCaptor.forClass(AsyncCommitApplicationEvent.class);
+ final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor =
ArgumentCaptor.forClass(AsyncCommitEvent.class);
verify(applicationEventHandler).add(commitEventCaptor.capture());
- final AsyncCommitApplicationEvent commitEvent =
commitEventCaptor.getValue();
+ final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
commitEvent.future().completeExceptionally(Errors.FENCED_INSTANCE_ID.exception());
assertThrows(Errors.FENCED_INSTANCE_ID.exception().getClass(), () ->
consumer.commitAsync());
@@ -325,7 +325,7 @@ public class AsyncKafkaConsumerTest {
completeFetchedCommittedOffsetApplicationEventSuccessfully(topicPartitionOffsets);
assertEquals(topicPartitionOffsets,
consumer.committed(topicPartitionOffsets.keySet(), Duration.ofMillis(1000)));
-
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class),
any());
+
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class),
any());
final Metric metric = consumer.metrics()
.get(consumer.metricsRegistry().metricName("committed-time-ns-total",
"consumer-metrics"));
assertTrue((double) metric.metricValue() > 0);
@@ -347,7 +347,7 @@ public class AsyncKafkaConsumerTest {
verify(metadata).updateLastSeenEpochIfNewer(t0, 2);
verify(metadata).updateLastSeenEpochIfNewer(t2, 3);
-
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class),
any());
+
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class),
any());
}
@Test
@@ -355,9 +355,9 @@ public class AsyncKafkaConsumerTest {
consumer = newConsumer();
Map<TopicPartition, OffsetAndMetadata> offsets =
mockTopicPartitionOffset();
when(applicationEventHandler.addAndGet(
- any(FetchCommittedOffsetsApplicationEvent.class),
any())).thenAnswer(invocation -> {
+ any(FetchCommittedOffsetsEvent.class),
any())).thenAnswer(invocation -> {
CompletableApplicationEvent<?> event =
invocation.getArgument(0);
- assertInstanceOf(FetchCommittedOffsetsApplicationEvent.class,
event);
+ assertInstanceOf(FetchCommittedOffsetsEvent.class, event);
throw new KafkaException("Test exception");
});
@@ -530,7 +530,7 @@ public class AsyncKafkaConsumerTest {
verify(metadata).updateLastSeenEpochIfNewer(t0, 2);
verify(metadata).updateLastSeenEpochIfNewer(t1, 1);
-
verify(applicationEventHandler).add(ArgumentMatchers.isA(SyncCommitApplicationEvent.class));
+
verify(applicationEventHandler).add(ArgumentMatchers.isA(SyncCommitEvent.class));
}
@Test
@@ -564,7 +564,7 @@ public class AsyncKafkaConsumerTest {
verify(metadata).updateLastSeenEpochIfNewer(t0, 2);
verify(metadata).updateLastSeenEpochIfNewer(t1, 1);
-
verify(applicationEventHandler).add(ArgumentMatchers.isA(AsyncCommitApplicationEvent.class));
+
verify(applicationEventHandler).add(ArgumentMatchers.isA(AsyncCommitEvent.class));
}
@Test
@@ -598,8 +598,8 @@ public class AsyncKafkaConsumerTest {
consumer = newConsumer();
doReturn(null).when(applicationEventHandler).addAndGet(any(), any());
consumer.close();
-
verify(applicationEventHandler).addAndGet(any(LeaveOnCloseApplicationEvent.class),
any());
-
verify(applicationEventHandler).add(any(CommitOnCloseApplicationEvent.class));
+
verify(applicationEventHandler).addAndGet(any(LeaveOnCloseEvent.class), any());
+ verify(applicationEventHandler).add(any(CommitOnCloseEvent.class));
}
@Test
@@ -641,7 +641,7 @@ public class AsyncKafkaConsumerTest {
subscriptions.assignFromSubscribed(singleton(tp));
doThrow(new
KafkaException()).when(listener).onPartitionsRevoked(eq(singleton(tp)));
assertThrows(KafkaException.class, () ->
consumer.close(Duration.ZERO));
- verify(applicationEventHandler,
never()).addAndGet(any(LeaveOnCloseApplicationEvent.class), any());
+ verify(applicationEventHandler,
never()).addAndGet(any(LeaveOnCloseEvent.class), any());
verify(listener).onPartitionsRevoked(eq(singleton(tp)));
assertEquals(emptySet(), subscriptions.assignedPartitions());
}
@@ -677,7 +677,7 @@ public class AsyncKafkaConsumerTest {
subscriptions.assignFromSubscribed(singleton(new
TopicPartition("topic", 0)));
subscriptions.seek(new TopicPartition("topic", 0), 100);
consumer.maybeAutoCommitSync(true, time.timer(100), null);
-
verify(applicationEventHandler).add(any(SyncCommitApplicationEvent.class));
+ verify(applicationEventHandler).add(any(SyncCommitEvent.class));
}
@Test
@@ -695,7 +695,7 @@ public class AsyncKafkaConsumerTest {
subscriptions.assignFromSubscribed(singleton(new
TopicPartition("topic", 0)));
subscriptions.seek(new TopicPartition("topic", 0), 100);
consumer.maybeAutoCommitSync(false, time.timer(100), null);
- verify(applicationEventHandler,
never()).add(any(SyncCommitApplicationEvent.class));
+ verify(applicationEventHandler,
never()).add(any(SyncCommitEvent.class));
}
private void assertMockCommitCallbackInvoked(final Executable task,
@@ -729,7 +729,7 @@ public class AsyncKafkaConsumerTest {
consumer.assign(singleton(tp));
assertTrue(consumer.subscription().isEmpty());
assertTrue(consumer.assignment().contains(tp));
-
verify(applicationEventHandler).add(any(AssignmentChangeApplicationEvent.class));
+ verify(applicationEventHandler).add(any(AssignmentChangeEvent.class));
verify(applicationEventHandler).add(any(NewTopicsMetadataUpdateRequestEvent.class));
}
@@ -781,7 +781,7 @@ public class AsyncKafkaConsumerTest {
Map<TopicPartition, Long> expectedOffsets =
expectedOffsetsAndTimestamp.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().offset()));
assertEquals(expectedOffsets, result);
-
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsApplicationEvent.class),
+
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsEvent.class),
ArgumentMatchers.isA(Timer.class));
}
@@ -792,13 +792,13 @@ public class AsyncKafkaConsumerTest {
Throwable eventProcessingFailure = new KafkaException("Unexpected
failure " +
"processing List Offsets event");
doThrow(eventProcessingFailure).when(applicationEventHandler).addAndGet(
- any(ListOffsetsApplicationEvent.class),
+ any(ListOffsetsEvent.class),
any());
Throwable consumerError = assertThrows(KafkaException.class,
() -> consumer.beginningOffsets(partitions,
Duration.ofMillis(1)));
assertEquals(eventProcessingFailure, consumerError);
-
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsApplicationEvent.class),
+
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsEvent.class),
ArgumentMatchers.isA(Timer.class));
}
@@ -810,7 +810,7 @@ public class AsyncKafkaConsumerTest {
() -> consumer.beginningOffsets(
Collections.singletonList(new TopicPartition("t1", 0)),
Duration.ofMillis(1)));
-
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsApplicationEvent.class),
+
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsEvent.class),
ArgumentMatchers.isA(Timer.class));
}
@@ -850,7 +850,7 @@ public class AsyncKafkaConsumerTest {
Map<TopicPartition, OffsetAndTimestamp> result =
assertDoesNotThrow(() ->
consumer.offsetsForTimes(timestampToSearch, Duration.ofMillis(1)));
assertEquals(expectedResult, result);
-
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsApplicationEvent.class),
+
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsEvent.class),
ArgumentMatchers.isA(Timer.class));
}
@@ -869,7 +869,7 @@ public class AsyncKafkaConsumerTest {
assertDoesNotThrow(() ->
consumer.offsetsForTimes(timestampToSearch,
Duration.ZERO));
assertEquals(expectedResult, result);
- verify(applicationEventHandler,
never()).addAndGet(ArgumentMatchers.isA(ListOffsetsApplicationEvent.class),
+ verify(applicationEventHandler,
never()).addAndGet(ArgumentMatchers.isA(ListOffsetsEvent.class),
ArgumentMatchers.isA(Timer.class));
}
@@ -880,12 +880,12 @@ public class AsyncKafkaConsumerTest {
doAnswer(invocation -> {
CompletableApplicationEvent<?> event = invocation.getArgument(0);
Timer timer = invocation.getArgument(1);
- assertInstanceOf(FetchCommittedOffsetsApplicationEvent.class,
event);
+ assertInstanceOf(FetchCommittedOffsetsEvent.class, event);
assertTrue(event.future().isCompletedExceptionally());
return ConsumerUtils.getResult(event.future(), timer);
})
.when(applicationEventHandler)
- .addAndGet(any(FetchCommittedOffsetsApplicationEvent.class),
any(Timer.class));
+ .addAndGet(any(FetchCommittedOffsetsEvent.class),
any(Timer.class));
consumer.wakeup();
assertThrows(WakeupException.class, () ->
consumer.committed(offsets.keySet()));
@@ -1011,7 +1011,7 @@ public class AsyncKafkaConsumerTest {
consumer.subscribe(singletonList(topic));
assertEquals(singleton(topic), consumer.subscription());
assertTrue(consumer.assignment().isEmpty());
-
verify(applicationEventHandler).add(ArgumentMatchers.isA(SubscriptionChangeApplicationEvent.class));
+
verify(applicationEventHandler).add(ArgumentMatchers.isA(SubscriptionChangeEvent.class));
}
@Test
@@ -1023,7 +1023,7 @@ public class AsyncKafkaConsumerTest {
assertTrue(consumer.subscription().isEmpty());
assertTrue(consumer.assignment().isEmpty());
-
verify(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeApplicationEvent.class));
+
verify(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeEvent.class));
}
@Test
@@ -1034,7 +1034,7 @@ public class AsyncKafkaConsumerTest {
consumer.subscribe(Collections.emptyList());
assertTrue(consumer.subscription().isEmpty());
assertTrue(consumer.assignment().isEmpty());
-
verify(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeApplicationEvent.class));
+
verify(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeEvent.class));
}
@Test
@@ -1356,8 +1356,8 @@ public class AsyncKafkaConsumerTest {
consumer = newConsumer(config);
final KafkaException expectedException = new KafkaException("Nobody
expects the Spanish Inquisition");
- final ErrorBackgroundEvent errorBackgroundEvent = new
ErrorBackgroundEvent(expectedException);
- backgroundEventQueue.add(errorBackgroundEvent);
+ final ErrorEvent errorEvent = new ErrorEvent(expectedException);
+ backgroundEventQueue.add(errorEvent);
consumer.assign(singletonList(new TopicPartition("topic", 0)));
final KafkaException exception = assertThrows(KafkaException.class, ()
-> consumer.poll(Duration.ZERO));
@@ -1371,11 +1371,11 @@ public class AsyncKafkaConsumerTest {
consumer = newConsumer(config);
final KafkaException expectedException1 = new KafkaException("Nobody
expects the Spanish Inquisition");
- final ErrorBackgroundEvent errorBackgroundEvent1 = new
ErrorBackgroundEvent(expectedException1);
- backgroundEventQueue.add(errorBackgroundEvent1);
+ final ErrorEvent errorEvent1 = new ErrorEvent(expectedException1);
+ backgroundEventQueue.add(errorEvent1);
final KafkaException expectedException2 = new KafkaException("Spam,
Spam, Spam");
- final ErrorBackgroundEvent errorBackgroundEvent2 = new
ErrorBackgroundEvent(expectedException2);
- backgroundEventQueue.add(errorBackgroundEvent2);
+ final ErrorEvent errorEvent2 = new ErrorEvent(expectedException2);
+ backgroundEventQueue.add(errorEvent2);
consumer.assign(singletonList(new TopicPartition("topic", 0)));
final KafkaException exception = assertThrows(KafkaException.class, ()
-> consumer.poll(Duration.ZERO));
@@ -1472,7 +1472,7 @@ public class AsyncKafkaConsumerTest {
consumer.subscribe(singletonList("topic1"));
consumer.poll(Duration.ofMillis(100));
- verify(applicationEventHandler).add(any(PollApplicationEvent.class));
+ verify(applicationEventHandler).add(any(PollEvent.class));
}
private void testInvalidGroupId(final String groupId) {
@@ -1510,20 +1510,20 @@ public class AsyncKafkaConsumerTest {
consumer.poll(Duration.ZERO);
verify(applicationEventHandler, atLeast(1))
-
.addAndGet(ArgumentMatchers.isA(ValidatePositionsApplicationEvent.class),
ArgumentMatchers.isA(Timer.class));
+ .addAndGet(ArgumentMatchers.isA(ValidatePositionsEvent.class),
ArgumentMatchers.isA(Timer.class));
if (committedOffsetsEnabled) {
// Verify there was an FetchCommittedOffsets event and no
ResetPositions event
verify(applicationEventHandler, atLeast(1))
-
.addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class),
ArgumentMatchers.isA(Timer.class));
+
.addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class),
ArgumentMatchers.isA(Timer.class));
verify(applicationEventHandler, never())
-
.addAndGet(ArgumentMatchers.isA(ResetPositionsApplicationEvent.class),
ArgumentMatchers.isA(Timer.class));
+ .addAndGet(ArgumentMatchers.isA(ResetPositionsEvent.class),
ArgumentMatchers.isA(Timer.class));
} else {
// Verify there was not any FetchCommittedOffsets event but there
should be a ResetPositions
verify(applicationEventHandler, never())
-
.addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class),
ArgumentMatchers.isA(Timer.class));
+
.addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class),
ArgumentMatchers.isA(Timer.class));
verify(applicationEventHandler, atLeast(1))
-
.addAndGet(ArgumentMatchers.isA(ResetPositionsApplicationEvent.class),
ArgumentMatchers.isA(Timer.class));
+ .addAndGet(ArgumentMatchers.isA(ResetPositionsEvent.class),
ArgumentMatchers.isA(Timer.class));
}
}
@@ -1538,11 +1538,11 @@ public class AsyncKafkaConsumerTest {
consumer.poll(Duration.ZERO);
verify(applicationEventHandler, atLeast(1))
-
.addAndGet(ArgumentMatchers.isA(ValidatePositionsApplicationEvent.class),
ArgumentMatchers.isA(Timer.class));
+ .addAndGet(ArgumentMatchers.isA(ValidatePositionsEvent.class),
ArgumentMatchers.isA(Timer.class));
verify(applicationEventHandler, atLeast(1))
-
.addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class),
ArgumentMatchers.isA(Timer.class));
+ .addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class),
ArgumentMatchers.isA(Timer.class));
verify(applicationEventHandler, atLeast(1))
-
.addAndGet(ArgumentMatchers.isA(ResetPositionsApplicationEvent.class),
ArgumentMatchers.isA(Timer.class));
+ .addAndGet(ArgumentMatchers.isA(ResetPositionsEvent.class),
ArgumentMatchers.isA(Timer.class));
}
@Test
@@ -1697,54 +1697,54 @@ public class AsyncKafkaConsumerTest {
private void completeCommitAsyncApplicationEventExceptionally(Exception
ex) {
doAnswer(invocation -> {
- AsyncCommitApplicationEvent event = invocation.getArgument(0);
+ AsyncCommitEvent event = invocation.getArgument(0);
event.future().completeExceptionally(ex);
return null;
-
}).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncCommitApplicationEvent.class));
+
}).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncCommitEvent.class));
}
private void completeCommitSyncApplicationEventExceptionally(Exception ex)
{
doAnswer(invocation -> {
- SyncCommitApplicationEvent event = invocation.getArgument(0);
+ SyncCommitEvent event = invocation.getArgument(0);
event.future().completeExceptionally(ex);
return null;
-
}).when(applicationEventHandler).add(ArgumentMatchers.isA(SyncCommitApplicationEvent.class));
+
}).when(applicationEventHandler).add(ArgumentMatchers.isA(SyncCommitEvent.class));
}
private void completeCommitAsyncApplicationEventSuccessfully() {
doAnswer(invocation -> {
- AsyncCommitApplicationEvent event = invocation.getArgument(0);
+ AsyncCommitEvent event = invocation.getArgument(0);
event.future().complete(null);
return null;
-
}).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncCommitApplicationEvent.class));
+
}).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncCommitEvent.class));
}
private void completeCommitSyncApplicationEventSuccessfully() {
doAnswer(invocation -> {
- SyncCommitApplicationEvent event = invocation.getArgument(0);
+ SyncCommitEvent event = invocation.getArgument(0);
event.future().complete(null);
return null;
-
}).when(applicationEventHandler).add(ArgumentMatchers.isA(SyncCommitApplicationEvent.class));
+
}).when(applicationEventHandler).add(ArgumentMatchers.isA(SyncCommitEvent.class));
}
private void
completeFetchedCommittedOffsetApplicationEventSuccessfully(final
Map<TopicPartition, OffsetAndMetadata> committedOffsets) {
doReturn(committedOffsets)
.when(applicationEventHandler)
- .addAndGet(any(FetchCommittedOffsetsApplicationEvent.class),
any(Timer.class));
+ .addAndGet(any(FetchCommittedOffsetsEvent.class),
any(Timer.class));
}
private void
completeFetchedCommittedOffsetApplicationEventExceptionally(Exception ex) {
doThrow(ex)
.when(applicationEventHandler)
- .addAndGet(any(FetchCommittedOffsetsApplicationEvent.class),
any(Timer.class));
+ .addAndGet(any(FetchCommittedOffsetsEvent.class),
any(Timer.class));
}
private void completeUnsubscribeApplicationEventSuccessfully() {
doAnswer(invocation -> {
- UnsubscribeApplicationEvent event = invocation.getArgument(0);
+ UnsubscribeEvent event = invocation.getArgument(0);
event.future().complete(null);
return null;
-
}).when(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeApplicationEvent.class));
+
}).when(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeEvent.class));
}
private void forceCommitCallbackInvocation() {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
index a491df417de..cbd56d8b5e2 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
@@ -20,16 +20,16 @@ import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
-import
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
-import
org.apache.kafka.clients.consumer.internals.events.AsyncCommitApplicationEvent;
+import
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
+import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
import
org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
-import
org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import
org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
-import org.apache.kafka.clients.consumer.internals.events.PollApplicationEvent;
-import
org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent;
-import
org.apache.kafka.clients.consumer.internals.events.SyncCommitApplicationEvent;
-import
org.apache.kafka.clients.consumer.internals.events.TopicMetadataApplicationEvent;
-import
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.PollEvent;
+import org.apache.kafka.clients.consumer.internals.events.ResetPositionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
+import org.apache.kafka.clients.consumer.internals.events.TopicMetadataEvent;
+import
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsEvent;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
@@ -137,7 +137,7 @@ public class ConsumerNetworkThreadTest {
@Test
public void testApplicationEvent() {
- ApplicationEvent e = new PollApplicationEvent(100);
+ ApplicationEvent e = new PollEvent(100);
applicationEventsQueue.add(e);
consumerNetworkThread.runOnce();
verify(applicationEventProcessor, times(1)).process(e);
@@ -153,36 +153,36 @@ public class ConsumerNetworkThreadTest {
@Test
public void testAsyncCommitEvent() {
- ApplicationEvent e = new AsyncCommitApplicationEvent(new HashMap<>());
+ ApplicationEvent e = new AsyncCommitEvent(new HashMap<>());
applicationEventsQueue.add(e);
consumerNetworkThread.runOnce();
-
verify(applicationEventProcessor).process(any(AsyncCommitApplicationEvent.class));
+ verify(applicationEventProcessor).process(any(AsyncCommitEvent.class));
}
@Test
public void testSyncCommitEvent() {
- ApplicationEvent e = new SyncCommitApplicationEvent(new HashMap<>(),
100L);
+ ApplicationEvent e = new SyncCommitEvent(new HashMap<>(), 100L);
applicationEventsQueue.add(e);
consumerNetworkThread.runOnce();
-
verify(applicationEventProcessor).process(any(SyncCommitApplicationEvent.class));
+ verify(applicationEventProcessor).process(any(SyncCommitEvent.class));
}
@Test
public void testListOffsetsEventIsProcessed() {
Map<TopicPartition, Long> timestamps = Collections.singletonMap(new
TopicPartition("topic1", 1), 5L);
- ApplicationEvent e = new ListOffsetsApplicationEvent(timestamps, true);
+ ApplicationEvent e = new ListOffsetsEvent(timestamps, true);
applicationEventsQueue.add(e);
consumerNetworkThread.runOnce();
-
verify(applicationEventProcessor).process(any(ListOffsetsApplicationEvent.class));
+ verify(applicationEventProcessor).process(any(ListOffsetsEvent.class));
assertTrue(applicationEventsQueue.isEmpty());
}
@Test
public void testResetPositionsEventIsProcessed() {
- ResetPositionsApplicationEvent e = new
ResetPositionsApplicationEvent();
+ ResetPositionsEvent e = new ResetPositionsEvent();
applicationEventsQueue.add(e);
consumerNetworkThread.runOnce();
-
verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class));
+
verify(applicationEventProcessor).process(any(ResetPositionsEvent.class));
assertTrue(applicationEventsQueue.isEmpty());
}
@@ -190,19 +190,19 @@ public class ConsumerNetworkThreadTest {
public void testResetPositionsProcessFailureIsIgnored() {
doThrow(new
NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded();
- ResetPositionsApplicationEvent event = new
ResetPositionsApplicationEvent();
+ ResetPositionsEvent event = new ResetPositionsEvent();
applicationEventsQueue.add(event);
assertDoesNotThrow(() -> consumerNetworkThread.runOnce());
-
verify(applicationEventProcessor).process(any(ResetPositionsApplicationEvent.class));
+
verify(applicationEventProcessor).process(any(ResetPositionsEvent.class));
}
@Test
public void testValidatePositionsEventIsProcessed() {
- ValidatePositionsApplicationEvent e = new
ValidatePositionsApplicationEvent();
+ ValidatePositionsEvent e = new ValidatePositionsEvent();
applicationEventsQueue.add(e);
consumerNetworkThread.runOnce();
-
verify(applicationEventProcessor).process(any(ValidatePositionsApplicationEvent.class));
+
verify(applicationEventProcessor).process(any(ValidatePositionsEvent.class));
assertTrue(applicationEventsQueue.isEmpty());
}
@@ -211,11 +211,11 @@ public class ConsumerNetworkThreadTest {
HashMap<TopicPartition, OffsetAndMetadata> offset =
mockTopicPartitionOffset();
final long currentTimeMs = time.milliseconds();
- ApplicationEvent e = new AssignmentChangeApplicationEvent(offset,
currentTimeMs);
+ ApplicationEvent e = new AssignmentChangeEvent(offset, currentTimeMs);
applicationEventsQueue.add(e);
consumerNetworkThread.runOnce();
-
verify(applicationEventProcessor).process(any(AssignmentChangeApplicationEvent.class));
+
verify(applicationEventProcessor).process(any(AssignmentChangeEvent.class));
verify(networkClient, times(1)).poll(anyLong(), anyLong());
verify(commitRequestManager,
times(1)).updateAutoCommitTimer(currentTimeMs);
// Assignment change should generate an async commit (not retried).
@@ -224,9 +224,9 @@ public class ConsumerNetworkThreadTest {
@Test
void testFetchTopicMetadata() {
- applicationEventsQueue.add(new TopicMetadataApplicationEvent("topic",
Long.MAX_VALUE));
+ applicationEventsQueue.add(new TopicMetadataEvent("topic",
Long.MAX_VALUE));
consumerNetworkThread.runOnce();
-
verify(applicationEventProcessor).process(any(TopicMetadataApplicationEvent.class));
+
verify(applicationEventProcessor).process(any(TopicMetadataEvent.class));
}
@Test
@@ -283,8 +283,8 @@ public class ConsumerNetworkThreadTest {
coordinatorRequestManager.markCoordinatorUnknown("test",
time.milliseconds());
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE,
"group-id", node));
prepareOffsetCommitRequest(new HashMap<>(), Errors.NONE, false);
- CompletableApplicationEvent<Void> event1 = spy(new
AsyncCommitApplicationEvent(Collections.emptyMap()));
- ApplicationEvent event2 = new
AsyncCommitApplicationEvent(Collections.emptyMap());
+ CompletableApplicationEvent<Void> event1 = spy(new
AsyncCommitEvent(Collections.emptyMap()));
+ ApplicationEvent event2 = new AsyncCommitEvent(Collections.emptyMap());
CompletableFuture<Void> future = new CompletableFuture<>();
when(event1.future()).thenReturn(future);
applicationEventsQueue.add(event1);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
index d7ad1b55738..afee3fff396 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
@@ -18,7 +18,7 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientResponse;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
-import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.TimeoutException;
@@ -114,10 +114,10 @@ public class CoordinatorRequestManagerTest {
expectFindCoordinatorRequest(coordinatorManager,
Errors.GROUP_AUTHORIZATION_FAILED);
verify(backgroundEventHandler).add(argThat(backgroundEvent -> {
- if (!(backgroundEvent instanceof ErrorBackgroundEvent))
+ if (!(backgroundEvent instanceof ErrorEvent))
return false;
- RuntimeException exception = ((ErrorBackgroundEvent)
backgroundEvent).error();
+ RuntimeException exception = ((ErrorEvent)
backgroundEvent).error();
if (!(exception instanceof GroupAuthorizationException))
return false;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
index 4d4492bcb47..90cfb90bb1b 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
@@ -328,12 +328,8 @@ public class HeartbeatRequestManagerTest {
@Test
public void testConsumerGroupMetadataFirstUpdate() {
final GroupMetadataUpdateEvent groupMetadataUpdateEvent =
makeFirstGroupMetadataUpdate(memberId, memberEpoch);
-
- final GroupMetadataUpdateEvent expectedGroupMetadataUpdateEvent = new
GroupMetadataUpdateEvent(
- memberEpoch,
- memberId
- );
- assertEquals(expectedGroupMetadataUpdateEvent,
groupMetadataUpdateEvent);
+ assertEquals(memberEpoch, groupMetadataUpdateEvent.memberEpoch());
+ assertEquals(memberId, groupMetadataUpdateEvent.memberId());
}
@Test
@@ -370,11 +366,8 @@ public class HeartbeatRequestManagerTest {
final BackgroundEvent eventWithUpdatedMemberEpoch =
backgroundEventQueue.poll();
assertEquals(BackgroundEvent.Type.GROUP_METADATA_UPDATE,
eventWithUpdatedMemberEpoch.type());
final GroupMetadataUpdateEvent groupMetadataUpdateEvent =
(GroupMetadataUpdateEvent) eventWithUpdatedMemberEpoch;
- final GroupMetadataUpdateEvent expectedGroupMetadataUpdateEvent = new
GroupMetadataUpdateEvent(
- updatedMemberEpoch,
- memberId
- );
- assertEquals(expectedGroupMetadataUpdateEvent,
groupMetadataUpdateEvent);
+ assertEquals(updatedMemberEpoch,
groupMetadataUpdateEvent.memberEpoch());
+ assertEquals(memberId, groupMetadataUpdateEvent.memberId());
}
@Test
@@ -398,11 +391,8 @@ public class HeartbeatRequestManagerTest {
final BackgroundEvent eventWithUpdatedMemberEpoch =
backgroundEventQueue.poll();
assertEquals(BackgroundEvent.Type.GROUP_METADATA_UPDATE,
eventWithUpdatedMemberEpoch.type());
final GroupMetadataUpdateEvent groupMetadataUpdateEvent =
(GroupMetadataUpdateEvent) eventWithUpdatedMemberEpoch;
- final GroupMetadataUpdateEvent expectedGroupMetadataUpdateEvent = new
GroupMetadataUpdateEvent(
- memberEpoch,
- updatedMemberId
- );
- assertEquals(expectedGroupMetadataUpdateEvent,
groupMetadataUpdateEvent);
+ assertEquals(memberEpoch, groupMetadataUpdateEvent.memberEpoch());
+ assertEquals(updatedMemberId, groupMetadataUpdateEvent.memberId());
}
private GroupMetadataUpdateEvent makeFirstGroupMetadataUpdate(final String
memberId, final int memberEpoch) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
index 8a0fcf85758..49c5819be7b 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
@@ -1890,10 +1890,10 @@ public class MembershipManagerImplTest {
}
private ConsumerRebalanceListenerCallbackCompletedEvent
performCallback(MembershipManagerImpl membershipManager,
- ConsumerRebalanceListenerInvoker invoker,
- ConsumerRebalanceListenerMethodName
expectedMethodName,
- SortedSet<TopicPartition> expectedPartitions,
- boolean complete) {
+
ConsumerRebalanceListenerInvoker invoker,
+
ConsumerRebalanceListenerMethodName expectedMethodName,
+
SortedSet<TopicPartition> expectedPartitions,
+
boolean complete) {
// We expect only our enqueued event in the background queue.
assertEquals(1, backgroundEventQueue.size());
assertNotNull(backgroundEventQueue.peek());
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
index 5ca034d6360..59ed7d440e2 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
-import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.IsolationLevel;
@@ -553,8 +553,8 @@ public class OffsetsRequestManagerTest {
assertNotNull(event);
// Check that the event itself is of the expected type
- assertInstanceOf(ErrorBackgroundEvent.class, event);
- ErrorBackgroundEvent errorEvent = (ErrorBackgroundEvent) event;
+ assertInstanceOf(ErrorEvent.class, event);
+ ErrorEvent errorEvent = (ErrorEvent) event;
assertNotNull(errorEvent.error());
// Check that the error held in the event is of the expected type
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
index 8ea8cb7a729..f3e2557ae94 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
@@ -92,7 +92,7 @@ public class ApplicationEventProcessorTest {
public void testPrepClosingCommitEvents() {
List<NetworkClientDelegate.UnsentRequest> results =
mockCommitResults();
doReturn(new NetworkClientDelegate.PollResult(100,
results)).when(commitRequestManager).pollOnClose();
- processor.process(new CommitOnCloseApplicationEvent());
+ processor.process(new CommitOnCloseEvent());
verify(commitRequestManager).signalClose();
}
@@ -107,7 +107,7 @@ public class ApplicationEventProcessorTest {
@Test
public void testPrepClosingLeaveGroupEvent() {
- LeaveOnCloseApplicationEvent event = new
LeaveOnCloseApplicationEvent();
+ LeaveOnCloseEvent event = new LeaveOnCloseEvent();
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
when(membershipManager.leaveGroup()).thenReturn(CompletableFuture.completedFuture(null));
processor.process(event);