[
https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16027841#comment-16027841
]
Ayush Verma edited comment on KAFKA-4273 at 5/28/17 3:30 PM:
-------------------------------------------------------------
I have a use case where I am storing some aggregates for flights against key,
origin_destination_departureDate. ( eg. minimum fares, trend, etc.) Now,
naturally for flights with expired departure dates (<today), the key should get
expired.
I do not see this being elegently achieved by using existing functionality.
Rocksdb has a fixed ttl, so this can not be done either way. Will have to
implement custom store.
was (Author: dexter007):
I have a use case where I am storing some aggregates for flights against key,
origin_destination_departureDate. ( eg. minimum fares, trend, etc.) Now,
naturally for flights with expired departure dates (<today), the key should get
expired.
I do not see this being elegently achieved by using existing functionality.
> Streams DSL - Add TTL / retention period support for intermediate topics and
> state stores
> -----------------------------------------------------------------------------------------
>
> Key: KAFKA-4273
> URL: https://issues.apache.org/jira/browse/KAFKA-4273
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Affects Versions: 0.10.0.1
> Reporter: Davor Poldrugo
>
> Hi!
> I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state
> as far as I know - it's not configurable.
> In my use case my data has TTL / retnetion period. It's 48 hours. After that
> - data can be discarded.
> I join two topics: "messages" and "prices" using windowed inner join.
> The two intermediate Kafka topics for this join are named:
> * messages-prices-join-this-changelog
> * messages-prices-join-other-changelog
> Since these topics are created as compacted by Kafka Streams, and I don't
> wan't to keep data forever, I have altered them to not use compaction. Right
> now my RocksDB state stores grow indefinitely, and I don't have any options
> to define TTL, or somehow periodically clean the older data.
> A "hack" that I use to keep my disk usage low - I have schedulled a job to
> periodically stop Kafka Streams instances - one at the time. This triggers a
> rebalance, and partitions migrate to other instances. When the instance is
> started again, there's another rebalance, and sometimes this instance starts
> processing partitions that wasn't processing before the stop - which leads to
> deletion of the RocksDB state store for those partitions
> (state.cleanup.delay.ms). In the next rebalance the local store is recreated
> with a restore consumer - which reads data from - as previously mentioned - a
> non compacted topic. And this effectively leads to a "hacked TTL support" in
> Kafka Streams DSL.
> Questions:
> * Do you think would be reasonable to add support in the DSL api to define
> TTL for local store?
> * Which opens another question - there are use cases which don't need the
> intermediate topics to be created as "compact". Could also this be added to
> the DSL api? Maybe only this could be added, and this flag should also be
> used for the RocksDB TTL. Of course in this case another config would be
> mandatory - the retention period or TTL for the intermediate topics and the
> state stores. I saw there is a new cleanup.policy - compact_and_delete -
> added with KAFKA-4015.
> * Which also leads to another question, maybe some intermediate topics /
> state stores need different TTL, so a it's not as simple as that. But after
> KAFKA-3870, it will be easier.
> RocksDB supports TTL:
> *
> https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166
> * https://github.com/facebook/rocksdb/wiki/Time-to-Live
> *
> https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java
> A somehow similar issue: KAFKA-4212
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)