[ 
https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15688015#comment-15688015
 ] 

ASF GitHub Bot commented on KAFKA-4273:
---------------------------------------

GitHub user dpoldrugo opened a pull request:

    https://github.com/apache/kafka/pull/2159

    KAFKA 4273 - Add TTL support for RocksDB

    Since Streams DSL doesn't support fine grained configurations of state 
stores (it usese only RocksDB) - I have added  new StreamsConfig called 
`rocksdb.ttl.sec` - which allows you to set TTL for all state stores used by 
the topology. To make short, if you set property to a value `>=1`, it will use 
TtlDB instead of RocksDB and this will lead to records getting expired after 
this defined period.
    
    This should help users to bound their disk usage and provide a 
configuration for use cases where your data has natural TTL/retention. For 
example, when you process data only for one hour, and after that you don't need 
the data in state stores anymore.
    
    I have added 
[test](https://github.com/apache/kafka/compare/trunk...dpoldrugo:KAFKA-4273-ttl-support?expand=1#diff-d908a80c770d196ac823752da3b3a864R117)
 to check if TtlDB is expiring record, but I can't make TtlDB expire record 
within a reasonable windows (1 minute). Do you have any suggestions how to 
force TtlDB to expire records more quickly?
    
    Since I'm using Kafka and Kafka Streams 0.10.1.0, I have also added this 
code to the 
[0.10.1](https://github.com/dpoldrugo/kafka/tree/0.10.1-KAFKA-4273-ttl-support) 
branch, and if the review goes well I hope it can be added to the 0.10.1.1 
release.
    The patch is here: 
[KAFKA_4273_Add_TTL_support_for_RocksDB_v2.patch.txt](https://github.com/apache/kafka/files/607638/KAFKA_4273_Add_TTL_support_for_RocksDB_v2.patch.txt)
    
    **Suggestion for future work**
    Since this config/feature applies to all state stores, it would be nice to 
provide an API for users to configure TTL for every state store, for example 
during toplogy building with KStreamBuilder.
    
    Now: KStreamBuilder#table(String topic, final String storeName)
    Suggestion: KStreamBuilder#table(String topic, final String storeName, 
**_StoreOptions_ storeOptions**)
    Where **_StoreOptions_** would be something like this: `{ ttlSeconds: int }`
    
    More details: [KAFKA-4273](https://issues.apache.org/jira/browse/KAFKA-4273)
    
    @guozhangwang @dguy @mjsax @norwood @enothereska @ijuma    - could you 
check this out?

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/dpoldrugo/kafka KAFKA-4273-ttl-support

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/kafka/pull/2159.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2159
    
----
commit 5a3f1372daf2a0e939b246756c7e712e9ea21662
Author: dpoldrugo <davor.poldr...@infobip.com>
Date:   2016-11-22T21:01:02Z

    KAFKA 4273 - Add TTL support for RocksDB

----


> Streams DSL - Add TTL / retention period support for intermediate topics and 
> state stores
> -----------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4273
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4273
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 0.10.0.1
>            Reporter: Davor Poldrugo
>
> Hi!
> I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state 
> as far as I know - it's not configurable.
> In my use case my data has TTL / retnetion period. It's 48 hours. After that 
> - data can be discarded.
> I join two topics: "messages" and "prices" using windowed inner join.
> The two intermediate Kafka topics for this join are named:
>  * messages-prices-join-this-changelog
>  * messages-prices-join-other-changelog
> Since these topics are created as compacted by Kafka Streams, and I don't 
> wan't to keep data forever, I have altered them to not use compaction. Right 
> now my RocksDB state stores grow indefinitely, and I don't have any options 
> to define TTL, or somehow periodically clean the older data.
> A "hack" that I use to keep my disk usage low - I have schedulled a job to 
> periodically stop Kafka Streams instances - one at the time. This triggers a 
> rebalance, and partitions migrate to other instances. When the instance is 
> started again, there's another rebalance, and sometimes this instance starts 
> processing partitions that wasn't processing before the stop - which leads to 
> deletion of the RocksDB state store for those partitions 
> (state.cleanup.delay.ms). In the next rebalance the local store is recreated 
> with a restore consumer - which reads data from - as previously mentioned - a 
> non compacted topic. And this effectively leads to a "hacked TTL support" in 
> Kafka Streams DSL.
> Questions:
>  * Do you think would be reasonable to add support in the DSL api to define 
> TTL for local store?
>  * Which opens another question - there are use cases which don't need the 
> intermediate topics to be created as "compact". Could also this be added to 
> the DSL api? Maybe only this could be added, and this flag should also be 
> used for the RocksDB TTL. Of course in this case another config would be 
> mandatory - the retention period or TTL for the intermediate topics and the 
> state stores. I saw there is a new cleanup.policy - compact_and_delete - 
> added with KAFKA-4015.
>  * Which also leads to another question, maybe some intermediate topics / 
> state stores need different TTL, so a it's not as simple as that. But after 
> KAFKA-3870, it will be easier.
> RocksDB supports TTL:
>  * 
> https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166
>  * https://github.com/facebook/rocksdb/wiki/Time-to-Live
>  * 
> https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java
> A somehow similar issue: KAFKA-4212



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to