[ 
https://issues.apache.org/jira/browse/KAFKA-6840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16463304#comment-16463304
 ] 

Boyang Chen commented on KAFKA-6840:
------------------------------------

I think alternatively we could extend class `Materialized` with those configs, 
by passing in retention, numSegments etc, how does that sound? I will keep 
trying this way now.

> support windowing in ktable API
> -------------------------------
>
>                 Key: KAFKA-6840
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6840
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 1.1.0
>            Reporter: Boyang Chen
>            Assignee: Boyang Chen
>            Priority: Major
>              Labels: api, needs-kip
>
> The StreamsBuilder provides table() API to materialize a changelog topic into 
> a local key-value store (KTable), which is very convenient. However, current 
> underlying implementation does not support materializing one topic to a 
> windowed key-value store, which in certain cases would be very useful. 
> To make up the gap, we proposed a new API in StreamsBuilder that could get a 
> windowed Ktable.
> The table() API in StreamsBuilder looks like this:
> public synchronized <K, V> KTable<K, V> table(final String topic,
>                                                   final Consumed<K, V> 
> consumed,
>                                                   final Materialized<K, V, 
> KeyValueStore<Bytes, byte[]>> materialized) {
>         Objects.requireNonNull(topic, "topic can't be null");
>         Objects.requireNonNull(consumed, "consumed can't be null");
>         Objects.requireNonNull(materialized, "materialized can't be null");
>         
> materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde);
>         return internalStreamsBuilder.table(topic,
>                                             new ConsumedInternal<>(consumed),
>                                             new 
> MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-"));
>     }
>  
> Where we could see that the store type is given as KeyValueStore. There is no 
> flexibility to change it to WindowStore.
>  
> To maintain compatibility of the existing API, we have two options to define 
> a new API:
> 1.Overload existing KTable struct
> public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String 
> topic,
>                                                   final Consumed<K, V> 
> consumed,
>                                                   final Materialized<K, V, 
> WindowStore<Bytes, byte[]>> materialized);
>  
> This could give developer an alternative to use windowed table instead. 
> However, this implies that we need to make sure all the KTable logic still 
> works as expected, such as join, aggregation, etc, so the challenge would be 
> making sure all current KTable logics work.
>  
> 2.Define a new type called WindowedKTable
> public synchronized <K, V> WindowedKTable<K, V> windowedTable(final String 
> topic,
>                                                   final Consumed<K, V> 
> consumed,
>                                                   final Materialized<K, V, 
> WindowStore<Bytes, byte[]>> materialized);
> The benefit of doing this is that we don’t need to worry about the existing 
> functionality of KTable. However, the cost is to introduce redundancy of 
> common operation logic. When upgrading common functionality, we need to take 
> care of both types.
> We could fill in more details in the KIP. Right now I would like to hear some 
> feedbacks on the two approaches, thank you!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to