[ 
https://issues.apache.org/jira/browse/KAFKA-6713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16414606#comment-16414606
 ] 

Guozhang Wang commented on KAFKA-6713:
--------------------------------------

Hi [~cemo], I've read your code, and here are some follow-up questions / 
clarifications:

1. StreamsBuilder.globalTable is expecting a {{Materialized<K, V, 
KeyValueStore<Bytes, byte[]>>}} parameter and hence with `Materialized.as` one 
should only pass in a {{KeyValueBytesStoreSupplier}} that generates a 
{{KeyValueStore<Bytes, byte[]>}}. So you do not need to template your 
{{DelegatingByteStore}} but just let it to use a `{{KeyValueStore<Bytes, 
byte[]> delegated}} internally. Using a converter after the serde will 
unnecessarily calling serdes three times other than one: when a <K, V> pair is 
passed in, you woud first use the key/value serde to serialize it into bytes, 
and then deserialize it in your {{DelegatingByteStore}} implementation with the 
converter into <K, V> again, and then when with the in-memory <K, V> store it 
will once again serialize it into bytes before putting to cache.

2. In your code you are re-using the same {{DelegatingByteStore}} in your 
{{KeyValueBytesStoreSupplier}}, i.e. whenever `get()` is called it will always 
return the same store object. Is it intentional? Note that although it is fine 
for now since we will only call `get()` once across all threads for global 
store, this is an internal implementation detail that maybe changed. To be 
safer I'd suggest you generate a new object in your supplier per each `get()` 
call.

3. About your invalidation use cases, I'm not sure I can follow completely... 
could you elaborate a bit more?

> Provide an easy way replace store with a custom one on High-Level Streams DSL
> -----------------------------------------------------------------------------
>
>                 Key: KAFKA-6713
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6713
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 1.0.1
>            Reporter: Cemalettin Koç
>            Priority: Major
>              Labels: streaming-api
>         Attachments: BytesTypeConverter.java, DelegatingByteStore.java, 
> TypeConverter.java
>
>
> I am trying to use GlobalKTable with a custom store implementation. In my 
> stores, I would like to store my `Category` entites and I would like to query 
> them by their name as well. My custom store has some capabilities beyond 
> `get` such as get by `name`. I also want to get all entries in a hierarchical 
> way in a lazy fashion. I have other use cases as well.
>  
> In order to accomplish my task I had to implement  a custom 
> `KeyValueBytesStoreSupplier`,  `BytesTypeConverter` and 
>  
> {code:java}
> public class DelegatingByteStore<K, V> implements KeyValueStore<Bytes, 
> byte[]> {
>   private BytesTypeConverter<K, V> converter;
>   private KeyValueStore<K, V> delegated;
>   public DelegatingByteStore(KeyValueStore<K, V> delegated, 
> BytesTypeConverter<K, V> converter) {
>     this.converter = converter;
>     this.delegated = delegated;
>   }
>   @Override
>   public void put(Bytes key, byte[] value) {
>     delegated.put(converter.outerKey(key),
>                   converter.outerValue(value));
>   }
>   @Override
>   public byte[] putIfAbsent(Bytes key, byte[] value) {
>     V v = delegated.putIfAbsent(converter.outerKey(key),
>                                 converter.outerValue(value));
>     return v == null ? null : value;
>   }
>   ......
> {code}
>  
>  Type Converter:
> {code:java}
> public interface TypeConverter<K, IK, V, IV> {
>   IK innerKey(final K key);
>   IV innerValue(final V value);
>   List<KeyValue<IK, IV>> innerEntries(final List<KeyValue<K, V>> from);
>   List<KeyValue<K, V>> outerEntries(final List<KeyValue<Bytes, byte[]>> from);
>   V outerValue(final IV value);
>   KeyValue<K, V> outerKeyValue(final KeyValue<IK, IV> from);
>   KeyValue<Bytes, byte[]>innerKeyValue(final KeyValue<K, V> entry);
>   K outerKey(final IK ik);
> }
> {code}
>  
> This is unfortunately too cumbersome and hard to maintain.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to