I'm taking 0.10.1 for a spin on our existing Kafka Streams jobs and
I'm hitting what seems to be a serious issue (at least, for us) with
the changes brought about in KIP-63. In our job, we have a number of
steps in the topology where we perform a repartition and aggregation
on topics that require low latency. These topics have a very low
message volume but require subsecond latency for the aggregations to
complete since they are configuration data that drive the rest of the
job and need to be applied immediately.

In 0.10.0, we performed a through (for repartitioning) and aggregateBy
and this resulted in minimal latency as the aggregateBy would just
result in a consumer attached to the output of the through and the
processor would consume + aggregate messages immediately passing them
to the next step in the topology.

However, in 0.10.1 the aggregateBy API is no longer available and it
is necessary to pivot the data through a groupByKey and then
aggregate(). The problem is that this mechanism results in the
intermediate KTable state store storing the data as usual, but the
data is not forwarded downstream until the next store flush. (Due to
the use of ForwardingCacheFlushListener instead of calling forward()
during the process of the record.)

As noted in KIP-63 and as I saw in the code, the flush interval of
state stores is commit.interval.ms. For us, this has been tuned to a
few seconds, and since we have a number of these aggregations in our
job sequentially, this now results in many seconds of latency in the
worst case for a tuple to travel through our topology.

It seems too inflexible to have the flush interval always be the same
as the commit interval across all aggregates. For certain aggregations
which are idempotent regardless of messages being reprocessed, being
able to flush more often than the commit interval seems like a very
important option when lower latency is required. It would still make
sense to flush every commit as well, but having an additional
configuration to set the maximum time between state store flushes
seems like it would solve our problem.

In our case, we'd set our flush interval to a few hundred ms. Ideally,
we would really prefer to be able to disable interval based flushing
altogether (and just put + forward all processed records) for certain
KTables that are low volume, latency sensitive, and which are
idempotent under message reprocessing.

Thanks for any help! Right now the only option it seems is for us to
radically lower the commit interval and accept any leftover latency,
but unless we can find a sweet spot this may be a blocker for us to
moving to 0.10.1.

Reply via email to