[ https://issues.apache.org/jira/browse/KAFKA-6713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16414323#comment-16414323 ]
Guozhang Wang commented on KAFKA-6713: -------------------------------------- Hi [~cemo] Thanks for reporting this, is very valuable to us. What's puzzles me is why you'd need to implement the {{TypeConverter}} interface, as it is an internal interface and is not supposed to be enforced to users. Could you share more of your code snippet to help me understand better the cumbersomeness. Note that if you are using DSL and trying to implement the customized store, you should only need to implement the {{KeyValueBytesStoreSupplier}}, and the {{KeyValueStore<Bytes, byte[]>}} it generates. The serdes will be auto-handled by the streams library. > Provide an easy way replace store with a custom one on High-Level Streams DSL > ----------------------------------------------------------------------------- > > Key: KAFKA-6713 > URL: https://issues.apache.org/jira/browse/KAFKA-6713 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 1.0.1 > Reporter: Cemalettin Koç > Priority: Major > Labels: streaming-api > > I am trying to use GlobalKTable with a custom store implementation. In my > stores, I would like to store my `Category` entites and I would like to query > them by their name as well. My custom store has some capabilities beyond > `get` such as get by `name`. I also want to get all entries in a hierarchical > way in a lazy fashion. I have other use cases as well. > > In order to accomplish my task I had to implement a custom > `KeyValueBytesStoreSupplier`, `BytesTypeConverter` and > > {code:java} > public class DelegatingByteStore<K, V> implements KeyValueStore<Bytes, > byte[]> { > private BytesTypeConverter<K, V> converter; > private KeyValueStore<K, V> delegated; > public DelegatingByteStore(KeyValueStore<K, V> delegated, > BytesTypeConverter<K, V> converter) { > this.converter = converter; > this.delegated = delegated; > } > @Override > public void put(Bytes key, byte[] value) { > delegated.put(converter.outerKey(key), > converter.outerValue(value)); > } > @Override > public byte[] putIfAbsent(Bytes key, byte[] value) { > V v = delegated.putIfAbsent(converter.outerKey(key), > converter.outerValue(value)); > return v == null ? null : value; > } > ...... > {code} > > Type Converter: > {code:java} > public interface TypeConverter<K, IK, V, IV> { > IK innerKey(final K key); > IV innerValue(final V value); > List<KeyValue<IK, IV>> innerEntries(final List<KeyValue<K, V>> from); > List<KeyValue<K, V>> outerEntries(final List<KeyValue<Bytes, byte[]>> from); > V outerValue(final IV value); > KeyValue<K, V> outerKeyValue(final KeyValue<IK, IV> from); > KeyValue<Bytes, byte[]>innerKeyValue(final KeyValue<K, V> entry); > K outerKey(final IK ik); > } > {code} > > This is unfortunately too cumbersome and hard to maintain. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)