[ 
https://issues.apache.org/jira/browse/KAFKA-3101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-3101 stopped by Bill Bejeck.
------------------------------------------
> Optimize Aggregation Outputs
> ----------------------------
>
>                 Key: KAFKA-3101
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3101
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Bill Bejeck
>              Labels: architecture
>             Fix For: 0.10.1.0
>
>
> Today we emit one output record for each incoming message for Table / 
> Windowed Stream Aggregations. For example, say we have a sequence of 
> aggregate outputs computed from the input stream (assuming there is no agg 
> value for this key before):
> V1, V2, V3, V4, V5
> Then the aggregator will output the following sequence of Change<newValue, 
> oldValue>:
> <V1, null>, <V2, V1>, <V3, V2>, <V4, V3>, <V5, V4>
> where could cost a lot of CPU overhead computing the intermediate results. 
> Instead if we can let the underlying state store to "remember" the last 
> emitted old value, we can reduce the number of emits based on some configs. 
> More specifically, we can add one more field in the KV store engine storing 
> the last emitted old value, which only get updated when we emit to the 
> downstream processor. For example:
> At Beginning:                 
> Store: key => empty (no agg values yet)
> V1 computed:         
> Update Both in Store: key => (V1, V1),     Emit <V1, null>
> V2 computed:         
> Update NewValue in Store: key => (V2, V1),     No Emit
> V3 computed:         
> Update NewValue in Store: key => (V3, V1),     No Emit
> V4 computed:         
> Update Both in Store: key => (V4, V4),     Emit <V4, V1>
> V5 computed:         
> Update NewValue in Store: key => (V5, V4),     No Emit
> One more thing to consider is that, we need a "closing" time control on the 
> not-yet-emitted keys; when some time has elapsed (or the window is to be 
> closed), we need to check for any key if their current materialized pairs 
> have not been emitted (for example <V5, V4> in the above example). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to