[
https://issues.apache.org/jira/browse/FLINK-9938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565227#comment-16565227
]
ASF GitHub Bot commented on FLINK-9938:
---------------------------------------
StefanRRichter commented on a change in pull request #6460: [FLINK-9938] Clean
up full snapshot from expired state with TTL
URL: https://github.com/apache/flink/pull/6460#discussion_r206829146
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
##########
@@ -1408,22 +1427,68 @@ public int numStateEntries() {
return count;
}
+ /** {@link RocksDBMergeIterator} with state filtering. */
+ static final class FilteringRocksDBMergeIterator extends
RocksDBMergeIterator {
+ private final List<StateSnapshotFilter<?>> stateSnapshotFilters;
+ private byte[] nextFilteredValue;
+
+ FilteringRocksDBMergeIterator(
+ List<Tuple2<RocksIteratorWrapper, Integer>>
kvStateIterators,
+ int keyGroupPrefixByteCount,
+ List<StateSnapshotFilter<?>> stateSnapshotFilters) {
+ super(kvStateIterators, keyGroupPrefixByteCount);
+ Preconditions.checkNotNull(stateSnapshotFilters);
+ this.stateSnapshotFilters = stateSnapshotFilters;
+ nextFiltered();
+ }
+
+ public void next() {
+ nextUnfiltered();
+ nextFiltered();
+ }
+
+ private void nextFiltered() {
+ if (!isValid()) {
+ return;
+ }
+ byte[] value = currentSubIterator.getIterator().value();
+ Optional<byte[]> filteredValue =
stateSnapshotFilters.get(kvStateId()).filterSerialized(value);
+ while (!filteredValue.isPresent()) {
+ nextUnfiltered();
+ if (isValid()) {
+ value =
currentSubIterator.getIterator().value();
+ filteredValue =
stateSnapshotFilters.get(kvStateId()).filterSerialized(value);
Review comment:
This looks to me like the filtering rather belong around each of the
sub-iterators (`RocksDBKeyedStateBackend.MergeIterator`), because there is a
1:1 mapping from filter to the `kvStateId`. That means you can probably also
keep the `RocksDBMergeIterator` unchanged.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> State TTL cleanup during full state scan upon checkpointing
> -----------------------------------------------------------
>
> Key: FLINK-9938
> URL: https://issues.apache.org/jira/browse/FLINK-9938
> Project: Flink
> Issue Type: Improvement
> Components: State Backends, Checkpointing
> Affects Versions: 1.6.0
> Reporter: Andrey Zagrebin
> Assignee: Andrey Zagrebin
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.6.0
>
>
> We can try to piggyback full state scan during certain checkpoint processes
> in backends, check TTL expiration for every entry and evict expired to speed
> up cleanup.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)