[
https://issues.apache.org/jira/browse/FLINK-33881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jinzhong Li updated FLINK-33881:
--------------------------------
Description:
In some scenarios, 'TtlListState#getUnexpiredOrNull ->
elementSerializer.copy(ttlValue)' consumes a lot of cpu resources.
!image-2023-12-19-21-25-21-446.png|width=529,height=119!
I found that for TtlListState#getUnexpiredOrNull, if none of the elements have
expired, it still needs to copy all the elements and update the whole list/map
in TtlIncrementalCleanup#runCleanup();
!image-2023-12-19-21-26-43-518.png|width=505,height=266!
I think we could optimize TtlListState#getUnexpiredOrNull by:
1)find the first expired element index in the list;
2)If not found, return to the original list;
3)If found, then constrct the unexpire list (puts the previous elements into
the list), and go through the subsequent elements, adding expired elements into
the list.
{code:java}
public List<TtlValue<T>> getUnexpiredOrNull(@Nonnull List<TtlValue<T>>
ttlValues) {
//.......
int firstExpireIndex = -1;
for (int i = 0; i < ttlValues.size(); i++) {
if (TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) {
firstExpireIndex = i;
break;
}
}
if (firstExpireIndex == -1) {
return ttlValues; //return the original ttlValues
}
List<TtlValue<T>> unexpired = new ArrayList<>(ttlValues.size());
for (int i = 0; i < ttlValues.size(); i++) {
if (i < firstExpireIndex) {
unexpired.add(ttlValues.get(i));
}
if (i > firstExpireIndex) {
if (!TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) {
unexpired.add(ttlValues.get(i));
}
}
}
// .....
} {code}
*In this way, the extra iteration overhead is actually very very small, but the
benefit when there are no expired elements is significant.*
was:
In some scenarios, 'TtlListState#getUnexpiredOrNull ->
elementSerializer.copy(ttlValue)' consumes a lot of cpu resources.
!image-2023-12-19-21-25-21-446.png|width=529,height=119!
I found that for TtlListState#getUnexpiredOrNull, if none of the elements have
expired, it still needs to copy all the elements and update the whole list/map
in TtlIncrementalCleanup#runCleanup();
!image-2023-12-19-21-26-43-518.png|width=505,height=266!
I think we could optimize TtlListState#getUnexpiredOrNull by:
1)find the first expired element index in the list;
2)If not found, return to the original list;
3)If found, then constrct the unexpire list (puts the previous elements into
the list), and go through the subsequent elements, adding expired elements into
the list.
{code:java}
public List<TtlValue<T>> getUnexpiredOrNull(@Nonnull List<TtlValue<T>>
ttlValues) {
//.......
int firstExpireIndex = -1;
for (int i = 0; i < ttlValues.size(); i++) {
if (TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) {
firstExpireIndex = i;
break;
}
}
if (firstExpireIndex == -1) {
return ttlValues; //return the original ttlValues
}
List<TtlValue<T>> unexpired = new ArrayList<>(ttlValues.size());
for (int i = 0; i < ttlValues.size(); i++) {
if (i < firstExpireIndex) {
unexpired.add(ttlValues.get(i));
}
if (i > firstExpireIndex) {
if (!TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) {
unexpired.add(ttlValues.get(i));
}
}
}
// .....
} {code}
In this way, the extra iteration overhead is actually very very small, but the
benefit when there are no expired elements is significant.
> [TtlListState]Avoid copy and update value in TtlListState#getUnexpiredOrNull
> ----------------------------------------------------------------------------
>
> Key: FLINK-33881
> URL: https://issues.apache.org/jira/browse/FLINK-33881
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / State Backends
> Reporter: Jinzhong Li
> Priority: Minor
> Attachments: image-2023-12-19-21-25-21-446.png,
> image-2023-12-19-21-26-43-518.png
>
>
> In some scenarios, 'TtlListState#getUnexpiredOrNull ->
> elementSerializer.copy(ttlValue)' consumes a lot of cpu resources.
> !image-2023-12-19-21-25-21-446.png|width=529,height=119!
> I found that for TtlListState#getUnexpiredOrNull, if none of the elements
> have expired, it still needs to copy all the elements and update the whole
> list/map in TtlIncrementalCleanup#runCleanup();
> !image-2023-12-19-21-26-43-518.png|width=505,height=266!
> I think we could optimize TtlListState#getUnexpiredOrNull by:
> 1)find the first expired element index in the list;
> 2)If not found, return to the original list;
> 3)If found, then constrct the unexpire list (puts the previous elements into
> the list), and go through the subsequent elements, adding expired elements
> into the list.
> {code:java}
> public List<TtlValue<T>> getUnexpiredOrNull(@Nonnull List<TtlValue<T>>
> ttlValues) {
> //.......
> int firstExpireIndex = -1;
> for (int i = 0; i < ttlValues.size(); i++) {
> if (TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) {
> firstExpireIndex = i;
> break;
> }
> }
> if (firstExpireIndex == -1) {
> return ttlValues; //return the original ttlValues
> }
> List<TtlValue<T>> unexpired = new ArrayList<>(ttlValues.size());
> for (int i = 0; i < ttlValues.size(); i++) {
> if (i < firstExpireIndex) {
> unexpired.add(ttlValues.get(i));
> }
> if (i > firstExpireIndex) {
> if (!TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) {
> unexpired.add(ttlValues.get(i));
> }
> }
> }
> // .....
> } {code}
> *In this way, the extra iteration overhead is actually very very small, but
> the benefit when there are no expired elements is significant.*
--
This message was sent by Atlassian Jira
(v8.20.10#820010)