Hi, all! At flink master branch, we have supported state ttl for sql mini-batch deduplication using incremental cleanup strategy on heap backend, refer to FLINK-16581. Because I want to test the performance of this feature, so I compile master branch code and deploy the jar to production environment,then run three types of tests, respectively:
flink 1.9.0 release version enable state ttl flink 1.11-snapshot version disable state ttl flink 1.11-snapshot version enable state ttl The test query sql as follows: select order_date, sum(price * amount - goods_all_fav_amt - virtual_money_amt + goods_carriage_amt) as saleP, sum(amount) as saleN, count(distinct parent_sn) as orderN, count(distinct user_id) as cusN from( select order_date, user_id, order_type, order_status, terminal, last_update_time, goods_all_fav_amt, goods_carriage_amt, virtual_money_amt, price, amount, order_quality, quality_goods_cnt, acture_goods_amt from (select *, row_number() over(partition by order_id, order_goods_id order by proctime desc) as rownum from dm_trd_order_goods) where rownum=1 and (order_type in (1,2,3,4,5) or order_status = 70) and terminal = 'shop' and price > 0) group by order_date At runtime, this query will generate two operators which include Deduplication and GroupAgg. In the test, the configuration is same, parallelism is 20, set kafka consumer from the earliest, and disable mini-batch function, The test results as follows: flink 1.9.0 enable state ttl:this test lasted 44m, flink receive 1374w records, average tps at 5200/s, Flink UI picture link back pressure, checkpoint flink 1.11-snapshot version disable state ttl:this test lasted 28m, flink receive 883w records, average tps at 5200/s, Flink UI picture link back pressure, checkpoint flink 1.11-snapshot version enable state ttl:this test lasted 1h 43m, flink only receive 168w records because of deduplication operator serious back pressure, average tps at 270/s, moreover, checkpoint always fail because of deduplication operator serious back pressure, Flink UI picture link back pressure, checkpoint Deduplication state clean up implement in flink 1.9.0 use timer, but 1.11-snapshot version use StateTtlConfig, this is the main difference. Comparing the three tests comprehensively, we can see that if disable state ttl in 1.11-snapshot the performance is the same with 1.9.0 enable state ttl. However, if enable state ttl in 1.11-snapshot, performance down is nearly 20 times, so I think incremental cleanup strategy cause this problem, what do you think about it? @azagrebin, @jark. Thanks. lsyldliu Zhejiang University, College of Control Science and engineer, CSC