JIRA opened here: https://issues.apache.org/jira/browse/KAFKA-4281
On Sun, Oct 9, 2016 at 2:02 AM, Greg Fodor <gfo...@gmail.com> wrote: > I went ahead and did some more testing, and it feels to me one option > for resolving this issue is having a method on KGroupedStream which > can be used to configure if the operations on it (reduce/aggregate) > will forward immediately or not. I did a quick patch and was able to > determine that if the records are forwarded immediately it resolves > the issue I am seeing. Having it be done on a per-KGroupedStream basis > would provide maximum flexibility. > > On Sun, Oct 9, 2016 at 1:06 AM, Greg Fodor <gfo...@gmail.com> wrote: >> I'm taking 0.10.1 for a spin on our existing Kafka Streams jobs and >> I'm hitting what seems to be a serious issue (at least, for us) with >> the changes brought about in KIP-63. In our job, we have a number of >> steps in the topology where we perform a repartition and aggregation >> on topics that require low latency. These topics have a very low >> message volume but require subsecond latency for the aggregations to >> complete since they are configuration data that drive the rest of the >> job and need to be applied immediately. >> >> In 0.10.0, we performed a through (for repartitioning) and aggregateBy >> and this resulted in minimal latency as the aggregateBy would just >> result in a consumer attached to the output of the through and the >> processor would consume + aggregate messages immediately passing them >> to the next step in the topology. >> >> However, in 0.10.1 the aggregateBy API is no longer available and it >> is necessary to pivot the data through a groupByKey and then >> aggregate(). The problem is that this mechanism results in the >> intermediate KTable state store storing the data as usual, but the >> data is not forwarded downstream until the next store flush. (Due to >> the use of ForwardingCacheFlushListener instead of calling forward() >> during the process of the record.) >> >> As noted in KIP-63 and as I saw in the code, the flush interval of >> state stores is commit.interval.ms. For us, this has been tuned to a >> few seconds, and since we have a number of these aggregations in our >> job sequentially, this now results in many seconds of latency in the >> worst case for a tuple to travel through our topology. >> >> It seems too inflexible to have the flush interval always be the same >> as the commit interval across all aggregates. For certain aggregations >> which are idempotent regardless of messages being reprocessed, being >> able to flush more often than the commit interval seems like a very >> important option when lower latency is required. It would still make >> sense to flush every commit as well, but having an additional >> configuration to set the maximum time between state store flushes >> seems like it would solve our problem. >> >> In our case, we'd set our flush interval to a few hundred ms. Ideally, >> we would really prefer to be able to disable interval based flushing >> altogether (and just put + forward all processed records) for certain >> KTables that are low volume, latency sensitive, and which are >> idempotent under message reprocessing. >> >> Thanks for any help! Right now the only option it seems is for us to >> radically lower the commit interval and accept any leftover latency, >> but unless we can find a sweet spot this may be a blocker for us to >> moving to 0.10.1.