Moved this KIP into status "inactive". Feel free to resume and any time.
-Matthias On 7/15/18 6:55 PM, Matthias J. Sax wrote: > I think it would make a lot of sense to provide a simple DSL abstraction. > > Something like: > > KStream stream = ... > KTable count = stream.count(); > > The missing groupBy() or grouByKey() class indicates a global counting > operation. The JavaDocs should highlight the impact. > > One open question is, what key we want to use for the result KTable? > > Also, the details about optional parameters like `Materialized` need to > be discussed in details. > > > > -Matthias > > > On 7/6/18 2:43 PM, Guozhang Wang wrote: >> That's a lot of email exchanges for me to catch up :) >> >> My original proposed alternative solution is indeed relying on >> pre-aggregate before sending to the single-partition topic, so that the >> traffic on that single-partition topic would not be huge (I called it >> partial-aggregate but the intent was the same). >> >> What I was thinking is that, given such a scenario could be common, if >> we've decided to go down this route should we provide a new API that wrap's >> John's proposed topology (right now with KIP-328 users still need to >> leverage this trick manually): >> >> >> ---------- >> >> final KStream<String, String> siteEvents = builder.stream("/site-events"); >> >> final KStream<Integer, Integer> keyedByPartition = siteEvents.transform(/* >> generate KeyValue(key, 1) for the pre-aggregate*/); >> >> final KTable<Integer, Long> countsByPartition = >> keyedByPartition.groupByKey().count(); /* pre-aggregate */ >> >> final KGroupedTable<String, Long> singlePartition = >> countsByPartition.groupBy((key, value) -> new KeyValue<>("ALL", value)); >> /* sent the suppressed pre-aggregate values to the single partition topic >> */ >> >> final KTable<String, Long> totalCount = singlePartition.reduce((l, r) -> l + >> r, (l, r) -> l - r); /* read from the single partition topic, do reduce >> on the data*/ >> >> ---------- >> >> Note that if we wrap them all into a new operator, users would need to >> provide two functions, for the aggregate and for the final "reduce" (in my >> previous email I called it merger function, but for the same intent). >> >> >> >> Guozhang >> >> >> >> On Thu, Jul 5, 2018 at 3:38 PM, John Roesler <j...@confluent.io> wrote: >> >>> Ok, I didn't get quite as far as I hoped, and several things are far from >>> ready, but here's what I have so far: >>> https://github.com/apache/kafka/pull/5337 >>> >>> The "unit" test works, and is a good example of how you should expect it to >>> behave: >>> https://github.com/apache/kafka/pull/5337/files#diff- >>> 2fdec52b9cc3d0e564f0c12a199bed77 >>> >>> I have one working integration test, but it's slow going getting the timing >>> right, so no promises of any kind ;) >>> >>> Let me know what you think! >>> >>> Thanks, >>> -John >>> >>> On Thu, Jul 5, 2018 at 8:39 AM John Roesler <j...@confluent.io> wrote: >>> >>>> Hey Flávio, >>>> >>>> Thanks! I haven't got anything usable yet, but I'm working on it now. I'm >>>> hoping to push up my branch by the end of the day. >>>> >>>> I don't know if you've seen it but Streams actually already has something >>>> like this, in the form of caching on materialized stores. If you pass in >>> a >>>> "Materialized.withCachingEnabled()", you should be able to get a POC >>>> working by setting the max cache size pretty high and setting the commit >>>> interval for your desired rate: >>>> https://docs.confluent.io/current/streams/developer- >>> guide/memory-mgmt.html#streams-developer-guide-memory-management >>>> . >>>> >>>> There are a couple of cases in joins and whatnot where it doesn't work, >>>> but for the aggregations we discussed, it should. The reason for KIP-328 >>> is >>>> to provide finer control and hopefully a more straightforward API. >>>> >>>> Let me know if that works, and I'll drop a message in here when I create >>>> the draft PR for KIP-328. I'd really appreciate your feedback. >>>> >>>> Thanks, >>>> -John >>>> >>>> On Wed, Jul 4, 2018 at 10:17 PM flaviost...@gmail.com < >>>> flaviost...@gmail.com> wrote: >>>> >>>>> John, that was fantastic, man! >>>>> Have you built any custom implementation of your KIP in your machine so >>>>> that I could test it out here? I wish I could test it out. >>>>> If you need any help implementing this feature, please tell me. >>>>> >>>>> Thanks. >>>>> >>>>> -Flávio Stutz >>>>> >>>>> >>>>> >>>>> >>>>> On 2018/07/03 18:04:52, John Roesler <j...@confluent.io> wrote: >>>>>> Hi Flávio, >>>>>> Thanks! I think that we can actually do this, but the API could be >>>>> better. >>>>>> I've included Java code below, but I'll copy and modify your example >>> so >>>>>> we're on the same page. >>>>>> >>>>>> EXERCISE 1: >>>>>> - The case is "total counting of events for a huge website" >>>>>> - Tasks from Application A will have something like: >>>>>> .stream(/site-events) >>>>>> .transform( re-key s.t. the new key is the partition id) >>>>>> .groupByKey() // you have to do this before count >>>>>> .count() >>>>>> // you explicitly published to a one-partition topic here, >>> but >>>>>> it's actually sufficient just >>>>>> // to re-group onto one key. You could name and pre-create >>> the >>>>>> intermediate topic here, >>>>>> // but you don't need a separate application for the final >>>>>> aggregation. >>>>>> .groupBy((partitionId, partialCount) -> new KeyValue("ALL", >>>>>> partialCount)) >>>>>> .aggregate(sum up the partialCounts) >>>>>> .publish(/counter-total) >>>>>> >>>>>> I've left out the suppressions, but they would go right after the >>>>> count() >>>>>> and the aggregate(). >>>>>> >>>>>> With this program, you don't have to worry about the >>> double-aggregation >>>>> you >>>>>> mentioned in the last email. The KTable produced by the first count() >>>>> will >>>>>> maintain the correct count per partition. If the value changes for any >>>>>> partition, it'll emit a retraction of the old value and then the new >>>>> value >>>>>> downstream, so that the final aggregation can update itself properly. >>>>>> >>>>>> I think we can optimize both the execution and the programability by >>>>> adding >>>>>> a "global aggregation" concept. But In principle, it seems like this >>>>> usage >>>>>> of the current API will support your use case. >>>>>> >>>>>> Once again, though, this is just to present an alternative. I haven't >>>>> done >>>>>> the math on whether your proposal would be more efficient. >>>>>> >>>>>> Thanks, >>>>>> -John >>>>>> >>>>>> Here's the same algorithm written in Java: >>>>>> >>>>>> final KStream<String, String> siteEvents = >>>>> builder.stream("/site-events"); >>>>>> >>>>>> // here we re-key the events so that the key is actually the partition >>>>> id. >>>>>> // we don't need the value to do a count, so I just set it to "1". >>>>>> final KStream<Integer, Integer> keyedByPartition = >>>>> siteEvents.transform(() >>>>>> -> new Transformer<String, String, KeyValue<Integer, Integer>>() { >>>>>> private ProcessorContext context; >>>>>> >>>>>> @Override >>>>>> public void init(final ProcessorContext context) { >>>>>> this.context = context; >>>>>> } >>>>>> >>>>>> @Override >>>>>> public KeyValue<Integer, Integer> transform(final String key, >>> final >>>>>> String value) { >>>>>> return new KeyValue<>(context.partition(), 1); >>>>>> } >>>>>> }); >>>>>> >>>>>> // Note that we can't do "count()" on a KStream, we have to group it >>>>> first. >>>>>> I'm grouping by the key, so it will produce the count for each key. >>>>>> // Since the key is actually the partition id, it will produce the >>>>>> pre-aggregated count per partition. >>>>>> // Note that the result is a KTable<PartitionId,Count>. It'll always >>>>>> contain the most recent count for each partition. >>>>>> final KTable<Integer, Long> countsByPartition = >>>>>> keyedByPartition.groupByKey().count(); >>>>>> >>>>>> // Now we get ready for the final roll-up. We re-group all the >>>>> constituent >>>>>> counts >>>>>> final KGroupedTable<String, Long> singlePartition = >>>>>> countsByPartition.groupBy((key, value) -> new KeyValue<>("ALL", >>> value)); >>>>>> >>>>>> final KTable<String, Long> totalCount = singlePartition.reduce((l, r) >>>>> -> l >>>>>> + r, (l, r) -> l - r); >>>>>> >>>>>> totalCount.toStream().foreach((k, v) -> { >>>>>> // k is always "ALL" >>>>>> // v is always the most recent total value >>>>>> System.out.println("The total event count is: " + v); >>>>>> }); >>>>>> >>>>>> >>>>>> On Tue, Jul 3, 2018 at 9:21 AM flaviost...@gmail.com < >>>>> flaviost...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Great feature you have there! >>>>>>> >>>>>>> I'll try to exercise here how we would achieve the same functional >>>>>>> objectives using your KIP: >>>>>>> >>>>>>> EXERCISE 1: >>>>>>> - The case is "total counting of events for a huge website" >>>>>>> - Tasks from Application A will have something like: >>>>>>> .stream(/site-events) >>>>>>> .count() >>>>>>> .publish(/single-partitioned-topic-with-count-partials) >>>>>>> - The published messages will be, for example: >>>>>>> ["counter-task1", 2345] >>>>>>> ["counter-task2", 8495] >>>>>>> ["counter-task3", 4839] >>>>>>> - Single Task from Application B will have something like: >>>>>>> .stream(/single-partitioned-topic-with-count-partials) >>>>>>> .aggregate(by messages whose key starts with "counter") >>>>>>> .publish(/counter-total) >>>>>>> - FAIL HERE. How would I know what is the overall partitions? >>> Maybe >>>>> two >>>>>>> partials for the same task will arrive before other tasks and it >>> maybe >>>>>>> aggregated twice. >>>>>>> >>>>>>> I tried to think about using GlobalKTables, but I didn't get an easy >>>>> way >>>>>>> to aggregate the keys from that table. Do you have any clue? >>>>>>> >>>>>>> Thanks. >>>>>>> >>>>>>> -Flávio Stutz >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> /partial-counters-to-single-partitioned-topic >>>>>>> >>>>>>> On 2018/07/02 20:03:57, John Roesler <j...@confluent.io> wrote: >>>>>>>> Hi Flávio, >>>>>>>> >>>>>>>> Thanks for the KIP. I'll apologize that I'm arriving late to the >>>>>>>> discussion. I've tried to catch up, but I might have missed some >>>>> nuances. >>>>>>>> >>>>>>>> Regarding KIP-328, the idea is to add the ability to suppress >>>>>>> intermediate >>>>>>>> results from all KTables, not just windowed ones. I think this >>> could >>>>>>>> support your use case in combination with the strategy that >>> Guozhang >>>>>>>> proposed of having one or more pre-aggregation steps that >>>>> ultimately push >>>>>>>> into a single-partition topic for final aggregation. Suppressing >>>>>>>> intermediate results would solve the problem you noted that today >>>>>>>> pre-aggregating doesn't do much to staunch the flow up updates. >>>>>>>> >>>>>>>> I'm not sure if this would be good enough for you overall; I just >>>>> wanted >>>>>>> to >>>>>>>> clarify the role of KIP-328. >>>>>>>> In particular, the solution you mentioned is to have the >>> downstream >>>>>>> KTables >>>>>>>> actually query the upstream ones to compute their results. I'm not >>>>> sure >>>>>>>> whether it's more efficient to do these queries on the schedule, >>> or >>>>> to >>>>>>> have >>>>>>>> the upstream tables emit their results, on the same schedule. >>>>>>>> >>>>>>>> What do you think? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> -John >>>>>>>> >>>>>>>> On Sun, Jul 1, 2018 at 10:03 PM flaviost...@gmail.com < >>>>>>> flaviost...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> For what I understood, that KIP is related to how KStreams will >>>>> handle >>>>>>>>> KTable updates in Windowed scenarios to optimize resource usage. >>>>>>>>> I couldn't see any specific relation to this KIP. Had you? >>>>>>>>> >>>>>>>>> -Flávio Stutz >>>>>>>>> >>>>>>>>> >>>>>>>>> On 2018/06/29 18:14:46, "Matthias J. Sax" < >>> matth...@confluent.io> >>>>>>> wrote: >>>>>>>>>> Flavio, >>>>>>>>>> >>>>>>>>>> thanks for cleaning up the KIP number collision. >>>>>>>>>> >>>>>>>>>> With regard to KIP-328 >>>>>>>>>> ( >>>>>>>>> >>>>>>> >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>> 328%3A+Ability+to+suppress+updates+for+KTables >>>>>>>>> ) >>>>>>>>>> I am wondering how both relate to each other? >>>>>>>>>> >>>>>>>>>> Any thoughts? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -Matthias >>>>>>>>>> >>>>>>>>>> On 6/29/18 10:23 AM, flaviost...@gmail.com wrote: >>>>>>>>>>> Just copying a follow up from another thread to here (sorry >>>>> about >>>>>>> the >>>>>>>>> mess): >>>>>>>>>>> >>>>>>>>>>> From: Guozhang Wang <wangg...@gmail.com> >>>>>>>>>>> Subject: Re: [DISCUSS] KIP-323: Schedulable KTable as Graph >>>>> source >>>>>>>>>>> Date: 2018/06/25 22:24:17 >>>>>>>>>>> List: dev@kafka.apache.org >>>>>>>>>>> >>>>>>>>>>> Flávio, thanks for creating this KIP. >>>>>>>>>>> >>>>>>>>>>> I think this "single-aggregation" use case is common enough >>>>> that we >>>>>>>>> should >>>>>>>>>>> consider how to efficiently supports it: for example, for >>> KSQL >>>>>>> that's >>>>>>>>> built >>>>>>>>>>> on top of Streams, we've seen lots of query statements whose >>>>>>> return is >>>>>>>>>>> expected a single row indicating the "total aggregate" etc. >>>>> See >>>>>>>>>>> https://github.com/confluentinc/ksql/issues/430 for >>> details. >>>>>>>>>>> >>>>>>>>>>> I've not read through >>>>>>> https://issues.apache.org/jira/browse/KAFKA-6953, >>>>>>>>> but >>>>>>>>>>> I'm wondering if we have discussed the option of supporting >>>>> it in a >>>>>>>>>>> "pre-aggregate" manner: that is we do partial aggregates on >>>>>>> parallel >>>>>>>>> tasks, >>>>>>>>>>> and then sends the partial aggregated value via a single >>> topic >>>>>>>>> partition >>>>>>>>>>> for the final aggregate, to reduce the traffic on that >>> single >>>>>>>>> partition and >>>>>>>>>>> hence the final aggregate workload. >>>>>>>>>>> Of course, for non-commutative aggregates we'd probably need >>>>> to >>>>>>> provide >>>>>>>>>>> another API in addition to aggregate, like the `merge` >>>>> function for >>>>>>>>>>> session-based aggregates, to let users customize the >>>>> operations of >>>>>>>>> merging >>>>>>>>>>> two partial aggregates into a single partial aggregate. >>>>> What's its >>>>>>>>> pros and >>>>>>>>>>> cons compared with the current proposal? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Guozhang >>>>>>>>>>> On 2018/06/26 18:22:27, Flávio Stutz <flaviost...@gmail.com >>>> >>>>>>> wrote: >>>>>>>>>>>> Hey, guys, I've just created a new KIP about creating a new >>>>> DSL >>>>>>> graph >>>>>>>>>>>> source for realtime partitioned consolidations. >>>>>>>>>>>> >>>>>>>>>>>> We have faced the following scenario/problem in a lot of >>>>>>> situations >>>>>>>>> with >>>>>>>>>>>> KStreams: >>>>>>>>>>>> - Huge incoming data being processed by numerous >>>>> application >>>>>>>>> instances >>>>>>>>>>>> - Need to aggregate different fields whose records span >>>>> all >>>>>>> topic >>>>>>>>>>>> partitions (something like “total amount spent by people >>>>> aged > 30 >>>>>>>>> yrs” >>>>>>>>>>>> when processing a topic partitioned by userid). >>>>>>>>>>>> >>>>>>>>>>>> The challenge here is to manage this kind of situation >>>>> without any >>>>>>>>>>>> bottlenecks. We don't need the “global aggregation” to be >>>>>>> processed >>>>>>>>> at each >>>>>>>>>>>> incoming message. On a scenario of 500 instances, each >>>>> handling 1k >>>>>>>>>>>> messages/s, any single point of aggregation (single >>>>> partitioned >>>>>>>>> topics, >>>>>>>>>>>> global tables or external databases) would create a >>>>> bottleneck of >>>>>>> 500k >>>>>>>>>>>> messages/s for single threaded/CPU elements. >>>>>>>>>>>> >>>>>>>>>>>> For this scenario, it is possible to store the partial >>>>>>> aggregations on >>>>>>>>>>>> local stores and, from time to time, query those states and >>>>>>> aggregate >>>>>>>>> them >>>>>>>>>>>> as a single value, avoiding bottlenecks. This is a way to >>>>> create a >>>>>>>>> "timed >>>>>>>>>>>> aggregation barrier”. >>>>>>>>>>>> >>>>>>>>>>>> If we leverage this kind of built-in feature we could >>> greatly >>>>>>> enhance >>>>>>>>> the >>>>>>>>>>>> ability of KStreams to better handle the CAP Theorem >>>>>>> characteristics, >>>>>>>>> so >>>>>>>>>>>> that one could choose to have Consistency over Availability >>>>> when >>>>>>>>> needed. >>>>>>>>>>>> >>>>>>>>>>>> We started this discussion with Matthias J. Sax here: >>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6953 >>>>>>>>>>>> >>>>>>>>>>>> If you want to see more, go to KIP-326 at: >>>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>> 326%3A+Schedulable+KTable+as+Graph+source >>>>>>>>>>>> >>>>>>>>>>>> -Flávio Stutz >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >> >> >
signature.asc
Description: OpenPGP digital signature