[
https://issues.apache.org/jira/browse/FLINK-33881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805110#comment-17805110
]
Jinzhong Li commented on FLINK-33881:
-------------------------------------
[~Zakelly] I have done the performance testing with TtlListStateBenchmark. The
results show that TtlListState has about 27%+ performance improvement with
this optimization.
|Benchmark|Param: backendType|Param: expiredOption|Param:
stateVisibility|Param: updateType| Score
{color:#FF0000}without{color} optimization|Score
{color:#FF0000}with{color} optimization|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAdd|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnCreateAndWrite|686.100001|902.399321|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAdd|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnReadAndWrite|713.428757|906.389711|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAdd|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|720.125366|902.577086|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAdd|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|736.226185|881.717185|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAdd|HEAP|NeverExpired|NeverReturnExpired|OnCreateAndWrite|733.647731|894.295796|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAdd|HEAP|NeverExpired|NeverReturnExpired|OnReadAndWrite|721.162458|901.177007|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAdd|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|731.355353|890.06144|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAdd|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|726.943995|909.319816|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAddAll|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnCreateAndWrite|212.313443|261.96109|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAddAll|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnReadAndWrite|211.863397|258.266859|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAddAll|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|215.67765|259.52777|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAddAll|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|208.505043|263.768959|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAddAll|HEAP|NeverExpired|NeverReturnExpired|OnCreateAndWrite|201.646868|257.422463|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAddAll|HEAP|NeverExpired|NeverReturnExpired|OnReadAndWrite|214.599971|267.296024|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAddAll|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|207.070109|263.741483|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAddAll|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|213.35357|265.783929|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAppend|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnCreateAndWrite|536.94471|688.041948|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAppend|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnReadAndWrite|538.545608|678.766982|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAppend|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|530.422217|688.008454|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAppend|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|540.517729|690.383004|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAppend|HEAP|NeverExpired|NeverReturnExpired|OnCreateAndWrite|538.089004|695.018344|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAppend|HEAP|NeverExpired|NeverReturnExpired|OnReadAndWrite|532.924439|690.375419|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAppend|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|513.289823|715.811655|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAppend|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|534.967481|724.869126|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGet|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnCreateAndWrite|630.801404|822.861466|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGet|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnReadAndWrite|495.627707|678.028432|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGet|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|645.222027|780.880865|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGet|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|513.539414|674.461302|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGet|HEAP|NeverExpired|NeverReturnExpired|OnCreateAndWrite|645.480934|763.709594|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGet|HEAP|NeverExpired|NeverReturnExpired|OnReadAndWrite|512.678829|673.243958|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGet|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|646.656774|822.529003|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGet|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|518.734052|664.093519|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGetAndIterate|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnCreateAndWrite|592.612052|778.660071|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGetAndIterate|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnReadAndWrite|507.275622|655.986257|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGetAndIterate|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|597.026859|814.902181|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGetAndIterate|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|500.038208|695.082031|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGetAndIterate|HEAP|NeverExpired|NeverReturnExpired|OnCreateAndWrite|593.968845|783.100913|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGetAndIterate|HEAP|NeverExpired|NeverReturnExpired|OnReadAndWrite|503.457165|654.044131|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGetAndIterate|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|592.170287|722.38563|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGetAndIterate|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|498.257928|652.923734|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listUpdate|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnCreateAndWrite|557.020907|683.122647|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listUpdate|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnReadAndWrite|556.828811|696.510575|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listUpdate|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|554.269049|670.339559|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listUpdate|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|547.803797|708.379692|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listUpdate|HEAP|NeverExpired|NeverReturnExpired|OnCreateAndWrite|562.205645|700.56704|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listUpdate|HEAP|NeverExpired|NeverReturnExpired|OnReadAndWrite|553.76725|683.665343|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listUpdate|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|536.061935|679.364527|
|org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listUpdate|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|556.908654|719.853693|
| Total| | | | | |25138.13506|32035.67703|
> [TtlListState]Avoid copy and update value in TtlListState#getUnexpiredOrNull
> ----------------------------------------------------------------------------
>
> Key: FLINK-33881
> URL: https://issues.apache.org/jira/browse/FLINK-33881
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / State Backends
> Reporter: Jinzhong Li
> Assignee: Jinzhong Li
> Priority: Minor
> Labels: pull-request-available
> Attachments: image-2023-12-19-21-25-21-446.png,
> image-2023-12-19-21-26-43-518.png
>
>
> In some scenarios, 'TtlListState#getUnexpiredOrNull ->
> elementSerializer.copy(ttlValue)' consumes a lot of cpu resources.
> !image-2023-12-19-21-25-21-446.png|width=529,height=119!
> I found that for TtlListState#getUnexpiredOrNull, if none of the elements
> have expired, it still needs to copy all the elements and update the whole
> list/map in TtlIncrementalCleanup#runCleanup();
> !image-2023-12-19-21-26-43-518.png|width=505,height=266!
> I think we could optimize TtlListState#getUnexpiredOrNull by:
> 1)find the first expired element index in the list;
> 2)If not found, return to the original list;
> 3)If found, then constrct the unexpire list (puts the previous elements into
> the list), and go through the subsequent elements, adding expired elements
> into the list.
> {code:java}
> public List<TtlValue<T>> getUnexpiredOrNull(@Nonnull List<TtlValue<T>>
> ttlValues) {
> //.......
> int firstExpireIndex = -1;
> for (int i = 0; i < ttlValues.size(); i++) {
> if (TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) {
> firstExpireIndex = i;
> break;
> }
> }
> if (firstExpireIndex == -1) {
> return ttlValues; //return the original ttlValues
> }
> List<TtlValue<T>> unexpired = new ArrayList<>(ttlValues.size());
> for (int i = 0; i < ttlValues.size(); i++) {
> if (i < firstExpireIndex) {
> // unexpired.add(ttlValues.get(i));
> unexpired.add(elementSerializer.copy(ttlValues.get(i)));
> }
> if (i > firstExpireIndex) {
> if (!TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) {
> // unexpired.add(ttlValues.get(i));
> unexpired.add(elementSerializer.copy(ttlValues.get(i)));
> }
> }
> }
> // .....
> } {code}
> *In this way, the extra iteration overhead is actually very very small, but
> the benefit when there are no expired elements is significant.*
--
This message was sent by Atlassian Jira
(v8.20.10#820010)