[ 
https://issues.apache.org/jira/browse/FLINK-10473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17067738#comment-17067738
 ] 

dalongliu edited comment on FLINK-10473 at 3/26/20, 2:51 PM:
-------------------------------------------------------------

Hi, [~azagrebin], for the pull request you subimmted, when I debug 
TtlStateTestBase#

testIncrementalCleanup, I found I can't enter into the *{color:#FF0000}for 
loop{color}* of TtlIncrementalCleanup#runCleanup method on state access,
{code:java}
private void runCleanup() { int entryNum = 0; Collection<StateEntry<K, N, S>> 
nextEntries; while ( entryNum < cleanupSize && stateIterator.hasNext() && 
!(nextEntries = stateIterator.nextEntries()).isEmpty()) { for (StateEntry<K, N, 
S> state : nextEntries) { S cleanState = 
ttlState.getUnexpiredOrNull(state.getState()); if (cleanState == null) { 
stateIterator.remove(state); } else if (cleanState != state.getState()) { 
stateIterator.update(state, cleanState); } } entryNum += nextEntries.size(); } }


{code}
so I think the incremental  cleanup strategy does not work, the state clean up 
only in call getWithTtlCheckAndUpdate phase.

Because I want to use this feature to support sql minibatch deduplication state 
ttl, so first I must understand the principle of incremental clean up,  can you 
tell me the detailed process of incremental clean up?


was (Author: lsy):
Hi, [~azagrebin], for the pull request you subimmted, when I debug 
TtlStateTestBase#

testIncrementalCleanup, I found I can't enter into the for loop of 
TtlIncrementalCleanup#runCleanup method on state access,
{code:java}
private void runCleanup() { int entryNum = 0; Collection<StateEntry<K, N, S>> 
nextEntries; while ( entryNum < cleanupSize && stateIterator.hasNext() && 
!(nextEntries = stateIterator.nextEntries()).isEmpty()) { for (StateEntry<K, N, 
S> state : nextEntries) { S cleanState = 
ttlState.getUnexpiredOrNull(state.getState()); if (cleanState == null) { 
stateIterator.remove(state); } else if (cleanState != state.getState()) { 
stateIterator.update(state, cleanState); } } entryNum += nextEntries.size(); } }


{code}
so I think the incremental  cleanup strategy does not work, the state clean up 
only in call getWithTtlCheckAndUpdate phase.

Because I want to use this feature to support sql minibatch deduplication state 
ttl, so first I must understand the principle of incremental clean up,  can you 
tell me the detailed process of incremental clean up?

> State TTL incremental cleanup using Heap backend key iterator
> -------------------------------------------------------------
>
>                 Key: FLINK-10473
>                 URL: https://issues.apache.org/jira/browse/FLINK-10473
>             Project: Flink
>          Issue Type: New Feature
>          Components: Runtime / State Backends
>    Affects Versions: 1.7.0
>            Reporter: Andrey Zagrebin
>            Assignee: Andrey Zagrebin
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.8.0
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> This feature enables lazy background cleanup of state with time-to-live in 
> state keyed backend which stores state in JVM heap. The idea is to keep a 
> global state lazy iterator with loose consistency. Every time a state value 
> for some key is accessed or a record is processed, the iterator is advanced, 
> TTL of iterated state entries is checked and the expired entries are cleaned 
> up. When the iterator reaches the end of state storage it just starts over. 
> This way the state with TTL is regularly cleaned up to prevent ever growing 
> memory consumption. The caveat of this cleanup strategy is that if state is 
> not accessed or no records are processed then accumulated expired state still 
> occupies the storage.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to