[ 
https://issues.apache.org/jira/browse/KAFKA-6953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16492043#comment-16492043
 ] 

Flavio Stutz commented on KAFKA-6953:
-------------------------------------

"... but even as of today, you can do a global aggregation IMHO."
*We can surely do global aggregation using Interactive Queries or single 
partitioning. This is what you mean, right?*


"This really depends on the use case. I think there are two scenarios:
 # You cannot pre-aggregate (for example computing median)...
 # You can pre-aggregate (for example, sum, average, max)...

Does this make sense?"

*Yes. The proposed feature would be useful on scenarios where you* *can* 
*pre-aggregate.* 
*It could be useful in scenarios where you accept skipping some data in order 
to avoid unnecessary pressure too. For example, in signal processing I already 
faced situations where I had a lot of data coming in, but needed to process 
just  'temporal snapshots' of it, creating a temporal normalization barrier. In 
our web analytics case, we can use this to avoid processing all clicks of a 
user if he is rapidly clicking on various items and I am only interested on his 
last click of a series of 5 clicks on the last 5 seconds. For this I could keep 
the last clicked item in a local storage and at each 5s the scheduled KTable 
would trigger a more complex Graph for processing the last clicked item. In 
this case, the scheduled KTable source could trigger tuples only from the local 
storage, not from all local storages.*


"The REST proxy works well, but I need to maintain an external database for the 
final aggregation. It would be great if I could use KStream capabilities for 
this job.

Not sure what you mean by this."

*I was describing a scenario where you achieve global aggregation by exposing a 
REST proxy in each application instance so that you can query each instance 
using interactive queries to get each local storage pre-aggregation results and 
send this data to an external database, so that you can perform the final 
global aggregation. Using an external element requires more code if you want to 
put back the global aggregated value back to Kafka Streams for further 
processing, as happens in our case because we use the global aggregated value 
in other Graphs.*


"We are always happy to get contributions :) However, we should first agree on 
a design...."
*Sure, that's why I am trying to make sense here :)*

 

> [Streams] Schedulable KTable as Graph source (for minimizing aggregation 
> pressure)
> ----------------------------------------------------------------------------------
>
>                 Key: KAFKA-6953
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6953
>             Project: Kafka
>          Issue Type: New Feature
>          Components: streams
>            Reporter: Flavio Stutz
>            Priority: Major
>
> === PROBLEM ===
> We have faced the following scenario/problem in a lot of situations with 
> KStreams:
>    - Huge incoming data being processed by numerous application instances
>    - Need to aggregate, or count the overall data as a single value 
> (something like "count the total number of messages that has been processed 
> among all distributed instances")
>    - The challenge here is to manage this kind of situation without any 
> bottlenecks. We don't need the overall aggregation of all instances states at 
> each processed message, so it is possible to store the partial aggregations 
> on local stores and, at time to time, query those states and aggregate them, 
> avoiding bottlenecks.
> Some ways we KNOW it wouldn't work because of bottlenecks:
>     - Sink all instances local counter/aggregation result to a Topic with a 
> single partition so that we could have another Graph with a single instance 
> that could aggregate all results
>          - In this case, if I had 500 instances processing 1000/s each (with 
> no bottlenecks), I would have a single partition topic with 500k messages/s 
> for my single aggregating instance to process that much messages (IMPOSSIBLE 
> bottleneck)
> === TRIALS ===
> These are some ways we managed to do this:
>    - Expose a REST endpoint so that Prometheus could extract local metrics of 
> each application instance's state stores and them calculate the total count 
> on Prometheus using queries
>          - we don't like this much because we believe KStreams was meant to 
> INPUT and OUTPUT data using Kafka Topics for simplicity and power
>    - Create a scheduled Punctuate at the end of the Graph so that we can 
> query (using getAllMetadata) all other instances's state store counters, sum 
> them all and them publish to another Kafka Topic from time to time.
>           - For this to work we created a way so that only one application 
> instance's Punctuate algorithm would perform the calculations (something like 
> a master election through instance ids and metadata)
> === PROPOSAL ===
> Create a new DSL Source with the following characteristics:
>    - Source parameters: "scheduled time" (using cron's like config), "state 
> store name", bool "from all application instances"
>    - Behavior: At the desired time, query all K,V tuples from the state store 
> and source those messages to the Graph
>           - If "from all application instances" is true, query the tuples 
> from all application instances state stores and source them all, concatenated
>    - This is a way to create a "timed aggregation barrier" to avoid 
> bottlenecks. With this we could enhance the ability of KStreams to better 
> handle the CAP Theorem characteristics, so that one could choose to have 
> Consistency over Availability.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to