Ah thanks so much for the insights -- we should be in a position to profile
the new library against real data in the next week or so so I'll let you
know how it goes.

On Oct 11, 2016 6:26 PM, "Guozhang Wang" <wangg...@gmail.com> wrote:

> Hello Greg,
>
> I can share some context of KIP-63 here:
>
> 1. Like Eno mentioned, we believe RocksDB's own mem-table is already
> optimizing a large portion of IO access for its write performance, and
> adding an extra caching layer on top of that was mainly for saving ser-de
> costs (note that you still need to ser / deser key-value objects into bytes
> when interacting with RocksDB). Although it may further help IO, it is not
> the main motivation.
>
> 2. As part of KIP-63 Bill helped investigating the pros / cons of such
> object caching (https://issues.apache.org/jira/browse/KAFKA-3973), and our
> conclusion based on that is, although it saves serde costs, it also makes
> memory management very hard in the long run, with caching based on
> num.records, not num.bytes. And when you have an OOM in one of the
> instances, it may well result in cascading failures from rebalances and
> task migration. Ideally, we want to have some restrict memory bound for
> better capacity planning and integration with cluster resource managers
> (see
> https://cwiki.apache.org/confluence/display/KAFKA/Discussion%3A+Memory+
> Management+in+Kafka+Streams
> for more details).
>
> 3. So as part of KIP-63, we removed object-oriented caching and replaced
> with bytes caches, and in addition add the RocksDBConfigSetter to allow
> users to configure their RocksDB to tune for their write /
> space amplifications for IO.
>
>
> With that, I think shutting off caching for your case should not degrading
> the performance too much assuming RocksDB itself can already do a good job
> in terms of write access, it may add extra serde costs though depending
> your use case (originally it is like 1000 records per cache, so roughly
> speaking you are saving those many serde calls per store). But if you do
> observe significant performance degradation I'd personally love to learn
> more and help on that end.
>
>
> Guozhang
>
>
>
>
>
> On Tue, Oct 11, 2016 at 10:10 AM, Greg Fodor <gfo...@gmail.com> wrote:
>
> > Thanks Eno -- my understanding is that cache is already enabled to be
> > 100MB per rocksdb so it should be on already, but I'll check. I was
> > wondering if you could shed some light on the changes between 0.10.0
> > and 0.10.1 -- in 0.10.0 there was an intermediate cache within
> > RocksDbStore -- presumably this was there to improve performance,
> > despite there still being a lower level cache managed by rocksdb. Can
> > you shed some light why this cache was needed in 0.10.0? If it sounds
> > like our use case won't warrant the same need then we might be OK.
> >
> > Overall however, this is really problematic for us, since we will have
> > to turn off caching for effectively all of our jobs. The way our
> > system works is that we have a number of jobs running kafka streams
> > that are configured via database tables we change via our web stack.
> > For example, when we want to tell our jobs to begin processing data
> > for a user, we insert a record for that user into the database which
> > gets passed via kafka connect to a kafka topic. The kafka streams job
> > is consuming this topic, does some basic group by operations and
> > repartitions on it, and joins it against other data streams so that it
> > knows what users should be getting processed.
> >
> > So fundamentally we have two types of aggregations: the typical case
> > that was I think the target for the optimizations in KIP-63, where
> > latency is less critical since we are counting and emitting counts for
> > analysis, etc. And the other type of aggregation is where we are doing
> > simple transformations on data coming from the database in a way to
> > configure the live behavior of the job. Latency here is very
> > sensitive: users expect the job to react and start sending data for a
> > user immediately after the database records are changed.
> >
> > So as you can see, since this is the paradigm we use to operate jobs,
> > we're in a bad position if we ever want to take advantage of the work
> > in KIP-63. All of our jobs are set up to work in this way, so we will
> > either have to maintain our fork or will have to shut off caching for
> > all of our jobs, neither of which sounds like a very good path.
> >
> > On Tue, Oct 11, 2016 at 4:16 AM, Eno Thereska <eno.there...@gmail.com>
> > wrote:
> > > Hi Greg,
> > >
> > > An alternative would be to set up RocksDB's cache, while keeping the
> > streams cache to 0. That might give you what you need, especially if you
> > can work with RocksDb and don't need to change the store.
> > >
> > > For example, here is how to set the Block Cache size to 100MB and the
> > Write Buffer size to 32MB
> > >
> > > https://github.com/facebook/rocksdb/wiki/Block-Cache <
> > https://github.com/facebook/rocksdb/wiki/Block-Cache>
> > > https://github.com/facebook/rocksdb/wiki/Basic-Operations#write-buffer
> <
> > https://github.com/facebook/rocksdb/wiki/Basic-Operations#write-buffer>
> > >
> > > They can override these settings by creating an impl of
> > RocksDBConfigSetter and setting StreamsConfig.ROCKSDB_CONFIG_
> SETTER_CLASS_CONFIG
> > in Kafka Streams.
> > >
> > > Hope this helps,
> > > Eno
> > >
> > >> On 10 Oct 2016, at 18:19, Greg Fodor <gfo...@gmail.com> wrote:
> > >>
> > >> Hey Eno, thanks for the suggestion -- understood that my patch is not
> > >> something that could be accepted given the API change, I posted it to
> > help
> > >> make the discussion concrete and because i needed a workaround.
> (Likely
> > >> we'll maintain this patch internally so we can move forward with the
> new
> > >> version, since the consumer heartbeat issue is something we really
> need
> > >> addressed.)
> > >>
> > >> Looking at the code, it seems that setting the cache size to zero will
> > >> disable all caching. However, the previous version of Kafka Streams
> had
> > a
> > >> local cache within the RocksDBStore to reduce I/O. If we were to set
> the
> > >> cache size to zero, my guess is we'd see a large increase in I/O
> > relative
> > >> to the previous version since we would no longer have caching of any
> > kind
> > >> even intra-store. By the looks of it there isn't an easy way to
> > replicate
> > >> the same caching behavior as the old version of Kafka Streams in the
> new
> > >> system without increasing latency, but maybe I'm missing something.
> > >>
> > >>
> > >> On Oct 10, 2016 3:10 AM, "Eno Thereska" <eno.there...@gmail.com>
> wrote:
> > >>
> > >>> Hi Greg,
> > >>>
> > >>> Thanks for trying 0.10.1. The best option you have for your specific
> > app
> > >>> is to simply turn off caching by setting the cache size to 0. That
> > should
> > >>> give you the old behaviour:
> > >>> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_
> > BUFFERING_CONFIG,
> > >>> 0L);
> > >>>
> > >>> Your PR is an alternative, but it requires changing the APIs and
> would
> > >>> require a KIP.
> > >>>
> > >>> Thanks
> > >>> Eno
> > >>>
> > >>>> On 9 Oct 2016, at 23:49, Greg Fodor <gfo...@gmail.com> wrote:
> > >>>>
> > >>>> JIRA opened here: https://issues.apache.org/jira/browse/KAFKA-4281
> > >>>>
> > >>>> On Sun, Oct 9, 2016 at 2:02 AM, Greg Fodor <gfo...@gmail.com>
> wrote:
> > >>>>> I went ahead and did some more testing, and it feels to me one
> option
> > >>>>> for resolving this issue is having a method on KGroupedStream which
> > >>>>> can be used to configure if the operations on it (reduce/aggregate)
> > >>>>> will forward immediately or not. I did a quick patch and was able
> to
> > >>>>> determine that if the records are forwarded immediately it resolves
> > >>>>> the issue I am seeing. Having it be done on a per-KGroupedStream
> > basis
> > >>>>> would provide maximum flexibility.
> > >>>>>
> > >>>>> On Sun, Oct 9, 2016 at 1:06 AM, Greg Fodor <gfo...@gmail.com>
> wrote:
> > >>>>>> I'm taking 0.10.1 for a spin on our existing Kafka Streams jobs
> and
> > >>>>>> I'm hitting what seems to be a serious issue (at least, for us)
> with
> > >>>>>> the changes brought about in KIP-63. In our job, we have a number
> of
> > >>>>>> steps in the topology where we perform a repartition and
> aggregation
> > >>>>>> on topics that require low latency. These topics have a very low
> > >>>>>> message volume but require subsecond latency for the aggregations
> to
> > >>>>>> complete since they are configuration data that drive the rest of
> > the
> > >>>>>> job and need to be applied immediately.
> > >>>>>>
> > >>>>>> In 0.10.0, we performed a through (for repartitioning) and
> > aggregateBy
> > >>>>>> and this resulted in minimal latency as the aggregateBy would just
> > >>>>>> result in a consumer attached to the output of the through and the
> > >>>>>> processor would consume + aggregate messages immediately passing
> > them
> > >>>>>> to the next step in the topology.
> > >>>>>>
> > >>>>>> However, in 0.10.1 the aggregateBy API is no longer available and
> it
> > >>>>>> is necessary to pivot the data through a groupByKey and then
> > >>>>>> aggregate(). The problem is that this mechanism results in the
> > >>>>>> intermediate KTable state store storing the data as usual, but the
> > >>>>>> data is not forwarded downstream until the next store flush. (Due
> to
> > >>>>>> the use of ForwardingCacheFlushListener instead of calling
> forward()
> > >>>>>> during the process of the record.)
> > >>>>>>
> > >>>>>> As noted in KIP-63 and as I saw in the code, the flush interval of
> > >>>>>> state stores is commit.interval.ms. For us, this has been tuned
> to
> > a
> > >>>>>> few seconds, and since we have a number of these aggregations in
> our
> > >>>>>> job sequentially, this now results in many seconds of latency in
> the
> > >>>>>> worst case for a tuple to travel through our topology.
> > >>>>>>
> > >>>>>> It seems too inflexible to have the flush interval always be the
> > same
> > >>>>>> as the commit interval across all aggregates. For certain
> > aggregations
> > >>>>>> which are idempotent regardless of messages being reprocessed,
> being
> > >>>>>> able to flush more often than the commit interval seems like a
> very
> > >>>>>> important option when lower latency is required. It would still
> make
> > >>>>>> sense to flush every commit as well, but having an additional
> > >>>>>> configuration to set the maximum time between state store flushes
> > >>>>>> seems like it would solve our problem.
> > >>>>>>
> > >>>>>> In our case, we'd set our flush interval to a few hundred ms.
> > Ideally,
> > >>>>>> we would really prefer to be able to disable interval based
> flushing
> > >>>>>> altogether (and just put + forward all processed records) for
> > certain
> > >>>>>> KTables that are low volume, latency sensitive, and which are
> > >>>>>> idempotent under message reprocessing.
> > >>>>>>
> > >>>>>> Thanks for any help! Right now the only option it seems is for us
> to
> > >>>>>> radically lower the commit interval and accept any leftover
> latency,
> > >>>>>> but unless we can find a sweet spot this may be a blocker for us
> to
> > >>>>>> moving to 0.10.1.
> > >>>
> > >>>
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to