[ https://issues.apache.org/jira/browse/KAFKA-6713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16414606#comment-16414606 ]
Guozhang Wang commented on KAFKA-6713: -------------------------------------- Hi [~cemo], I've read your code, and here are some follow-up questions / clarifications: 1. StreamsBuilder.globalTable is expecting a {{Materialized<K, V, KeyValueStore<Bytes, byte[]>>}} parameter and hence with `Materialized.as` one should only pass in a {{KeyValueBytesStoreSupplier}} that generates a {{KeyValueStore<Bytes, byte[]>}}. So you do not need to template your {{DelegatingByteStore}} but just let it to use a `{{KeyValueStore<Bytes, byte[]> delegated}} internally. Using a converter after the serde will unnecessarily calling serdes three times other than one: when a <K, V> pair is passed in, you woud first use the key/value serde to serialize it into bytes, and then deserialize it in your {{DelegatingByteStore}} implementation with the converter into <K, V> again, and then when with the in-memory <K, V> store it will once again serialize it into bytes before putting to cache. 2. In your code you are re-using the same {{DelegatingByteStore}} in your {{KeyValueBytesStoreSupplier}}, i.e. whenever `get()` is called it will always return the same store object. Is it intentional? Note that although it is fine for now since we will only call `get()` once across all threads for global store, this is an internal implementation detail that maybe changed. To be safer I'd suggest you generate a new object in your supplier per each `get()` call. 3. About your invalidation use cases, I'm not sure I can follow completely... could you elaborate a bit more? > Provide an easy way replace store with a custom one on High-Level Streams DSL > ----------------------------------------------------------------------------- > > Key: KAFKA-6713 > URL: https://issues.apache.org/jira/browse/KAFKA-6713 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 1.0.1 > Reporter: Cemalettin Koç > Priority: Major > Labels: streaming-api > Attachments: BytesTypeConverter.java, DelegatingByteStore.java, > TypeConverter.java > > > I am trying to use GlobalKTable with a custom store implementation. In my > stores, I would like to store my `Category` entites and I would like to query > them by their name as well. My custom store has some capabilities beyond > `get` such as get by `name`. I also want to get all entries in a hierarchical > way in a lazy fashion. I have other use cases as well. > > In order to accomplish my task I had to implement a custom > `KeyValueBytesStoreSupplier`, `BytesTypeConverter` and > > {code:java} > public class DelegatingByteStore<K, V> implements KeyValueStore<Bytes, > byte[]> { > private BytesTypeConverter<K, V> converter; > private KeyValueStore<K, V> delegated; > public DelegatingByteStore(KeyValueStore<K, V> delegated, > BytesTypeConverter<K, V> converter) { > this.converter = converter; > this.delegated = delegated; > } > @Override > public void put(Bytes key, byte[] value) { > delegated.put(converter.outerKey(key), > converter.outerValue(value)); > } > @Override > public byte[] putIfAbsent(Bytes key, byte[] value) { > V v = delegated.putIfAbsent(converter.outerKey(key), > converter.outerValue(value)); > return v == null ? null : value; > } > ...... > {code} > > Type Converter: > {code:java} > public interface TypeConverter<K, IK, V, IV> { > IK innerKey(final K key); > IV innerValue(final V value); > List<KeyValue<IK, IV>> innerEntries(final List<KeyValue<K, V>> from); > List<KeyValue<K, V>> outerEntries(final List<KeyValue<Bytes, byte[]>> from); > V outerValue(final IV value); > KeyValue<K, V> outerKeyValue(final KeyValue<IK, IV> from); > KeyValue<Bytes, byte[]>innerKeyValue(final KeyValue<K, V> entry); > K outerKey(final IK ik); > } > {code} > > This is unfortunately too cumbersome and hard to maintain. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)