This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a0b716ec9fd KAFKA-16001: Migrated ConsumerNetworkThreadTest away from 
ConsumerTestBuilder (#16140)
a0b716ec9fd is described below

commit a0b716ec9fd518daef4f7164d82ab515b2ad7e64
Author: brenden20 <[email protected]>
AuthorDate: Thu Jun 13 15:35:36 2024 -0500

    KAFKA-16001: Migrated ConsumerNetworkThreadTest away from 
ConsumerTestBuilder (#16140)
    
    Completely migrates ConsumerNetworkThreadTest away from ConsumerTestBuilder 
and removes all usages of spy objects and replaced with mocks. Removes 
testEnsureMetadataUpdateOnPoll() since it was doing integration testing. Also I 
adds new tests to get more complete test coverage of ConsumerNetworkThread.
    
    Reviewers: Kirk True <[email protected]>, Lianet Magrans 
<[email protected]>, Philip Nee <[email protected]>, Matthias J. Sax 
<[email protected]>
---
 .../internals/ConsumerNetworkThreadTest.java       | 364 +++++++--------------
 1 file changed, 123 insertions(+), 241 deletions(-)

diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
index 160825a3088..e02c983ed37 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
@@ -16,13 +16,11 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
-import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
 import 
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
 import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
 import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
 import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
@@ -32,17 +30,9 @@ import 
org.apache.kafka.clients.consumer.internals.events.ResetPositionsEvent;
 import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
 import org.apache.kafka.clients.consumer.internals.events.TopicMetadataEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsEvent;
-import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.message.FindCoordinatorRequestData;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.FindCoordinatorRequest;
-import org.apache.kafka.common.requests.FindCoordinatorResponse;
-import org.apache.kafka.common.requests.MetadataResponse;
-import org.apache.kafka.common.requests.OffsetCommitRequest;
-import org.apache.kafka.common.requests.OffsetCommitResponse;
-import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
@@ -50,20 +40,22 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.time.Duration;
-import java.util.Collection;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Optional;
+import java.util.List;
+import java.util.LinkedList;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Stream;
 
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_HEARTBEAT_INTERVAL_MS;
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.createDefaultGroupInformation;
 import static 
org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs;
 import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -72,68 +64,97 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerNetworkThreadTest {
-
-    private ConsumerTestBuilder testBuilder;
-    private Time time;
-    private ConsumerMetadata metadata;
-    private NetworkClientDelegate networkClient;
-    private BlockingQueue<ApplicationEvent> applicationEventsQueue;
-    private ApplicationEventProcessor applicationEventProcessor;
-    private OffsetsRequestManager offsetsRequestManager;
-    private CommitRequestManager commitRequestManager;
-    private CoordinatorRequestManager coordinatorRequestManager;
-    private ConsumerNetworkThread consumerNetworkThread;
-    private final CompletableEventReaper applicationEventReaper = 
mock(CompletableEventReaper.class);
-    private MockClient client;
-
-    @BeforeEach
-    public void setup() {
-        testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation());
-        time = testBuilder.time;
-        metadata = testBuilder.metadata;
-        networkClient = testBuilder.networkClientDelegate;
-        client = testBuilder.client;
-        applicationEventsQueue = testBuilder.applicationEventQueue;
-        applicationEventProcessor = testBuilder.applicationEventProcessor;
-        commitRequestManager = 
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
-        offsetsRequestManager = testBuilder.offsetsRequestManager;
-        coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-        consumerNetworkThread = new ConsumerNetworkThread(
-                testBuilder.logContext,
+    private final Time time;
+    private final BlockingQueue<ApplicationEvent> applicationEventsQueue;
+    private final ApplicationEventProcessor applicationEventProcessor;
+    private final OffsetsRequestManager offsetsRequestManager;
+    private final HeartbeatRequestManager heartbeatRequestManager;
+    private final CoordinatorRequestManager coordinatorRequestManager;
+    private final ConsumerNetworkThread consumerNetworkThread;
+    private final NetworkClientDelegate networkClientDelegate;
+    private final RequestManagers requestManagers;
+    private final CompletableEventReaper applicationEventReaper;
+
+    ConsumerNetworkThreadTest() {
+        this.networkClientDelegate = mock(NetworkClientDelegate.class);
+        this.requestManagers = mock(RequestManagers.class);
+        this.offsetsRequestManager = mock(OffsetsRequestManager.class);
+        this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+        this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+        this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
+        this.applicationEventReaper = mock(CompletableEventReaper.class);
+        this.time = new MockTime();
+        this.applicationEventsQueue = new LinkedBlockingQueue<>();
+        LogContext logContext = new LogContext();
+
+        this.consumerNetworkThread = new ConsumerNetworkThread(
+                logContext,
                 time,
-                testBuilder.applicationEventQueue,
+                applicationEventsQueue,
                 applicationEventReaper,
                 () -> applicationEventProcessor,
-                () -> testBuilder.networkClientDelegate,
-                () -> testBuilder.requestManagers
+                () -> networkClientDelegate,
+                () -> requestManagers
         );
+    }
+
+    @BeforeEach
+    public void setup() {
         consumerNetworkThread.initializeResources();
     }
 
     @AfterEach
     public void tearDown() {
-        if (testBuilder != null) {
-            testBuilder.close();
-            consumerNetworkThread.close(Duration.ZERO);
-        }
+        if (consumerNetworkThread != null)
+            consumerNetworkThread.close();
+    }
+
+    @Test
+    public void testEnsureCloseStopsRunningThread() {
+        assertTrue(consumerNetworkThread.isRunning(),
+            "ConsumerNetworkThread should start running when created");
+
+        consumerNetworkThread.close();
+        assertFalse(consumerNetworkThread.isRunning(),
+            "close() should make consumerNetworkThread.running false by 
calling closeInternal(Duration timeout)");
+    }
+
+    @ParameterizedTest
+    @ValueSource(longs = {ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS - 1, 
ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, 
ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS + 1})
+    public void testConsumerNetworkThreadPollTimeComputations(long 
exampleTime) {
+        List<Optional<? extends RequestManager>> list = new ArrayList<>();
+        list.add(Optional.of(coordinatorRequestManager));
+        list.add(Optional.of(heartbeatRequestManager));
+
+        when(requestManagers.entries()).thenReturn(list);
+
+        NetworkClientDelegate.PollResult pollResult = new 
NetworkClientDelegate.PollResult(exampleTime);
+        NetworkClientDelegate.PollResult pollResult1 = new 
NetworkClientDelegate.PollResult(exampleTime + 100);
+
+        long t = time.milliseconds();
+        when(coordinatorRequestManager.poll(t)).thenReturn(pollResult);
+        
when(coordinatorRequestManager.maximumTimeToWait(t)).thenReturn(exampleTime);
+        when(heartbeatRequestManager.poll(t)).thenReturn(pollResult1);
+        
when(heartbeatRequestManager.maximumTimeToWait(t)).thenReturn(exampleTime + 
100);
+        
when(networkClientDelegate.addAll(pollResult)).thenReturn(pollResult.timeUntilNextPollMs);
+        
when(networkClientDelegate.addAll(pollResult1)).thenReturn(pollResult1.timeUntilNextPollMs);
+        consumerNetworkThread.runOnce();
+
+        verify(networkClientDelegate).poll(Math.min(exampleTime, 
ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS), time.milliseconds());
+        assertEquals(consumerNetworkThread.maximumTimeToWait(), exampleTime);
     }
 
     @Test
     public void testStartupAndTearDown() throws InterruptedException {
-        // The consumer is closed in 
ConsumerTestBuilder.ConsumerNetworkThreadTestBuilder.close()
-        // which is called from tearDown().
         consumerNetworkThread.start();
-        TestCondition isStarted = () -> consumerNetworkThread.isRunning();
+        TestCondition isStarted = consumerNetworkThread::isRunning;
         TestCondition isClosed = () -> !(consumerNetworkThread.isRunning() || 
consumerNetworkThread.isAlive());
 
         // There's a nonzero amount of time between starting the thread and 
having it
@@ -148,35 +169,32 @@ public class ConsumerNetworkThreadTest {
     }
 
     @Test
-    public void testApplicationEvent() {
-        ApplicationEvent e = new PollEvent(100);
-        applicationEventsQueue.add(e);
+    public void testRequestsTransferFromManagersToClientOnThreadRun() {
+        List<Optional<? extends RequestManager>> list = new ArrayList<>();
+        list.add(Optional.of(coordinatorRequestManager));
+        list.add(Optional.of(heartbeatRequestManager));
+        list.add(Optional.of(offsetsRequestManager));
+
+        when(requestManagers.entries()).thenReturn(list);
+        
when(coordinatorRequestManager.poll(anyLong())).thenReturn(mock(NetworkClientDelegate.PollResult.class));
         consumerNetworkThread.runOnce();
-        verify(applicationEventProcessor, times(1)).process(e);
+        requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).poll(anyLong())));
+        requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm).maximumTimeToWait(anyLong())));
+        
verify(networkClientDelegate).addAll(any(NetworkClientDelegate.PollResult.class));
+        verify(networkClientDelegate).poll(anyLong(), anyLong());
     }
 
-    @Test
-    public void testMetadataUpdateEvent() {
-        ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent();
+    @ParameterizedTest
+    @MethodSource("applicationEvents")
+    public void testApplicationEventIsProcessed(ApplicationEvent e) {
         applicationEventsQueue.add(e);
         consumerNetworkThread.runOnce();
-        verify(metadata).requestUpdateForNewTopics();
-    }
 
-    @Test
-    public void testAsyncCommitEvent() {
-        ApplicationEvent e = new AsyncCommitEvent(new HashMap<>());
-        applicationEventsQueue.add(e);
-        consumerNetworkThread.runOnce();
-        verify(applicationEventProcessor).process(any(AsyncCommitEvent.class));
-    }
+        if (e instanceof CompletableEvent)
+            verify(applicationEventReaper).add((CompletableEvent<?>) e);
 
-    @Test
-    public void testSyncCommitEvent() {
-        ApplicationEvent e = new SyncCommitEvent(new HashMap<>(), 
calculateDeadlineMs(time, 100));
-        applicationEventsQueue.add(e);
-        consumerNetworkThread.runOnce();
-        verify(applicationEventProcessor).process(any(SyncCommitEvent.class));
+        verify(applicationEventProcessor).process(any(e.getClass()));
+        assertTrue(applicationEventsQueue.isEmpty());
     }
 
     @ParameterizedTest
@@ -190,15 +208,6 @@ public class ConsumerNetworkThreadTest {
         assertTrue(applicationEventsQueue.isEmpty());
     }
 
-    @Test
-    public void testResetPositionsEventIsProcessed() {
-        ResetPositionsEvent e = new 
ResetPositionsEvent(calculateDeadlineMs(time, 100));
-        applicationEventsQueue.add(e);
-        consumerNetworkThread.runOnce();
-        
verify(applicationEventProcessor).process(any(ResetPositionsEvent.class));
-        assertTrue(applicationEventsQueue.isEmpty());
-    }
-
     @Test
     public void testResetPositionsProcessFailureIsIgnored() {
         doThrow(new 
NullPointerException()).when(offsetsRequestManager).resetPositionsIfNeeded();
@@ -211,178 +220,51 @@ public class ConsumerNetworkThreadTest {
     }
 
     @Test
-    public void testValidatePositionsEventIsProcessed() {
-        ValidatePositionsEvent e = new 
ValidatePositionsEvent(calculateDeadlineMs(time, 100));
-        applicationEventsQueue.add(e);
-        consumerNetworkThread.runOnce();
-        
verify(applicationEventProcessor).process(any(ValidatePositionsEvent.class));
-        assertTrue(applicationEventsQueue.isEmpty());
-    }
-
-    @Test
-    public void testAssignmentChangeEvent() {
-        HashMap<TopicPartition, OffsetAndMetadata> offset = 
mockTopicPartitionOffset();
-
-        final long currentTimeMs = time.milliseconds();
-        ApplicationEvent e = new AssignmentChangeEvent(offset, currentTimeMs);
-        applicationEventsQueue.add(e);
-
-        consumerNetworkThread.runOnce();
-        
verify(applicationEventProcessor).process(any(AssignmentChangeEvent.class));
-        verify(networkClient, times(1)).poll(anyLong(), anyLong());
-        verify(commitRequestManager, 
times(1)).updateAutoCommitTimer(currentTimeMs);
-        // Assignment change should generate an async commit (not retried).
-        verify(commitRequestManager, times(1)).maybeAutoCommitAsync();
-    }
-
-    @Test
-    void testFetchTopicMetadata() {
-        applicationEventsQueue.add(new TopicMetadataEvent("topic", 
Long.MAX_VALUE));
-        consumerNetworkThread.runOnce();
-        
verify(applicationEventProcessor).process(any(TopicMetadataEvent.class));
-    }
-
-    @Test
-    void testMaximumTimeToWait() {
+    public void testMaximumTimeToWait() {
+        final int defaultHeartbeatIntervalMs = 1000;
         // Initial value before runOnce has been called
         assertEquals(ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS, 
consumerNetworkThread.maximumTimeToWait());
-        consumerNetworkThread.runOnce();
-        // After runOnce has been called, it takes the default heartbeat 
interval from the heartbeat request manager
-        assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS, 
consumerNetworkThread.maximumTimeToWait());
-    }
 
-    @Test
-    void testRequestManagersArePolledOnce() {
-        consumerNetworkThread.runOnce();
-        testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm 
-> verify(rm, times(1)).poll(anyLong())));
-        testBuilder.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm 
-> verify(rm, times(1)).maximumTimeToWait(anyLong())));
-        verify(networkClient, times(1)).poll(anyLong(), anyLong());
-    }
+        
when(requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(heartbeatRequestManager)));
+        
when(heartbeatRequestManager.maximumTimeToWait(time.milliseconds())).thenReturn((long)
 defaultHeartbeatIntervalMs);
 
