[
https://issues.apache.org/jira/browse/FLINK-33881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805124#comment-17805124
]
Zakelly Lan commented on FLINK-33881:
-------------------------------------
This is Great! Well done.
> [TtlListState]Avoid copy and update value in TtlListState#getUnexpiredOrNull
> ----------------------------------------------------------------------------
>
> Key: FLINK-33881
> URL: https://issues.apache.org/jira/browse/FLINK-33881
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / State Backends
> Reporter: Jinzhong Li
> Assignee: Jinzhong Li
> Priority: Minor
> Labels: pull-request-available
> Attachments: image-2023-12-19-21-25-21-446.png,
> image-2023-12-19-21-26-43-518.png
>
>
> In some scenarios, 'TtlListState#getUnexpiredOrNull ->
> elementSerializer.copy(ttlValue)' consumes a lot of cpu resources.
> !image-2023-12-19-21-25-21-446.png|width=529,height=119!
> I found that for TtlListState#getUnexpiredOrNull, if none of the elements
> have expired, it still needs to copy all the elements and update the whole
> list/map in TtlIncrementalCleanup#runCleanup();
> !image-2023-12-19-21-26-43-518.png|width=505,height=266!
> I think we could optimize TtlListState#getUnexpiredOrNull by:
> 1)find the first expired element index in the list;
> 2)If not found, return to the original list;
> 3)If found, then constrct the unexpire list (puts the previous elements into
> the list), and go through the subsequent elements, adding expired elements
> into the list.
> {code:java}
> public List<TtlValue<T>> getUnexpiredOrNull(@Nonnull List<TtlValue<T>>
> ttlValues) {
> //.......
> int firstExpireIndex = -1;
> for (int i = 0; i < ttlValues.size(); i++) {
> if (TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) {
> firstExpireIndex = i;
> break;
> }
> }
> if (firstExpireIndex == -1) {
> return ttlValues; //return the original ttlValues
> }
> List<TtlValue<T>> unexpired = new ArrayList<>(ttlValues.size());
> for (int i = 0; i < ttlValues.size(); i++) {
> if (i < firstExpireIndex) {
> // unexpired.add(ttlValues.get(i));
> unexpired.add(elementSerializer.copy(ttlValues.get(i)));
> }
> if (i > firstExpireIndex) {
> if (!TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) {
> // unexpired.add(ttlValues.get(i));
> unexpired.add(elementSerializer.copy(ttlValues.get(i)));
> }
> }
> }
> // .....
> } {code}
> *In this way, the extra iteration overhead is actually very very small, but
> the benefit when there are no expired elements is significant.*
--
This message was sent by Atlassian Jira
(v8.20.10#820010)