This is an automated email from the ASF dual-hosted git repository.
divijv pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5bdea94c058 KAFKA-14133: Move MeteredSessionStoreTest,
MeteredWindowStoreTest and ReadOnlyKeyValueStoreFacadeTest to Mockito (#14404)
5bdea94c058 is described below
commit 5bdea94c0582104fd1d471918dc143aa9bee67d3
Author: Christo Lolov <[email protected]>
AuthorDate: Fri Sep 22 10:27:11 2023 +0100
KAFKA-14133: Move MeteredSessionStoreTest, MeteredWindowStoreTest and
ReadOnlyKeyValueStoreFacadeTest to Mockito (#14404)
Reviewers: Divij Vaidya <[email protected]>
---
.../state/internals/MeteredSessionStoreTest.java | 166 ++++++++-------------
.../state/internals/MeteredWindowStoreTest.java | 166 ++++++++-------------
.../internals/ReadOnlyKeyValueStoreFacadeTest.java | 54 +++----
3 files changed, 146 insertions(+), 240 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index 5de51279f19..f1c64c2d4fb 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -44,12 +44,11 @@ import
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.test.KeyValueIteratorStub;
-import org.easymock.EasyMockRule;
-import org.easymock.Mock;
-import org.easymock.MockType;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
import java.util.Collections;
import java.util.List;
@@ -58,15 +57,6 @@ import java.util.stream.Collectors;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.aryEq;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.mock;
-import static org.easymock.EasyMock.niceMock;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
@@ -75,12 +65,16 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
-
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class MeteredSessionStoreTest {
- @Rule
- public EasyMockRule rule = new EasyMockRule(this);
-
private static final String APPLICATION_ID = "test-app";
private static final String STORE_TYPE = "scope";
private static final String STORE_NAME = "mocked-store";
@@ -101,9 +95,9 @@ public class MeteredSessionStoreTest {
private final TaskId taskId = new TaskId(0, 0, "My-Topology");
private final Metrics metrics = new Metrics();
private MeteredSessionStore<String, String> store;
- @Mock(type = MockType.NICE)
+ @Mock
private SessionStore<Bytes, byte[]> innerStore;
- @Mock(type = MockType.NICE)
+ @Mock
private InternalProcessorContext context;
private Map<String, String> tags;
@@ -119,12 +113,12 @@ public class MeteredSessionStoreTest {
mockTime
);
metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
- expect(context.applicationId()).andStubReturn(APPLICATION_ID);
- expect(context.metrics())
- .andStubReturn(new StreamsMetricsImpl(metrics, "test",
StreamsConfig.METRICS_LATEST, mockTime));
- expect(context.taskId()).andStubReturn(taskId);
-
expect(context.changelogFor(STORE_NAME)).andStubReturn(CHANGELOG_TOPIC);
- expect(innerStore.name()).andStubReturn(STORE_NAME);
+ when(context.applicationId()).thenReturn(APPLICATION_ID);
+ when(context.metrics())
+ .thenReturn(new StreamsMetricsImpl(metrics, "test",
StreamsConfig.METRICS_LATEST, mockTime));
+ when(context.taskId()).thenReturn(taskId);
+ when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC);
+ when(innerStore.name()).thenReturn(STORE_NAME);
tags = mkMap(
mkEntry(THREAD_ID_TAG_KEY, threadId),
mkEntry("task-id", taskId.toString()),
@@ -133,45 +127,34 @@ public class MeteredSessionStoreTest {
}
private void init() {
- replay(innerStore, context);
store.init((StateStoreContext) context, store);
}
@SuppressWarnings("deprecation")
@Test
public void shouldDelegateDeprecatedInit() {
- final SessionStore<Bytes, byte[]> inner = mock(SessionStore.class);
final MeteredSessionStore<String, String> outer = new
MeteredSessionStore<>(
- inner,
+ innerStore,
STORE_TYPE,
Serdes.String(),
Serdes.String(),
new MockTime()
);
- expect(inner.name()).andStubReturn("store");
- inner.init((ProcessorContext) context, outer);
- expectLastCall();
- replay(inner, context);
+ doNothing().when(innerStore).init((ProcessorContext) context, outer);
outer.init((ProcessorContext) context, outer);
- verify(inner);
}
@Test
public void shouldDelegateInit() {
- final SessionStore<Bytes, byte[]> inner = mock(SessionStore.class);
final MeteredSessionStore<String, String> outer = new
MeteredSessionStore<>(
- inner,
+ innerStore,
STORE_TYPE,
Serdes.String(),
Serdes.String(),
new MockTime()
);
- expect(inner.name()).andStubReturn("store");
- inner.init((StateStoreContext) context, outer);
- expectLastCall();
- replay(inner, context);
+ doNothing().when(innerStore).init((StateStoreContext) context, outer);
outer.init((StateStoreContext) context, outer);
- verify(inner);
}
@Test
@@ -183,24 +166,24 @@ public class MeteredSessionStoreTest {
public void
shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() {
final String defaultChangelogTopicName =
ProcessorStateManager.storeChangelogTopic(APPLICATION_ID,
STORE_NAME, taskId.topologyName());
- expect(context.changelogFor(STORE_NAME)).andReturn(null);
+ when(context.changelogFor(STORE_NAME)).thenReturn(null);
doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName);
}
+ @SuppressWarnings("unchecked")
private void doShouldPassChangelogTopicNameToStateStoreSerde(final String
topic) {
- final Serde<String> keySerde = niceMock(Serde.class);
+ final Serde<String> keySerde = mock(Serde.class);
final Serializer<String> keySerializer = mock(Serializer.class);
- final Serde<String> valueSerde = niceMock(Serde.class);
+ final Serde<String> valueSerde = mock(Serde.class);
final Deserializer<String> valueDeserializer =
mock(Deserializer.class);
final Serializer<String> valueSerializer = mock(Serializer.class);
- expect(keySerde.serializer()).andStubReturn(keySerializer);
- expect(keySerializer.serialize(topic,
KEY)).andStubReturn(KEY.getBytes());
- expect(valueSerde.deserializer()).andStubReturn(valueDeserializer);
- expect(valueDeserializer.deserialize(topic,
VALUE_BYTES)).andStubReturn(VALUE);
- expect(valueSerde.serializer()).andStubReturn(valueSerializer);
- expect(valueSerializer.serialize(topic,
VALUE)).andStubReturn(VALUE_BYTES);
- expect(innerStore.fetchSession(KEY_BYTES, START_TIMESTAMP,
END_TIMESTAMP)).andStubReturn(VALUE_BYTES);
- replay(innerStore, context, keySerializer, keySerde,
valueDeserializer, valueSerializer, valueSerde);
+ when(keySerde.serializer()).thenReturn(keySerializer);
+ when(keySerializer.serialize(topic, KEY)).thenReturn(KEY.getBytes());
+ when(valueSerde.deserializer()).thenReturn(valueDeserializer);
+ when(valueDeserializer.deserialize(topic,
VALUE_BYTES)).thenReturn(VALUE);
+ when(valueSerde.serializer()).thenReturn(valueSerializer);
+ when(valueSerializer.serialize(topic, VALUE)).thenReturn(VALUE_BYTES);
+ when(innerStore.fetchSession(KEY_BYTES, START_TIMESTAMP,
END_TIMESTAMP)).thenReturn(VALUE_BYTES);
store = new MeteredSessionStore<>(
innerStore,
STORE_TYPE,
@@ -212,8 +195,6 @@ public class MeteredSessionStoreTest {
store.fetchSession(KEY, START_TIMESTAMP, END_TIMESTAMP);
store.put(WINDOWED_KEY, VALUE);
-
- verify(keySerializer, valueDeserializer, valueSerializer);
}
@Test
@@ -237,8 +218,7 @@ public class MeteredSessionStoreTest {
@Test
public void shouldWriteBytesToInnerStoreAndRecordPutMetric() {
- innerStore.put(eq(WINDOWED_KEY_BYTES), aryEq(VALUE_BYTES));
- expectLastCall();
+ doNothing().when(innerStore).put(WINDOWED_KEY_BYTES, VALUE_BYTES);
init();
store.put(WINDOWED_KEY, VALUE);
@@ -247,13 +227,12 @@ public class MeteredSessionStoreTest {
// and the sensor is tested elsewhere
final KafkaMetric metric = metric("put-rate");
assertTrue(((Double) metric.metricValue()) > 0);
- verify(innerStore);
}
@Test
public void shouldFindSessionsFromStoreAndRecordFetchMetric() {
- expect(innerStore.findSessions(KEY_BYTES, 0, 0))
- .andReturn(new KeyValueIteratorStub<>(
+ when(innerStore.findSessions(KEY_BYTES, 0, 0))
+ .thenReturn(new KeyValueIteratorStub<>(
Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES,
VALUE_BYTES)).iterator()));
init();
@@ -266,13 +245,12 @@ public class MeteredSessionStoreTest {
// and the sensor is tested elsewhere
final KafkaMetric metric = metric("fetch-rate");
assertTrue((Double) metric.metricValue() > 0);
- verify(innerStore);
}
@Test
public void shouldBackwardFindSessionsFromStoreAndRecordFetchMetric() {
- expect(innerStore.backwardFindSessions(KEY_BYTES, 0, 0))
- .andReturn(
+ when(innerStore.backwardFindSessions(KEY_BYTES, 0, 0))
+ .thenReturn(
new KeyValueIteratorStub<>(
Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES,
VALUE_BYTES)).iterator()
)
@@ -288,13 +266,12 @@ public class MeteredSessionStoreTest {
// and the sensor is tested elsewhere
final KafkaMetric metric = metric("fetch-rate");
assertTrue((Double) metric.metricValue() > 0);
- verify(innerStore);
}
@Test
public void shouldFindSessionRangeFromStoreAndRecordFetchMetric() {
- expect(innerStore.findSessions(KEY_BYTES, KEY_BYTES, 0, 0))
- .andReturn(new KeyValueIteratorStub<>(
+ when(innerStore.findSessions(KEY_BYTES, KEY_BYTES, 0, 0))
+ .thenReturn(new KeyValueIteratorStub<>(
Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES,
VALUE_BYTES)).iterator()));
init();
@@ -307,13 +284,12 @@ public class MeteredSessionStoreTest {
// and the sensor is tested elsewhere
final KafkaMetric metric = metric("fetch-rate");
assertTrue((Double) metric.metricValue() > 0);
- verify(innerStore);
}
@Test
public void shouldBackwardFindSessionRangeFromStoreAndRecordFetchMetric() {
- expect(innerStore.backwardFindSessions(KEY_BYTES, KEY_BYTES, 0, 0))
- .andReturn(
+ when(innerStore.backwardFindSessions(KEY_BYTES, KEY_BYTES, 0, 0))
+ .thenReturn(
new KeyValueIteratorStub<>(
Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES,
VALUE_BYTES)).iterator()
)
@@ -329,13 +305,11 @@ public class MeteredSessionStoreTest {
// and the sensor is tested elsewhere
final KafkaMetric metric = metric("fetch-rate");
assertTrue((Double) metric.metricValue() > 0);
- verify(innerStore);
}
@Test
public void shouldRemoveFromStoreAndRecordRemoveMetric() {
- innerStore.remove(WINDOWED_KEY_BYTES);
- expectLastCall();
+ doNothing().when(innerStore).remove(WINDOWED_KEY_BYTES);
init();
@@ -345,13 +319,12 @@ public class MeteredSessionStoreTest {
// and the sensor is tested elsewhere
final KafkaMetric metric = metric("remove-rate");
assertTrue((Double) metric.metricValue() > 0);
- verify(innerStore);
}
@Test
public void shouldFetchForKeyAndRecordFetchMetric() {
- expect(innerStore.fetch(KEY_BYTES))
- .andReturn(new KeyValueIteratorStub<>(
+ when(innerStore.fetch(KEY_BYTES))
+ .thenReturn(new KeyValueIteratorStub<>(
Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES,
VALUE_BYTES)).iterator()));
init();
@@ -364,13 +337,12 @@ public class MeteredSessionStoreTest {
// and the sensor is tested elsewhere
final KafkaMetric metric = metric("fetch-rate");
assertTrue((Double) metric.metricValue() > 0);
- verify(innerStore);
}
@Test
public void shouldBackwardFetchForKeyAndRecordFetchMetric() {
- expect(innerStore.backwardFetch(KEY_BYTES))
- .andReturn(
+ when(innerStore.backwardFetch(KEY_BYTES))
+ .thenReturn(
new KeyValueIteratorStub<>(
Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES,
VALUE_BYTES)).iterator()
)
@@ -386,13 +358,12 @@ public class MeteredSessionStoreTest {
// and the sensor is tested elsewhere
final KafkaMetric metric = metric("fetch-rate");
assertTrue((Double) metric.metricValue() > 0);
- verify(innerStore);
}
@Test
public void shouldFetchRangeFromStoreAndRecordFetchMetric() {
- expect(innerStore.fetch(KEY_BYTES, KEY_BYTES))
- .andReturn(new KeyValueIteratorStub<>(
+ when(innerStore.fetch(KEY_BYTES, KEY_BYTES))
+ .thenReturn(new KeyValueIteratorStub<>(
Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES,
VALUE_BYTES)).iterator()));
init();
@@ -405,13 +376,12 @@ public class MeteredSessionStoreTest {
// and the sensor is tested elsewhere
final KafkaMetric metric = metric("fetch-rate");
assertTrue((Double) metric.metricValue() > 0);
- verify(innerStore);
}
@Test
public void shouldBackwardFetchRangeFromStoreAndRecordFetchMetric() {
- expect(innerStore.backwardFetch(KEY_BYTES, KEY_BYTES))
- .andReturn(
+ when(innerStore.backwardFetch(KEY_BYTES, KEY_BYTES))
+ .thenReturn(
new KeyValueIteratorStub<>(
Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES,
VALUE_BYTES)).iterator()
)
@@ -427,14 +397,13 @@ public class MeteredSessionStoreTest {
// and the sensor is tested elsewhere
final KafkaMetric metric = metric("fetch-rate");
assertTrue((Double) metric.metricValue() > 0);
- verify(innerStore);
}
@Test
public void shouldReturnNoSessionsWhenFetchedKeyHasExpired() {
final long systemTime = Time.SYSTEM.milliseconds();
- expect(innerStore.findSessions(KEY_BYTES, systemTime -
RETENTION_PERIOD, systemTime))
- .andReturn(new
KeyValueIteratorStub<>(KeyValueIterators.emptyIterator()));
+ when(innerStore.findSessions(KEY_BYTES, systemTime - RETENTION_PERIOD,
systemTime))
+ .thenReturn(new
KeyValueIteratorStub<>(KeyValueIterators.emptyIterator()));
init();
final KeyValueIterator<Windowed<String>, String> iterator =
store.findSessions(KEY, systemTime - RETENTION_PERIOD, systemTime);
@@ -445,8 +414,8 @@ public class MeteredSessionStoreTest {
@Test
public void
shouldReturnNoSessionsInBackwardOrderWhenFetchedKeyHasExpired() {
final long systemTime = Time.SYSTEM.milliseconds();
- expect(innerStore.backwardFindSessions(KEY_BYTES, systemTime -
RETENTION_PERIOD, systemTime))
- .andReturn(new
KeyValueIteratorStub<>(KeyValueIterators.emptyIterator()));
+ when(innerStore.backwardFindSessions(KEY_BYTES, systemTime -
RETENTION_PERIOD, systemTime))
+ .thenReturn(new
KeyValueIteratorStub<>(KeyValueIterators.emptyIterator()));
init();
final KeyValueIterator<Windowed<String>, String> iterator =
store.backwardFindSessions(KEY, systemTime - RETENTION_PERIOD, systemTime);
@@ -457,8 +426,8 @@ public class MeteredSessionStoreTest {
@Test
public void shouldNotFindExpiredSessionRangeFromStore() {
final long systemTime = Time.SYSTEM.milliseconds();
- expect(innerStore.findSessions(KEY_BYTES, KEY_BYTES, systemTime -
RETENTION_PERIOD, systemTime))
- .andReturn(new
KeyValueIteratorStub<>(KeyValueIterators.emptyIterator()));
+ when(innerStore.findSessions(KEY_BYTES, KEY_BYTES, systemTime -
RETENTION_PERIOD, systemTime))
+ .thenReturn(new
KeyValueIteratorStub<>(KeyValueIterators.emptyIterator()));
init();
final KeyValueIterator<Windowed<String>, String> iterator =
store.findSessions(KEY, KEY, systemTime - RETENTION_PERIOD, systemTime);
@@ -469,8 +438,8 @@ public class MeteredSessionStoreTest {
@Test
public void shouldNotFindExpiredSessionRangeInBackwardOrderFromStore() {
final long systemTime = Time.SYSTEM.milliseconds();
- expect(innerStore.backwardFindSessions(KEY_BYTES, KEY_BYTES,
systemTime - RETENTION_PERIOD, systemTime))
- .andReturn(new
KeyValueIteratorStub<>(KeyValueIterators.emptyIterator()));
+ when(innerStore.backwardFindSessions(KEY_BYTES, KEY_BYTES, systemTime
- RETENTION_PERIOD, systemTime))
+ .thenReturn(new
KeyValueIteratorStub<>(KeyValueIterators.emptyIterator()));
init();
final KeyValueIterator<Windowed<String>, String> iterator =
store.backwardFindSessions(KEY, KEY, systemTime - RETENTION_PERIOD, systemTime);
@@ -490,7 +459,7 @@ public class MeteredSessionStoreTest {
@Test
public void shouldNotThrowNullPointerExceptionIfFetchSessionReturnsNull() {
- expect(innerStore.fetchSession(Bytes.wrap("a".getBytes()), 0,
Long.MAX_VALUE)).andReturn(null);
+ when(innerStore.fetchSession(Bytes.wrap("a".getBytes()), 0,
Long.MAX_VALUE)).thenReturn(null);
init();
assertNull(store.fetchSession("a", 0, Long.MAX_VALUE));
@@ -598,8 +567,7 @@ public class MeteredSessionStoreTest {
public void shouldSetFlushListenerOnWrappedCachingStore() {
final CachedSessionStore cachedSessionStore =
mock(CachedSessionStore.class);
-
expect(cachedSessionStore.setFlushListener(anyObject(CacheFlushListener.class),
eq(false))).andReturn(true);
- replay(cachedSessionStore);
+
when(cachedSessionStore.setFlushListener(any(CacheFlushListener.class),
eq(false))).thenReturn(true);
store = new MeteredSessionStore<>(
cachedSessionStore,
@@ -608,8 +576,6 @@ public class MeteredSessionStoreTest {
Serdes.String(),
new MockTime());
assertTrue(store.setFlushListener(null, false));
-
- verify(cachedSessionStore);
}
@Test
@@ -619,27 +585,23 @@ public class MeteredSessionStoreTest {
@Test
public void shouldRemoveMetricsOnClose() {
- innerStore.close();
- expectLastCall();
+ doNothing().when(innerStore).close();
init(); // replays "inner"
// There's always a "count" metric registered
assertThat(storeMetrics(), not(empty()));
store.close();
assertThat(storeMetrics(), empty());
- verify(innerStore);
}
@Test
public void shouldRemoveMetricsEvenIfWrappedStoreThrowsOnClose() {
- innerStore.close();
- expectLastCall().andThrow(new RuntimeException("Oops!"));
+ doThrow(new RuntimeException("Oops!")).when(innerStore).close();
init(); // replays "inner"
assertThat(storeMetrics(), not(empty()));
assertThrows(RuntimeException.class, store::close);
assertThat(storeMetrics(), empty());
- verify(innerStore);
}
private KafkaMetric metric(final String name) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 20eb5ec88a1..952a52dbff2 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -44,6 +44,8 @@ import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
import java.time.temporal.ChronoUnit;
import java.util.List;
@@ -53,15 +55,6 @@ import java.util.stream.Collectors;
import static java.time.Instant.ofEpochMilli;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.createNiceMock;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.mock;
-import static org.easymock.EasyMock.niceMock;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.greaterThan;
@@ -70,7 +63,14 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
-
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class MeteredWindowStoreTest {
private static final String STORE_TYPE = "scope";
@@ -88,7 +88,8 @@ public class MeteredWindowStoreTest {
private final String threadId = Thread.currentThread().getName();
private InternalMockProcessorContext context;
- private final WindowStore<Bytes, byte[]> innerStoreMock =
createNiceMock(WindowStore.class);
+ @SuppressWarnings("unchecked")
+ private final WindowStore<Bytes, byte[]> innerStoreMock =
mock(WindowStore.class);
private MeteredWindowStore<String, String> store = new
MeteredWindowStore<>(
innerStoreMock,
WINDOW_SIZE_MS, // any size
@@ -101,7 +102,7 @@ public class MeteredWindowStoreTest {
private Map<String, String> tags;
{
- expect(innerStoreMock.name()).andReturn(STORE_NAME).anyTimes();
+ when(innerStoreMock.name()).thenReturn(STORE_NAME);
}
@Before
@@ -128,40 +129,32 @@ public class MeteredWindowStoreTest {
@SuppressWarnings("deprecation")
@Test
public void shouldDelegateDeprecatedInit() {
- final WindowStore<Bytes, byte[]> inner = mock(WindowStore.class);
final MeteredWindowStore<String, String> outer = new
MeteredWindowStore<>(
- inner,
+ innerStoreMock,
WINDOW_SIZE_MS, // any size
STORE_TYPE,
new MockTime(),
Serdes.String(),
new SerdeThatDoesntHandleNull()
);
- expect(inner.name()).andStubReturn("store");
- inner.init((ProcessorContext) context, outer);
- expectLastCall();
- replay(inner);
+ when(innerStoreMock.name()).thenReturn("store");
+ doNothing().when(innerStoreMock).init((ProcessorContext) context,
outer);
outer.init((ProcessorContext) context, outer);
- verify(inner);
}
@Test
public void shouldDelegateInit() {
- final WindowStore<Bytes, byte[]> inner = mock(WindowStore.class);
final MeteredWindowStore<String, String> outer = new
MeteredWindowStore<>(
- inner,
+ innerStoreMock,
WINDOW_SIZE_MS, // any size
STORE_TYPE,
new MockTime(),
Serdes.String(),
new SerdeThatDoesntHandleNull()
);
- expect(inner.name()).andStubReturn("store");
- inner.init((StateStoreContext) context, outer);
- expectLastCall();
- replay(inner);
+ when(innerStoreMock.name()).thenReturn("store");
+ doNothing().when(innerStoreMock).init((StateStoreContext) context,
outer);
outer.init((StateStoreContext) context, outer);
- verify(inner);
}
@Test
@@ -177,20 +170,20 @@ public class MeteredWindowStoreTest {
doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName);
}
+ @SuppressWarnings("unchecked")
private void doShouldPassChangelogTopicNameToStateStoreSerde(final String
topic) {
- final Serde<String> keySerde = niceMock(Serde.class);
+ final Serde<String> keySerde = mock(Serde.class);
final Serializer<String> keySerializer = mock(Serializer.class);
- final Serde<String> valueSerde = niceMock(Serde.class);
+ final Serde<String> valueSerde = mock(Serde.class);
final Deserializer<String> valueDeserializer =
mock(Deserializer.class);
final Serializer<String> valueSerializer = mock(Serializer.class);
- expect(keySerde.serializer()).andStubReturn(keySerializer);
- expect(keySerializer.serialize(topic,
KEY)).andStubReturn(KEY.getBytes());
- expect(valueSerde.deserializer()).andStubReturn(valueDeserializer);
- expect(valueDeserializer.deserialize(topic,
VALUE_BYTES)).andStubReturn(VALUE);
- expect(valueSerde.serializer()).andStubReturn(valueSerializer);
- expect(valueSerializer.serialize(topic,
VALUE)).andStubReturn(VALUE_BYTES);
- expect(innerStoreMock.fetch(KEY_BYTES,
TIMESTAMP)).andStubReturn(VALUE_BYTES);
- replay(innerStoreMock, keySerializer, keySerde, valueDeserializer,
valueSerializer, valueSerde);
+ when(keySerde.serializer()).thenReturn(keySerializer);
+ when(keySerializer.serialize(topic, KEY)).thenReturn(KEY.getBytes());
+ when(valueSerde.deserializer()).thenReturn(valueDeserializer);
+ when(valueDeserializer.deserialize(topic,
VALUE_BYTES)).thenReturn(VALUE);
+ when(valueSerde.serializer()).thenReturn(valueSerializer);
+ when(valueSerializer.serialize(topic, VALUE)).thenReturn(VALUE_BYTES);
+ when(innerStoreMock.fetch(KEY_BYTES,
TIMESTAMP)).thenReturn(VALUE_BYTES);
store = new MeteredWindowStore<>(
innerStoreMock,
WINDOW_SIZE_MS,
@@ -203,13 +196,10 @@ public class MeteredWindowStoreTest {
store.fetch(KEY, TIMESTAMP);
store.put(KEY, VALUE, TIMESTAMP);
-
- verify(keySerializer, valueDeserializer, valueSerializer);
}
@Test
public void testMetrics() {
- replay(innerStoreMock);
store.init((StateStoreContext) context, store);
final JmxReporter reporter = new JmxReporter();
final MetricsContext metricsContext = new
KafkaMetricsContext("kafka.streams");
@@ -229,22 +219,19 @@ public class MeteredWindowStoreTest {
@Test
public void shouldRecordRestoreLatencyOnInit() {
- innerStoreMock.init((StateStoreContext) context, store);
- replay(innerStoreMock);
+ doNothing().when(innerStoreMock).init((StateStoreContext) context,
store);
store.init((StateStoreContext) context, store);
// it suffices to verify one restore metric since all restore metrics
are recorded by the same sensor
// and the sensor is tested elsewhere
final KafkaMetric metric = metric("restore-rate");
assertThat((Double) metric.metricValue(), greaterThan(0.0));
- verify(innerStoreMock);
}
@Test
public void shouldPutToInnerStoreAndRecordPutMetrics() {
final byte[] bytes = "a".getBytes();
- innerStoreMock.put(eq(Bytes.wrap(bytes)), anyObject(),
eq(context.timestamp()));
- replay(innerStoreMock);
+ doNothing().when(innerStoreMock).put(eq(Bytes.wrap(bytes)), any(),
eq(context.timestamp()));
store.init((StateStoreContext) context, store);
store.put("a", "a", context.timestamp());
@@ -253,14 +240,12 @@ public class MeteredWindowStoreTest {
// and the sensor is tested elsewhere
final KafkaMetric metric = metric("put-rate");
assertThat((Double) metric.metricValue(), greaterThan(0.0));
- verify(innerStoreMock);
}
@Test
public void shouldFetchFromInnerStoreAndRecordFetchMetrics() {
- expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1, 1))
- .andReturn(KeyValueIterators.emptyWindowStoreIterator());
- replay(innerStoreMock);
+ when(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1, 1))
+ .thenReturn(KeyValueIterators.emptyWindowStoreIterator());
store.init((StateStoreContext) context, store);
store.fetch("a", ofEpochMilli(1), ofEpochMilli(1)).close(); //
recorded on close;
@@ -269,32 +254,27 @@ public class MeteredWindowStoreTest {
// and the sensor is tested elsewhere
final KafkaMetric metric = metric("fetch-rate");
assertThat((Double) metric.metricValue(), greaterThan(0.0));
- verify(innerStoreMock);
}
@Test
public void shouldReturnNoRecordWhenFetchedKeyHasExpired() {
- expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1, 1 +
RETENTION_PERIOD))
- .andReturn(KeyValueIterators.emptyWindowStoreIterator());
- replay(innerStoreMock);
+ when(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1, 1 +
RETENTION_PERIOD))
+ .thenReturn(KeyValueIterators.emptyWindowStoreIterator());
store.init((StateStoreContext) context, store);
store.fetch("a", ofEpochMilli(1),
ofEpochMilli(1).plus(RETENTION_PERIOD, ChronoUnit.MILLIS)).close(); // recorded
on close;
-
- verify(innerStoreMock);
}
@Test
public void shouldFetchRangeFromInnerStoreAndRecordFetchMetrics() {
- expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()),
Bytes.wrap("b".getBytes()), 1, 1))
- .andReturn(KeyValueIterators.emptyIterator());
- expect(innerStoreMock.fetch(null, Bytes.wrap("b".getBytes()), 1, 1))
- .andReturn(KeyValueIterators.emptyIterator());
- expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), null, 1, 1))
- .andReturn(KeyValueIterators.emptyIterator());
- expect(innerStoreMock.fetch(null, null, 1, 1))
- .andReturn(KeyValueIterators.emptyIterator());
- replay(innerStoreMock);
+ when(innerStoreMock.fetch(Bytes.wrap("a".getBytes()),
Bytes.wrap("b".getBytes()), 1, 1))
+ .thenReturn(KeyValueIterators.emptyIterator());
+ when(innerStoreMock.fetch(null, Bytes.wrap("b".getBytes()), 1, 1))
+ .thenReturn(KeyValueIterators.emptyIterator());
+ when(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), null, 1, 1))
+ .thenReturn(KeyValueIterators.emptyIterator());
+ when(innerStoreMock.fetch(null, null, 1, 1))
+ .thenReturn(KeyValueIterators.emptyIterator());
store.init((StateStoreContext) context, store);
store.fetch("a", "b", ofEpochMilli(1), ofEpochMilli(1)).close(); //
recorded on close;
@@ -306,14 +286,12 @@ public class MeteredWindowStoreTest {
// and the sensor is tested elsewhere
final KafkaMetric metric = metric("fetch-rate");
assertThat((Double) metric.metricValue(), greaterThan(0.0));
- verify(innerStoreMock);
}
@Test
public void shouldBackwardFetchFromInnerStoreAndRecordFetchMetrics() {
- expect(innerStoreMock.backwardFetch(Bytes.wrap("a".getBytes()),
Bytes.wrap("b".getBytes()), 1, 1))
- .andReturn(KeyValueIterators.emptyIterator());
- replay(innerStoreMock);
+ when(innerStoreMock.backwardFetch(Bytes.wrap("a".getBytes()),
Bytes.wrap("b".getBytes()), 1, 1))
+ .thenReturn(KeyValueIterators.emptyIterator());
store.init((StateStoreContext) context, store);
store.backwardFetch("a", "b", ofEpochMilli(1),
ofEpochMilli(1)).close(); // recorded on close;
@@ -322,20 +300,18 @@ public class MeteredWindowStoreTest {
// and the sensor is tested elsewhere
final KafkaMetric metric = metric("fetch-rate");
assertThat((Double) metric.metricValue(), greaterThan(0.0));
- verify(innerStoreMock);
}
@Test
public void shouldBackwardFetchRangeFromInnerStoreAndRecordFetchMetrics() {
- expect(innerStoreMock.backwardFetch(Bytes.wrap("a".getBytes()),
Bytes.wrap("b".getBytes()), 1, 1))
- .andReturn(KeyValueIterators.emptyIterator());
- expect(innerStoreMock.backwardFetch(null, Bytes.wrap("b".getBytes()),
1, 1))
- .andReturn(KeyValueIterators.emptyIterator());
- expect(innerStoreMock.backwardFetch(Bytes.wrap("a".getBytes()), null,
1, 1))
- .andReturn(KeyValueIterators.emptyIterator());
- expect(innerStoreMock.backwardFetch(null, null, 1, 1))
- .andReturn(KeyValueIterators.emptyIterator());
- replay(innerStoreMock);
+ when(innerStoreMock.backwardFetch(Bytes.wrap("a".getBytes()),
Bytes.wrap("b".getBytes()), 1, 1))
+ .thenReturn(KeyValueIterators.emptyIterator());
+ when(innerStoreMock.backwardFetch(null, Bytes.wrap("b".getBytes()), 1,
1))
+ .thenReturn(KeyValueIterators.emptyIterator());
+ when(innerStoreMock.backwardFetch(Bytes.wrap("a".getBytes()), null, 1,
1))
+ .thenReturn(KeyValueIterators.emptyIterator());
+ when(innerStoreMock.backwardFetch(null, null, 1, 1))
+ .thenReturn(KeyValueIterators.emptyIterator());
store.init((StateStoreContext) context, store);
store.backwardFetch("a", "b", ofEpochMilli(1),
ofEpochMilli(1)).close(); // recorded on close;
@@ -347,13 +323,11 @@ public class MeteredWindowStoreTest {
// and the sensor is tested elsewhere
final KafkaMetric metric = metric("fetch-rate");
assertThat((Double) metric.metricValue(), greaterThan(0.0));
- verify(innerStoreMock);
}
@Test
public void shouldFetchAllFromInnerStoreAndRecordFetchMetrics() {
- expect(innerStoreMock.fetchAll(1,
1)).andReturn(KeyValueIterators.emptyIterator());
- replay(innerStoreMock);
+ when(innerStoreMock.fetchAll(1,
1)).thenReturn(KeyValueIterators.emptyIterator());
store.init((StateStoreContext) context, store);
store.fetchAll(ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded
on close;
@@ -362,13 +336,11 @@ public class MeteredWindowStoreTest {
// and the sensor is tested elsewhere
final KafkaMetric metric = metric("fetch-rate");
assertThat((Double) metric.metricValue(), greaterThan(0.0));
- verify(innerStoreMock);
}
@Test
public void shouldBackwardFetchAllFromInnerStoreAndRecordFetchMetrics() {
- expect(innerStoreMock.backwardFetchAll(1,
1)).andReturn(KeyValueIterators.emptyIterator());
- replay(innerStoreMock);
+ when(innerStoreMock.backwardFetchAll(1,
1)).thenReturn(KeyValueIterators.emptyIterator());
store.init((StateStoreContext) context, store);
store.backwardFetchAll(ofEpochMilli(1), ofEpochMilli(1)).close(); //
recorded on close;
@@ -377,13 +349,11 @@ public class MeteredWindowStoreTest {
// and the sensor is tested elsewhere
final KafkaMetric metric = metric("fetch-rate");
assertThat((Double) metric.metricValue(), greaterThan(0.0));
- verify(innerStoreMock);
}
@Test
public void shouldRecordFlushLatency() {
- innerStoreMock.flush();
- replay(innerStoreMock);
+ doNothing().when(innerStoreMock).flush();
store.init((StateStoreContext) context, store);
store.flush();
@@ -392,13 +362,11 @@ public class MeteredWindowStoreTest {
// and the sensor is tested elsewhere
final KafkaMetric metric = metric("flush-rate");
assertTrue((Double) metric.metricValue() > 0);
- verify(innerStoreMock);
}
@Test
public void shouldNotThrowNullPointerExceptionIfFetchReturnsNull() {
- expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()),
0)).andReturn(null);
- replay(innerStoreMock);
+ when(innerStoreMock.fetch(Bytes.wrap("a".getBytes()),
0)).thenReturn(null);
store.init((StateStoreContext) context, store);
assertNull(store.fetch("a", 0));
@@ -412,8 +380,7 @@ public class MeteredWindowStoreTest {
public void shouldSetFlushListenerOnWrappedCachingStore() {
final CachedWindowStore cachedWindowStore =
mock(CachedWindowStore.class);
-
expect(cachedWindowStore.setFlushListener(anyObject(CacheFlushListener.class),
eq(false))).andReturn(true);
- replay(cachedWindowStore);
+ when(cachedWindowStore.setFlushListener(any(CacheFlushListener.class),
eq(false))).thenReturn(true);
final MeteredWindowStore<String, String> metered = new
MeteredWindowStore<>(
cachedWindowStore,
@@ -424,8 +391,6 @@ public class MeteredWindowStoreTest {
new SerdeThatDoesntHandleNull()
);
assertTrue(metered.setFlushListener(null, false));
-
- verify(cachedWindowStore);
}
@Test
@@ -435,40 +400,31 @@ public class MeteredWindowStoreTest {
@Test
public void shouldCloseUnderlyingStore() {
- innerStoreMock.close();
- expectLastCall();
- replay(innerStoreMock);
+ doNothing().when(innerStoreMock).close();
store.init((StateStoreContext) context, store);
store.close();
- verify(innerStoreMock);
}
@Test
public void shouldRemoveMetricsOnClose() {
- innerStoreMock.close();
- expectLastCall();
- replay(innerStoreMock);
+ doNothing().when(innerStoreMock).close();
store.init((StateStoreContext) context, store);
assertThat(storeMetrics(), not(empty()));
store.close();
assertThat(storeMetrics(), empty());
- verify(innerStoreMock);
}
@Test
public void shouldRemoveMetricsEvenIfWrappedStoreThrowsOnClose() {
- innerStoreMock.close();
- expectLastCall().andThrow(new RuntimeException("Oops!"));
- replay(innerStoreMock);
+ doThrow(new RuntimeException("Oops!")).when(innerStoreMock).close();
store.init((StateStoreContext) context, store);
// There's always a "count" metric registered
assertThat(storeMetrics(), not(empty()));
assertThrows(RuntimeException.class, store::close);
assertThat(storeMetrics(), empty());
- verify(innerStoreMock);
}
@Test
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacadeTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacadeTest.java
index ffb5ab3c53d..d2b69e97415 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacadeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacadeTest.java
@@ -21,20 +21,18 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
-import org.easymock.EasyMockRunner;
-import org.easymock.Mock;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.when;
-@RunWith(EasyMockRunner.class)
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class ReadOnlyKeyValueStoreFacadeTest {
@Mock
private TimestampedKeyValueStore<String, String>
mockedKeyValueTimestampStore;
@@ -50,66 +48,56 @@ public class ReadOnlyKeyValueStoreFacadeTest {
@Test
public void shouldReturnPlainValueOnGet() {
- expect(mockedKeyValueTimestampStore.get("key"))
- .andReturn(ValueAndTimestamp.make("value", 42L));
- expect(mockedKeyValueTimestampStore.get("unknownKey"))
- .andReturn(null);
- replay(mockedKeyValueTimestampStore);
+ when(mockedKeyValueTimestampStore.get("key"))
+ .thenReturn(ValueAndTimestamp.make("value", 42L));
+ when(mockedKeyValueTimestampStore.get("unknownKey"))
+ .thenReturn(null);
assertThat(readOnlyKeyValueStoreFacade.get("key"), is("value"));
assertNull(readOnlyKeyValueStoreFacade.get("unknownKey"));
- verify(mockedKeyValueTimestampStore);
}
@Test
public void shouldReturnPlainKeyValuePairsForRangeIterator() {
- expect(mockedKeyValueTimestampIterator.next())
- .andReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1",
21L)))
- .andReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2",
42L)));
- expect(mockedKeyValueTimestampStore.range("key1",
"key2")).andReturn(mockedKeyValueTimestampIterator);
- replay(mockedKeyValueTimestampIterator, mockedKeyValueTimestampStore);
+ when(mockedKeyValueTimestampIterator.next())
+ .thenReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1",
21L)))
+ .thenReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2",
42L)));
+ when(mockedKeyValueTimestampStore.range("key1",
"key2")).thenReturn(mockedKeyValueTimestampIterator);
final KeyValueIterator<String, String> iterator =
readOnlyKeyValueStoreFacade.range("key1", "key2");
assertThat(iterator.next(), is(KeyValue.pair("key1", "value1")));
assertThat(iterator.next(), is(KeyValue.pair("key2", "value2")));
- verify(mockedKeyValueTimestampIterator, mockedKeyValueTimestampStore);
}
@Test
public void shouldReturnPlainKeyValuePairsForPrefixScan() {
final StringSerializer stringSerializer = new StringSerializer();
- expect(mockedKeyValueTimestampIterator.next())
- .andReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1",
21L)))
- .andReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2",
42L)));
- expect(mockedKeyValueTimestampStore.prefixScan("key",
stringSerializer)).andReturn(mockedKeyValueTimestampIterator);
- replay(mockedKeyValueTimestampIterator, mockedKeyValueTimestampStore);
+ when(mockedKeyValueTimestampIterator.next())
+ .thenReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1",
21L)))
+ .thenReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2",
42L)));
+ when(mockedKeyValueTimestampStore.prefixScan("key",
stringSerializer)).thenReturn(mockedKeyValueTimestampIterator);
final KeyValueIterator<String, String> iterator =
readOnlyKeyValueStoreFacade.prefixScan("key", stringSerializer);
assertThat(iterator.next(), is(KeyValue.pair("key1", "value1")));
assertThat(iterator.next(), is(KeyValue.pair("key2", "value2")));
- verify(mockedKeyValueTimestampIterator, mockedKeyValueTimestampStore);
}
@Test
public void shouldReturnPlainKeyValuePairsForAllIterator() {
- expect(mockedKeyValueTimestampIterator.next())
- .andReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1",
21L)))
- .andReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2",
42L)));
-
expect(mockedKeyValueTimestampStore.all()).andReturn(mockedKeyValueTimestampIterator);
- replay(mockedKeyValueTimestampIterator, mockedKeyValueTimestampStore);
+ when(mockedKeyValueTimestampIterator.next())
+ .thenReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1",
21L)))
+ .thenReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2",
42L)));
+
when(mockedKeyValueTimestampStore.all()).thenReturn(mockedKeyValueTimestampIterator);
final KeyValueIterator<String, String> iterator =
readOnlyKeyValueStoreFacade.all();
assertThat(iterator.next(), is(KeyValue.pair("key1", "value1")));
assertThat(iterator.next(), is(KeyValue.pair("key2", "value2")));
- verify(mockedKeyValueTimestampIterator, mockedKeyValueTimestampStore);
}
@Test
public void shouldForwardApproximateNumEntries() {
-
expect(mockedKeyValueTimestampStore.approximateNumEntries()).andReturn(42L);
- replay(mockedKeyValueTimestampStore);
+
when(mockedKeyValueTimestampStore.approximateNumEntries()).thenReturn(42L);
assertThat(readOnlyKeyValueStoreFacade.approximateNumEntries(),
is(42L));
- verify(mockedKeyValueTimestampStore);
}
}