-    @Test
-    void testEnsureMetadataUpdateOnPoll() {
-        MetadataResponse metadataResponse = 
RequestTestUtils.metadataUpdateWith(2, Collections.emptyMap());
-        client.prepareMetadataUpdate(metadataResponse);
-        metadata.requestUpdate(false);
         consumerNetworkThread.runOnce();
-        verify(metadata, 
times(1)).updateWithCurrentRequestVersion(eq(metadataResponse), eq(false), 
anyLong());
-    }
-
-    @Test
-    void testEnsureEventsAreCompleted() {
-        // Mimic the logic of CompletableEventReaper.reap(Collection):
-        doAnswer(__ -> {
-            Iterator<ApplicationEvent> i = applicationEventsQueue.iterator();
-
-            while (i.hasNext()) {
-                ApplicationEvent event = i.next();
-
-                if (event instanceof CompletableEvent)
-                    ((CompletableEvent<?>) 
event).future().completeExceptionally(new TimeoutException());
-
-                i.remove();
-            }
-
-            return null;
-        }).when(applicationEventReaper).reap(any(Collection.class));
-
-        Node node = metadata.fetch().nodes().get(0);
-        coordinatorRequestManager.markCoordinatorUnknown("test", 
time.milliseconds());
-        
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
"group-id", node));
-        prepareOffsetCommitRequest(new HashMap<>(), Errors.NONE, false);
-        CompletableApplicationEvent<Void> event1 = spy(new 
AsyncCommitEvent(Collections.emptyMap()));
-        ApplicationEvent event2 = new AsyncCommitEvent(Collections.emptyMap());
-        CompletableFuture<Void> future = new CompletableFuture<>();
-        when(event1.future()).thenReturn(future);
-        applicationEventsQueue.add(event1);
-        applicationEventsQueue.add(event2);
-        assertFalse(future.isDone());
-        assertFalse(applicationEventsQueue.isEmpty());
-        consumerNetworkThread.cleanup();
-        assertTrue(future.isCompletedExceptionally());
-        assertTrue(applicationEventsQueue.isEmpty());
+        // After runOnce has been called, it takes the default heartbeat 
interval from the heartbeat request manager
+        assertEquals(defaultHeartbeatIntervalMs, 
consumerNetworkThread.maximumTimeToWait());
     }
 
     @Test
