This is an automated email from the ASF dual-hosted git repository.

divijv pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5bdea94c058 KAFKA-14133: Move MeteredSessionStoreTest, 
MeteredWindowStoreTest and ReadOnlyKeyValueStoreFacadeTest to Mockito (#14404)
5bdea94c058 is described below

commit 5bdea94c0582104fd1d471918dc143aa9bee67d3
Author: Christo Lolov <[email protected]>
AuthorDate: Fri Sep 22 10:27:11 2023 +0100

    KAFKA-14133: Move MeteredSessionStoreTest, MeteredWindowStoreTest and 
ReadOnlyKeyValueStoreFacadeTest to Mockito (#14404)
    
    Reviewers: Divij Vaidya <[email protected]>
---
 .../state/internals/MeteredSessionStoreTest.java   | 166 ++++++++-------------
 .../state/internals/MeteredWindowStoreTest.java    | 166 ++++++++-------------
 .../internals/ReadOnlyKeyValueStoreFacadeTest.java |  54 +++----
 3 files changed, 146 insertions(+), 240 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index 5de51279f19..f1c64c2d4fb 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -44,12 +44,11 @@ import 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.test.KeyValueIteratorStub;
-import org.easymock.EasyMockRule;
-import org.easymock.Mock;
-import org.easymock.MockType;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import java.util.Collections;
 import java.util.List;
@@ -58,15 +57,6 @@ import java.util.stream.Collectors;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.aryEq;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.mock;
-import static org.easymock.EasyMock.niceMock;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.empty;
@@ -75,12 +65,16 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
-
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class MeteredSessionStoreTest {
 
-    @Rule
-    public EasyMockRule rule = new EasyMockRule(this);
-
     private static final String APPLICATION_ID = "test-app";
     private static final String STORE_TYPE = "scope";
     private static final String STORE_NAME = "mocked-store";
@@ -101,9 +95,9 @@ public class MeteredSessionStoreTest {
     private final TaskId taskId = new TaskId(0, 0, "My-Topology");
     private final Metrics metrics = new Metrics();
     private MeteredSessionStore<String, String> store;
-    @Mock(type = MockType.NICE)
+    @Mock
     private SessionStore<Bytes, byte[]> innerStore;
-    @Mock(type = MockType.NICE)
+    @Mock
     private InternalProcessorContext context;
 
     private Map<String, String> tags;
@@ -119,12 +113,12 @@ public class MeteredSessionStoreTest {
             mockTime
         );
         metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
-        expect(context.applicationId()).andStubReturn(APPLICATION_ID);
-        expect(context.metrics())
-            .andStubReturn(new StreamsMetricsImpl(metrics, "test", 
StreamsConfig.METRICS_LATEST, mockTime));
-        expect(context.taskId()).andStubReturn(taskId);
-        
expect(context.changelogFor(STORE_NAME)).andStubReturn(CHANGELOG_TOPIC);
-        expect(innerStore.name()).andStubReturn(STORE_NAME);
+        when(context.applicationId()).thenReturn(APPLICATION_ID);
+        when(context.metrics())
+            .thenReturn(new StreamsMetricsImpl(metrics, "test", 
StreamsConfig.METRICS_LATEST, mockTime));
+        when(context.taskId()).thenReturn(taskId);
+        when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC);
+        when(innerStore.name()).thenReturn(STORE_NAME);
         tags = mkMap(
             mkEntry(THREAD_ID_TAG_KEY, threadId),
             mkEntry("task-id", taskId.toString()),
@@ -133,45 +127,34 @@ public class MeteredSessionStoreTest {
     }
 
     private void init() {
-        replay(innerStore, context);
         store.init((StateStoreContext) context, store);
     }
 
     @SuppressWarnings("deprecation")
     @Test
     public void shouldDelegateDeprecatedInit() {
-        final SessionStore<Bytes, byte[]> inner = mock(SessionStore.class);
         final MeteredSessionStore<String, String> outer = new 
MeteredSessionStore<>(
-            inner,
+            innerStore,
             STORE_TYPE,
             Serdes.String(),
             Serdes.String(),
             new MockTime()
         );
-        expect(inner.name()).andStubReturn("store");
-        inner.init((ProcessorContext) context, outer);
-        expectLastCall();
-        replay(inner, context);
+        doNothing().when(innerStore).init((ProcessorContext) context, outer);
         outer.init((ProcessorContext) context, outer);
-        verify(inner);
     }
 
     @Test
     public void shouldDelegateInit() {
-        final SessionStore<Bytes, byte[]> inner = mock(SessionStore.class);
         final MeteredSessionStore<String, String> outer = new 
MeteredSessionStore<>(
-            inner,
+            innerStore,
             STORE_TYPE,
             Serdes.String(),
             Serdes.String(),
             new MockTime()
         );
-        expect(inner.name()).andStubReturn("store");
-        inner.init((StateStoreContext) context, outer);
-        expectLastCall();
-        replay(inner, context);
+        doNothing().when(innerStore).init((StateStoreContext) context, outer);
         outer.init((StateStoreContext) context, outer);
-        verify(inner);
     }
 
     @Test
@@ -183,24 +166,24 @@ public class MeteredSessionStoreTest {
     public void 
shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() {
         final String defaultChangelogTopicName =
             ProcessorStateManager.storeChangelogTopic(APPLICATION_ID, 
STORE_NAME, taskId.topologyName());
-        expect(context.changelogFor(STORE_NAME)).andReturn(null);
+        when(context.changelogFor(STORE_NAME)).thenReturn(null);
         
doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName);
     }
 
+    @SuppressWarnings("unchecked")
     private void doShouldPassChangelogTopicNameToStateStoreSerde(final String 
topic) {
-        final Serde<String> keySerde = niceMock(Serde.class);
+        final Serde<String> keySerde = mock(Serde.class);
         final Serializer<String> keySerializer = mock(Serializer.class);
-        final Serde<String> valueSerde = niceMock(Serde.class);
+        final Serde<String> valueSerde = mock(Serde.class);
         final Deserializer<String> valueDeserializer = 
mock(Deserializer.class);
         final Serializer<String> valueSerializer = mock(Serializer.class);
-        expect(keySerde.serializer()).andStubReturn(keySerializer);
-        expect(keySerializer.serialize(topic, 
KEY)).andStubReturn(KEY.getBytes());
-        expect(valueSerde.deserializer()).andStubReturn(valueDeserializer);
-        expect(valueDeserializer.deserialize(topic, 
VALUE_BYTES)).andStubReturn(VALUE);
-        expect(valueSerde.serializer()).andStubReturn(valueSerializer);
-        expect(valueSerializer.serialize(topic, 
VALUE)).andStubReturn(VALUE_BYTES);
-        expect(innerStore.fetchSession(KEY_BYTES, START_TIMESTAMP, 
END_TIMESTAMP)).andStubReturn(VALUE_BYTES);
-        replay(innerStore, context, keySerializer, keySerde, 
valueDeserializer, valueSerializer, valueSerde);
+        when(keySerde.serializer()).thenReturn(keySerializer);
+        when(keySerializer.serialize(topic, KEY)).thenReturn(KEY.getBytes());
+        when(valueSerde.deserializer()).thenReturn(valueDeserializer);
+        when(valueDeserializer.deserialize(topic, 
VALUE_BYTES)).thenReturn(VALUE);
+        when(valueSerde.serializer()).thenReturn(valueSerializer);
+        when(valueSerializer.serialize(topic, VALUE)).thenReturn(VALUE_BYTES);
+        when(innerStore.fetchSession(KEY_BYTES, START_TIMESTAMP, 
END_TIMESTAMP)).thenReturn(VALUE_BYTES);
         store = new MeteredSessionStore<>(
             innerStore,
             STORE_TYPE,
@@ -212,8 +195,6 @@ public class MeteredSessionStoreTest {
 
         store.fetchSession(KEY, START_TIMESTAMP, END_TIMESTAMP);
         store.put(WINDOWED_KEY, VALUE);
-
-        verify(keySerializer, valueDeserializer, valueSerializer);
     }
 
     @Test
@@ -237,8 +218,7 @@ public class MeteredSessionStoreTest {
 
     @Test
     public void shouldWriteBytesToInnerStoreAndRecordPutMetric() {
-        innerStore.put(eq(WINDOWED_KEY_BYTES), aryEq(VALUE_BYTES));
-        expectLastCall();
+        doNothing().when(innerStore).put(WINDOWED_KEY_BYTES, VALUE_BYTES);
         init();
 
         store.put(WINDOWED_KEY, VALUE);
@@ -247,13 +227,12 @@ public class MeteredSessionStoreTest {
         // and the sensor is tested elsewhere
         final KafkaMetric metric = metric("put-rate");
         assertTrue(((Double) metric.metricValue()) > 0);
-        verify(innerStore);
     }
 
     @Test
     public void shouldFindSessionsFromStoreAndRecordFetchMetric() {
-        expect(innerStore.findSessions(KEY_BYTES, 0, 0))
-                .andReturn(new KeyValueIteratorStub<>(
+        when(innerStore.findSessions(KEY_BYTES, 0, 0))
+                .thenReturn(new KeyValueIteratorStub<>(
                         
Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, 
VALUE_BYTES)).iterator()));
         init();
 
@@ -266,13 +245,12 @@ public class MeteredSessionStoreTest {
         // and the sensor is tested elsewhere
         final KafkaMetric metric = metric("fetch-rate");
         assertTrue((Double) metric.metricValue() > 0);
-        verify(innerStore);
     }
 
     @Test
     public void shouldBackwardFindSessionsFromStoreAndRecordFetchMetric() {
-        expect(innerStore.backwardFindSessions(KEY_BYTES, 0, 0))
-            .andReturn(
+        when(innerStore.backwardFindSessions(KEY_BYTES, 0, 0))
+            .thenReturn(
                 new KeyValueIteratorStub<>(
                     Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, 
VALUE_BYTES)).iterator()
                 )
@@ -288,13 +266,12 @@ public class MeteredSessionStoreTest {
         // and the sensor is tested elsewhere
         final KafkaMetric metric = metric("fetch-rate");
         assertTrue((Double) metric.metricValue() > 0);
-        verify(innerStore);
     }
 
     @Test
     public void shouldFindSessionRangeFromStoreAndRecordFetchMetric() {
-        expect(innerStore.findSessions(KEY_BYTES, KEY_BYTES, 0, 0))
-                .andReturn(new KeyValueIteratorStub<>(
+        when(innerStore.findSessions(KEY_BYTES, KEY_BYTES, 0, 0))
+                .thenReturn(new KeyValueIteratorStub<>(
                         
Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, 
VALUE_BYTES)).iterator()));
         init();
 
@@ -307,13 +284,12 @@ public class MeteredSessionStoreTest {
         // and the sensor is tested elsewhere
         final KafkaMetric metric = metric("fetch-rate");
         assertTrue((Double) metric.metricValue() > 0);
-        verify(innerStore);
     }
 
     @Test
     public void shouldBackwardFindSessionRangeFromStoreAndRecordFetchMetric() {
-        expect(innerStore.backwardFindSessions(KEY_BYTES, KEY_BYTES, 0, 0))
-            .andReturn(
+        when(innerStore.backwardFindSessions(KEY_BYTES, KEY_BYTES, 0, 0))
+            .thenReturn(
                 new KeyValueIteratorStub<>(
                     Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, 
VALUE_BYTES)).iterator()
                 )
@@ -329,13 +305,11 @@ public class MeteredSessionStoreTest {
         // and the sensor is tested elsewhere
         final KafkaMetric metric = metric("fetch-rate");
         assertTrue((Double) metric.metricValue() > 0);
-        verify(innerStore);
     }
 
     @Test
     public void shouldRemoveFromStoreAndRecordRemoveMetric() {
-        innerStore.remove(WINDOWED_KEY_BYTES);
-        expectLastCall();
+        doNothing().when(innerStore).remove(WINDOWED_KEY_BYTES);
 
         init();
 
@@ -345,13 +319,12 @@ public class MeteredSessionStoreTest {
         // and the sensor is tested elsewhere
         final KafkaMetric metric = metric("remove-rate");
         assertTrue((Double) metric.metricValue() > 0);
-        verify(innerStore);
     }
 
     @Test
     public void shouldFetchForKeyAndRecordFetchMetric() {
-        expect(innerStore.fetch(KEY_BYTES))
-                .andReturn(new KeyValueIteratorStub<>(
+        when(innerStore.fetch(KEY_BYTES))
+                .thenReturn(new KeyValueIteratorStub<>(
                         
Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, 
VALUE_BYTES)).iterator()));
         init();
 
@@ -364,13 +337,12 @@ public class MeteredSessionStoreTest {
         // and the sensor is tested elsewhere
         final KafkaMetric metric = metric("fetch-rate");
         assertTrue((Double) metric.metricValue() > 0);
-        verify(innerStore);
     }
 
     @Test
     public void shouldBackwardFetchForKeyAndRecordFetchMetric() {
-        expect(innerStore.backwardFetch(KEY_BYTES))
-            .andReturn(
+        when(innerStore.backwardFetch(KEY_BYTES))
+            .thenReturn(
                 new KeyValueIteratorStub<>(
                     Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, 
VALUE_BYTES)).iterator()
                 )
@@ -386,13 +358,12 @@ public class MeteredSessionStoreTest {
         // and the sensor is tested elsewhere
         final KafkaMetric metric = metric("fetch-rate");
         assertTrue((Double) metric.metricValue() > 0);
-        verify(innerStore);
     }
 
     @Test
     public void shouldFetchRangeFromStoreAndRecordFetchMetric() {
-        expect(innerStore.fetch(KEY_BYTES, KEY_BYTES))
-                .andReturn(new KeyValueIteratorStub<>(
+        when(innerStore.fetch(KEY_BYTES, KEY_BYTES))
+                .thenReturn(new KeyValueIteratorStub<>(
                         
Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, 
VALUE_BYTES)).iterator()));
         init();
 
@@ -405,13 +376,12 @@ public class MeteredSessionStoreTest {
         // and the sensor is tested elsewhere
         final KafkaMetric metric = metric("fetch-rate");
         assertTrue((Double) metric.metricValue() > 0);
-        verify(innerStore);
     }
 
     @Test
     public void shouldBackwardFetchRangeFromStoreAndRecordFetchMetric() {
-        expect(innerStore.backwardFetch(KEY_BYTES, KEY_BYTES))
-            .andReturn(
+        when(innerStore.backwardFetch(KEY_BYTES, KEY_BYTES))
+            .thenReturn(
                 new KeyValueIteratorStub<>(
                     Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES, 
VALUE_BYTES)).iterator()
                 )
@@ -427,14 +397,13 @@ public class MeteredSessionStoreTest {
         // and the sensor is tested elsewhere
         final KafkaMetric metric = metric("fetch-rate");
         assertTrue((Double) metric.metricValue() > 0);
-        verify(innerStore);
     }
 
     @Test
     public void shouldReturnNoSessionsWhenFetchedKeyHasExpired() {
         final long systemTime = Time.SYSTEM.milliseconds();
-        expect(innerStore.findSessions(KEY_BYTES, systemTime - 
RETENTION_PERIOD, systemTime))
-                .andReturn(new 
KeyValueIteratorStub<>(KeyValueIterators.emptyIterator()));
+        when(innerStore.findSessions(KEY_BYTES, systemTime - RETENTION_PERIOD, 
systemTime))
+                .thenReturn(new 
KeyValueIteratorStub<>(KeyValueIterators.emptyIterator()));
         init();
 
         final KeyValueIterator<Windowed<String>, String> iterator = 
store.findSessions(KEY, systemTime - RETENTION_PERIOD, systemTime);
@@ -445,8 +414,8 @@ public class MeteredSessionStoreTest {
     @Test
     public void 
shouldReturnNoSessionsInBackwardOrderWhenFetchedKeyHasExpired() {
         final long systemTime = Time.SYSTEM.milliseconds();
-        expect(innerStore.backwardFindSessions(KEY_BYTES, systemTime - 
RETENTION_PERIOD, systemTime))
-                .andReturn(new 
KeyValueIteratorStub<>(KeyValueIterators.emptyIterator()));
+        when(innerStore.backwardFindSessions(KEY_BYTES, systemTime - 
RETENTION_PERIOD, systemTime))
+                .thenReturn(new 
KeyValueIteratorStub<>(KeyValueIterators.emptyIterator()));
         init();
 
         final KeyValueIterator<Windowed<String>, String> iterator = 
store.backwardFindSessions(KEY, systemTime - RETENTION_PERIOD, systemTime);
@@ -457,8 +426,8 @@ public class MeteredSessionStoreTest {
     @Test
     public void shouldNotFindExpiredSessionRangeFromStore() {
         final long systemTime = Time.SYSTEM.milliseconds();
-        expect(innerStore.findSessions(KEY_BYTES, KEY_BYTES, systemTime - 
RETENTION_PERIOD, systemTime))
-                .andReturn(new 
KeyValueIteratorStub<>(KeyValueIterators.emptyIterator()));
+        when(innerStore.findSessions(KEY_BYTES, KEY_BYTES, systemTime - 
RETENTION_PERIOD, systemTime))
+                .thenReturn(new 
KeyValueIteratorStub<>(KeyValueIterators.emptyIterator()));
         init();
 
         final KeyValueIterator<Windowed<String>, String> iterator = 
store.findSessions(KEY, KEY, systemTime - RETENTION_PERIOD, systemTime);
@@ -469,8 +438,8 @@ public class MeteredSessionStoreTest {
     @Test
     public void shouldNotFindExpiredSessionRangeInBackwardOrderFromStore() {
         final long systemTime = Time.SYSTEM.milliseconds();
-        expect(innerStore.backwardFindSessions(KEY_BYTES, KEY_BYTES, 
systemTime - RETENTION_PERIOD, systemTime))
-                .andReturn(new 
KeyValueIteratorStub<>(KeyValueIterators.emptyIterator()));
+        when(innerStore.backwardFindSessions(KEY_BYTES, KEY_BYTES, systemTime 
- RETENTION_PERIOD, systemTime))
+                .thenReturn(new 
KeyValueIteratorStub<>(KeyValueIterators.emptyIterator()));
         init();
 
         final KeyValueIterator<Windowed<String>, String> iterator = 
store.backwardFindSessions(KEY, KEY, systemTime - RETENTION_PERIOD, systemTime);
@@ -490,7 +459,7 @@ public class MeteredSessionStoreTest {
 
     @Test
     public void shouldNotThrowNullPointerExceptionIfFetchSessionReturnsNull() {
-        expect(innerStore.fetchSession(Bytes.wrap("a".getBytes()), 0, 
Long.MAX_VALUE)).andReturn(null);
+        when(innerStore.fetchSession(Bytes.wrap("a".getBytes()), 0, 
Long.MAX_VALUE)).thenReturn(null);
 
         init();
         assertNull(store.fetchSession("a", 0, Long.MAX_VALUE));
@@ -598,8 +567,7 @@ public class MeteredSessionStoreTest {
     public void shouldSetFlushListenerOnWrappedCachingStore() {
         final CachedSessionStore cachedSessionStore = 
mock(CachedSessionStore.class);
 
-        
expect(cachedSessionStore.setFlushListener(anyObject(CacheFlushListener.class), 
eq(false))).andReturn(true);
-        replay(cachedSessionStore);
+        
when(cachedSessionStore.setFlushListener(any(CacheFlushListener.class), 
eq(false))).thenReturn(true);
 
         store = new MeteredSessionStore<>(
             cachedSessionStore,
@@ -608,8 +576,6 @@ public class MeteredSessionStoreTest {
             Serdes.String(),
             new MockTime());
         assertTrue(store.setFlushListener(null, false));
-
-        verify(cachedSessionStore);
     }
 
     @Test
@@ -619,27 +585,23 @@ public class MeteredSessionStoreTest {
 
     @Test
     public void shouldRemoveMetricsOnClose() {
-        innerStore.close();
-        expectLastCall();
+        doNothing().when(innerStore).close();
         init(); // replays "inner"
 
         // There's always a "count" metric registered
         assertThat(storeMetrics(), not(empty()));
         store.close();
         assertThat(storeMetrics(), empty());
-        verify(innerStore);
     }
 
     @Test
     public void shouldRemoveMetricsEvenIfWrappedStoreThrowsOnClose() {
-        innerStore.close();
-        expectLastCall().andThrow(new RuntimeException("Oops!"));
+        doThrow(new RuntimeException("Oops!")).when(innerStore).close();
         init(); // replays "inner"
 
         assertThat(storeMetrics(), not(empty()));
         assertThrows(RuntimeException.class, store::close);
         assertThat(storeMetrics(), empty());
-        verify(innerStore);
     }
 
     private KafkaMetric metric(final String name) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 20eb5ec88a1..952a52dbff2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -44,6 +44,8 @@ import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import java.time.temporal.ChronoUnit;
 import java.util.List;
@@ -53,15 +55,6 @@ import java.util.stream.Collectors;
 import static java.time.Instant.ofEpochMilli;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.createNiceMock;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.mock;
-import static org.easymock.EasyMock.niceMock;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.greaterThan;
@@ -70,7 +63,14 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
-
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class MeteredWindowStoreTest {
 
     private static final String STORE_TYPE = "scope";
@@ -88,7 +88,8 @@ public class MeteredWindowStoreTest {
 
     private final String threadId = Thread.currentThread().getName();
     private InternalMockProcessorContext context;
-    private final WindowStore<Bytes, byte[]> innerStoreMock = 
createNiceMock(WindowStore.class);
+    @SuppressWarnings("unchecked")
+    private final WindowStore<Bytes, byte[]> innerStoreMock = 
mock(WindowStore.class);
     private MeteredWindowStore<String, String> store = new 
MeteredWindowStore<>(
         innerStoreMock,
         WINDOW_SIZE_MS, // any size
@@ -101,7 +102,7 @@ public class MeteredWindowStoreTest {
     private Map<String, String> tags;
 
     {
-        expect(innerStoreMock.name()).andReturn(STORE_NAME).anyTimes();
+        when(innerStoreMock.name()).thenReturn(STORE_NAME);
     }
 
     @Before
@@ -128,40 +129,32 @@ public class MeteredWindowStoreTest {
     @SuppressWarnings("deprecation")
     @Test
     public void shouldDelegateDeprecatedInit() {
-        final WindowStore<Bytes, byte[]> inner = mock(WindowStore.class);
         final MeteredWindowStore<String, String> outer = new 
MeteredWindowStore<>(
-            inner,
+            innerStoreMock,
             WINDOW_SIZE_MS, // any size
             STORE_TYPE,
             new MockTime(),
             Serdes.String(),
             new SerdeThatDoesntHandleNull()
         );
-        expect(inner.name()).andStubReturn("store");
-        inner.init((ProcessorContext) context, outer);
-        expectLastCall();
-        replay(inner);
+        when(innerStoreMock.name()).thenReturn("store");
+        doNothing().when(innerStoreMock).init((ProcessorContext) context, 
outer);
         outer.init((ProcessorContext) context, outer);
-        verify(inner);
     }
 
     @Test
     public void shouldDelegateInit() {
-        final WindowStore<Bytes, byte[]> inner = mock(WindowStore.class);
         final MeteredWindowStore<String, String> outer = new 
MeteredWindowStore<>(
-            inner,
+            innerStoreMock,
             WINDOW_SIZE_MS, // any size
             STORE_TYPE,
             new MockTime(),
             Serdes.String(),
             new SerdeThatDoesntHandleNull()
         );
-        expect(inner.name()).andStubReturn("store");
-        inner.init((StateStoreContext) context, outer);
-        expectLastCall();
-        replay(inner);
+        when(innerStoreMock.name()).thenReturn("store");
+        doNothing().when(innerStoreMock).init((StateStoreContext) context, 
outer);
         outer.init((StateStoreContext) context, outer);
-        verify(inner);
     }
 
     @Test
@@ -177,20 +170,20 @@ public class MeteredWindowStoreTest {
         
doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName);
     }
 
+    @SuppressWarnings("unchecked")
     private void doShouldPassChangelogTopicNameToStateStoreSerde(final String 
topic) {
-        final Serde<String> keySerde = niceMock(Serde.class);
+        final Serde<String> keySerde = mock(Serde.class);
         final Serializer<String> keySerializer = mock(Serializer.class);
-        final Serde<String> valueSerde = niceMock(Serde.class);
+        final Serde<String> valueSerde = mock(Serde.class);
         final Deserializer<String> valueDeserializer = 
mock(Deserializer.class);
         final Serializer<String> valueSerializer = mock(Serializer.class);
-        expect(keySerde.serializer()).andStubReturn(keySerializer);
-        expect(keySerializer.serialize(topic, 
KEY)).andStubReturn(KEY.getBytes());
-        expect(valueSerde.deserializer()).andStubReturn(valueDeserializer);
-        expect(valueDeserializer.deserialize(topic, 
VALUE_BYTES)).andStubReturn(VALUE);
-        expect(valueSerde.serializer()).andStubReturn(valueSerializer);
-        expect(valueSerializer.serialize(topic, 
VALUE)).andStubReturn(VALUE_BYTES);
-        expect(innerStoreMock.fetch(KEY_BYTES, 
TIMESTAMP)).andStubReturn(VALUE_BYTES);
-        replay(innerStoreMock, keySerializer, keySerde, valueDeserializer, 
valueSerializer, valueSerde);
+        when(keySerde.serializer()).thenReturn(keySerializer);
+        when(keySerializer.serialize(topic, KEY)).thenReturn(KEY.getBytes());
+        when(valueSerde.deserializer()).thenReturn(valueDeserializer);
+        when(valueDeserializer.deserialize(topic, 
VALUE_BYTES)).thenReturn(VALUE);
+        when(valueSerde.serializer()).thenReturn(valueSerializer);
+        when(valueSerializer.serialize(topic, VALUE)).thenReturn(VALUE_BYTES);
+        when(innerStoreMock.fetch(KEY_BYTES, 
TIMESTAMP)).thenReturn(VALUE_BYTES);
         store = new MeteredWindowStore<>(
             innerStoreMock,
             WINDOW_SIZE_MS,
@@ -203,13 +196,10 @@ public class MeteredWindowStoreTest {
 
         store.fetch(KEY, TIMESTAMP);
         store.put(KEY, VALUE, TIMESTAMP);
-
-        verify(keySerializer, valueDeserializer, valueSerializer);
     }
 
     @Test
     public void testMetrics() {
-        replay(innerStoreMock);
         store.init((StateStoreContext) context, store);
         final JmxReporter reporter = new JmxReporter();
         final MetricsContext metricsContext = new 
KafkaMetricsContext("kafka.streams");
@@ -229,22 +219,19 @@ public class MeteredWindowStoreTest {
 
     @Test
     public void shouldRecordRestoreLatencyOnInit() {
-        innerStoreMock.init((StateStoreContext) context, store);
-        replay(innerStoreMock);
+        doNothing().when(innerStoreMock).init((StateStoreContext) context, 
store);
         store.init((StateStoreContext) context, store);
 
         // it suffices to verify one restore metric since all restore metrics 
are recorded by the same sensor
         // and the sensor is tested elsewhere
         final KafkaMetric metric = metric("restore-rate");
         assertThat((Double) metric.metricValue(), greaterThan(0.0));
-        verify(innerStoreMock);
     }
 
     @Test
     public void shouldPutToInnerStoreAndRecordPutMetrics() {
         final byte[] bytes = "a".getBytes();
-        innerStoreMock.put(eq(Bytes.wrap(bytes)), anyObject(), 
eq(context.timestamp()));
-        replay(innerStoreMock);
+        doNothing().when(innerStoreMock).put(eq(Bytes.wrap(bytes)), any(), 
eq(context.timestamp()));
 
         store.init((StateStoreContext) context, store);
         store.put("a", "a", context.timestamp());
@@ -253,14 +240,12 @@ public class MeteredWindowStoreTest {
         // and the sensor is tested elsewhere
         final KafkaMetric metric = metric("put-rate");
         assertThat((Double) metric.metricValue(), greaterThan(0.0));
-        verify(innerStoreMock);
     }
 
     @Test
     public void shouldFetchFromInnerStoreAndRecordFetchMetrics() {
-        expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1, 1))
-            .andReturn(KeyValueIterators.emptyWindowStoreIterator());
-        replay(innerStoreMock);
+        when(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1, 1))
+            .thenReturn(KeyValueIterators.emptyWindowStoreIterator());
 
         store.init((StateStoreContext) context, store);
         store.fetch("a", ofEpochMilli(1), ofEpochMilli(1)).close(); // 
recorded on close;
@@ -269,32 +254,27 @@ public class MeteredWindowStoreTest {
         // and the sensor is tested elsewhere
         final KafkaMetric metric = metric("fetch-rate");
         assertThat((Double) metric.metricValue(), greaterThan(0.0));
-        verify(innerStoreMock);
     }
 
     @Test
     public void shouldReturnNoRecordWhenFetchedKeyHasExpired() {
-        expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1, 1 + 
RETENTION_PERIOD))
-                .andReturn(KeyValueIterators.emptyWindowStoreIterator());
-        replay(innerStoreMock);
+        when(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1, 1 + 
RETENTION_PERIOD))
+                .thenReturn(KeyValueIterators.emptyWindowStoreIterator());
 
         store.init((StateStoreContext) context, store);
         store.fetch("a", ofEpochMilli(1), 
ofEpochMilli(1).plus(RETENTION_PERIOD, ChronoUnit.MILLIS)).close(); // recorded 
on close;
-
-        verify(innerStoreMock);
     }
 
     @Test
     public void shouldFetchRangeFromInnerStoreAndRecordFetchMetrics() {
-        expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 
Bytes.wrap("b".getBytes()), 1, 1))
-            .andReturn(KeyValueIterators.emptyIterator());
-        expect(innerStoreMock.fetch(null, Bytes.wrap("b".getBytes()), 1, 1))
-            .andReturn(KeyValueIterators.emptyIterator());
-        expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), null, 1, 1))
-            .andReturn(KeyValueIterators.emptyIterator());
-        expect(innerStoreMock.fetch(null, null, 1, 1))
-            .andReturn(KeyValueIterators.emptyIterator());
-        replay(innerStoreMock);
+        when(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 
Bytes.wrap("b".getBytes()), 1, 1))
+            .thenReturn(KeyValueIterators.emptyIterator());
+        when(innerStoreMock.fetch(null, Bytes.wrap("b".getBytes()), 1, 1))
+            .thenReturn(KeyValueIterators.emptyIterator());
+        when(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), null, 1, 1))
+            .thenReturn(KeyValueIterators.emptyIterator());
+        when(innerStoreMock.fetch(null, null, 1, 1))
+            .thenReturn(KeyValueIterators.emptyIterator());
 
         store.init((StateStoreContext) context, store);
         store.fetch("a", "b", ofEpochMilli(1), ofEpochMilli(1)).close(); // 
recorded on close;
@@ -306,14 +286,12 @@ public class MeteredWindowStoreTest {
         // and the sensor is tested elsewhere
         final KafkaMetric metric = metric("fetch-rate");
         assertThat((Double) metric.metricValue(), greaterThan(0.0));
-        verify(innerStoreMock);
     }
 
     @Test
     public void shouldBackwardFetchFromInnerStoreAndRecordFetchMetrics() {
-        expect(innerStoreMock.backwardFetch(Bytes.wrap("a".getBytes()), 
Bytes.wrap("b".getBytes()), 1, 1))
-            .andReturn(KeyValueIterators.emptyIterator());
-        replay(innerStoreMock);
+        when(innerStoreMock.backwardFetch(Bytes.wrap("a".getBytes()), 
Bytes.wrap("b".getBytes()), 1, 1))
+            .thenReturn(KeyValueIterators.emptyIterator());
 
         store.init((StateStoreContext) context, store);
         store.backwardFetch("a", "b", ofEpochMilli(1), 
ofEpochMilli(1)).close(); // recorded on close;
@@ -322,20 +300,18 @@ public class MeteredWindowStoreTest {
         // and the sensor is tested elsewhere
         final KafkaMetric metric = metric("fetch-rate");
         assertThat((Double) metric.metricValue(), greaterThan(0.0));
-        verify(innerStoreMock);
     }
 
     @Test
     public void shouldBackwardFetchRangeFromInnerStoreAndRecordFetchMetrics() {
-        expect(innerStoreMock.backwardFetch(Bytes.wrap("a".getBytes()), 
Bytes.wrap("b".getBytes()), 1, 1))
-            .andReturn(KeyValueIterators.emptyIterator());
-        expect(innerStoreMock.backwardFetch(null, Bytes.wrap("b".getBytes()), 
1, 1))
-            .andReturn(KeyValueIterators.emptyIterator());
-        expect(innerStoreMock.backwardFetch(Bytes.wrap("a".getBytes()), null, 
1, 1))
-            .andReturn(KeyValueIterators.emptyIterator());
-        expect(innerStoreMock.backwardFetch(null, null, 1, 1))
-            .andReturn(KeyValueIterators.emptyIterator());
-        replay(innerStoreMock);
+        when(innerStoreMock.backwardFetch(Bytes.wrap("a".getBytes()), 
Bytes.wrap("b".getBytes()), 1, 1))
+            .thenReturn(KeyValueIterators.emptyIterator());
+        when(innerStoreMock.backwardFetch(null, Bytes.wrap("b".getBytes()), 1, 
1))
+            .thenReturn(KeyValueIterators.emptyIterator());
+        when(innerStoreMock.backwardFetch(Bytes.wrap("a".getBytes()), null, 1, 
1))
+            .thenReturn(KeyValueIterators.emptyIterator());
+        when(innerStoreMock.backwardFetch(null, null, 1, 1))
+            .thenReturn(KeyValueIterators.emptyIterator());
 
         store.init((StateStoreContext) context, store);
         store.backwardFetch("a", "b", ofEpochMilli(1), 
ofEpochMilli(1)).close(); // recorded on close;
@@ -347,13 +323,11 @@ public class MeteredWindowStoreTest {
         // and the sensor is tested elsewhere
         final KafkaMetric metric = metric("fetch-rate");
         assertThat((Double) metric.metricValue(), greaterThan(0.0));
-        verify(innerStoreMock);
     }
 
     @Test
     public void shouldFetchAllFromInnerStoreAndRecordFetchMetrics() {
-        expect(innerStoreMock.fetchAll(1, 
1)).andReturn(KeyValueIterators.emptyIterator());
-        replay(innerStoreMock);
+        when(innerStoreMock.fetchAll(1, 
1)).thenReturn(KeyValueIterators.emptyIterator());
 
         store.init((StateStoreContext) context, store);
         store.fetchAll(ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded 
on close;
@@ -362,13 +336,11 @@ public class MeteredWindowStoreTest {
         // and the sensor is tested elsewhere
         final KafkaMetric metric = metric("fetch-rate");
         assertThat((Double) metric.metricValue(), greaterThan(0.0));
-        verify(innerStoreMock);
     }
 
     @Test
     public void shouldBackwardFetchAllFromInnerStoreAndRecordFetchMetrics() {
-        expect(innerStoreMock.backwardFetchAll(1, 
1)).andReturn(KeyValueIterators.emptyIterator());
-        replay(innerStoreMock);
+        when(innerStoreMock.backwardFetchAll(1, 
1)).thenReturn(KeyValueIterators.emptyIterator());
 
         store.init((StateStoreContext) context, store);
         store.backwardFetchAll(ofEpochMilli(1), ofEpochMilli(1)).close(); // 
recorded on close;
@@ -377,13 +349,11 @@ public class MeteredWindowStoreTest {
         // and the sensor is tested elsewhere
         final KafkaMetric metric = metric("fetch-rate");
         assertThat((Double) metric.metricValue(), greaterThan(0.0));
-        verify(innerStoreMock);
     }
 
     @Test
     public void shouldRecordFlushLatency() {
-        innerStoreMock.flush();
-        replay(innerStoreMock);
+        doNothing().when(innerStoreMock).flush();
 
         store.init((StateStoreContext) context, store);
         store.flush();
@@ -392,13 +362,11 @@ public class MeteredWindowStoreTest {
         // and the sensor is tested elsewhere
         final KafkaMetric metric = metric("flush-rate");
         assertTrue((Double) metric.metricValue() > 0);
-        verify(innerStoreMock);
     }
 
     @Test
     public void shouldNotThrowNullPointerExceptionIfFetchReturnsNull() {
-        expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 
0)).andReturn(null);
-        replay(innerStoreMock);
+        when(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 
0)).thenReturn(null);
 
         store.init((StateStoreContext) context, store);
         assertNull(store.fetch("a", 0));
@@ -412,8 +380,7 @@ public class MeteredWindowStoreTest {
     public void shouldSetFlushListenerOnWrappedCachingStore() {
         final CachedWindowStore cachedWindowStore = 
mock(CachedWindowStore.class);
 
-        
expect(cachedWindowStore.setFlushListener(anyObject(CacheFlushListener.class), 
eq(false))).andReturn(true);
-        replay(cachedWindowStore);
+        when(cachedWindowStore.setFlushListener(any(CacheFlushListener.class), 
eq(false))).thenReturn(true);
 
         final MeteredWindowStore<String, String> metered = new 
MeteredWindowStore<>(
             cachedWindowStore,
@@ -424,8 +391,6 @@ public class MeteredWindowStoreTest {
             new SerdeThatDoesntHandleNull()
         );
         assertTrue(metered.setFlushListener(null, false));
-
-        verify(cachedWindowStore);
     }
 
     @Test
@@ -435,40 +400,31 @@ public class MeteredWindowStoreTest {
 
     @Test
     public void shouldCloseUnderlyingStore() {
-        innerStoreMock.close();
-        expectLastCall();
-        replay(innerStoreMock);
+        doNothing().when(innerStoreMock).close();
         store.init((StateStoreContext) context, store);
 
         store.close();
-        verify(innerStoreMock);
     }
 
     @Test
     public void shouldRemoveMetricsOnClose() {
-        innerStoreMock.close();
-        expectLastCall();
-        replay(innerStoreMock);
+        doNothing().when(innerStoreMock).close();
         store.init((StateStoreContext) context, store);
 
         assertThat(storeMetrics(), not(empty()));
         store.close();
         assertThat(storeMetrics(), empty());
-        verify(innerStoreMock);
     }
 
     @Test
     public void shouldRemoveMetricsEvenIfWrappedStoreThrowsOnClose() {
-        innerStoreMock.close();
-        expectLastCall().andThrow(new RuntimeException("Oops!"));
-        replay(innerStoreMock);
+        doThrow(new RuntimeException("Oops!")).when(innerStoreMock).close();
         store.init((StateStoreContext) context, store);
 
         // There's always a "count" metric registered
         assertThat(storeMetrics(), not(empty()));
         assertThrows(RuntimeException.class, store::close);
         assertThat(storeMetrics(), empty());
-        verify(innerStoreMock);
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacadeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacadeTest.java
index ffb5ab3c53d..d2b69e97415 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacadeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacadeTest.java
@@ -21,20 +21,18 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
-import org.easymock.EasyMockRunner;
-import org.easymock.Mock;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
 
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.when;
 
-@RunWith(EasyMockRunner.class)
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class ReadOnlyKeyValueStoreFacadeTest {
     @Mock
     private TimestampedKeyValueStore<String, String> 
mockedKeyValueTimestampStore;
@@ -50,66 +48,56 @@ public class ReadOnlyKeyValueStoreFacadeTest {
 
     @Test
     public void shouldReturnPlainValueOnGet() {
-        expect(mockedKeyValueTimestampStore.get("key"))
-            .andReturn(ValueAndTimestamp.make("value", 42L));
-        expect(mockedKeyValueTimestampStore.get("unknownKey"))
-            .andReturn(null);
-        replay(mockedKeyValueTimestampStore);
+        when(mockedKeyValueTimestampStore.get("key"))
+            .thenReturn(ValueAndTimestamp.make("value", 42L));
+        when(mockedKeyValueTimestampStore.get("unknownKey"))
+            .thenReturn(null);
 
         assertThat(readOnlyKeyValueStoreFacade.get("key"), is("value"));
         assertNull(readOnlyKeyValueStoreFacade.get("unknownKey"));
-        verify(mockedKeyValueTimestampStore);
     }
 
     @Test
     public void shouldReturnPlainKeyValuePairsForRangeIterator() {
-        expect(mockedKeyValueTimestampIterator.next())
-            .andReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1", 
21L)))
-            .andReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2", 
42L)));
-        expect(mockedKeyValueTimestampStore.range("key1", 
"key2")).andReturn(mockedKeyValueTimestampIterator);
-        replay(mockedKeyValueTimestampIterator, mockedKeyValueTimestampStore);
+        when(mockedKeyValueTimestampIterator.next())
+            .thenReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1", 
21L)))
+            .thenReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2", 
42L)));
+        when(mockedKeyValueTimestampStore.range("key1", 
"key2")).thenReturn(mockedKeyValueTimestampIterator);
 
         final KeyValueIterator<String, String> iterator = 
readOnlyKeyValueStoreFacade.range("key1", "key2");
         assertThat(iterator.next(), is(KeyValue.pair("key1", "value1")));
         assertThat(iterator.next(), is(KeyValue.pair("key2", "value2")));
-        verify(mockedKeyValueTimestampIterator, mockedKeyValueTimestampStore);
     }
 
     @Test
     public void shouldReturnPlainKeyValuePairsForPrefixScan() {
         final StringSerializer stringSerializer = new StringSerializer();
-        expect(mockedKeyValueTimestampIterator.next())
-            .andReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1", 
21L)))
-            .andReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2", 
42L)));
-        expect(mockedKeyValueTimestampStore.prefixScan("key", 
stringSerializer)).andReturn(mockedKeyValueTimestampIterator);
-        replay(mockedKeyValueTimestampIterator, mockedKeyValueTimestampStore);
+        when(mockedKeyValueTimestampIterator.next())
+            .thenReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1", 
21L)))
+            .thenReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2", 
42L)));
+        when(mockedKeyValueTimestampStore.prefixScan("key", 
stringSerializer)).thenReturn(mockedKeyValueTimestampIterator);
 
         final KeyValueIterator<String, String> iterator = 
readOnlyKeyValueStoreFacade.prefixScan("key", stringSerializer);
         assertThat(iterator.next(), is(KeyValue.pair("key1", "value1")));
         assertThat(iterator.next(), is(KeyValue.pair("key2", "value2")));
-        verify(mockedKeyValueTimestampIterator, mockedKeyValueTimestampStore);
     }
 
     @Test
     public void shouldReturnPlainKeyValuePairsForAllIterator() {
-        expect(mockedKeyValueTimestampIterator.next())
-            .andReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1", 
21L)))
-            .andReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2", 
42L)));
-        
expect(mockedKeyValueTimestampStore.all()).andReturn(mockedKeyValueTimestampIterator);
-        replay(mockedKeyValueTimestampIterator, mockedKeyValueTimestampStore);
+        when(mockedKeyValueTimestampIterator.next())
+            .thenReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1", 
21L)))
+            .thenReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2", 
42L)));
+        
when(mockedKeyValueTimestampStore.all()).thenReturn(mockedKeyValueTimestampIterator);
 
         final KeyValueIterator<String, String> iterator = 
readOnlyKeyValueStoreFacade.all();
         assertThat(iterator.next(), is(KeyValue.pair("key1", "value1")));
         assertThat(iterator.next(), is(KeyValue.pair("key2", "value2")));
-        verify(mockedKeyValueTimestampIterator, mockedKeyValueTimestampStore);
     }
 
     @Test
     public void shouldForwardApproximateNumEntries() {
-        
expect(mockedKeyValueTimestampStore.approximateNumEntries()).andReturn(42L);
-        replay(mockedKeyValueTimestampStore);
+        
when(mockedKeyValueTimestampStore.approximateNumEntries()).thenReturn(42L);
 
         assertThat(readOnlyKeyValueStoreFacade.approximateNumEntries(), 
is(42L));
-        verify(mockedKeyValueTimestampStore);
     }
 }


Reply via email to