Hi, all!

At flink master branch, we have supported state ttl  for sql mini-batch 
deduplication using incremental cleanup strategy on heap backend, refer to 
FLINK-16581. Because I want to test the performance of this feature, so I 
compile master branch code and deploy the jar to production environment,then 
run three types of tests, respectively: 




flink 1.9.0 release version enable state ttl
flink 1.11-snapshot version disable state ttl
flink 1.11-snapshot version enable state ttl




The test query sql as follows:

select order_date,
    sum(price * amount - goods_all_fav_amt - virtual_money_amt + 
goods_carriage_amt) as saleP,
    sum(amount) as saleN,
    count(distinct parent_sn) as orderN,
    count(distinct user_id) as cusN
       from(
            select order_date, user_id, 
            order_type, order_status, terminal, last_update_time, 
goods_all_fav_amt, 
            goods_carriage_amt, virtual_money_amt, price, amount, 
order_quality, quality_goods_cnt, acture_goods_amt 
            from (select *, row_number() over(partition by order_id, 
order_goods_id order by proctime desc) as rownum from dm_trd_order_goods) 
        where rownum=1 
        and (order_type in (1,2,3,4,5) or order_status = 70) 
        and terminal = 'shop' and price > 0)
    group by order_date


At runtime, this query will generate two operators which include Deduplication 
and GroupAgg. In the test, the configuration is same, parallelism is 20, set 
kafka consumer from the earliest, and disable mini-batch function, The test 
results as follows:

flink 1.9.0 enable state ttl:this test lasted 44m, flink receive 1374w records, 
average tps at 5200/s, Flink UI picture link back pressure, checkpoint
flink 1.11-snapshot version disable state ttl:this test lasted 28m, flink 
receive 883w records, average tps at 5200/s, Flink UI picture link back 
pressure, checkpoint
flink 1.11-snapshot version enable state ttl:this test lasted 1h 43m, flink 
only receive 168w records because of deduplication operator serious back 
pressure, average tps at 270/s, moreover, checkpoint always fail because of 
deduplication operator serious back pressure, Flink UI picture link back 
pressure, checkpoint

Deduplication state clean up implement in flink 1.9.0 use timer, but 
1.11-snapshot version use StateTtlConfig, this is the main difference. 
Comparing the three tests comprehensively, we can see that if disable state ttl 
in 1.11-snapshot the performance is the same with 1.9.0 enable state ttl. 
However, if enable state ttl in 1.11-snapshot, performance down is nearly 20 
times, so I think incremental cleanup strategy cause this problem, what do you 
think about it? @azagrebin, @jark.

Thanks.

lsyldliu

Zhejiang University, College of Control Science and engineer, CSC

Reply via email to