-    void testCleanupInvokesReaper() {
+    public void testCleanupInvokesReaper() {
+        LinkedList<NetworkClientDelegate.UnsentRequest> queue = new 
LinkedList<>();
+        when(networkClientDelegate.unsentRequests()).thenReturn(queue);
         consumerNetworkThread.cleanup();
         verify(applicationEventReaper).reap(applicationEventsQueue);
     }
 
     @Test
-    void testRunOnceInvokesReaper() {
+    public void testRunOnceInvokesReaper() {
         consumerNetworkThread.runOnce();
         verify(applicationEventReaper).reap(any(Long.class));
     }
 
     @Test
-    void testSendUnsentRequest() {
-        String groupId = "group-id";
-        NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
-            new FindCoordinatorRequest.Builder(
-                new FindCoordinatorRequestData()
-                    
.setKeyType(FindCoordinatorRequest.CoordinatorType.TRANSACTION.id())
-                    .setKey(groupId)),
-            Optional.empty());
-
-        networkClient.add(request);
-        assertTrue(networkClient.hasAnyPendingRequests());
-        assertFalse(networkClient.unsentRequests().isEmpty());
-        assertFalse(client.hasInFlightRequests());
+    public void testSendUnsentRequests() {
+        
when(networkClientDelegate.hasAnyPendingRequests()).thenReturn(true).thenReturn(true).thenReturn(false);
         consumerNetworkThread.cleanup();
-
-        assertTrue(networkClient.unsentRequests().isEmpty());
-        assertFalse(client.hasInFlightRequests());
-        assertFalse(networkClient.hasAnyPendingRequests());
-    }
-
-    private void prepareOffsetCommitRequest(final Map<TopicPartition, Long> 
expectedOffsets,
-                                            final Errors error,
-                                            final boolean disconnected) {
-        Map<TopicPartition, Errors> errors = 
partitionErrors(expectedOffsets.keySet(), error);
-        client.prepareResponse(offsetCommitRequestMatcher(expectedOffsets), 
offsetCommitResponse(errors), disconnected);
-    }
-
-    private Map<TopicPartition, Errors> partitionErrors(final 
Collection<TopicPartition> partitions,
-                                                        final Errors error) {
-        final Map<TopicPartition, Errors> errors = new HashMap<>();
-        for (TopicPartition partition : partitions) {
-            errors.put(partition, error);
-        }
-        return errors;
-    }
-
-    private OffsetCommitResponse offsetCommitResponse(final 
Map<TopicPartition, Errors> responseData) {
-        return new OffsetCommitResponse(responseData);
-    }
-
-    private MockClient.RequestMatcher offsetCommitRequestMatcher(final 
Map<TopicPartition, Long> expectedOffsets) {
-        return body -> {
-            OffsetCommitRequest req = (OffsetCommitRequest) body;
-            Map<TopicPartition, Long> offsets = req.offsets();
-            if (offsets.size() != expectedOffsets.size())
-                return false;
-
-            for (Map.Entry<TopicPartition, Long> expectedOffset : 
expectedOffsets.entrySet()) {
-                if (!offsets.containsKey(expectedOffset.getKey())) {
-                    return false;
-                } else {
-                    Long actualOffset = offsets.get(expectedOffset.getKey());
-                    if (!actualOffset.equals(expectedOffset.getValue())) {
-                        return false;
-                    }
-                }
-            }
-            return true;
-        };
+        verify(networkClientDelegate, times(2)).poll(anyLong(), anyLong());
     }
 
-    private HashMap<TopicPartition, OffsetAndMetadata> 
mockTopicPartitionOffset() {
-        final TopicPartition t0 = new TopicPartition("t0", 2);
-        final TopicPartition t1 = new TopicPartition("t0", 3);
-        HashMap<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = new 
HashMap<>();
-        topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L));
-        topicPartitionOffsets.put(t1, new OffsetAndMetadata(20L));
-        return topicPartitionOffsets;
+    private static Stream<Arguments> applicationEvents() {
+        Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
+        final long currentTimeMs = 12345;
+        return Stream.of(
+                Arguments.of(new PollEvent(100)),
+                Arguments.of(new NewTopicsMetadataUpdateRequestEvent()),
+                Arguments.of(new AsyncCommitEvent(new HashMap<>())),
+                Arguments.of(new SyncCommitEvent(new HashMap<>(), 500)),
+                Arguments.of(new ResetPositionsEvent(500)),
+                Arguments.of(new ValidatePositionsEvent(500)),
+                Arguments.of(new TopicMetadataEvent("topic", Long.MAX_VALUE)),
+                Arguments.of(new AssignmentChangeEvent(offset, 
currentTimeMs)));
     }
 }

Reply via email to