[
https://issues.apache.org/jira/browse/FLINK-33881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17804975#comment-17804975
]
Jinzhong Li commented on FLINK-33881:
-------------------------------------
[~Zakelly] [~masteryhx]
I have published a pr, could you help to review it?
>>> "And actually I think there may be some value if we could make sure it is
>>>safe to do shallow copy."
I think it is not safe to do shallow copy.
The CopyOnWrite mechanism in HeapStateBackend only deep copy StateMapEntry
wrapper, not user Object. It is necessary to deepcopy listState elements to
avoid taskThread and snapshotThread access one object concurrently.
> [TtlListState]Avoid copy and update value in TtlListState#getUnexpiredOrNull
> ----------------------------------------------------------------------------
>
> Key: FLINK-33881
> URL: https://issues.apache.org/jira/browse/FLINK-33881
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / State Backends
> Reporter: Jinzhong Li
> Assignee: Jinzhong Li
> Priority: Minor
> Labels: pull-request-available
> Attachments: image-2023-12-19-21-25-21-446.png,
> image-2023-12-19-21-26-43-518.png
>
>
> In some scenarios, 'TtlListState#getUnexpiredOrNull ->
> elementSerializer.copy(ttlValue)' consumes a lot of cpu resources.
> !image-2023-12-19-21-25-21-446.png|width=529,height=119!
> I found that for TtlListState#getUnexpiredOrNull, if none of the elements
> have expired, it still needs to copy all the elements and update the whole
> list/map in TtlIncrementalCleanup#runCleanup();
> !image-2023-12-19-21-26-43-518.png|width=505,height=266!
> I think we could optimize TtlListState#getUnexpiredOrNull by:
> 1)find the first expired element index in the list;
> 2)If not found, return to the original list;
> 3)If found, then constrct the unexpire list (puts the previous elements into
> the list), and go through the subsequent elements, adding expired elements
> into the list.
> {code:java}
> public List<TtlValue<T>> getUnexpiredOrNull(@Nonnull List<TtlValue<T>>
> ttlValues) {
> //.......
> int firstExpireIndex = -1;
> for (int i = 0; i < ttlValues.size(); i++) {
> if (TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) {
> firstExpireIndex = i;
> break;
> }
> }
> if (firstExpireIndex == -1) {
> return ttlValues; //return the original ttlValues
> }
> List<TtlValue<T>> unexpired = new ArrayList<>(ttlValues.size());
> for (int i = 0; i < ttlValues.size(); i++) {
> if (i < firstExpireIndex) {
> // unexpired.add(ttlValues.get(i));
> unexpired.add(elementSerializer.copy(ttlValues.get(i)));
> }
> if (i > firstExpireIndex) {
> if (!TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) {
> // unexpired.add(ttlValues.get(i));
> unexpired.add(elementSerializer.copy(ttlValues.get(i)));
> }
> }
> }
> // .....
> } {code}
> *In this way, the extra iteration overhead is actually very very small, but
> the benefit when there are no expired elements is significant.*
--
This message was sent by Atlassian Jira
(v8.20.10#820010)