This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.2 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.2 by this push: new b3b0ed6 KAFKA-7916: Unify store wrapping code for clarity (#6255) b3b0ed6 is described below commit b3b0ed637238375739c76fc5237f8c8414453875 Author: John Roesler <vvcep...@users.noreply.github.com> AuthorDate: Thu Feb 14 10:38:01 2019 -0600 KAFKA-7916: Unify store wrapping code for clarity (#6255) Refactor internal store wrapping for improved maintainability. Reviewers: Matthias J. Sax <matth...@confluent.io>, Guozhang Wang <guozh...@confluent.io> --- .../streams/kstream/internals/TupleForwarder.java | 4 +- .../processor/internals/AbstractStateManager.java | 9 ++ .../internals/GlobalStateManagerImpl.java | 10 +- .../processor/internals/ProcessorContextImpl.java | 44 ++++----- .../processor/internals/ProcessorStateManager.java | 7 +- .../state/internals/CachingKeyValueStore.java | 50 +++------- .../state/internals/CachingSessionStore.java | 30 +++--- .../state/internals/CachingWindowStore.java | 32 +++--- .../internals/ChangeLoggingKeyValueBytesStore.java | 28 +++--- .../internals/ChangeLoggingSessionBytesStore.java | 20 ++-- .../internals/ChangeLoggingWindowBytesStore.java | 22 ++--- .../state/internals/MeteredKeyValueStore.java | 38 ++++---- .../state/internals/MeteredSessionStore.java | 18 ++-- .../state/internals/MeteredWindowStore.java | 20 ++-- .../streams/state/internals/RecordConverter.java | 28 ------ .../streams/state/internals/RecordConverters.java | 60 ++++++++++++ .../state/internals/RocksDBSessionStore.java | 18 ++-- .../state/internals/RocksDBWindowStore.java | 20 ++-- .../streams/state/internals/WrappedStateStore.java | 105 +++++++++----------- .../org/apache/kafka/streams/KafkaStreamsTest.java | 2 +- .../internals/GlobalStateManagerImplTest.java | 108 ++++----------------- .../internals/KeyValueStoreMaterializerTest.java | 15 ++- .../processor/internals/StateRestorerTest.java | 5 +- .../internals/StoreChangelogReaderTest.java | 61 ++++++------ .../state/internals/CachingKeyValueStoreTest.java | 5 +- .../state/internals/KeyValueStoreBuilderTest.java | 16 +-- .../state/internals/SessionStoreBuilderTest.java | 16 +-- .../state/internals/WindowStoreBuilderTest.java | 16 +-- 28 files changed, 347 insertions(+), 460 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java index 127057f..0862e47 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TupleForwarder.java @@ -49,10 +49,10 @@ class TupleForwarder<K, V> { if (store instanceof CachedStateStore) { return (CachedStateStore) store; } else if (store instanceof WrappedStateStore) { - StateStore wrapped = ((WrappedStateStore) store).wrappedStore(); + StateStore wrapped = ((WrappedStateStore) store).wrapped(); while (wrapped instanceof WrappedStateStore && !(wrapped instanceof CachedStateStore)) { - wrapped = ((WrappedStateStore) wrapped).wrappedStore(); + wrapped = ((WrappedStateStore) wrapped).wrapped(); } if (!(wrapped instanceof CachedStateStore)) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java index 66ddec9..36482aa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; +import org.apache.kafka.streams.state.internals.RecordConverter; import org.slf4j.Logger; import java.io.File; @@ -32,6 +33,10 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; +import static org.apache.kafka.streams.state.internals.RecordConverters.identity; +import static org.apache.kafka.streams.state.internals.RecordConverters.rawValueToTimestampedValue; +import static org.apache.kafka.streams.state.internals.WrappedStateStore.isTimestamped; + abstract class AbstractStateManager implements StateManager { static final String CHECKPOINT_FILE_NAME = ".checkpoint"; @@ -50,6 +55,10 @@ abstract class AbstractStateManager implements StateManager { this.checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME)); } + static RecordConverter converterForStore(final StateStore store) { + return isTimestamped(store) ? rawValueToTimestampedValue() : identity(); + } + public void reinitializeStateStoresForPartitions(final Logger log, final Map<String, StateStore> stateStores, final Map<String, String> storeToChangelogTopic, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index efedbec..48319a5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -32,9 +32,7 @@ import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.state.internals.TimestampedBytesStore; import org.apache.kafka.streams.state.internals.RecordConverter; -import org.apache.kafka.streams.state.internals.WrappedStateStore; import org.slf4j.Logger; import java.io.File; @@ -198,17 +196,13 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob } } try { - final StateStore stateStore = - store instanceof WrappedStateStore ? ((WrappedStateStore) store).inner() : store; - final RecordConverter recordConverter = - stateStore instanceof TimestampedBytesStore ? RecordConverter.converter() : record -> record; - restoreState( stateRestoreCallback, topicPartitions, highWatermarks, store.name(), - recordConverter); + converterForStore(store) + ); globalStores.put(store.name(), store); } finally { globalConsumer.unsubscribe(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 4409a95..4bf66e0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -35,7 +35,7 @@ import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; import org.apache.kafka.streams.state.internals.ThreadCache; -import org.apache.kafka.streams.state.internals.WrappedStateStore.AbstractStateStore; +import org.apache.kafka.streams.state.internals.WrappedStateStore; import java.time.Duration; import java.util.List; @@ -214,18 +214,13 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re return streamTimeSupplier.get(); } - private abstract static class StateStoreReadOnlyDecorator<T extends StateStore> extends AbstractStateStore { + private abstract static class StateStoreReadOnlyDecorator<T extends StateStore> extends WrappedStateStore<T> { static final String ERROR_MESSAGE = "Global store is read only"; private StateStoreReadOnlyDecorator(final T inner) { super(inner); } - @SuppressWarnings("unchecked") - T getInner() { - return (T) wrappedStore(); - } - @Override public void flush() { throw new UnsupportedOperationException(ERROR_MESSAGE); @@ -253,23 +248,23 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re @Override public V get(final K key) { - return getInner().get(key); + return wrapped().get(key); } @Override public KeyValueIterator<K, V> range(final K from, final K to) { - return getInner().range(from, to); + return wrapped().range(from, to); } @Override public KeyValueIterator<K, V> all() { - return getInner().all(); + return wrapped().all(); } @Override public long approximateNumEntries() { - return getInner().approximateNumEntries(); + return wrapped().approximateNumEntries(); } @Override @@ -319,7 +314,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re @Override public V fetch(final K key, final long time) { - return getInner().fetch(key, time); + return wrapped().fetch(key, time); } @Deprecated @@ -327,7 +322,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) { - return getInner().fetch(key, timeFrom, timeTo); + return wrapped().fetch(key, timeFrom, timeTo); } @Deprecated @@ -336,19 +331,19 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re final K to, final long timeFrom, final long timeTo) { - return getInner().fetch(from, to, timeFrom, timeTo); + return wrapped().fetch(from, to, timeFrom, timeTo); } @Override public KeyValueIterator<Windowed<K>, V> all() { - return getInner().all(); + return wrapped().all(); } @Deprecated @Override public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo) { - return getInner().fetchAll(timeFrom, timeTo); + return wrapped().fetchAll(timeFrom, timeTo); } } @@ -364,7 +359,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) { - return getInner().findSessions(key, earliestSessionEndTime, latestSessionStartTime); + return wrapped().findSessions(key, earliestSessionEndTime, latestSessionStartTime); } @Override @@ -372,7 +367,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) { - return getInner().findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime); + return wrapped().findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime); } @Override @@ -388,33 +383,28 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re @Override public AGG fetchSession(final K key, final long startTime, final long endTime) { - return getInner().fetchSession(key, startTime, endTime); + return wrapped().fetchSession(key, startTime, endTime); } @Override public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) { - return getInner().fetch(key); + return wrapped().fetch(key); } @Override public KeyValueIterator<Windowed<K>, AGG> fetch(final K from, final K to) { - return getInner().fetch(from, to); + return wrapped().fetch(from, to); } } - private abstract static class StateStoreReadWriteDecorator<T extends StateStore> extends AbstractStateStore { + private abstract static class StateStoreReadWriteDecorator<T extends StateStore> extends WrappedStateStore<T> { static final String ERROR_MESSAGE = "This method may only be called by Kafka Streams"; private StateStoreReadWriteDecorator(final T inner) { super(inner); } - @SuppressWarnings("unchecked") - T wrapped() { - return (T) super.wrappedStore(); - } - @Override public void init(final ProcessorContext context, final StateStore root) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 468d3d6..f75f185 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -23,10 +23,8 @@ import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.state.internals.TimestampedBytesStore; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.apache.kafka.streams.state.internals.RecordConverter; -import org.apache.kafka.streams.state.internals.WrappedStateStore; import org.slf4j.Logger; import java.io.File; @@ -134,10 +132,7 @@ public class ProcessorStateManager extends AbstractStateManager { final TopicPartition storePartition = new TopicPartition(topic, getPartition(topic)); - final StateStore stateStore = - store instanceof WrappedStateStore ? ((WrappedStateStore) store).inner() : store; - final RecordConverter recordConverter = - stateStore instanceof TimestampedBytesStore ? RecordConverter.converter() : record -> record; + final RecordConverter recordConverter = converterForStore(store); if (isStandby) { log.trace("Preparing standby replica of persistent state store {} with changelog topic {}", storeName, topic); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index 992466c..4ccb2a0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -34,9 +34,8 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore implements KeyValueStore<Bytes, byte[]>, CachedStateStore<K, V> { +class CachingKeyValueStore<K, V> extends WrappedStateStore<KeyValueStore<Bytes, byte[]>> implements KeyValueStore<Bytes, byte[]>, CachedStateStore<K, V> { - private final KeyValueStore<Bytes, byte[]> underlying; private final Serde<K> keySerde; private final Serde<V> valueSerde; private CacheFlushListener<K, V> flushListener; @@ -52,7 +51,6 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im final Serde<K> keySerde, final Serde<V> valueSerde) { super(underlying); - this.underlying = underlying; this.keySerde = keySerde; this.valueSerde = valueSerde; } @@ -61,7 +59,7 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im public void init(final ProcessorContext context, final StateStore root) { initInternal(context); - underlying.init(context, root); + super.init(context, root); // save the stream thread as we only ever want to trigger a flush // when the stream thread is the current thread. streamThread = Thread.currentThread(); @@ -70,12 +68,12 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im @SuppressWarnings("unchecked") private void initInternal(final ProcessorContext context) { this.context = (InternalProcessorContext) context; - this.serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), underlying.name()), + this.serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), keySerde == null ? (Serde<K>) context.keySerde() : keySerde, valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); this.cache = this.context.getCache(); - this.cacheName = ThreadCache.nameSpaceFromTaskIdAndStore(context.taskId().toString(), underlying.name()); + this.cacheName = ThreadCache.nameSpaceFromTaskIdAndStore(context.taskId().toString(), name()); cache.addDirtyEntryFlushListener(cacheName, entries -> { for (final ThreadCache.DirtyEntry entry : entries) { putAndMaybeForward(entry, (InternalProcessorContext) context); @@ -87,7 +85,7 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im final InternalProcessorContext context) { if (flushListener != null) { final byte[] newValueBytes = entry.newValue(); - final byte[] oldValueBytes = newValueBytes == null || sendOldValues ? underlying.get(entry.key()) : null; + final byte[] oldValueBytes = newValueBytes == null || sendOldValues ? wrapped().get(entry.key()) : null; // this is an optimization: if this key did not exist in underlying store and also not in the cache, // we can skip flushing to downstream as well as writing to underlying store @@ -96,7 +94,7 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im final V newValue = newValueBytes != null ? serdes.valueFrom(newValueBytes) : null; final V oldValue = sendOldValues && oldValueBytes != null ? serdes.valueFrom(oldValueBytes) : null; // we need to get the old values if needed, and then put to store, and then flush - underlying.put(entry.key(), entry.newValue()); + wrapped().put(entry.key(), entry.newValue()); final ProcessorRecordContext current = context.recordContext(); context.setRecordContext(entry.entry().context()); @@ -111,7 +109,7 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im } } } else { - underlying.put(entry.key(), entry.newValue()); + wrapped().put(entry.key(), entry.newValue()); } } @@ -127,7 +125,7 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im lock.writeLock().lock(); try { cache.flush(cacheName); - underlying.flush(); + super.flush(); } finally { lock.writeLock().unlock(); } @@ -139,7 +137,7 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im flush(); } finally { try { - underlying.close(); + super.close(); } finally { cache.close(cacheName); } @@ -147,16 +145,6 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im } @Override - public boolean persistent() { - return underlying.persistent(); - } - - @Override - public boolean isOpen() { - return underlying.isOpen(); - } - - @Override public byte[] get(final Bytes key) { Objects.requireNonNull(key, "key cannot be null"); validateStoreOpen(); @@ -180,7 +168,7 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im entry = cache.get(cacheName, key); } if (entry == null) { - final byte[] rawValue = underlying.get(key); + final byte[] rawValue = wrapped().get(key); if (rawValue == null) { return null; } @@ -199,7 +187,7 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im public KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) { validateStoreOpen(); - final KeyValueIterator<Bytes, byte[]> storeIterator = underlying.range(from, to); + final KeyValueIterator<Bytes, byte[]> storeIterator = wrapped().range(from, to); final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(cacheName, from, to); return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator); } @@ -207,7 +195,7 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im @Override public KeyValueIterator<Bytes, byte[]> all() { validateStoreOpen(); - final KeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>(this.name(), underlying.all()); + final KeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>(this.name(), wrapped().all()); final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.all(cacheName); return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, storeIterator); } @@ -217,7 +205,7 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im validateStoreOpen(); lock.readLock().lock(); try { - return underlying.approximateNumEntries(); + return wrapped().approximateNumEntries(); } finally { lock.readLock().unlock(); } @@ -300,16 +288,4 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im putInternal(key, null); return v; } - - KeyValueStore<Bytes, byte[]> underlying() { - return underlying; - } - - @Override - public StateStore inner() { - if (underlying instanceof WrappedStateStore) { - return ((WrappedStateStore) underlying).inner(); - } - return underlying; - } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index 1c5c2f2..67a1588 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -30,9 +30,8 @@ import org.apache.kafka.streams.state.StateSerdes; import java.util.Objects; -class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore implements SessionStore<Bytes, byte[]>, CachedStateStore<Windowed<K>, AGG> { +class CachingSessionStore<K, AGG> extends WrappedStateStore<SessionStore<Bytes, byte[]>> implements SessionStore<Bytes, byte[]>, CachedStateStore<Windowed<K>, AGG> { - private final SessionStore<Bytes, byte[]> bytesStore; private final SessionKeySchema keySchema; private final Serde<K> keySerde; private final Serde<AGG> aggSerde; @@ -50,7 +49,6 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i final Serde<AGG> aggSerde, final long segmentInterval) { super(bytesStore); - this.bytesStore = bytesStore; this.keySerde = keySerde; this.aggSerde = aggSerde; this.keySchema = new SessionKeySchema(); @@ -60,7 +58,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i public void init(final ProcessorContext context, final StateStore root) { topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), root.name()); initInternal((InternalProcessorContext) context); - bytesStore.init(context, root); + super.init(context, root); } @SuppressWarnings("unchecked") @@ -72,7 +70,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i keySerde == null ? (Serde<K>) context.keySerde() : keySerde, aggSerde == null ? (Serde<AGG>) context.valueSerde() : aggSerde); - cacheName = context.taskId() + "-" + bytesStore.name(); + cacheName = context.taskId() + "-" + name(); cache = context.getCache(); cache.addDirtyEntryFlushListener(cacheName, entries -> { for (final ThreadCache.DirtyEntry entry : entries) { @@ -89,9 +87,9 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRangeFixedSize(key, latestSessionStartTime)); final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(cacheName, cacheKeyFrom, cacheKeyTo); - final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator = bytesStore.findSessions(key, - earliestSessionEndTime, - latestSessionStartTime); + final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator = wrapped().findSessions(key, + earliestSessionEndTime, + latestSessionStartTime); final HasNextCondition hasNextCondition = keySchema.hasNextCondition(key, key, earliestSessionEndTime, @@ -111,7 +109,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRange(keyTo, latestSessionStartTime)); final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(cacheName, cacheKeyFrom, cacheKeyTo); - final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator = bytesStore.findSessions( + final KeyValueIterator<Windowed<Bytes>, byte[]> storeIterator = wrapped().findSessions( keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime ); final HasNextCondition hasNextCondition = keySchema.hasNextCondition(keyFrom, @@ -149,13 +147,13 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i Objects.requireNonNull(key, "key cannot be null"); validateStoreOpen(); if (cache == null) { - return bytesStore.fetchSession(key, startTime, endTime); + return wrapped().fetchSession(key, startTime, endTime); } else { final Bytes bytesKey = SessionKeySchema.toBinary(key, startTime, endTime); final Bytes cacheKey = cacheFunction.cacheKey(bytesKey); final LRUCacheEntry entry = cache.get(cacheName, cacheKey); if (entry == null) { - return bytesStore.fetchSession(key, startTime, endTime); + return wrapped().fetchSession(key, startTime, endTime); } else { return entry.value(); } @@ -181,7 +179,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i if (flushListener != null) { final byte[] newValueBytes = entry.newValue(); final byte[] oldValueBytes = newValueBytes == null || sendOldValues ? - bytesStore.fetchSession(bytesKey.key(), bytesKey.window().start(), bytesKey.window().end()) : null; + wrapped().fetchSession(bytesKey.key(), bytesKey.window().start(), bytesKey.window().end()) : null; // this is an optimization: if this key did not exist in underlying store and also not in the cache, // we can skip flushing to downstream as well as writing to underlying store @@ -190,7 +188,7 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i final AGG newValue = newValueBytes != null ? serdes.valueFrom(newValueBytes) : null; final AGG oldValue = sendOldValues && oldValueBytes != null ? serdes.valueFrom(oldValueBytes) : null; // we need to get the old values if needed, and then put to store, and then flush - bytesStore.put(bytesKey, entry.newValue()); + wrapped().put(bytesKey, entry.newValue()); final ProcessorRecordContext current = context.recordContext(); context.setRecordContext(entry.entry().context()); @@ -205,19 +203,19 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore i } } } else { - bytesStore.put(bytesKey, entry.newValue()); + wrapped().put(bytesKey, entry.newValue()); } } public void flush() { cache.flush(cacheName); - bytesStore.flush(); + super.flush(); } public void close() { flush(); cache.close(cacheName); - bytesStore.close(); + super.close(); } public void setFlushListener(final CacheFlushListener<Windowed<K>, AGG> flushListener, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index 53d02dd..bc82cc4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -30,9 +30,8 @@ import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; -class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore implements WindowStore<Bytes, byte[]>, CachedStateStore<Windowed<K>, V> { +class CachingWindowStore<K, V> extends WrappedStateStore<WindowStore<Bytes, byte[]>> implements WindowStore<Bytes, byte[]>, CachedStateStore<Windowed<K>, V> { - private final WindowStore<Bytes, byte[]> underlying; private final Serde<K> keySerde; private final Serde<V> valueSerde; private final long windowSize; @@ -55,7 +54,6 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl final long windowSize, final long segmentInterval) { super(underlying); - this.underlying = underlying; this.keySerde = keySerde; this.valueSerde = valueSerde; this.windowSize = windowSize; @@ -65,13 +63,13 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl @Override public void init(final ProcessorContext context, final StateStore root) { initInternal((InternalProcessorContext) context); - underlying.init(context, root); + super.init(context, root); } @SuppressWarnings("unchecked") private void initInternal(final InternalProcessorContext context) { this.context = context; - final String topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), underlying.name()); + final String topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()); serdes = new StateSerdes<>(topic, keySerde == null ? (Serde<K>) context.keySerde() : keySerde, valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); @@ -79,7 +77,7 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl bytesSerdes = new StateSerdes<>(topic, Serdes.Bytes(), Serdes.ByteArray()); - name = context.taskId() + "-" + underlying.name(); + name = context.taskId() + "-" + name(); cache = this.context.getCache(); cache.addDirtyEntryFlushListener(name, entries -> { @@ -97,7 +95,7 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl final Bytes key = windowedKeyBytes.key(); if (flushListener != null) { final byte[] newValueBytes = entry.newValue(); - final byte[] oldValueBytes = newValueBytes == null || sendOldValues ? underlying.fetch(key, windowStartTimestamp) : null; + final byte[] oldValueBytes = newValueBytes == null || sendOldValues ? wrapped().fetch(key, windowStartTimestamp) : null; // this is an optimization: if this key did not exist in underlying store and also not in the cache, // we can skip flushing to downstream as well as writing to underlying store @@ -106,7 +104,7 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl final V newValue = newValueBytes != null ? serdes.valueFrom(newValueBytes) : null; final V oldValue = sendOldValues && oldValueBytes != null ? serdes.valueFrom(oldValueBytes) : null; // we need to get the old values if needed, and then put to store, and then flush - underlying.put(key, entry.newValue(), windowStartTimestamp); + wrapped().put(key, entry.newValue(), windowStartTimestamp); final ProcessorRecordContext current = context.recordContext(); context.setRecordContext(entry.entry().context()); @@ -121,7 +119,7 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl } } } else { - underlying.put(key, entry.newValue(), windowStartTimestamp); + wrapped().put(key, entry.newValue(), windowStartTimestamp); } } @@ -135,14 +133,14 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl @Override public synchronized void flush() { cache.flush(name); - underlying.flush(); + wrapped().flush(); } @Override public void close() { flush(); cache.close(name); - underlying.close(); + wrapped().close(); } @Override @@ -175,11 +173,11 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl final Bytes bytesKey = WindowKeySchema.toStoreKeyBinary(key, timestamp, 0); final Bytes cacheKey = cacheFunction.cacheKey(bytesKey); if (cache == null) { - return underlying.fetch(key, timestamp); + return wrapped().fetch(key, timestamp); } final LRUCacheEntry entry = cache.get(name, cacheKey); if (entry == null) { - return underlying.fetch(key, timestamp); + return wrapped().fetch(key, timestamp); } else { return entry.value(); } @@ -192,7 +190,7 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl // if store is open outside as well. validateStoreOpen(); - final WindowStoreIterator<byte[]> underlyingIterator = underlying.fetch(key, timeFrom, timeTo); + final WindowStoreIterator<byte[]> underlyingIterator = wrapped().fetch(key, timeFrom, timeTo); if (cache == null) { return underlyingIterator; } @@ -218,7 +216,7 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl // if store is open outside as well. validateStoreOpen(); - final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = underlying.fetch(from, to, timeFrom, timeTo); + final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = wrapped().fetch(from, to, timeFrom, timeTo); if (cache == null) { return underlyingIterator; } @@ -245,7 +243,7 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl public KeyValueIterator<Windowed<Bytes>, byte[]> all() { validateStoreOpen(); - final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = underlying.all(); + final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = wrapped().all(); final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.all(name); return new MergedSortedCacheWindowStoreKeyValueIterator( @@ -262,7 +260,7 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore impl public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom, final long timeTo) { validateStoreOpen(); - final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = underlying.fetchAll(timeFrom, timeTo); + final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = wrapped().fetchAll(timeFrom, timeTo); final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.all(name); final HasNextCondition hasNextCondition = keySchema.hasNextCondition(null, null, timeFrom, timeTo); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java index 94c250c..d5f5ad2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java @@ -28,25 +28,23 @@ import org.apache.kafka.streams.state.StateSerdes; import java.util.List; -public class ChangeLoggingKeyValueBytesStore extends WrappedStateStore.AbstractStateStore implements KeyValueStore<Bytes, byte[]> { - private final KeyValueStore<Bytes, byte[]> inner; +public class ChangeLoggingKeyValueBytesStore extends WrappedStateStore<KeyValueStore<Bytes, byte[]>> implements KeyValueStore<Bytes, byte[]> { private StoreChangeLogger<Bytes, byte[]> changeLogger; ChangeLoggingKeyValueBytesStore(final KeyValueStore<Bytes, byte[]> inner) { super(inner); - this.inner = inner; } @Override public void init(final ProcessorContext context, final StateStore root) { - inner.init(context, root); - final String topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), inner.name()); - this.changeLogger = new StoreChangeLogger<>(inner.name(), context, new StateSerdes<>(topic, Serdes.Bytes(), Serdes.ByteArray())); + super.init(context, root); + final String topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()); + this.changeLogger = new StoreChangeLogger<>(name(), context, new StateSerdes<>(topic, Serdes.Bytes(), Serdes.ByteArray())); // if the inner store is an LRU cache, add the eviction listener to log removed record - if (inner instanceof MemoryLRUCache) { - ((MemoryLRUCache<Bytes, byte[]>) inner).setWhenEldestRemoved((key, value) -> { + if (wrapped() instanceof MemoryLRUCache) { + ((MemoryLRUCache<Bytes, byte[]>) wrapped()).setWhenEldestRemoved((key, value) -> { // pass null to indicate removal changeLogger.logChange(key, null); }); @@ -55,13 +53,13 @@ public class ChangeLoggingKeyValueBytesStore extends WrappedStateStore.AbstractS @Override public long approximateNumEntries() { - return inner.approximateNumEntries(); + return wrapped().approximateNumEntries(); } @Override public void put(final Bytes key, final byte[] value) { - inner.put(key, value); + wrapped().put(key, value); changeLogger.logChange(key, value); } @@ -77,7 +75,7 @@ public class ChangeLoggingKeyValueBytesStore extends WrappedStateStore.AbstractS @Override public void putAll(final List<KeyValue<Bytes, byte[]>> entries) { - inner.putAll(entries); + wrapped().putAll(entries); for (final KeyValue<Bytes, byte[]> entry : entries) { changeLogger.logChange(entry.key, entry.value); } @@ -85,24 +83,24 @@ public class ChangeLoggingKeyValueBytesStore extends WrappedStateStore.AbstractS @Override public byte[] delete(final Bytes key) { - final byte[] oldValue = inner.delete(key); + final byte[] oldValue = wrapped().delete(key); changeLogger.logChange(key, null); return oldValue; } @Override public byte[] get(final Bytes key) { - return inner.get(key); + return wrapped().get(key); } @Override public KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) { - return inner.range(from, to); + return wrapped().range(from, to); } @Override public KeyValueIterator<Bytes, byte[]> all() { - return inner.all(); + return wrapped().all(); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java index 3ddbede..1ed163b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java @@ -27,25 +27,23 @@ import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.StateSerdes; /** - * Simple wrapper around a {@link SegmentedBytesStore} to support writing + * Simple wrapper around a {@link SessionStore} to support writing * updates to a changelog */ -class ChangeLoggingSessionBytesStore extends WrappedStateStore.AbstractStateStore implements SessionStore<Bytes, byte[]> { +class ChangeLoggingSessionBytesStore extends WrappedStateStore<SessionStore<Bytes, byte[]>> implements SessionStore<Bytes, byte[]> { - private final SessionStore<Bytes, byte[]> bytesStore; private StoreChangeLogger<Bytes, byte[]> changeLogger; ChangeLoggingSessionBytesStore(final SessionStore<Bytes, byte[]> bytesStore) { super(bytesStore); - this.bytesStore = bytesStore; } @Override public void init(final ProcessorContext context, final StateStore root) { - bytesStore.init(context, root); + super.init(context, root); final String topic = ProcessorStateManager.storeChangelogTopic( context.applicationId(), - bytesStore.name()); + name()); changeLogger = new StoreChangeLogger<>( name(), context, @@ -55,30 +53,30 @@ class ChangeLoggingSessionBytesStore extends WrappedStateStore.AbstractStateStor @Override public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes key, final long earliestSessionEndTime, final long latestSessionStartTime) { - return bytesStore.findSessions(key, earliestSessionEndTime, latestSessionStartTime); + return wrapped().findSessions(key, earliestSessionEndTime, latestSessionStartTime); } @Override public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes keyFrom, final Bytes keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) { - return bytesStore.findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime); + return wrapped().findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime); } @Override public void remove(final Windowed<Bytes> sessionKey) { - bytesStore.remove(sessionKey); + wrapped().remove(sessionKey); changeLogger.logChange(SessionKeySchema.toBinary(sessionKey), null); } @Override public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) { - bytesStore.put(sessionKey, aggregate); + wrapped().put(sessionKey, aggregate); changeLogger.logChange(SessionKeySchema.toBinary(sessionKey), aggregate); } @Override public byte[] fetchSession(final Bytes key, final long startTime, final long endTime) { - return bytesStore.fetchSession(key, startTime, endTime); + return wrapped().fetchSession(key, startTime, endTime); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java index a592471..a614f92 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java @@ -28,12 +28,11 @@ import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; /** - * Simple wrapper around a {@link SegmentedBytesStore} to support writing + * Simple wrapper around a {@link WindowStore} to support writing * updates to a changelog */ -class ChangeLoggingWindowBytesStore extends WrappedStateStore.AbstractStateStore implements WindowStore<Bytes, byte[]> { +class ChangeLoggingWindowBytesStore extends WrappedStateStore<WindowStore<Bytes, byte[]>> implements WindowStore<Bytes, byte[]> { - private final WindowStore<Bytes, byte[]> bytesStore; private final boolean retainDuplicates; private StoreChangeLogger<Bytes, byte[]> changeLogger; private ProcessorContext context; @@ -42,36 +41,35 @@ class ChangeLoggingWindowBytesStore extends WrappedStateStore.AbstractStateStore ChangeLoggingWindowBytesStore(final WindowStore<Bytes, byte[]> bytesStore, final boolean retainDuplicates) { super(bytesStore); - this.bytesStore = bytesStore; this.retainDuplicates = retainDuplicates; } @Override public byte[] fetch(final Bytes key, final long timestamp) { - return bytesStore.fetch(key, timestamp); + return wrapped().fetch(key, timestamp); } @SuppressWarnings("deprecation") @Override public WindowStoreIterator<byte[]> fetch(final Bytes key, final long from, final long to) { - return bytesStore.fetch(key, from, to); + return wrapped().fetch(key, from, to); } @SuppressWarnings("deprecation") @Override public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo, final long from, final long to) { - return bytesStore.fetch(keyFrom, keyTo, from, to); + return wrapped().fetch(keyFrom, keyTo, from, to); } @Override public KeyValueIterator<Windowed<Bytes>, byte[]> all() { - return bytesStore.all(); + return wrapped().all(); } @SuppressWarnings("deprecation") @Override public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom, final long timeTo) { - return bytesStore.fetchAll(timeFrom, timeTo); + return wrapped().fetchAll(timeFrom, timeTo); } @Override @@ -81,15 +79,15 @@ class ChangeLoggingWindowBytesStore extends WrappedStateStore.AbstractStateStore @Override public void put(final Bytes key, final byte[] value, final long windowStartTimestamp) { - bytesStore.put(key, value, windowStartTimestamp); + wrapped().put(key, value, windowStartTimestamp); changeLogger.logChange(WindowKeySchema.toStoreKeyBinary(key, windowStartTimestamp, maybeUpdateSeqnumForDups()), value); } @Override public void init(final ProcessorContext context, final StateStore root) { this.context = context; - bytesStore.init(context, root); - final String topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), bytesStore.name()); + super.init(context, root); + final String topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()); changeLogger = new StoreChangeLogger<>( name(), context, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index f3d1cae..0c08606 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -45,9 +45,8 @@ import static org.apache.kafka.streams.state.internals.metrics.Sensors.createTas * @param <K> * @param <V> */ -public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore implements KeyValueStore<K, V> { +public class MeteredKeyValueStore<K, V> extends WrappedStateStore<KeyValueStore<Bytes, byte[]>> implements KeyValueStore<K, V> { - private final KeyValueStore<Bytes, byte[]> inner; private final Serde<K> keySerde; private final Serde<V> valueSerde; private StateSerdes<K, V> serdes; @@ -71,7 +70,6 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS final Serde<K> keySerde, final Serde<V> valueSerde) { super(inner); - this.inner = inner; this.metricScope = metricScope; this.time = time != null ? time : Time.SYSTEM; this.keySerde = keySerde; @@ -108,12 +106,12 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS if (restoreTime.shouldRecord()) { measureLatency( () -> { - inner.init(context, root); + super.init(context, root); return null; }, restoreTime); } else { - inner.init(context, root); + super.init(context, root); } } @@ -125,16 +123,16 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS @Override public long approximateNumEntries() { - return inner.approximateNumEntries(); + return wrapped().approximateNumEntries(); } @Override public V get(final K key) { try { if (getTime.shouldRecord()) { - return measureLatency(() -> outerValue(inner.get(keyBytes(key))), getTime); + return measureLatency(() -> outerValue(wrapped().get(keyBytes(key))), getTime); } else { - return outerValue(inner.get(keyBytes(key))); + return outerValue(wrapped().get(keyBytes(key))); } } catch (final ProcessorStateException e) { final String message = String.format(e.getMessage(), key); @@ -148,11 +146,11 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS try { if (putTime.shouldRecord()) { measureLatency(() -> { - inner.put(keyBytes(key), serdes.rawValue(value)); + wrapped().put(keyBytes(key), serdes.rawValue(value)); return null; }, putTime); } else { - inner.put(keyBytes(key), serdes.rawValue(value)); + wrapped().put(keyBytes(key), serdes.rawValue(value)); } } catch (final ProcessorStateException e) { final String message = String.format(e.getMessage(), key, value); @@ -165,10 +163,10 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS final V value) { if (putIfAbsentTime.shouldRecord()) { return measureLatency( - () -> outerValue(inner.putIfAbsent(keyBytes(key), serdes.rawValue(value))), + () -> outerValue(wrapped().putIfAbsent(keyBytes(key), serdes.rawValue(value))), putIfAbsentTime); } else { - return outerValue(inner.putIfAbsent(keyBytes(key), serdes.rawValue(value))); + return outerValue(wrapped().putIfAbsent(keyBytes(key), serdes.rawValue(value))); } } @@ -177,12 +175,12 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS if (putAllTime.shouldRecord()) { measureLatency( () -> { - inner.putAll(innerEntries(entries)); + wrapped().putAll(innerEntries(entries)); return null; }, putAllTime); } else { - inner.putAll(innerEntries(entries)); + wrapped().putAll(innerEntries(entries)); } } @@ -190,9 +188,9 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS public V delete(final K key) { try { if (deleteTime.shouldRecord()) { - return measureLatency(() -> outerValue(inner.delete(keyBytes(key))), deleteTime); + return measureLatency(() -> outerValue(wrapped().delete(keyBytes(key))), deleteTime); } else { - return outerValue(inner.delete(keyBytes(key))); + return outerValue(wrapped().delete(keyBytes(key))); } } catch (final ProcessorStateException e) { final String message = String.format(e.getMessage(), key); @@ -204,13 +202,13 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS public KeyValueIterator<K, V> range(final K from, final K to) { return new MeteredKeyValueIterator( - inner.range(Bytes.wrap(serdes.rawKey(from)), Bytes.wrap(serdes.rawKey(to))), + wrapped().range(Bytes.wrap(serdes.rawKey(from)), Bytes.wrap(serdes.rawKey(to))), rangeTime); } @Override public KeyValueIterator<K, V> all() { - return new MeteredKeyValueIterator(inner.all(), allTime); + return new MeteredKeyValueIterator(wrapped().all(), allTime); } @Override @@ -218,12 +216,12 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS if (flushTime.shouldRecord()) { measureLatency( () -> { - inner.flush(); + super.flush(); return null; }, flushTime); } else { - inner.flush(); + super.flush(); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index 3bb7fca..0db67c3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -36,8 +36,7 @@ import java.util.Objects; import static org.apache.kafka.common.metrics.Sensor.RecordingLevel.DEBUG; import static org.apache.kafka.streams.state.internals.metrics.Sensors.createTaskAndStoreLatencyAndThroughputSensors; -public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateStore implements SessionStore<K, V> { - private final SessionStore<Bytes, byte[]> inner; +public class MeteredSessionStore<K, V> extends WrappedStateStore<SessionStore<Bytes, byte[]>> implements SessionStore<K, V> { private final String metricScope; private final Serde<K> keySerde; private final Serde<V> valueSerde; @@ -56,7 +55,6 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt final Serde<V> valueSerde, final Time time) { super(inner); - this.inner = inner; this.metricScope = metricScope; this.keySerde = keySerde; this.valueSerde = valueSerde; @@ -88,7 +86,7 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt // register and possibly restore the state from the logs final long startNs = time.nanoseconds(); try { - inner.init(context, root); + super.init(context, root); } finally { metrics.recordLatency( restoreTime, @@ -112,7 +110,7 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt Objects.requireNonNull(key, "key cannot be null"); final Bytes bytesKey = keyBytes(key); return new MeteredWindowedKeyValueIterator<>( - inner.findSessions( + wrapped().findSessions( bytesKey, earliestSessionEndTime, latestSessionStartTime), @@ -132,7 +130,7 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt final Bytes bytesKeyFrom = keyBytes(keyFrom); final Bytes bytesKeyTo = keyBytes(keyTo); return new MeteredWindowedKeyValueIterator<>( - inner.findSessions( + wrapped().findSessions( bytesKeyFrom, bytesKeyTo, earliestSessionEndTime, @@ -149,7 +147,7 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt final long startNs = time.nanoseconds(); try { final Bytes key = keyBytes(sessionKey.key()); - inner.remove(new Windowed<>(key, sessionKey.window())); + wrapped().remove(new Windowed<>(key, sessionKey.window())); } catch (final ProcessorStateException e) { final String message = String.format(e.getMessage(), sessionKey.key()); throw new ProcessorStateException(message, e); @@ -165,7 +163,7 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt final long startNs = time.nanoseconds(); try { final Bytes key = keyBytes(sessionKey.key()); - inner.put(new Windowed<>(key, sessionKey.window()), serdes.rawValue(aggregate)); + wrapped().put(new Windowed<>(key, sessionKey.window()), serdes.rawValue(aggregate)); } catch (final ProcessorStateException e) { final String message = String.format(e.getMessage(), sessionKey.key(), aggregate); throw new ProcessorStateException(message, e); @@ -185,7 +183,7 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt final Bytes bytesKey = keyBytes(key); final long startNs = time.nanoseconds(); try { - value = serdes.valueFrom(inner.fetchSession(bytesKey, startTime, endTime)); + value = serdes.valueFrom(wrapped().fetchSession(bytesKey, startTime, endTime)); } finally { metrics.recordLatency(flushTime, startNs, time.nanoseconds()); } @@ -211,7 +209,7 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt public void flush() { final long startNs = time.nanoseconds(); try { - inner.flush(); + super.flush(); } finally { metrics.recordLatency(flushTime, startNs, time.nanoseconds()); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index 166c300..2bbfc45 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -36,9 +36,8 @@ import java.util.Map; import static org.apache.kafka.common.metrics.Sensor.RecordingLevel.DEBUG; import static org.apache.kafka.streams.state.internals.metrics.Sensors.createTaskAndStoreLatencyAndThroughputSensors; -public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateStore implements WindowStore<K, V> { +public class MeteredWindowStore<K, V> extends WrappedStateStore<WindowStore<Bytes, byte[]>> implements WindowStore<K, V> { - private final WindowStore<Bytes, byte[]> inner; private final String metricScope; private final Time time; private final Serde<K> keySerde; @@ -57,7 +56,6 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto final Serde<K> keySerde, final Serde<V> valueSerde) { super(inner); - this.inner = inner; this.metricScope = metricScope; this.time = time; this.keySerde = keySerde; @@ -88,7 +86,7 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto // register and possibly restore the state from the logs final long startNs = time.nanoseconds(); try { - inner.init(context, root); + super.init(context, root); } finally { metrics.recordLatency( restoreTime, @@ -116,7 +114,7 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto final long windowStartTimestamp) { final long startNs = time.nanoseconds(); try { - inner.put(keyBytes(key), serdes.rawValue(value), windowStartTimestamp); + wrapped().put(keyBytes(key), serdes.rawValue(value), windowStartTimestamp); } catch (final ProcessorStateException e) { final String message = String.format(e.getMessage(), key, value); throw new ProcessorStateException(message, e); @@ -134,7 +132,7 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto final long timestamp) { final long startNs = time.nanoseconds(); try { - final byte[] result = inner.fetch(keyBytes(key), timestamp); + final byte[] result = wrapped().fetch(keyBytes(key), timestamp); if (result == null) { return null; } @@ -149,7 +147,7 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) { - return new MeteredWindowStoreIterator<>(inner.fetch(keyBytes(key), timeFrom, timeTo), + return new MeteredWindowStoreIterator<>(wrapped().fetch(keyBytes(key), timeFrom, timeTo), fetchTime, metrics, serdes, @@ -158,7 +156,7 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto @Override public KeyValueIterator<Windowed<K>, V> all() { - return new MeteredWindowedKeyValueIterator<>(inner.all(), fetchTime, metrics, serdes, time); + return new MeteredWindowedKeyValueIterator<>(wrapped().all(), fetchTime, metrics, serdes, time); } @SuppressWarnings("deprecation") @@ -166,7 +164,7 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo) { return new MeteredWindowedKeyValueIterator<>( - inner.fetchAll(timeFrom, timeTo), + wrapped().fetchAll(timeFrom, timeTo), fetchTime, metrics, serdes, @@ -180,7 +178,7 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto final long timeFrom, final long timeTo) { return new MeteredWindowedKeyValueIterator<>( - inner.fetch(keyBytes(from), keyBytes(to), timeFrom, timeTo), + wrapped().fetch(keyBytes(from), keyBytes(to), timeFrom, timeTo), fetchTime, metrics, serdes, @@ -191,7 +189,7 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto public void flush() { final long startNs = time.nanoseconds(); try { - inner.flush(); + super.flush(); } finally { metrics.recordLatency(flushTime, startNs, time.nanoseconds()); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverter.java index 64733b2..9046e37 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverter.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverter.java @@ -18,34 +18,6 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; -import java.nio.ByteBuffer; - public interface RecordConverter { ConsumerRecord<byte[], byte[]> convert(final ConsumerRecord<byte[], byte[]> record); - - @SuppressWarnings("deprecation") - static RecordConverter converter() { - return record -> { - final byte[] rawValue = record.value(); - final long timestamp = record.timestamp(); - return new ConsumerRecord<>( - record.topic(), - record.partition(), - record.offset(), - timestamp, - record.timestampType(), - record.checksum(), - record.serializedKeySize(), - record.serializedValueSize(), - record.key(), - ByteBuffer - .allocate(8 + rawValue.length) - .putLong(timestamp) - .put(rawValue) - .array(), - record.headers(), - record.leaderEpoch() - ); - }; - } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java new file mode 100644 index 0000000..f65cc32 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import java.nio.ByteBuffer; + +public final class RecordConverters { + private static final RecordConverter IDENTITY_INSTANCE = record -> record; + + @SuppressWarnings("deprecation") + private static final RecordConverter RAW_TO_TIMESTAMED_INSTANCE = record -> { + final byte[] rawValue = record.value(); + final long timestamp = record.timestamp(); + return new ConsumerRecord<>( + record.topic(), + record.partition(), + record.offset(), + timestamp, + record.timestampType(), + record.checksum(), + record.serializedKeySize(), + record.serializedValueSize(), + record.key(), + ByteBuffer + .allocate(8 + rawValue.length) + .putLong(timestamp) + .put(rawValue) + .array(), + record.headers(), + record.leaderEpoch() + ); + }; + + // privatize the constructor so the class cannot be instantiated (only used for its static members) + private RecordConverters() {} + + public static RecordConverter rawValueToTimestampedValue() { + return RAW_TO_TIMESTAMED_INSTANCE; + } + + public static RecordConverter identity() { + return IDENTITY_INSTANCE; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java index be423bc..d855442 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java @@ -27,11 +27,10 @@ import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.StateSerdes; -public class RocksDBSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore implements SessionStore<K, AGG> { +public class RocksDBSessionStore<K, AGG> extends WrappedStateStore<SegmentedBytesStore> implements SessionStore<K, AGG> { private final Serde<K> keySerde; private final Serde<AGG> aggSerde; - private final SegmentedBytesStore bytesStore; private StateSerdes<K, AGG> serdes; private String topic; @@ -41,14 +40,13 @@ public class RocksDBSessionStore<K, AGG> extends WrappedStateStore.AbstractState final Serde<AGG> aggSerde) { super(bytesStore); this.keySerde = keySerde; - this.bytesStore = bytesStore; this.aggSerde = aggSerde; } @Override @SuppressWarnings("unchecked") public void init(final ProcessorContext context, final StateStore root) { - final String storeName = bytesStore.name(); + final String storeName = name(); topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName); serdes = new StateSerdes<>( @@ -56,12 +54,12 @@ public class RocksDBSessionStore<K, AGG> extends WrappedStateStore.AbstractState keySerde == null ? (Serde<K>) context.keySerde() : keySerde, aggSerde == null ? (Serde<AGG>) context.valueSerde() : aggSerde); - bytesStore.init(context, root); + super.init(context, root); } @Override public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) { - final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch( + final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetch( Bytes.wrap(serdes.rawKey(key)), earliestSessionEndTime, latestSessionStartTime @@ -71,7 +69,7 @@ public class RocksDBSessionStore<K, AGG> extends WrappedStateStore.AbstractState @Override public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) { - final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch( + final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetch( Bytes.wrap(serdes.rawKey(keyFrom)), Bytes.wrap(serdes.rawKey(keyTo)), earliestSessionEndTime, @@ -82,7 +80,7 @@ public class RocksDBSessionStore<K, AGG> extends WrappedStateStore.AbstractState @Override public AGG fetchSession(final K key, final long startTime, final long endTime) { - return serdes.valueFrom(bytesStore.get(SessionKeySchema.toBinary(Bytes.wrap(serdes.rawKey(key)), startTime, endTime))); + return serdes.valueFrom(wrapped().get(SessionKeySchema.toBinary(Bytes.wrap(serdes.rawKey(key)), startTime, endTime))); } @Override @@ -97,11 +95,11 @@ public class RocksDBSessionStore<K, AGG> extends WrappedStateStore.AbstractState @Override public void remove(final Windowed<K> key) { - bytesStore.remove(Bytes.wrap(SessionKeySchema.toBinary(key, serdes.keySerializer(), topic))); + wrapped().remove(Bytes.wrap(SessionKeySchema.toBinary(key, serdes.keySerializer(), topic))); } @Override public void put(final Windowed<K> sessionKey, final AGG aggregate) { - bytesStore.put(Bytes.wrap(SessionKeySchema.toBinary(sessionKey, serdes.keySerializer(), topic)), serdes.rawValue(aggregate)); + wrapped().put(Bytes.wrap(SessionKeySchema.toBinary(sessionKey, serdes.keySerializer(), topic)), serdes.rawValue(aggregate)); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java index bb13c74..c22ca52 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java @@ -27,13 +27,12 @@ import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; -public class RocksDBWindowStore<K, V> extends WrappedStateStore.AbstractStateStore implements WindowStore<K, V> { +public class RocksDBWindowStore<K, V> extends WrappedStateStore<SegmentedBytesStore> implements WindowStore<K, V> { private final Serde<K> keySerde; private final Serde<V> valueSerde; private final boolean retainDuplicates; private final long windowSize; - private final SegmentedBytesStore bytesStore; private ProcessorContext context; private StateSerdes<K, V> serdes; @@ -47,7 +46,6 @@ public class RocksDBWindowStore<K, V> extends WrappedStateStore.AbstractStateSto super(bytesStore); this.keySerde = keySerde; this.valueSerde = valueSerde; - this.bytesStore = bytesStore; this.retainDuplicates = retainDuplicates; this.windowSize = windowSize; } @@ -57,11 +55,11 @@ public class RocksDBWindowStore<K, V> extends WrappedStateStore.AbstractStateSto public void init(final ProcessorContext context, final StateStore root) { this.context = context; // construct the serde - serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), bytesStore.name()), + serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), keySerde == null ? (Serde<K>) context.keySerde() : keySerde, valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); - bytesStore.init(context, root); + super.init(context, root); } @Override @@ -73,12 +71,12 @@ public class RocksDBWindowStore<K, V> extends WrappedStateStore.AbstractStateSto public void put(final K key, final V value, final long windowStartTimestamp) { maybeUpdateSeqnumForDups(); - bytesStore.put(WindowKeySchema.toStoreKeyBinary(key, windowStartTimestamp, seqnum, serdes), serdes.rawValue(value)); + wrapped().put(WindowKeySchema.toStoreKeyBinary(key, windowStartTimestamp, seqnum, serdes), serdes.rawValue(value)); } @Override public V fetch(final K key, final long timestamp) { - final byte[] bytesValue = bytesStore.get(WindowKeySchema.toStoreKeyBinary(key, timestamp, seqnum, serdes)); + final byte[] bytesValue = wrapped().get(WindowKeySchema.toStoreKeyBinary(key, timestamp, seqnum, serdes)); if (bytesValue == null) { return null; } @@ -88,27 +86,27 @@ public class RocksDBWindowStore<K, V> extends WrappedStateStore.AbstractStateSto @SuppressWarnings("deprecation") @Override public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) { - final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(Bytes.wrap(serdes.rawKey(key)), timeFrom, timeTo); + final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetch(Bytes.wrap(serdes.rawKey(key)), timeFrom, timeTo); return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).valuesIterator(); } @SuppressWarnings("deprecation") @Override public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) { - final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(Bytes.wrap(serdes.rawKey(from)), Bytes.wrap(serdes.rawKey(to)), timeFrom, timeTo); + final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetch(Bytes.wrap(serdes.rawKey(from)), Bytes.wrap(serdes.rawKey(to)), timeFrom, timeTo); return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).keyValueIterator(); } @Override public KeyValueIterator<Windowed<K>, V> all() { - final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.all(); + final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().all(); return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).keyValueIterator(); } @SuppressWarnings("deprecation") @Override public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo) { - final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetchAll(timeFrom, timeTo); + final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetchAll(timeFrom, timeTo); return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).keyValueIterator(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java index 2bb60bc..65cd484 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java @@ -19,81 +19,66 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.TimestampedBytesStore; /** * A storage engine wrapper for utilities like logging, caching, and metering. */ -public interface WrappedStateStore extends StateStore { - - /** - * Return the inner most storage engine - * - * @return wrapped inner storage engine - */ - StateStore inner(); - - /** - * Return the state store this store directly wraps - * @return the state store this store directly wraps - */ - StateStore wrappedStore(); - - abstract class AbstractStateStore implements WrappedStateStore { - final StateStore innerState; - - protected AbstractStateStore(final StateStore inner) { - this.innerState = inner; +public abstract class WrappedStateStore<S extends StateStore> implements StateStore { + public static boolean isTimestamped(final StateStore stateStore) { + if (stateStore instanceof TimestampedBytesStore) { + return true; + } else if (stateStore instanceof WrappedStateStore) { + return isTimestamped(((WrappedStateStore) stateStore).wrapped()); + } else { + return false; } + } - @Override - public void init(final ProcessorContext context, - final StateStore root) { - innerState.init(context, root); - } + private final S wrapped; - @Override - public String name() { - return innerState.name(); - } + public WrappedStateStore(final S wrapped) { + this.wrapped = wrapped; + } - @Override - public boolean persistent() { - return innerState.persistent(); - } + @Override + public void init(final ProcessorContext context, + final StateStore root) { + wrapped.init(context, root); + } - @Override - public boolean isOpen() { - return innerState.isOpen(); - } + @Override + public String name() { + return wrapped.name(); + } - void validateStoreOpen() { - if (!innerState.isOpen()) { - throw new InvalidStateStoreException("Store " + innerState.name() + " is currently closed."); - } - } + @Override + public boolean persistent() { + return wrapped.persistent(); + } - @Override - public StateStore inner() { - if (innerState instanceof WrappedStateStore) { - return ((WrappedStateStore) innerState).inner(); - } - return innerState; - } + @Override + public boolean isOpen() { + return wrapped.isOpen(); + } - @Override - public void flush() { - innerState.flush(); + void validateStoreOpen() { + if (!wrapped.isOpen()) { + throw new InvalidStateStoreException("Store " + wrapped.name() + " is currently closed."); } + } - @Override - public void close() { - innerState.close(); - } + @Override + public void flush() { + wrapped.flush(); + } - @Override - public StateStore wrappedStore() { - return innerState; - } + @Override + public void close() { + wrapped.close(); + } + public S wrapped() { + return wrapped; } } diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 698a0ab..6b8b5b5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -692,7 +692,7 @@ public class KafkaStreamsTest { .addSink("sink", outputTopic, new StringSerializer(), new StringSerializer(), "process"); final StoreBuilder<KeyValueStore<String, String>> globalStoreBuilder = Stores.keyValueStoreBuilder( - isPersistentStore ? Stores.persistentKeyValueStore(globalStoreName) : Stores.inMemoryKeyValueStore(globalStoreName), + isPersistentStore ? Stores.persistentKeyValueStore(globalStoreName) : Stores.inMemoryKeyValueStore(globalStoreName), Serdes.String(), Serdes.String()).withLoggingDisabled(); topology.addGlobalStore(globalStoreBuilder, "global", diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java index 0e41794..c8879a6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java @@ -31,9 +31,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.StreamsException; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; -import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.internals.TimestampedBytesStore; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.apache.kafka.streams.state.internals.WrappedStateStore; @@ -240,47 +238,11 @@ public class GlobalStateManagerImplTest { initializeConsumer(1, 0, t1); stateManager.initialize(); - stateManager.register(new WrappedStateStore() { - @Override - public StateStore inner() { - return store1; - } - - @Override - public StateStore wrappedStore() { - return store1; - } - - @Override - public String name() { - return store1.name(); - } - - @Override - public void init(final ProcessorContext context, final StateStore root) { - store1.init(context, root); - } - - @Override - public void flush() { - store1.flush(); - } - - @Override - public void close() { - store1.close(); - } - - @Override - public boolean persistent() { - return store1.persistent(); - } - - @Override - public boolean isOpen() { - return store1.isOpen(); - } - }, stateRestoreCallback); + stateManager.register( + new WrappedStateStore<NoOpReadOnlyStore<Object, Object>>(store1) { + }, + stateRestoreCallback + ); final KeyValue<byte[], byte[]> restoredRecord = stateRestoreCallback.restored.get(0); assertEquals(3, restoredRecord.key.length); @@ -304,47 +266,11 @@ public class GlobalStateManagerImplTest { initializeConsumer(1, 0, t2); stateManager.initialize(); - stateManager.register(new WrappedStateStore() { - @Override - public StateStore inner() { - return store2; - } - - @Override - public StateStore wrappedStore() { - return store2; - } - - @Override - public String name() { - return store2.name(); - } - - @Override - public void init(final ProcessorContext context, final StateStore root) { - store2.init(context, root); - } - - @Override - public void flush() { - store2.flush(); - } - - @Override - public void close() { - store2.close(); - } - - @Override - public boolean persistent() { - return store2.persistent(); - } - - @Override - public boolean isOpen() { - return store2.isOpen(); - } - }, stateRestoreCallback); + stateManager.register( + new WrappedStateStore<NoOpReadOnlyStore<Object, Object>>(store2) { + }, + stateRestoreCallback + ); final KeyValue<byte[], byte[]> restoredRecord = stateRestoreCallback.restored.get(0); assertEquals(3, restoredRecord.key.length); @@ -402,7 +328,7 @@ public class GlobalStateManagerImplTest { offsetCheckpoint.write(Collections.singletonMap(t1, 5L)); stateManager.initialize(); - stateManager.register(store1, stateRestoreCallback); + stateManager.register(store1, stateRestoreCallback); assertEquals(5, stateRestoreCallback.restored.size()); } @@ -729,15 +655,15 @@ public class GlobalStateManagerImplTest { @Test public void shouldDeleteAndRecreateStoreDirectoryOnReinitialize() throws IOException { final File storeDirectory1 = new File(stateDirectory.globalStateDir().getAbsolutePath() - + File.separator + "rocksdb" - + File.separator + storeName1); + + File.separator + "rocksdb" + + File.separator + storeName1); final File storeDirectory2 = new File(stateDirectory.globalStateDir().getAbsolutePath() - + File.separator + "rocksdb" - + File.separator + storeName2); + + File.separator + "rocksdb" + + File.separator + storeName2); final File storeDirectory3 = new File(stateDirectory.globalStateDir().getAbsolutePath() - + File.separator + storeName3); + + File.separator + storeName3); final File storeDirectory4 = new File(stateDirectory.globalStateDir().getAbsolutePath() - + File.separator + storeName4); + + File.separator + storeName4); final File testFile1 = new File(storeDirectory1.getAbsolutePath() + File.separator + "testFile"); final File testFile2 = new File(storeDirectory2.getAbsolutePath() + File.separator + "testFile"); final File testFile3 = new File(storeDirectory3.getAbsolutePath() + File.separator + "testFile"); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java index 494ae02..bb1dec2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java @@ -59,8 +59,8 @@ public class KeyValueStoreMaterializerTest { final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized); final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize(); final KeyValueStore<String, String> store = builder.build(); - final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrappedStore(); - final StateStore logging = caching.wrappedStore(); + final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrapped(); + final StateStore logging = caching.wrapped(); assertThat(store, instanceOf(MeteredKeyValueStore.class)); assertThat(caching, instanceOf(CachedStateStore.class)); assertThat(logging, instanceOf(ChangeLoggingKeyValueBytesStore.class)); @@ -74,7 +74,7 @@ public class KeyValueStoreMaterializerTest { final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized); final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize(); final KeyValueStore<String, String> store = builder.build(); - final WrappedStateStore logging = (WrappedStateStore) ((WrappedStateStore) store).wrappedStore(); + final WrappedStateStore logging = (WrappedStateStore) ((WrappedStateStore) store).wrapped(); assertThat(logging, instanceOf(ChangeLoggingKeyValueBytesStore.class)); } @@ -86,9 +86,9 @@ public class KeyValueStoreMaterializerTest { final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized); final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize(); final KeyValueStore<String, String> store = builder.build(); - final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrappedStore(); + final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrapped(); assertThat(caching, instanceOf(CachedStateStore.class)); - assertThat(caching.wrappedStore(), not(instanceOf(ChangeLoggingKeyValueBytesStore.class))); + assertThat(caching.wrapped(), not(instanceOf(ChangeLoggingKeyValueBytesStore.class))); } @Test @@ -99,7 +99,7 @@ public class KeyValueStoreMaterializerTest { final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized); final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize(); final KeyValueStore<String, String> store = builder.build(); - final StateStore wrapped = ((WrappedStateStore) store).wrappedStore(); + final StateStore wrapped = ((WrappedStateStore) store).wrapped(); assertThat(wrapped, not(instanceOf(CachedStateStore.class))); assertThat(wrapped, not(instanceOf(ChangeLoggingKeyValueBytesStore.class))); } @@ -117,9 +117,8 @@ public class KeyValueStoreMaterializerTest { final KeyValueStoreMaterializer<String, Integer> materializer = new KeyValueStoreMaterializer<>(materialized); final StoreBuilder<KeyValueStore<String, Integer>> builder = materializer.materialize(); final KeyValueStore<String, Integer> built = builder.build(); - final StateStore inner = ((WrappedStateStore) built).inner(); - assertThat(inner, CoreMatchers.equalTo(store)); + assertThat(store.name(), CoreMatchers.equalTo(built.name())); } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java index 8ed23a6..4622c01 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java @@ -25,6 +25,7 @@ import org.junit.Test; import java.util.Collections; +import static org.apache.kafka.streams.state.internals.RecordConverters.identity; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertTrue; @@ -42,7 +43,7 @@ public class StateRestorerTest { OFFSET_LIMIT, true, "storeName", - record -> record); + identity()); @Before public void setUp() { @@ -79,7 +80,7 @@ public class StateRestorerTest { 0, true, "storeName", - record -> record); + identity()); assertTrue(restorer.hasCompleted(0, 10)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index ed82421..b13ed53 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -48,6 +48,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import static java.util.Arrays.asList; +import static org.apache.kafka.streams.state.internals.RecordConverters.identity; import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_BATCH; import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_END; import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_START; @@ -107,7 +108,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, true, "storeName", - record -> record)); + identity())); changelogReader.restore(active); assertTrue(functionCalled.get()); } @@ -145,7 +146,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, true, "storeName", - record -> record)); + identity())); expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); replay(active, task); changelogReader.restore(active); @@ -170,7 +171,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, true, "storeName", - record -> record)); + identity())); EasyMock.expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); EasyMock.replay(active, task); @@ -185,7 +186,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, true, "storeName", - record -> record)); + identity())); // retry restore should succeed assertEquals(1, changelogReader.restore(active).size()); assertThat(callback.restored.size(), equalTo(messages)); @@ -210,7 +211,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, true, "storeName", - record -> record); + identity()); changelogReader.register(stateRestorer); EasyMock.expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); @@ -240,7 +241,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, true, "storeName", - record -> record)); + identity())); changelogReader.restore(active); assertThat(callback.restored.size(), equalTo(5)); @@ -257,7 +258,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, true, "storeName", - record -> record)); + identity())); expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); replay(active, task); changelogReader.restore(active); @@ -274,7 +275,7 @@ public class StoreChangelogReaderTest { 3, true, "storeName", - record -> record); + identity()); changelogReader.register(restorer); expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); replay(active, task); @@ -302,7 +303,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, true, "storeName1", - record -> record)); + identity())); changelogReader.register(new StateRestorer( one, restoreListener1, @@ -310,7 +311,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, true, "storeName2", - record -> record)); + identity())); changelogReader.register(new StateRestorer( two, restoreListener2, @@ -318,7 +319,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, true, "storeName3", - record -> record)); + identity())); expect(active.restoringTaskFor(one)).andStubReturn(task); expect(active.restoringTaskFor(two)).andStubReturn(task); @@ -350,7 +351,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, true, "storeName1", - record -> record)); + identity())); changelogReader.register(new StateRestorer( one, restoreListener1, @@ -358,7 +359,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, true, "storeName2", - record -> record)); + identity())); changelogReader.register(new StateRestorer( two, restoreListener2, @@ -366,7 +367,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, true, "storeName3", - record -> record)); + identity())); expect(active.restoringTaskFor(one)).andReturn(task); expect(active.restoringTaskFor(two)).andReturn(task); @@ -401,7 +402,7 @@ public class StoreChangelogReaderTest { 5, true, "storeName1", - record -> record)); + identity())); expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); replay(active, task); changelogReader.restore(active); @@ -437,7 +438,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, true, "storeName", - record -> record); + identity()); setupConsumer(0, topicPartition); changelogReader.register(restorer); @@ -457,7 +458,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, true, "storeName", - record -> record); + identity()); changelogReader.register(restorer); @@ -476,7 +477,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, true, "storeName", - record -> record)); + identity())); expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); replay(active, task); changelogReader.restore(active); @@ -495,7 +496,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, false, "storeName", - record -> record)); + identity())); expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); replay(active, task); changelogReader.restore(active); @@ -518,7 +519,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, false, "storeName", - record -> record)); + identity())); expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); replay(active, task); changelogReader.restore(active); @@ -537,7 +538,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, true, "store", - record -> record)); + identity())); expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); replay(active, task); @@ -559,7 +560,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, false, "storeName", - record -> record)); + identity())); final TopicPartition postInitialization = new TopicPartition("other", 0); expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); @@ -581,7 +582,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, false, "otherStore", - record -> record)); + identity())); final Collection<TopicPartition> expected = Utils.mkSet(topicPartition, postInitialization); consumer.assign(expected); @@ -605,7 +606,7 @@ public class StoreChangelogReaderTest { 9L, true, "storeName", - record -> record)); + identity())); expect(active.restoringTaskFor(topicPartition)).andReturn(task); replay(active); @@ -627,7 +628,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, true, "storeName", - record -> record)); + identity())); expect(active.restoringTaskFor(topicPartition)).andReturn(task); replay(active); @@ -648,7 +649,7 @@ public class StoreChangelogReaderTest { Long.MAX_VALUE, true, "storeName", - record -> record)); + identity())); expect(active.restoringTaskFor(topicPartition)).andReturn(task); replay(active); @@ -668,7 +669,7 @@ public class StoreChangelogReaderTest { 5, true, "storeName", - record -> record)); + identity())); expect(active.restoringTaskFor(topicPartition)).andReturn(task); replay(active); @@ -689,7 +690,7 @@ public class StoreChangelogReaderTest { 10, true, "storeName", - record -> record)); + identity())); expect(active.restoringTaskFor(topicPartition)).andReturn(task); replay(active); @@ -717,7 +718,7 @@ public class StoreChangelogReaderTest { 6, true, "storeName", - record -> record)); + identity())); expect(active.restoringTaskFor(topicPartition)).andReturn(task); replay(active); @@ -740,7 +741,7 @@ public class StoreChangelogReaderTest { 11, true, "storeName", - record -> record)); + identity())); expect(active.restoringTaskFor(topicPartition)).andReturn(task); replay(active); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java index 12dffba..c46498c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java @@ -51,7 +51,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest { @@ -96,7 +95,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest { final KeyValueStore<K, V> store = (KeyValueStore<K, V>) storeBuilder.build(); final CacheFlushListenerStub<K, V> cacheFlushListener = new CacheFlushListenerStub<>(); - final CachedStateStore inner = (CachedStateStore) ((WrappedStateStore) store).wrappedStore(); + final CachedStateStore inner = (CachedStateStore) ((WrappedStateStore) store).wrapped(); inner.setFlushListener(cacheFlushListener, false); store.init(context, store); return store; @@ -358,7 +357,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest { @Test public void shouldReturnUnderlying() { - assertTrue(store.underlying().equals(underlyingStore)); + assertEquals(store.wrapped(), underlyingStore); } @Test(expected = InvalidStateStoreException.class) diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java index 848bba6..8642cfa 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilderTest.java @@ -69,21 +69,21 @@ public class KeyValueStoreBuilderTest { public void shouldHaveChangeLoggingStoreByDefault() { final KeyValueStore<String, String> store = builder.build(); assertThat(store, instanceOf(MeteredKeyValueStore.class)); - final StateStore next = ((WrappedStateStore) store).wrappedStore(); + final StateStore next = ((WrappedStateStore) store).wrapped(); assertThat(next, instanceOf(ChangeLoggingKeyValueBytesStore.class)); } @Test public void shouldNotHaveChangeLoggingStoreWhenDisabled() { final KeyValueStore<String, String> store = builder.withLoggingDisabled().build(); - final StateStore next = ((WrappedStateStore) store).wrappedStore(); + final StateStore next = ((WrappedStateStore) store).wrapped(); assertThat(next, CoreMatchers.equalTo(inner)); } @Test public void shouldHaveCachingStoreWhenEnabled() { final KeyValueStore<String, String> store = builder.withCachingEnabled().build(); - final StateStore wrapped = ((WrappedStateStore) store).wrappedStore(); + final StateStore wrapped = ((WrappedStateStore) store).wrapped(); assertThat(store, instanceOf(MeteredKeyValueStore.class)); assertThat(wrapped, instanceOf(CachingKeyValueStore.class)); } @@ -93,10 +93,10 @@ public class KeyValueStoreBuilderTest { final KeyValueStore<String, String> store = builder .withLoggingEnabled(Collections.emptyMap()) .build(); - final StateStore wrapped = ((WrappedStateStore) store).wrappedStore(); + final StateStore wrapped = ((WrappedStateStore) store).wrapped(); assertThat(store, instanceOf(MeteredKeyValueStore.class)); assertThat(wrapped, instanceOf(ChangeLoggingKeyValueBytesStore.class)); - assertThat(((WrappedStateStore) wrapped).wrappedStore(), CoreMatchers.equalTo(inner)); + assertThat(((WrappedStateStore) wrapped).wrapped(), CoreMatchers.equalTo(inner)); } @Test @@ -105,12 +105,12 @@ public class KeyValueStoreBuilderTest { .withLoggingEnabled(Collections.emptyMap()) .withCachingEnabled() .build(); - final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrappedStore(); - final WrappedStateStore changeLogging = (WrappedStateStore) caching.wrappedStore(); + final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrapped(); + final WrappedStateStore changeLogging = (WrappedStateStore) caching.wrapped(); assertThat(store, instanceOf(MeteredKeyValueStore.class)); assertThat(caching, instanceOf(CachingKeyValueStore.class)); assertThat(changeLogging, instanceOf(ChangeLoggingKeyValueBytesStore.class)); - assertThat(changeLogging.wrappedStore(), CoreMatchers.equalTo(inner)); + assertThat(changeLogging.wrapped(), CoreMatchers.equalTo(inner)); } @SuppressWarnings("all") diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java index 621a1c2..eb0cc55 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderTest.java @@ -69,21 +69,21 @@ public class SessionStoreBuilderTest { @Test public void shouldHaveChangeLoggingStoreByDefault() { final SessionStore<String, String> store = builder.build(); - final StateStore next = ((WrappedStateStore) store).wrappedStore(); + final StateStore next = ((WrappedStateStore) store).wrapped(); assertThat(next, instanceOf(ChangeLoggingSessionBytesStore.class)); } @Test public void shouldNotHaveChangeLoggingStoreWhenDisabled() { final SessionStore<String, String> store = builder.withLoggingDisabled().build(); - final StateStore next = ((WrappedStateStore) store).wrappedStore(); + final StateStore next = ((WrappedStateStore) store).wrapped(); assertThat(next, CoreMatchers.<StateStore>equalTo(inner)); } @Test public void shouldHaveCachingStoreWhenEnabled() { final SessionStore<String, String> store = builder.withCachingEnabled().build(); - final StateStore wrapped = ((WrappedStateStore) store).wrappedStore(); + final StateStore wrapped = ((WrappedStateStore) store).wrapped(); assertThat(store, instanceOf(MeteredSessionStore.class)); assertThat(wrapped, instanceOf(CachingSessionStore.class)); } @@ -93,10 +93,10 @@ public class SessionStoreBuilderTest { final SessionStore<String, String> store = builder .withLoggingEnabled(Collections.<String, String>emptyMap()) .build(); - final StateStore wrapped = ((WrappedStateStore) store).wrappedStore(); + final StateStore wrapped = ((WrappedStateStore) store).wrapped(); assertThat(store, instanceOf(MeteredSessionStore.class)); assertThat(wrapped, instanceOf(ChangeLoggingSessionBytesStore.class)); - assertThat(((WrappedStateStore) wrapped).wrappedStore(), CoreMatchers.<StateStore>equalTo(inner)); + assertThat(((WrappedStateStore) wrapped).wrapped(), CoreMatchers.<StateStore>equalTo(inner)); } @Test @@ -105,12 +105,12 @@ public class SessionStoreBuilderTest { .withLoggingEnabled(Collections.<String, String>emptyMap()) .withCachingEnabled() .build(); - final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrappedStore(); - final WrappedStateStore changeLogging = (WrappedStateStore) caching.wrappedStore(); + final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrapped(); + final WrappedStateStore changeLogging = (WrappedStateStore) caching.wrapped(); assertThat(store, instanceOf(MeteredSessionStore.class)); assertThat(caching, instanceOf(CachingSessionStore.class)); assertThat(changeLogging, instanceOf(ChangeLoggingSessionBytesStore.class)); - assertThat(changeLogging.wrappedStore(), CoreMatchers.<StateStore>equalTo(inner)); + assertThat(changeLogging.wrapped(), CoreMatchers.<StateStore>equalTo(inner)); } @Test(expected = NullPointerException.class) diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java index 25b8178..022f6dd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java @@ -68,21 +68,21 @@ public class WindowStoreBuilderTest { @Test public void shouldHaveChangeLoggingStoreByDefault() { final WindowStore<String, String> store = builder.build(); - final StateStore next = ((WrappedStateStore) store).wrappedStore(); + final StateStore next = ((WrappedStateStore) store).wrapped(); assertThat(next, instanceOf(ChangeLoggingWindowBytesStore.class)); } @Test public void shouldNotHaveChangeLoggingStoreWhenDisabled() { final WindowStore<String, String> store = builder.withLoggingDisabled().build(); - final StateStore next = ((WrappedStateStore) store).wrappedStore(); + final StateStore next = ((WrappedStateStore) store).wrapped(); assertThat(next, CoreMatchers.<StateStore>equalTo(inner)); } @Test public void shouldHaveCachingStoreWhenEnabled() { final WindowStore<String, String> store = builder.withCachingEnabled().build(); - final StateStore wrapped = ((WrappedStateStore) store).wrappedStore(); + final StateStore wrapped = ((WrappedStateStore) store).wrapped(); assertThat(store, instanceOf(MeteredWindowStore.class)); assertThat(wrapped, instanceOf(CachingWindowStore.class)); } @@ -92,10 +92,10 @@ public class WindowStoreBuilderTest { final WindowStore<String, String> store = builder .withLoggingEnabled(Collections.<String, String>emptyMap()) .build(); - final StateStore wrapped = ((WrappedStateStore) store).wrappedStore(); + final StateStore wrapped = ((WrappedStateStore) store).wrapped(); assertThat(store, instanceOf(MeteredWindowStore.class)); assertThat(wrapped, instanceOf(ChangeLoggingWindowBytesStore.class)); - assertThat(((WrappedStateStore) wrapped).wrappedStore(), CoreMatchers.<StateStore>equalTo(inner)); + assertThat(((WrappedStateStore) wrapped).wrapped(), CoreMatchers.<StateStore>equalTo(inner)); } @Test @@ -104,12 +104,12 @@ public class WindowStoreBuilderTest { .withLoggingEnabled(Collections.<String, String>emptyMap()) .withCachingEnabled() .build(); - final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrappedStore(); - final WrappedStateStore changeLogging = (WrappedStateStore) caching.wrappedStore(); + final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrapped(); + final WrappedStateStore changeLogging = (WrappedStateStore) caching.wrapped(); assertThat(store, instanceOf(MeteredWindowStore.class)); assertThat(caching, instanceOf(CachingWindowStore.class)); assertThat(changeLogging, instanceOf(ChangeLoggingWindowBytesStore.class)); - assertThat(changeLogging.wrappedStore(), CoreMatchers.<StateStore>equalTo(inner)); + assertThat(changeLogging.wrapped(), CoreMatchers.<StateStore>equalTo(inner)); } @Test(expected = NullPointerException.class)