This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0fe902bed23 KAFKA-20497: Add readOnly(IsolationLevel) to
MeteredKeyValueStore (#22316)
0fe902bed23 is described below
commit 0fe902bed238eb9b11d6ef36724cd57ccb69dbd0
Author: Nick Telford <[email protected]>
AuthorDate: Tue Jun 2 23:38:13 2026 +0100
KAFKA-20497: Add readOnly(IsolationLevel) to MeteredKeyValueStore (#22316)
The metered layer must wrap both isolation levels rather than delegating
the inner store's readOnly view directly: it holds the serde context
needed to deserialise keys and values, and it owns the sensor
registration for iterator metrics. The ReadOnlyView inner class applies
the same key/value deserialisation and sensor instrumentation as the
public fetch methods so that IQ operations through a readOnly view are
measured identically to direct queries.
KAFKA-20497
Reviewers: Bill Bejeck <[email protected]>
---
.../state/internals/MeteredKeyValueStore.java | 113 +++++++++++++---
...MeteredTimestampedKeyValueStoreWithHeaders.java | 125 +++++++++++++++---
.../state/internals/MeteredKeyValueStoreTest.java | 147 +++++++++++++++++++++
3 files changed, 350 insertions(+), 35 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 673f887779c..327ba3ff69a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
@@ -44,6 +45,7 @@ import org.apache.kafka.streams.query.ResultOrder;
import org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.StoreQueryUtils.QueryHandler;
import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
@@ -337,8 +339,12 @@ public class MeteredKeyValueStore<K, V>
@Override
public V get(final K key) {
Objects.requireNonNull(key, "key cannot be null");
+ return getInternal(wrapped(), key);
+ }
+
+ private V getInternal(final ReadOnlyKeyValueStore<Bytes, byte[]> store,
final K key) {
try {
- return maybeMeasureLatency(() ->
deserializeValue(wrapped().get(serializeKey(key))), time, getSensor);
+ return maybeMeasureLatency(() ->
deserializeValue(store.get(serializeKey(key))), time, getSensor);
} catch (final ProcessorStateException e) {
final String message = String.format(e.getMessage(), key);
throw new ProcessorStateException(message, e);
@@ -392,35 +398,57 @@ public class MeteredKeyValueStore<K, V>
public <PS extends Serializer<P>, P> KeyValueIterator<K, V>
prefixScan(final P prefix, final PS prefixKeySerializer) {
Objects.requireNonNull(prefix, "prefix cannot be null");
Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer
cannot be null");
- return new MeteredKeyValueStoreIterator(wrapped().prefixScan(prefix,
prefixKeySerializer), prefixScanSensor);
+ return prefixScanInternal(wrapped(), prefix, prefixKeySerializer);
+ }
+
+ private <PS extends Serializer<P>, P> KeyValueIterator<K, V>
prefixScanInternal(
+ final ReadOnlyKeyValueStore<Bytes, byte[]> store, final P prefix,
final PS prefixKeySerializer
+ ) {
+ return meteredKeyValueIterator(store.prefixScan(prefix,
prefixKeySerializer), prefixScanSensor);
}
@Override
- public KeyValueIterator<K, V> range(final K from,
- final K to) {
- return new MeteredKeyValueStoreIterator(
- wrapped().range(serializeKey(from), serializeKey(to)),
- rangeSensor
- );
+ public KeyValueIterator<K, V> range(final K from, final K to) {
+ return rangeInternal(wrapped(), from, to);
+ }
+
+ private KeyValueIterator<K, V> rangeInternal(
+ final ReadOnlyKeyValueStore<Bytes, byte[]> store, final K from, final
K to
+ ) {
+ return meteredKeyValueIterator(store.range(serializeKey(from),
serializeKey(to)), rangeSensor);
}
@Override
- public KeyValueIterator<K, V> reverseRange(final K from,
- final K to) {
- return new MeteredKeyValueStoreIterator(
- wrapped().reverseRange(serializeKey(from), serializeKey(to)),
- rangeSensor
- );
+ public KeyValueIterator<K, V> reverseRange(final K from, final K to) {
+ return reverseRangeInternal(wrapped(), from, to);
+ }
+
+ private KeyValueIterator<K, V> reverseRangeInternal(
+ final ReadOnlyKeyValueStore<Bytes, byte[]> store, final K from, final
K to
+ ) {
+ return meteredKeyValueIterator(store.reverseRange(serializeKey(from),
serializeKey(to)), rangeSensor);
}
@Override
public KeyValueIterator<K, V> all() {
- return new MeteredKeyValueStoreIterator(wrapped().all(), allSensor);
+ return allInternal(wrapped());
+ }
+
+ private KeyValueIterator<K, V> allInternal(final
ReadOnlyKeyValueStore<Bytes, byte[]> store) {
+ return meteredKeyValueIterator(store.all(), allSensor);
}
@Override
public KeyValueIterator<K, V> reverseAll() {
- return new MeteredKeyValueStoreIterator(wrapped().reverseAll(),
allSensor);
+ return reverseAllInternal(wrapped());
+ }
+
+ private KeyValueIterator<K, V> reverseAllInternal(final
ReadOnlyKeyValueStore<Bytes, byte[]> store) {
+ return meteredKeyValueIterator(store.reverseAll(), allSensor);
+ }
+
+ private KeyValueIterator<K, V> meteredKeyValueIterator(final
KeyValueIterator<Bytes, byte[]> iter, final Sensor sensor) {
+ return new MeteredKeyValueStoreIterator(iter, sensor);
}
@Override
@@ -433,6 +461,59 @@ public class MeteredKeyValueStore<K, V>
return wrapped().approximateNumEntries();
}
+ @Override
+ public ReadOnlyKeyValueStore<K, V> readOnly(final IsolationLevel
isolationLevel) {
+ Objects.requireNonNull(isolationLevel, "isolationLevel cannot be
null");
+ return new ReadOnlyView(wrapped().readOnly(isolationLevel));
+ }
+
+ private final class ReadOnlyView implements ReadOnlyKeyValueStore<K, V> {
+
+ private final ReadOnlyKeyValueStore<Bytes, byte[]> underlying;
+
+ ReadOnlyView(final ReadOnlyKeyValueStore<Bytes, byte[]> underlying) {
+ this.underlying = underlying;
+ }
+
+ @Override
+ public V get(final K key) {
+ Objects.requireNonNull(key, "key cannot be null");
+ return getInternal(underlying, key);
+ }
+
+ @Override
+ public KeyValueIterator<K, V> range(final K from, final K to) {
+ return rangeInternal(underlying, from, to);
+ }
+
+ @Override
+ public KeyValueIterator<K, V> reverseRange(final K from, final K to) {
+ return reverseRangeInternal(underlying, from, to);
+ }
+
+ @Override
+ public KeyValueIterator<K, V> all() {
+ return allInternal(underlying);
+ }
+
+ @Override
+ public KeyValueIterator<K, V> reverseAll() {
+ return reverseAllInternal(underlying);
+ }
+
+ @Override
+ public <PS extends Serializer<P>, P> KeyValueIterator<K, V>
prefixScan(final P prefix, final PS prefixKeySerializer) {
+ Objects.requireNonNull(prefix, "prefix cannot be null");
+ Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer
cannot be null");
+ return prefixScanInternal(underlying, prefix, prefixKeySerializer);
+ }
+
+ @Override
+ public long approximateNumEntries() {
+ return underlying.approximateNumEntries();
+ }
+ }
+
@Override
public void close() {
try {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
index ebdcc8f73c5..8d10c6ccd25 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Sensor;
@@ -39,6 +40,7 @@ import org.apache.kafka.streams.query.TimestampedRangeQuery;
import org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.ValueTimestampHeaders;
@@ -113,8 +115,16 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K,
V>
@Override
public ValueTimestampHeaders<V> get(final K key) {
Objects.requireNonNull(key, "key cannot be null");
+ return getInternal(wrapped(), key);
+ }
+
+ private ValueTimestampHeaders<V> getInternal(final
ReadOnlyKeyValueStore<Bytes, byte[]> store, final K key) {
try {
- return maybeMeasureLatency(() ->
deserializeValue(wrapped().get(serializeKey(key, internalContext.headers()))),
time, getSensor);
+ return maybeMeasureLatency(
+ () -> deserializeValue(store.get(serializeKey(key,
internalContext.headers()))),
+ time,
+ getSensor
+ );
} catch (final ProcessorStateException e) {
final String message = String.format(e.getMessage(), key);
throw new ProcessorStateException(message, e);
@@ -463,21 +473,32 @@ public class
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
}
@Override
- public <PS extends Serializer<P>, P> KeyValueIterator<K,
ValueTimestampHeaders<V>> prefixScan(final P prefix,
-
final PS prefixKeySerializer) {
+ public <PS extends Serializer<P>, P> KeyValueIterator<K,
ValueTimestampHeaders<V>> prefixScan(
+ final P prefix, final PS prefixKeySerializer
+ ) {
Objects.requireNonNull(prefix, "prefix cannot be null");
Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer
cannot be null");
+ return prefixScanInternal(wrapped(), prefix, prefixKeySerializer);
+ }
+
+ private <PS extends Serializer<P>, P> KeyValueIterator<K,
ValueTimestampHeaders<V>> prefixScanInternal(
+ final ReadOnlyKeyValueStore<Bytes, byte[]> store, final P prefix,
final PS prefixKeySerializer
+ ) {
return new MeteredTimestampedKeyValueStoreWithHeadersIterator(
- wrapped().prefixScan(prefix, prefixKeySerializer),
- prefixScanSensor
+ store.prefixScan(prefix, prefixKeySerializer), prefixScanSensor
);
}
@Override
- public KeyValueIterator<K, ValueTimestampHeaders<V>> range(final K from,
- final K to) {
+ public KeyValueIterator<K, ValueTimestampHeaders<V>> range(final K from,
final K to) {
+ return rangeInternal(wrapped(), from, to);
+ }
+
+ private KeyValueIterator<K, ValueTimestampHeaders<V>> rangeInternal(
+ final ReadOnlyKeyValueStore<Bytes, byte[]> store, final K from, final
K to
+ ) {
return new MeteredTimestampedKeyValueStoreWithHeadersIterator(
- wrapped().range(
+ store.range(
serializeKey(from, internalContext.headers()),
serializeKey(to, internalContext.headers())
),
@@ -486,10 +507,15 @@ public class
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
}
@Override
- public KeyValueIterator<K, ValueTimestampHeaders<V>> reverseRange(final K
from,
- final K
to) {
+ public KeyValueIterator<K, ValueTimestampHeaders<V>> reverseRange(final K
from, final K to) {
+ return reverseRangeInternal(wrapped(), from, to);
+ }
+
+ private KeyValueIterator<K, ValueTimestampHeaders<V>> reverseRangeInternal(
+ final ReadOnlyKeyValueStore<Bytes, byte[]> store, final K from, final
K to
+ ) {
return new MeteredTimestampedKeyValueStoreWithHeadersIterator(
- wrapped().reverseRange(
+ store.reverseRange(
serializeKey(from, internalContext.headers()),
serializeKey(to, internalContext.headers())
),
@@ -499,18 +525,24 @@ public class
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
@Override
public KeyValueIterator<K, ValueTimestampHeaders<V>> all() {
- return new MeteredTimestampedKeyValueStoreWithHeadersIterator(
- wrapped().all(),
- allSensor
- );
+ return allInternal(wrapped());
+ }
+
+ private KeyValueIterator<K, ValueTimestampHeaders<V>> allInternal(
+ final ReadOnlyKeyValueStore<Bytes, byte[]> store
+ ) {
+ return new
MeteredTimestampedKeyValueStoreWithHeadersIterator(store.all(), allSensor);
}
@Override
public KeyValueIterator<K, ValueTimestampHeaders<V>> reverseAll() {
- return new MeteredTimestampedKeyValueStoreWithHeadersIterator(
- wrapped().reverseAll(),
- allSensor
- );
+ return reverseAllInternal(wrapped());
+ }
+
+ private KeyValueIterator<K, ValueTimestampHeaders<V>> reverseAllInternal(
+ final ReadOnlyKeyValueStore<Bytes, byte[]> store
+ ) {
+ return new
MeteredTimestampedKeyValueStoreWithHeadersIterator(store.reverseAll(),
allSensor);
}
@SuppressWarnings("unchecked")
@@ -659,6 +691,61 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K,
V>
}
}
+ @Override
+ public ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>> readOnly(final
IsolationLevel isolationLevel) {
+ Objects.requireNonNull(isolationLevel, "isolationLevel cannot be
null");
+ return new ReadOnlyHeadersView(wrapped().readOnly(isolationLevel));
+ }
+
+ private final class ReadOnlyHeadersView implements
ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>> {
+
+ private final ReadOnlyKeyValueStore<Bytes, byte[]> underlying;
+
+ ReadOnlyHeadersView(final ReadOnlyKeyValueStore<Bytes, byte[]>
underlying) {
+ this.underlying = underlying;
+ }
+
+ @Override
+ public ValueTimestampHeaders<V> get(final K key) {
+ Objects.requireNonNull(key, "key cannot be null");
+ return getInternal(underlying, key);
+ }
+
+ @Override
+ public KeyValueIterator<K, ValueTimestampHeaders<V>> range(final K
from, final K to) {
+ return rangeInternal(underlying, from, to);
+ }
+
+ @Override
+ public KeyValueIterator<K, ValueTimestampHeaders<V>>
reverseRange(final K from, final K to) {
+ return reverseRangeInternal(underlying, from, to);
+ }
+
+ @Override
+ public KeyValueIterator<K, ValueTimestampHeaders<V>> all() {
+ return allInternal(underlying);
+ }
+
+ @Override
+ public KeyValueIterator<K, ValueTimestampHeaders<V>> reverseAll() {
+ return reverseAllInternal(underlying);
+ }
+
+ @Override
+ public <PS extends Serializer<P>, P> KeyValueIterator<K,
ValueTimestampHeaders<V>> prefixScan(
+ final P prefix, final PS prefixKeySerializer
+ ) {
+ Objects.requireNonNull(prefix, "prefix cannot be null");
+ Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer
cannot be null");
+ return prefixScanInternal(underlying, prefix, prefixKeySerializer);
+ }
+
+ @Override
+ public long approximateNumEntries() {
+ return underlying.approximateNumEntries();
+ }
+ }
+
@Override
protected Bytes serializeKey(final K key) {
throw new
UnsupportedOperationException("MeteredTimestampedKeyValueStoreWithHeaders
required to pass in Headers when serializing a key.");
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index 2a2a445502f..d5d65e56be5 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
@@ -44,6 +45,7 @@ import
org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.KeyValueIteratorStub;
import org.apache.kafka.test.MockRecordCollector;
@@ -636,6 +638,151 @@ public class MeteredKeyValueStoreTest {
assertThat(oldestIteratorTimestampMetric.metricValue(), equalTo(0L));
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldReadOnlyViewGetApplySerdesAndRecordGetMetric() {
+ setUp();
+ final ReadOnlyKeyValueStore<Bytes, byte[]> innerView =
mock(ReadOnlyKeyValueStore.class);
+
when(inner.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+ when(innerView.get(KEY_BYTES)).thenReturn(VALUE_BYTES);
+ init();
+
+ final ReadOnlyKeyValueStore<String, String> view =
metered.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ assertThat(view.get(KEY), equalTo(VALUE));
+
+ assertTrue((Double) metric("get-rate").metricValue() > 0);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldReadOnlyViewRangeApplySerdesAndRecordRangeMetric() {
+ setUp();
+ final ReadOnlyKeyValueStore<Bytes, byte[]> innerView =
mock(ReadOnlyKeyValueStore.class);
+
when(inner.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+ when(innerView.range(KEY_BYTES, KEY_BYTES))
+ .thenReturn(new
KeyValueIteratorStub<>(Collections.singletonList(BYTE_KEY_VALUE_PAIR).iterator()));
+ init();
+
+ final ReadOnlyKeyValueStore<String, String> view =
metered.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ try (final KeyValueIterator<String, String> it = view.range(KEY, KEY))
{
+ assertThat(it.next().value, equalTo(VALUE));
+ assertFalse(it.hasNext());
+ }
+
+ assertTrue((Double) metric("range-rate").metricValue() > 0);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void
shouldReadOnlyViewReverseRangeApplySerdesAndRecordRangeMetric() {
+ setUp();
+ final ReadOnlyKeyValueStore<Bytes, byte[]> innerView =
mock(ReadOnlyKeyValueStore.class);
+
when(inner.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+ when(innerView.reverseRange(KEY_BYTES, KEY_BYTES))
+ .thenReturn(new
KeyValueIteratorStub<>(Collections.singletonList(BYTE_KEY_VALUE_PAIR).iterator()));
+ init();
+
+ final ReadOnlyKeyValueStore<String, String> view =
metered.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ try (final KeyValueIterator<String, String> it =
view.reverseRange(KEY, KEY)) {
+ assertThat(it.next().value, equalTo(VALUE));
+ assertFalse(it.hasNext());
+ }
+
+ assertTrue((Double) metric("range-rate").metricValue() > 0);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldReadOnlyViewAllApplySerdesAndRecordAllMetric() {
+ setUp();
+ final ReadOnlyKeyValueStore<Bytes, byte[]> innerView =
mock(ReadOnlyKeyValueStore.class);
+
when(inner.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+ when(innerView.all())
+ .thenReturn(new
KeyValueIteratorStub<>(Collections.singletonList(BYTE_KEY_VALUE_PAIR).iterator()));
+ init();
+
+ final ReadOnlyKeyValueStore<String, String> view =
metered.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ try (final KeyValueIterator<String, String> it = view.all()) {
+ assertThat(it.next().value, equalTo(VALUE));
+ assertFalse(it.hasNext());
+ }
+
+ assertTrue((Double) metric(new MetricName("all-rate",
STORE_LEVEL_GROUP, "", tags)).metricValue() > 0);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldReadOnlyViewReverseAllApplySerdesAndRecordAllMetric() {
+ setUp();
+ final ReadOnlyKeyValueStore<Bytes, byte[]> innerView =
mock(ReadOnlyKeyValueStore.class);
+
when(inner.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+ when(innerView.reverseAll())
+ .thenReturn(new
KeyValueIteratorStub<>(Collections.singletonList(BYTE_KEY_VALUE_PAIR).iterator()));
+ init();
+
+ final ReadOnlyKeyValueStore<String, String> view =
metered.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ try (final KeyValueIterator<String, String> it = view.reverseAll()) {
+ assertThat(it.next().value, equalTo(VALUE));
+ assertFalse(it.hasNext());
+ }
+
+ assertTrue((Double) metric(new MetricName("all-rate",
STORE_LEVEL_GROUP, "", tags)).metricValue() > 0);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void
shouldReadOnlyViewPrefixScanApplySerdesAndRecordPrefixScanMetric() {
+ setUp();
+ final ReadOnlyKeyValueStore<Bytes, byte[]> innerView =
mock(ReadOnlyKeyValueStore.class);
+ final StringSerializer stringSerializer = new StringSerializer();
+
when(inner.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+ when(innerView.prefixScan(KEY, stringSerializer))
+ .thenReturn(new
KeyValueIteratorStub<>(Collections.singletonList(BYTE_KEY_VALUE_PAIR).iterator()));
+ init();
+
+ final ReadOnlyKeyValueStore<String, String> view =
metered.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ try (final KeyValueIterator<String, String> it = view.prefixScan(KEY,
stringSerializer)) {
+ assertThat(it.next().value, equalTo(VALUE));
+ assertFalse(it.hasNext());
+ }
+
+ assertTrue((Double) metrics.metric(new MetricName("prefix-scan-rate",
STORE_LEVEL_GROUP, "", tags)).metricValue() > 0);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldReadOnlyViewApproximateNumEntriesDelegatesToUnderlying()
{
+ setUp();
+ final ReadOnlyKeyValueStore<Bytes, byte[]> innerView =
mock(ReadOnlyKeyValueStore.class);
+
when(inner.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+ when(innerView.approximateNumEntries()).thenReturn(42L);
+ init();
+
+ final ReadOnlyKeyValueStore<String, String> view =
metered.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ assertThat(view.approximateNumEntries(), equalTo(42L));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldPassReadCommittedThroughToInner() {
+ setUp();
+ final ReadOnlyKeyValueStore<Bytes, byte[]> innerView =
mock(ReadOnlyKeyValueStore.class);
+
when(inner.readOnly(IsolationLevel.READ_COMMITTED)).thenReturn(innerView);
+ init();
+
+ metered.readOnly(IsolationLevel.READ_COMMITTED);
+
+ verify(inner).readOnly(IsolationLevel.READ_COMMITTED);
+ }
+
+ @Test
+ public void shouldThrowNpeOnNullIsolationLevel() {
+ setUp();
+ init();
+
+ assertThrows(NullPointerException.class, () -> metered.readOnly(null));
+ }
+
private KafkaMetric metric(final MetricName metricName) {
return this.metrics.metric(metricName);
}