[ 
https://issues.apache.org/jira/browse/FLINK-9938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16568390#comment-16568390
 ] 

ASF GitHub Bot commented on FLINK-9938:
---------------------------------------

StefanRRichter commented on a change in pull request #6460: [FLINK-9938] Clean 
up full snapshot from expired state with TTL
URL: https://github.com/apache/flink/pull/6460#discussion_r207590480
 
 

 ##########
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##########
 @@ -1650,6 +1679,54 @@ public void close() {
                }
        }
 
+       private static final class TransformingRocksIteratorWrapper extends 
RocksIteratorWrapper {
+               @Nonnull
+               private final StateSnapshotTransformer<byte[]> 
stateSnapshotTransformer;
+               private byte[] current;
+
+               public TransformingRocksIteratorWrapper(
+                       @Nonnull RocksIterator iterator,
+                       @Nonnull StateSnapshotTransformer<byte[]> 
stateSnapshotTransformer) {
+                       super(iterator);
+                       this.stateSnapshotTransformer = 
stateSnapshotTransformer;
+               }
+
+               @Override
+               public void seekToFirst() {
+                       super.seekToFirst();
+                       filterOrTransform(super::next);
+               }
+
+               @Override
+               public void seekToLast() {
+                       super.seekToLast();
+                       filterOrTransform(super::prev);
+               }
+
+               @Override
+               public void next() {
+                       super.next();
+                       filterOrTransform(super::next);
+               }
+
+               @Override
+               public void prev() {
+                       super.prev();
+                       filterOrTransform(super::prev);
+               }
+
+               private void filterOrTransform(Runnable advance) {
+                       while (isValid() && (current = 
stateSnapshotTransformer.filterOrTransform(super.value())) == null) {
 
 Review comment:
   I assume that `super.value()` always returns not `null`? Otherwise this loop 
could be problematic.
   
   Furthermore, if the iterator became invalid, current is not update and still 
accessible. I think the use of this iterator always respects to check `valid` 
first, but we might reset current or throw an exception in the invalid case? 
Could use some marker `byte[0]` to mark the exception case if you think it 
makes sense.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> State TTL cleanup during full state scan upon checkpointing
> -----------------------------------------------------------
>
>                 Key: FLINK-9938
>                 URL: https://issues.apache.org/jira/browse/FLINK-9938
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.6.0
>            Reporter: Andrey Zagrebin
>            Assignee: Andrey Zagrebin
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.6.0
>
>
> We can try to piggyback full state scan during certain checkpoint processes 
> in backends, check TTL expiration for every entry and evict expired to speed 
> up cleanup.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to