[ 
https://issues.apache.org/jira/browse/FLINK-33881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jinzhong Li updated FLINK-33881:
--------------------------------
    Description: 
In some scenarios, 'TtlListState#getUnexpiredOrNull -> 
elementSerializer.copy(ttlValue)'  consumes a lot of cpu resources.

!image-2023-12-19-21-25-21-446.png|width=529,height=119!

I found that for TtlListState#getUnexpiredOrNull, if none of the elements have 
expired, it still needs to copy all the elements and update the whole list/map 
in TtlIncrementalCleanup#runCleanup();

!image-2023-12-19-21-26-43-518.png|width=505,height=266!

I think we could optimize TtlListState#getUnexpiredOrNull by:
1)find the first expired element index in the list;
2)If not found, return to the original list;
3)If found, then constrct the unexpire list (puts the previous elements into 
the list), and go through the subsequent elements, adding expired elements into 
the list.
{code:java}
public List<TtlValue<T>> getUnexpiredOrNull(@Nonnull List<TtlValue<T>> 
ttlValues) {
    //.......
    int firstExpireIndex = -1;
    for (int i = 0; i < ttlValues.size(); i++) {
        if (TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) {
            firstExpireIndex = i;
            break;
        }
    }
    if (firstExpireIndex == -1) {
        return ttlValues;  //return the original ttlValues
    }
    List<TtlValue<T>> unexpired = new ArrayList<>(ttlValues.size());
    for (int i = 0; i < ttlValues.size(); i++) {
        if (i < firstExpireIndex) {
            unexpired.add(ttlValues.get(i));
        }
        if (i > firstExpireIndex) {
            if (!TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) {
                unexpired.add(ttlValues.get(i));
            }
        }
    }
    //  .....
} {code}
In this way, the extra iteration overhead is actually very very small, but the 
benefit when there are no expired elements is significant.

  was:
In some scenarios, 'TtlListState#getUnexpiredOrNull -> 
elementSerializer.copy(ttlValue)'  consumes a lot of cpu resources.

!image-2023-12-19-21-25-21-446.png|width=444,height=100!

I found that for TtlListState#getUnexpiredOrNull, if none of the elements have 
expired, it still needs to copy all the elements and update the whole list/map 
in TtlIncrementalCleanup#runCleanup();

!image-2023-12-19-21-26-43-518.png|width=317,height=167!

I think we could optimize TtlListState#getUnexpiredOrNull by:
1)find the first expired element index in the list;
2)If not found, return to the original list;
3)If found, then constrct the unexpire list (puts the previous elements into 
the list), and go through the subsequent elements, adding expired elements into 
the list.
{code:java}
public List<TtlValue<T>> getUnexpiredOrNull(@Nonnull List<TtlValue<T>> 
ttlValues) {
    //.......
    int firstExpireIndex = -1;
    for (int i = 0; i < ttlValues.size(); i++) {
        if (TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) {
            firstExpireIndex = i;
            break;
        }
    }
    if (firstExpireIndex == -1) {
        return ttlValues;  //return the original ttlValues
    }
    List<TtlValue<T>> unexpired = new ArrayList<>(ttlValues.size());
    for (int i = 0; i < ttlValues.size(); i++) {
        if (i < firstExpireIndex) {
            unexpired.add(ttlValues.get(i));
        }
        if (i > firstExpireIndex) {
            if (!TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) {
                unexpired.add(ttlValues.get(i));
            }
        }
    }
    //  .....
} {code}
In this way, the extra iteration overhead is actually very very small, but the 
benefit when there are no expired elements is significant.


> [TtlListState]Avoid copy and update value in TtlListState#getUnexpiredOrNull
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-33881
>                 URL: https://issues.apache.org/jira/browse/FLINK-33881
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / State Backends
>            Reporter: Jinzhong Li
>            Priority: Minor
>         Attachments: image-2023-12-19-21-25-21-446.png, 
> image-2023-12-19-21-26-43-518.png
>
>
> In some scenarios, 'TtlListState#getUnexpiredOrNull -> 
> elementSerializer.copy(ttlValue)'  consumes a lot of cpu resources.
> !image-2023-12-19-21-25-21-446.png|width=529,height=119!
> I found that for TtlListState#getUnexpiredOrNull, if none of the elements 
> have expired, it still needs to copy all the elements and update the whole 
> list/map in TtlIncrementalCleanup#runCleanup();
> !image-2023-12-19-21-26-43-518.png|width=505,height=266!
> I think we could optimize TtlListState#getUnexpiredOrNull by:
> 1)find the first expired element index in the list;
> 2)If not found, return to the original list;
> 3)If found, then constrct the unexpire list (puts the previous elements into 
> the list), and go through the subsequent elements, adding expired elements 
> into the list.
> {code:java}
> public List<TtlValue<T>> getUnexpiredOrNull(@Nonnull List<TtlValue<T>> 
> ttlValues) {
>     //.......
>     int firstExpireIndex = -1;
>     for (int i = 0; i < ttlValues.size(); i++) {
>         if (TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) {
>             firstExpireIndex = i;
>             break;
>         }
>     }
>     if (firstExpireIndex == -1) {
>         return ttlValues;  //return the original ttlValues
>     }
>     List<TtlValue<T>> unexpired = new ArrayList<>(ttlValues.size());
>     for (int i = 0; i < ttlValues.size(); i++) {
>         if (i < firstExpireIndex) {
>             unexpired.add(ttlValues.get(i));
>         }
>         if (i > firstExpireIndex) {
>             if (!TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) {
>                 unexpired.add(ttlValues.get(i));
>             }
>         }
>     }
>     //  .....
> } {code}
> In this way, the extra iteration overhead is actually very very small, but 
> the benefit when there are no expired elements is significant.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to