[ 
https://issues.apache.org/jira/browse/KAFKA-6840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16464281#comment-16464281
 ] 

Matthias J. Sax commented on KAFKA-6840:
----------------------------------------

Actually, {{Stores#persistenWindowStore}} does exist: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/Stores.java#L72-L77]

Thus, you should be able to pass in a windowed store into 
{{StreamsBuilder#table()}} using {{Materialized.as(WindowBytesStoreSupplier)}}?

> support windowing in ktable API
> -------------------------------
>
>                 Key: KAFKA-6840
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6840
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 1.1.0
>            Reporter: Boyang Chen
>            Assignee: Boyang Chen
>            Priority: Major
>              Labels: api, needs-kip
>
> The StreamsBuilder provides table() API to materialize a changelog topic into 
> a local key-value store (KTable), which is very convenient. However, current 
> underlying implementation does not support materializing one topic to a 
> windowed key-value store, which in certain cases would be very useful. 
> To make up the gap, we proposed a new API in StreamsBuilder that could get a 
> windowed Ktable.
> The table() API in StreamsBuilder looks like this:
> public synchronized <K, V> KTable<K, V> table(final String topic,
>                                                   final Consumed<K, V> 
> consumed,
>                                                   final Materialized<K, V, 
> KeyValueStore<Bytes, byte[]>> materialized) {
>         Objects.requireNonNull(topic, "topic can't be null");
>         Objects.requireNonNull(consumed, "consumed can't be null");
>         Objects.requireNonNull(materialized, "materialized can't be null");
>         
> materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde);
>         return internalStreamsBuilder.table(topic,
>                                             new ConsumedInternal<>(consumed),
>                                             new 
> MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-"));
>     }
>  
> Where we could see that the store type is given as KeyValueStore. There is no 
> flexibility to change it to WindowStore.
>  
> To maintain compatibility of the existing API, we have two options to define 
> a new API:
> 1.Overload existing KTable struct
> public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String 
> topic,
>                                                   final Consumed<K, V> 
> consumed,
>                                                   final Materialized<K, V, 
> WindowStore<Bytes, byte[]>> materialized);
>  
> This could give developer an alternative to use windowed table instead. 
> However, this implies that we need to make sure all the KTable logic still 
> works as expected, such as join, aggregation, etc, so the challenge would be 
> making sure all current KTable logics work.
>  
> 2.Define a new type called WindowedKTable
> public synchronized <K, V> WindowedKTable<K, V> windowedTable(final String 
> topic,
>                                                   final Consumed<K, V> 
> consumed,
>                                                   final Materialized<K, V, 
> WindowStore<Bytes, byte[]>> materialized);
> The benefit of doing this is that we don’t need to worry about the existing 
> functionality of KTable. However, the cost is to introduce redundancy of 
> common operation logic. When upgrading common functionality, we need to take 
> care of both types.
> We could fill in more details in the KIP. Right now I would like to hear some 
> feedbacks on the two approaches, thank you!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to