[ 
https://issues.apache.org/jira/browse/FLINK-9938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565231#comment-16565231
 ] 

ASF GitHub Bot commented on FLINK-9938:
---------------------------------------

StefanRRichter commented on a change in pull request #6460: [FLINK-9938] Clean 
up full snapshot from expired state with TTL
URL: https://github.com/apache/flink/pull/6460#discussion_r206831159
 
 

 ##########
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##########
 @@ -1408,22 +1427,68 @@ public int numStateEntries() {
                return count;
        }
 
+       /** {@link RocksDBMergeIterator} with state filtering. */
+       static final class FilteringRocksDBMergeIterator extends 
RocksDBMergeIterator {
+               private final List<StateSnapshotFilter<?>> stateSnapshotFilters;
+               private byte[] nextFilteredValue;
+
+               FilteringRocksDBMergeIterator(
+                       List<Tuple2<RocksIteratorWrapper, Integer>> 
kvStateIterators,
+                       int keyGroupPrefixByteCount,
+                       List<StateSnapshotFilter<?>> stateSnapshotFilters) {
+                       super(kvStateIterators, keyGroupPrefixByteCount);
+                       Preconditions.checkNotNull(stateSnapshotFilters);
+                       this.stateSnapshotFilters = stateSnapshotFilters;
+                       nextFiltered();
+               }
+
+               public void next() {
+                       nextUnfiltered();
+                       nextFiltered();
+               }
+
+               private void nextFiltered() {
+                       if (!isValid()) {
+                               return;
+                       }
+                       byte[] value = currentSubIterator.getIterator().value();
+                       Optional<byte[]> filteredValue = 
stateSnapshotFilters.get(kvStateId()).filterSerialized(value);
+                       while (!filteredValue.isPresent()) {
+                               nextUnfiltered();
+                               if (isValid()) {
+                                       value = 
currentSubIterator.getIterator().value();
+                                       filteredValue = 
stateSnapshotFilters.get(kvStateId()).filterSerialized(value);
+                               } else {
+                                       break;
+                               }
+                       }
+                       filteredValue.ifPresent(bytes -> nextFilteredValue = 
bytes);
+               }
+
+               private void nextUnfiltered() {
+                       super.next();
+               }
 
+               @Override
+               public byte[] value() {
+                       return nextFilteredValue;
+               }
+       }
 
        /**
         * Iterator that merges multiple RocksDB iterators to partition all 
states into contiguous key-groups.
         * The resulting iteration sequence is ordered by (key-group, kv-state).
         */
        @VisibleForTesting
-       static final class RocksDBMergeIterator implements AutoCloseable {
+       static class RocksDBMergeIterator implements AutoCloseable {
 
 Review comment:
   All changes from here to line 1503 are unrelated. While I am totally fine 
with cleanup, it would be better to review if  you could keep the all 
cleanup/beautification in separate commits.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> State TTL cleanup during full state scan upon checkpointing
> -----------------------------------------------------------
>
>                 Key: FLINK-9938
>                 URL: https://issues.apache.org/jira/browse/FLINK-9938
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.6.0
>            Reporter: Andrey Zagrebin
>            Assignee: Andrey Zagrebin
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.6.0
>
>
> We can try to piggyback full state scan during certain checkpoint processes 
> in backends, check TTL expiration for every entry and evict expired to speed 
> up cleanup.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to