This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a0b716ec9fd KAFKA-16001: Migrated ConsumerNetworkThreadTest away from
ConsumerTestBuilder (#16140)
a0b716ec9fd is described below
commit a0b716ec9fd518daef4f7164d82ab515b2ad7e64
Author: brenden20 <[email protected]>
AuthorDate: Thu Jun 13 15:35:36 2024 -0500
KAFKA-16001: Migrated ConsumerNetworkThreadTest away from
ConsumerTestBuilder (#16140)
Completely migrates ConsumerNetworkThreadTest away from ConsumerTestBuilder
and removes all usages of spy objects and replaced with mocks. Removes
testEnsureMetadataUpdateOnPoll() since it was doing integration testing. Also I
adds new tests to get more complete test coverage of ConsumerNetworkThread.
Reviewers: Kirk True <[email protected]>, Lianet Magrans
<[email protected]>, Philip Nee <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../internals/ConsumerNetworkThreadTest.java | 364 +++++++--------------
1 file changed, 123 insertions(+), 241 deletions(-)
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
index 160825a3088..e02c983ed37 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
@@ -16,13 +16,11 @@
*/
package org.apache.kafka.clients.consumer.internals;
-import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
-import
org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
import
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
@@ -32,17 +30,9 @@ import
org.apache.kafka.clients.consumer.internals.events.ResetPositionsEvent;
import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.TopicMetadataEvent;
import
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsEvent;
-import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.message.FindCoordinatorRequestData;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.FindCoordinatorRequest;
-import org.apache.kafka.common.requests.FindCoordinatorResponse;
-import org.apache.kafka.common.requests.MetadataResponse;
-import org.apache.kafka.common.requests.OffsetCommitRequest;
-import org.apache.kafka.common.requests.OffsetCommitResponse;
-import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
@@ -50,20 +40,22 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.time.Duration;
-import java.util.Collection;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
+import java.util.List;
+import java.util.LinkedList;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Stream;
-import static
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_HEARTBEAT_INTERVAL_MS;
-import static
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.createDefaultGroupInformation;
import static
org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs;
import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -72,68 +64,97 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class ConsumerNetworkThreadTest {
-
- private ConsumerTestBuilder testBuilder;
- private Time time;
- private ConsumerMetadata metadata;
- private NetworkClientDelegate networkClient;
- private BlockingQueue<ApplicationEvent> applicationEventsQueue;
- private ApplicationEventProcessor applicationEventProcessor;
- private OffsetsRequestManager offsetsRequestManager;
- private CommitRequestManager commitRequestManager;
- private CoordinatorRequestManager coordinatorRequestManager;
- private ConsumerNetworkThread consumerNetworkThread;
- private final CompletableEventReaper applicationEventReaper =
mock(CompletableEventReaper.class);
- private MockClient client;
-
- @BeforeEach
- public void setup() {
- testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation());
- time = testBuilder.time;
- metadata = testBuilder.metadata;
- networkClient = testBuilder.networkClientDelegate;
- client = testBuilder.client;
- applicationEventsQueue = testBuilder.applicationEventQueue;
- applicationEventProcessor = testBuilder.applicationEventProcessor;
- commitRequestManager =
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
- offsetsRequestManager = testBuilder.offsetsRequestManager;
- coordinatorRequestManager =
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
- consumerNetworkThread = new ConsumerNetworkThread(
- testBuilder.logContext,
+ private final Time time;
+ private final BlockingQueue<ApplicationEvent> applicationEventsQueue;
+ private final ApplicationEventProcessor applicationEventProcessor;
+ private final OffsetsRequestManager offsetsRequestManager;
+ private final HeartbeatRequestManager heartbeatRequestManager;
+ private final CoordinatorRequestManager coordinatorRequestManager;
+ private final ConsumerNetworkThread consumerNetworkThread;
+ private final NetworkClientDelegate networkClientDelegate;
+ private final RequestManagers requestManagers;
+ private final CompletableEventReaper applicationEventReaper;
+
+ ConsumerNetworkThreadTest() {
+ this.networkClientDelegate = mock(NetworkClientDelegate.class);
+ this.requestManagers = mock(RequestManagers.class);
+ this.offsetsRequestManager = mock(OffsetsRequestManager.class);
+ this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+ this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+ this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
+ this.applicationEventReaper = mock(CompletableEventReaper.class);
+ this.time = new MockTime();
+ this.applicationEventsQueue = new LinkedBlockingQueue<>();
+ LogContext logContext = new LogContext();
+
+ this.consumerNetworkThread = new ConsumerNetworkThread(
+ logContext,
time,
- testBuilder.applicationEventQueue,
+ applicationEventsQueue,
applicationEventReaper,
() -> applicationEventProcessor,
- () -> testBuilder.networkClientDelegate,
- () -> testBuilder.requestManagers
+ () -> networkClientDelegate,
+ () -> requestManagers
);
+ }
+
+ @BeforeEach
+ public void setup() {
consumerNetworkThread.initializeResources();
}
@AfterEach
public void tearDown() {
- if (testBuilder != null) {
- testBuilder.close();
- consumerNetworkThread.close(Duration.ZERO);
- }
+ if (consumerNetworkThread != null)
+ consumerNetworkThread.close();
+ }
+
+ @Test
+ public void testEnsureCloseStopsRunningThread() {
+ assertTrue(consumerNetworkThread.isRunning(),
+ "ConsumerNetworkThread should start running when created");
+
+ consumerNetworkThread.close();
+ assertFalse(consumerNetworkThread.isRunning(),
+ "close() should make consumerNetworkThread.running false by
calling closeInternal(Duration timeout)");
+ }
+
+ @ParameterizedTest
+ @ValueSource(longs = {ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS - 1,
ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS,
ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS + 1})
+ public void testConsumerNetworkThreadPollTimeComputations(long
exampleTime) {
+ List<Optional<? extends RequestManager>> list = new ArrayList<>();
+ list.add(Optional.of(coordinatorRequestManager));
+ list.add(Optional.of(heartbeatRequestManager));
+
+ when(requestManagers.entries()).thenReturn(list);
+
+ NetworkClientDelegate.PollResult pollResult = new
NetworkClientDelegate.PollResult(exampleTime);
+ NetworkClientDelegate.PollResult pollResult1 = new
NetworkClientDelegate.PollResult(exampleTime + 100);
+
+ long t = time.milliseconds();
+ when(coordinatorRequestManager.poll(t)).thenReturn(pollResult);
+
when(coordinatorRequestManager.maximumTimeToWait(t)).thenReturn(exampleTime);
+ when(heartbeatRequestManager.poll(t)).thenReturn(pollResult1);
+
when(heartbeatRequestManager.maximumTimeToWait(t)).thenReturn(exampleTime +
100);
+
when(networkClientDelegate.addAll(pollResult)).thenReturn(pollResult.timeUntilNextPollMs);
+
when(networkClientDelegate.addAll(pollResult1)).thenReturn(pollResult1.timeUntilNextPollMs);
+ consumerNetworkThread.runOnce();
+
+ verify(networkClientDelegate).poll(Math.min(exampleTime,
ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS), time.milliseconds());
+ assertEquals(consumerNetworkThread.maximumTimeToWait(), exampleTime);
}
@Test
public void testStartupAndTearDown() throws InterruptedException {
- // The consumer is closed in
ConsumerTestBuilder.ConsumerNetworkThreadTestBuilder.close()
- // which is called from tearDown().
consumerNetworkThread.start();
- TestCondition isStarted = () -> consumerNetworkThread.isRunning();
+ TestCondition isStarted = consumerNetworkThread::isRunning;
TestCondition isClosed = () -> !(consumerNetworkThread.isRunning() ||
consumerNetworkThread.isAlive());
// There's a nonzero amount of time between starting the thread and
having it
@@ -148,35 +169,32 @@ public class ConsumerNetworkThreadTest {
}
@Test
- public void testApplicationEvent() {
- ApplicationEvent e = new PollEvent(100);
- applicationEventsQueue.add(e);
+ public void testRequestsTransferFromManagersToClientOnThreadRun() {
+ List<Optional<? extends RequestManager>> list = new ArrayList<>();
+ list.add(Optional.of(coordinatorRequestManager));
+ list.add(Optional.of(heartbeatRequestManager));
+ list.add(Optional.of(offsetsRequestManager));
+
+ when(requestManagers.entries()).thenReturn(list);
+
when(coordinatorRequestManager.poll(anyLong())).thenReturn(mock(NetworkClientDelegate.PollResult.class));
consumerNetworkThread.runOnce();
- verify(applicationEventProcessor, times(1)).process(e);
+ requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm ->
verify(rm).poll(anyLong())));
+ requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm ->
verify(rm).maximumTimeToWait(anyLong())));
+
verify(networkClientDelegate).addAll(any(NetworkClientDelegate.PollResult.class));
+ verify(networkClientDelegate).poll(anyLong(), anyLong());
}
- @Test
- public void testMetadataUpdateEvent() {
- ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent();
+ @ParameterizedTest
+ @MethodSource("applicationEvents")
+ public void testApplicationEventIsProcessed(ApplicationEvent e) {
applicationEventsQueue.add(e);
consumerNetworkThread.runOnce();
- verify(metadata).requestUpdateForNewTopics();
- }
- @Test
- public void testAsyncCommitEvent() {
- ApplicationEvent e = new AsyncCommitEvent(new HashMap<>());
- applicationEventsQueue.add(e);
- consumerNetworkThread.runOnce();
- verify(applicationEventProcessor).process(any(AsyncCommitEvent.class));
- }
+ if (e instanceof CompletableEvent)
+ verify(applicationEventReaper).add((CompletableEvent<?>) e);
- @Test
- public void testSyncCommitEvent() {
- ApplicationEvent e = new SyncCommitEvent(new HashMap<>(),
calculateDeadlineMs(time, 100));
- applicationEventsQueue.add(e);
- consumerNetworkThread.runOnce();
- verify(applicationEventProcessor).process(any(SyncCommitEvent.class));
+ verify(applicationEventProcessor).process(any(e.getClass()));
+ assertTrue(applicationEventsQueue.isEmpty());
}
@ParameterizedTest
@@ -190,15 +208,6 @@ public class ConsumerNetworkThreadTest {
assertTrue(applicationEventsQueue.isEmpty());
}
- @Test
- public void testResetPositionsEventIsProcessed() {
- ResetPositionsEvent e = new
ResetPositionsEvent(calculateDeadlineMs(time, 100));
- applicationEventsQueue.add(e);
- consumerNetworkThread.runOnce();
-
verify(applicationEventProcessor).process(any(ResetPositionsEvent.class));
- assertTrue(applicationEventsQueue.isEmpty());
- }
-
@Test
public void testResetPositionsProcessFailureIsIgnored() {
doThrow(new
NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded();
@@ -211,178 +220,51 @@ public class ConsumerNetworkThreadTest {
}
@Test
- public void testValidatePositionsEventIsProcessed() {
- ValidatePositionsEvent e = new
ValidatePositionsEvent(calculateDeadlineMs(time, 100));
- applicationEventsQueue.add(e);
- consumerNetworkThread.runOnce();
-
verify(applicationEventProcessor).process(any(ValidatePositionsEvent.class));
- assertTrue(applicationEventsQueue.isEmpty());
- }
-
- @Test
- public void testAssignmentChangeEvent() {
- HashMap<TopicPartition, OffsetAndMetadata> offset =
mockTopicPartitionOffset();
-
- final long currentTimeMs = time.milliseconds();
- ApplicationEvent e = new AssignmentChangeEvent(offset, currentTimeMs);
- applicationEventsQueue.add(e);
-
- consumerNetworkThread.runOnce();
-
verify(applicationEventProcessor).process(any(AssignmentChangeEvent.class));
- verify(networkClient, times(1)).poll(anyLong(), anyLong());
- verify(commitRequestManager,
times(1)).updateAutoCommitTimer(currentTimeMs);
- // Assignment change should generate an async commit (not retried).
- verify(commitRequestManager, times(1)).maybeAutoCommitAsync();
- }
-
- @Test
- void testFetchTopicMetadata() {
- applicationEventsQueue.add(new TopicMetadataEvent("topic",
Long.MAX_VALUE));
- consumerNetworkThread.runOnce();
-
verify(applicationEventProcessor).process(any(TopicMetadataEvent.class));
- }
-
- @Test
- void testMaximumTimeToWait() {
+ public void testMaximumTimeToWait() {
+ final int defaultHeartbeatIntervalMs = 1000;
// Initial value before runOnce has been called
assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS,
consumerNetworkThread.maximumTimeToWait());
- consumerNetworkThread.runOnce();
- // After runOnce has been called, it takes the default heartbeat
interval from the heartbeat request manager
- assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS,
consumerNetworkThread.maximumTimeToWait());
- }
- @Test
- void testRequestManagersArePolledOnce() {
- consumerNetworkThread.runOnce();
- testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm
-> verify(rm, times(1)).poll(anyLong())));
- testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm
-> verify(rm, times(1)).maximumTimeToWait(anyLong())));
- verify(networkClient, times(1)).poll(anyLong(), anyLong());
- }
+
when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager)));
+
when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long)
defaultHeartbeatIntervalMs);
- @Test
- void testEnsureMetadataUpdateOnPoll() {
- MetadataResponse metadataResponse =
RequestTestUtils.metadataUpdateWith(2, Collections.emptyMap());
- client.prepareMetadataUpdate(metadataResponse);
- metadata.requestUpdate(false);
consumerNetworkThread.runOnce();
- verify(metadata,
times(1)).updateWithCurrentRequestVersion(eq(metadataResponse), eq(false),
anyLong());
- }
-
- @Test
- void testEnsureEventsAreCompleted() {
- // Mimic the logic of CompletableEventReaper.reap(Collection):
- doAnswer(__ -> {
- Iterator<ApplicationEvent> i = applicationEventsQueue.iterator();
-
- while (i.hasNext()) {
- ApplicationEvent event = i.next();
-
- if (event instanceof CompletableEvent)
- ((CompletableEvent<?>)
event).future().completeExceptionally(new TimeoutException());
-
- i.remove();
- }
-
- return null;
- }).when(applicationEventReaper).reap(any(Collection.class));
-
- Node node = metadata.fetch().nodes().get(0);
- coordinatorRequestManager.markCoordinatorUnknown("test",
time.milliseconds());
-
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE,
"group-id", node));
- prepareOffsetCommitRequest(new HashMap<>(), Errors.NONE, false);
- CompletableApplicationEvent<Void> event1 = spy(new
AsyncCommitEvent(Collections.emptyMap()));
- ApplicationEvent event2 = new AsyncCommitEvent(Collections.emptyMap());
- CompletableFuture<Void> future = new CompletableFuture<>();
- when(event1.future()).thenReturn(future);
- applicationEventsQueue.add(event1);
- applicationEventsQueue.add(event2);
- assertFalse(future.isDone());
- assertFalse(applicationEventsQueue.isEmpty());
- consumerNetworkThread.cleanup();
- assertTrue(future.isCompletedExceptionally());
- assertTrue(applicationEventsQueue.isEmpty());
+ // After runOnce has been called, it takes the default heartbeat
interval from the heartbeat request manager
+ assertEquals(defaultHeartbeatIntervalMs,
consumerNetworkThread.maximumTimeToWait());
}
@Test
- void testCleanupInvokesReaper() {
+ public void testCleanupInvokesReaper() {
+ LinkedList<NetworkClientDelegate.UnsentRequest> queue = new
LinkedList<>();
+ when(networkClientDelegate.unsentRequests()).thenReturn(queue);
consumerNetworkThread.cleanup();
verify(applicationEventReaper).reap(applicationEventsQueue);
}
@Test
- void testRunOnceInvokesReaper() {
+ public void testRunOnceInvokesReaper() {
consumerNetworkThread.runOnce();
verify(applicationEventReaper).reap(any(Long.class));
}
@Test
- void testSendUnsentRequest() {
- String groupId = "group-id";
- NetworkClientDelegate.UnsentRequest request = new
NetworkClientDelegate.UnsentRequest(
- new FindCoordinatorRequest.Builder(
- new FindCoordinatorRequestData()
-
.setKeyType(FindCoordinatorRequest.CoordinatorType.TRANSACTION.id())
- .setKey(groupId)),
- Optional.empty());
-
- networkClient.add(request);
- assertTrue(networkClient.hasAnyPendingRequests());
- assertFalse(networkClient.unsentRequests().isEmpty());
- assertFalse(client.hasInFlightRequests());
+ public void testSendUnsentRequests() {
+
when(networkClientDelegate.hasAnyPendingRequests()).thenReturn(true).thenReturn(true).thenReturn(false);
consumerNetworkThread.cleanup();
-
- assertTrue(networkClient.unsentRequests().isEmpty());
- assertFalse(client.hasInFlightRequests());
- assertFalse(networkClient.hasAnyPendingRequests());
- }
-
- private void prepareOffsetCommitRequest(final Map<TopicPartition, Long>
expectedOffsets,
- final Errors error,
- final boolean disconnected) {
- Map<TopicPartition, Errors> errors =
partitionErrors(expectedOffsets.keySet(), error);
- client.prepareResponse(offsetCommitRequestMatcher(expectedOffsets),
offsetCommitResponse(errors), disconnected);
- }
-
- private Map<TopicPartition, Errors> partitionErrors(final
Collection<TopicPartition> partitions,
- final Errors error) {
- final Map<TopicPartition, Errors> errors = new HashMap<>();
- for (TopicPartition partition : partitions) {
- errors.put(partition, error);
- }
- return errors;
- }
-
- private OffsetCommitResponse offsetCommitResponse(final
Map<TopicPartition, Errors> responseData) {
- return new OffsetCommitResponse(responseData);
- }
-
- private MockClient.RequestMatcher offsetCommitRequestMatcher(final
Map<TopicPartition, Long> expectedOffsets) {
- return body -> {
- OffsetCommitRequest req = (OffsetCommitRequest) body;
- Map<TopicPartition, Long> offsets = req.offsets();
- if (offsets.size() != expectedOffsets.size())
- return false;
-
- for (Map.Entry<TopicPartition, Long> expectedOffset :
expectedOffsets.entrySet()) {
- if (!offsets.containsKey(expectedOffset.getKey())) {
- return false;
- } else {
- Long actualOffset = offsets.get(expectedOffset.getKey());
- if (!actualOffset.equals(expectedOffset.getValue())) {
- return false;
- }
- }
- }
- return true;
- };
+ verify(networkClientDelegate, times(2)).poll(anyLong(), anyLong());
}
- private HashMap<TopicPartition, OffsetAndMetadata>
mockTopicPartitionOffset() {
- final TopicPartition t0 = new TopicPartition("t0", 2);
- final TopicPartition t1 = new TopicPartition("t0", 3);
- HashMap<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = new
HashMap<>();
- topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L));
- topicPartitionOffsets.put(t1, new OffsetAndMetadata(20L));
- return topicPartitionOffsets;
+ private static Stream<Arguments> applicationEvents() {
+ Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
+ final long currentTimeMs = 12345;
+ return Stream.of(
+ Arguments.of(new PollEvent(100)),
+ Arguments.of(new NewTopicsMetadataUpdateRequestEvent()),
+ Arguments.of(new AsyncCommitEvent(new HashMap<>())),
+ Arguments.of(new SyncCommitEvent(new HashMap<>(), 500)),
+ Arguments.of(new ResetPositionsEvent(500)),
+ Arguments.of(new ValidatePositionsEvent(500)),
+ Arguments.of(new TopicMetadataEvent("topic", Long.MAX_VALUE)),
+ Arguments.of(new AssignmentChangeEvent(offset,
currentTimeMs)));
}
}