[
https://issues.apache.org/jira/browse/KAFKA-5650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16149020#comment-16149020
]
ASF GitHub Bot commented on KAFKA-5650:
---------------------------------------
GitHub user dguy opened a pull request:
https://github.com/apache/kafka/pull/3767
KAFKA-5650: add StateStoreBuilder interface and implementations
Part of KIP-182
- Add `StateStoreBuilder` interface and `WindowStateStoreBuilder`,
`KeyValueStateStoreBuilder`, and `SessionStateStoreBuilder` implementations
- Add `StoreSupplier`, `WindowBytesStoreSupplier`,
`KeyValueBytesStoreSupplier`, `SessionBytesStoreSupplier` interfaces and
implementations
- Add new methods to `Stores` to create the newly added `StoreSupplier` and
`StateStoreBuilder` implementations
- Update `Topology` and `InternalTopology` to use the interfaces
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/dguy/kafka kafka-5650
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/kafka/pull/3767.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3767
----
commit ee543690317581dc09d3d3f06d52df0685501960
Author: Damian Guy <[email protected]>
Date: 2017-08-25T16:10:35Z
add StateStoreBuilder interface and implementations
----
> Provide a simple way for custom storage engines to use streams wrapped stores
> (KIP-182)
> ---------------------------------------------------------------------------------------
>
> Key: KAFKA-5650
> URL: https://issues.apache.org/jira/browse/KAFKA-5650
> Project: Kafka
> Issue Type: Sub-task
> Reporter: Damian Guy
> Assignee: Damian Guy
>
> As per KIP-182:
> A new interface will be added:
> {code}
> /**
> * Implementations of this will provide the ability to wrap a given StateStore
> * with or without caching/loggging etc.
> */
> public interface StateStoreBuilder<T extends StateStore> {
>
> StateStoreBuilder<T> withCachingEnabled();
> StateStoreBuilder<T> withCachingDisabled();
> StateStoreBuilder<T> withLoggingEnabled(Map<String, String> config);
> StateStoreBuilder<T> withLoggingDisabled();
> T build();
> }
> {code}
> This interface will be used to wrap stores with caching, logging etc.
> Additionally some convenience methods on the {{Stores}} class:
> {code}
> public static <K, V> StateStoreSupplier<KeyValueStore<K, V>>
> persistentKeyValueStore(final String name,
>
> final Serde<K> keySerde,
>
> final Serde<V> valueSerde)
>
> public static <K, V> StateStoreSupplier<KeyValueStore<K, V>>
> inMemoryKeyValueStore(final String name,
>
> final Serde<K> keySerde,
>
> final Serde<V> valueSerde)
>
> public static <K, V> StateStoreSupplier<KeyValueStore<K, V>> lruMap(final
> String name,
> final int
> capacity,
> final
> Serde<K> keySerde,
> final
> Serde<V> valueSerde)
>
> public static <K, V> StateStoreSupplier<WindowStore<K, V>>
> persistentWindowStore(final String name,
>
> final Windows windows,
>
> final Serde<K> keySerde,
>
> final Serde<V> valueSerde)
>
> public static <K, V> StateStoreSupplier<SessionStore<K, V>>
> persistentSessionStore(final String name,
>
> final SessionWindows windows,
>
> final Serde<K> keySerde,
>
> final Serde<V> valueSerde)
>
> /**
> * The following methods are for use with the PAPI. They allow building of
> StateStores that can be wrapped with
> * caching, logging, and any other convenient wrappers provided by the
> KafkaStreams library
> */
> public <K, V> StateStoreBuilder<WindowStore<K, V>> windowStoreBuilder(final
> StateStoreSupplier<WindowStore<K, V>> supplier)
>
> public <K, V> StateStoreBuilder<KeyValueStore<K, V>>
> keyValueStoreBuilder(final StateStoreSupplier<KeyValueStore<K, V>> supplier)
>
> public <K, V> StateStoreBuilder<SessionStore<K, V>> sessionStoreBuilder(final
> StateStoreSupplier<SessionStore<K, V>> supplier)
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)