[ 
https://issues.apache.org/jira/browse/FLINK-33881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zakelly Lan reassigned FLINK-33881:
-----------------------------------

    Assignee: Jinzhong Li

> [TtlListState]Avoid copy and update value in TtlListState#getUnexpiredOrNull
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-33881
>                 URL: https://issues.apache.org/jira/browse/FLINK-33881
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / State Backends
>            Reporter: Jinzhong Li
>            Assignee: Jinzhong Li
>            Priority: Minor
>         Attachments: image-2023-12-19-21-25-21-446.png, 
> image-2023-12-19-21-26-43-518.png
>
>
> In some scenarios, 'TtlListState#getUnexpiredOrNull -> 
> elementSerializer.copy(ttlValue)'  consumes a lot of cpu resources.
> !image-2023-12-19-21-25-21-446.png|width=529,height=119!
> I found that for TtlListState#getUnexpiredOrNull, if none of the elements 
> have expired, it still needs to copy all the elements and update the whole 
> list/map in TtlIncrementalCleanup#runCleanup();
> !image-2023-12-19-21-26-43-518.png|width=505,height=266!
> I think we could optimize TtlListState#getUnexpiredOrNull by:
> 1)find the first expired element index in the list;
> 2)If not found, return to the original list;
> 3)If found, then constrct the unexpire list (puts the previous elements into 
> the list), and go through the subsequent elements, adding expired elements 
> into the list.
> {code:java}
> public List<TtlValue<T>> getUnexpiredOrNull(@Nonnull List<TtlValue<T>> 
> ttlValues) {
>     //.......
>     int firstExpireIndex = -1;
>     for (int i = 0; i < ttlValues.size(); i++) {
>         if (TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) {
>             firstExpireIndex = i;
>             break;
>         }
>     }
>     if (firstExpireIndex == -1) {
>         return ttlValues;  //return the original ttlValues
>     }
>     List<TtlValue<T>> unexpired = new ArrayList<>(ttlValues.size());
>     for (int i = 0; i < ttlValues.size(); i++) {
>         if (i < firstExpireIndex) {
>             // unexpired.add(ttlValues.get(i));
>             unexpired.add(elementSerializer.copy(ttlValues.get(i)));
>         }
>         if (i > firstExpireIndex) {
>             if (!TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) {
>                 // unexpired.add(ttlValues.get(i));
>                 unexpired.add(elementSerializer.copy(ttlValues.get(i)));
>             }
>         }
>     }
>     //  .....
> } {code}
> *In this way, the extra iteration overhead is actually very very small, but 
> the benefit when there are no expired elements is significant.*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to