Hi, all!
At flink master branch, we have supported state ttl for sql mini-batch
deduplication using incremental cleanup strategy on heap backend, refer to
FLINK-16581. Because I want to test the performance of this feature, so I
compile master branch code and deploy the jar to production environment,then
run three types of tests, respectively:
flink 1.9.0 release version enable state ttl
flink 1.11-snapshot version disable state ttl
flink 1.11-snapshot version enable state ttl
The test query sql as follows:
select order_date,
sum(price * amount - goods_all_fav_amt - virtual_money_amt +
goods_carriage_amt) as saleP,
sum(amount) as saleN,
count(distinct parent_sn) as orderN,
count(distinct user_id) as cusN
from(
select order_date, user_id,
order_type, order_status, terminal, last_update_time,
goods_all_fav_amt,
goods_carriage_amt, virtual_money_amt, price, amount,
order_quality, quality_goods_cnt, acture_goods_amt
from (select *, row_number() over(partition by order_id,
order_goods_id order by proctime desc) as rownum from dm_trd_order_goods)
where rownum=1
and (order_type in (1,2,3,4,5) or order_status = 70)
and terminal = 'shop' and price > 0)
group by order_date
At runtime, this query will generate two operators which include Deduplication
and GroupAgg. In the test, the configuration is same, parallelism is 20, set
kafka consumer from the earliest, and disable mini-batch function, The test
results as follows:
flink 1.9.0 enable state ttl:this test lasted 44m, flink receive 1374w records,
average tps at 5200/s, Flink UI picture link back pressure, checkpoint
flink 1.11-snapshot version disable state ttl:this test lasted 28m, flink
receive 883w records, average tps at 5200/s, Flink UI picture link back
pressure, checkpoint
flink 1.11-snapshot version enable state ttl:this test lasted 1h 43m, flink
only receive 168w records because of deduplication operator serious back
pressure, average tps at 270/s, moreover, checkpoint always fail because of
deduplication operator serious back pressure, Flink UI picture link back
pressure, checkpoint
Deduplication state clean up implement in flink 1.9.0 use timer, but
1.11-snapshot version use StateTtlConfig, this is the main difference.
Comparing the three tests comprehensively, we can see that if disable state ttl
in 1.11-snapshot the performance is the same with 1.9.0 enable state ttl.
However, if enable state ttl in 1.11-snapshot, performance down is nearly 20
times, so I think incremental cleanup strategy cause this problem, what do you
think about it? @azagrebin, @jark.
Thanks.
lsyldliu
Zhejiang University, College of Control Science and engineer, CSC