JIRA opened here: https://issues.apache.org/jira/browse/KAFKA-4281

On Sun, Oct 9, 2016 at 2:02 AM, Greg Fodor <gfo...@gmail.com> wrote:
> I went ahead and did some more testing, and it feels to me one option
> for resolving this issue is having a method on KGroupedStream which
> can be used to configure if the operations on it (reduce/aggregate)
> will forward immediately or not. I did a quick patch and was able to
> determine that if the records are forwarded immediately it resolves
> the issue I am seeing. Having it be done on a per-KGroupedStream basis
> would provide maximum flexibility.
>
> On Sun, Oct 9, 2016 at 1:06 AM, Greg Fodor <gfo...@gmail.com> wrote:
>> I'm taking 0.10.1 for a spin on our existing Kafka Streams jobs and
>> I'm hitting what seems to be a serious issue (at least, for us) with
>> the changes brought about in KIP-63. In our job, we have a number of
>> steps in the topology where we perform a repartition and aggregation
>> on topics that require low latency. These topics have a very low
>> message volume but require subsecond latency for the aggregations to
>> complete since they are configuration data that drive the rest of the
>> job and need to be applied immediately.
>>
>> In 0.10.0, we performed a through (for repartitioning) and aggregateBy
>> and this resulted in minimal latency as the aggregateBy would just
>> result in a consumer attached to the output of the through and the
>> processor would consume + aggregate messages immediately passing them
>> to the next step in the topology.
>>
>> However, in 0.10.1 the aggregateBy API is no longer available and it
>> is necessary to pivot the data through a groupByKey and then
>> aggregate(). The problem is that this mechanism results in the
>> intermediate KTable state store storing the data as usual, but the
>> data is not forwarded downstream until the next store flush. (Due to
>> the use of ForwardingCacheFlushListener instead of calling forward()
>> during the process of the record.)
>>
>> As noted in KIP-63 and as I saw in the code, the flush interval of
>> state stores is commit.interval.ms. For us, this has been tuned to a
>> few seconds, and since we have a number of these aggregations in our
>> job sequentially, this now results in many seconds of latency in the
>> worst case for a tuple to travel through our topology.
>>
>> It seems too inflexible to have the flush interval always be the same
>> as the commit interval across all aggregates. For certain aggregations
>> which are idempotent regardless of messages being reprocessed, being
>> able to flush more often than the commit interval seems like a very
>> important option when lower latency is required. It would still make
>> sense to flush every commit as well, but having an additional
>> configuration to set the maximum time between state store flushes
>> seems like it would solve our problem.
>>
>> In our case, we'd set our flush interval to a few hundred ms. Ideally,
>> we would really prefer to be able to disable interval based flushing
>> altogether (and just put + forward all processed records) for certain
>> KTables that are low volume, latency sensitive, and which are
>> idempotent under message reprocessing.
>>
>> Thanks for any help! Right now the only option it seems is for us to
>> radically lower the commit interval and accept any leftover latency,
>> but unless we can find a sweet spot this may be a blocker for us to
>> moving to 0.10.1.

Reply via email to