[ 
https://issues.apache.org/jira/browse/FLINK-9938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565223#comment-16565223
 ] 

ASF GitHub Bot commented on FLINK-9938:
---------------------------------------

StefanRRichter commented on a change in pull request #6460: [FLINK-9938] Clean 
up full snapshot from expired state with TTL
URL: https://github.com/apache/flink/pull/6460#discussion_r206818193
 
 

 ##########
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
 ##########
 @@ -267,4 +279,33 @@ public void addAll(List<V> values) {
                        ((ListStateDescriptor<E>) 
stateDesc).getElementSerializer(),
                        backend);
        }
+
+       static class StateSnapshotFilterWrapper<T> implements 
StateSnapshotFilter<List<T>> {
+               private final StateSnapshotFilter<List<T>> originalFilter;
+               private final TypeSerializer<T> elementSerializer;
+               private final ByteArrayOutputStreamWithPos out = new 
ByteArrayOutputStreamWithPos(128);
+
+               StateSnapshotFilterWrapper(StateSnapshotFilter<List<T>> 
originalFilter, TypeSerializer<T> elementSerializer) {
+                       this.originalFilter = originalFilter;
+                       this.elementSerializer = elementSerializer;
+               }
+
+               @Override
+               @Nonnull
+               public Optional<List<T>> filter(@Nonnull List<T> value) {
+                       return originalFilter.filter(value);
+               }
+
+               @Override
+               @Nonnull
+               public Optional<byte[]> filterSerialized(@Nonnull byte[] value) 
{
+                       try {
+                               Optional<List<T>> filteredValue = 
filter(deserializeList(value, elementSerializer));
 
 Review comment:
   I am sure there might be a good reason, but I still want to ask: Why do we 
need to deserialize the whole list and then filter? At least theoretically, 
couldn't we use the filter that operates in `byte[]` here? I think it might 
actually work, but clearly possible that I overlook something. Similar question 
for map state obviously.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> State TTL cleanup during full state scan upon checkpointing
> -----------------------------------------------------------
>
>                 Key: FLINK-9938
>                 URL: https://issues.apache.org/jira/browse/FLINK-9938
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.6.0
>            Reporter: Andrey Zagrebin
>            Assignee: Andrey Zagrebin
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.6.0
>
>
> We can try to piggyback full state scan during certain checkpoint processes 
> in backends, check TTL expiration for every entry and evict expired to speed 
> up cleanup.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to