[
https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15602159#comment-15602159
]
Davor Poldrugo commented on KAFKA-4273:
---------------------------------------
Is this possibly resolved in 0.10.1.0 with streams config:
*rocksdb.config.setter* ?
http://kafka.apache.org/documentation#streamsconfigs
Description of the config: _A Rocks DB config setter class that implements the
RocksDBConfigSetter interface_
Interface is here:
https://github.com/apache/kafka/blob/0.10.1.0/streams/src/main/java/org/apache/kafka/streams/state/RocksDBConfigSetter.java
> Streams DSL - Add TTL / retention period support for intermediate topics and
> state stores
> -----------------------------------------------------------------------------------------
>
> Key: KAFKA-4273
> URL: https://issues.apache.org/jira/browse/KAFKA-4273
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Affects Versions: 0.10.0.1
> Reporter: Davor Poldrugo
> Assignee: Guozhang Wang
>
> Hi!
> I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state
> as far as I know - it's not configurable.
> In my use case my data has TTL / retnetion period. It's 48 hours. After that
> - data can be discarded.
> I join two topics: "messages" and "prices" using windowed inner join.
> The two intermediate Kafka topics for this join are named:
> * messages-prices-join-this-changelog
> * messages-prices-join-other-changelog
> Since these topics are created as compacted by Kafka Streams, and I don't
> wan't to keep data forever, I have altered them to not use compaction. Right
> now my RocksDB state stores grow indefinitely, and I don't have any options
> to define TTL, or somehow periodically clean the older data.
> A "hack" that I use to keep my disk usage low - I have schedulled a job to
> periodically stop Kafka Streams instances - one at the time. This triggers a
> rebalance, and partitions migrate to other instances. When the instance is
> started again, there's another rebalance, and sometimes this instance starts
> processing partitions that wasn't processing before the stop - which leads to
> deletion of the RocksDB state store for those partitions
> (state.cleanup.delay.ms). In the next rebalance the local store is recreated
> with a restore consumer - which reads data from - as previously mentioned - a
> non compacted topic. And this effectively leads to a "hacked TTL support" in
> Kafka Streams DSL.
> Questions:
> * Do you think would be reasonable to add support in the DSL api to define
> TTL for local store?
> * Which opens another question - there are use cases which don't need the
> intermediate topics to be created as "compact". Could also this be added to
> the DSL api? Maybe only this could be added, and this flag should also be
> used for the RocksDB TTL. Of course in this case another config would be
> mandatory - the retention period or TTL for the intermediate topics and the
> state stores. I saw there is a new cleanup.policy - compact_and_delete -
> added with KAFKA-4015.
> * Which also leads to another question, maybe some intermediate topics /
> state stores need different TTL, so a it's not as simple as that. But after
> KAFKA-3870, it will be easier.
> RocksDB supports TTL:
> *
> https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166
> * https://github.com/facebook/rocksdb/wiki/Time-to-Live
> *
> https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java
> A somehow similar issue: KAFKA-4212
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)