azagrebin commented on a change in pull request #8459: [FLINK-12476] [State
TTL] Consider setting a default background cleanup strategy in StateTtlConfig
URL: https://github.com/apache/flink/pull/8459#discussion_r287733029
##########
File path:
flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
##########
@@ -376,39 +391,57 @@ public StateTtlConfig build() {
private static final long serialVersionUID =
1373998465131443873L;
}
- final EnumMap<Strategies, CleanupStrategy> strategies = new
EnumMap<>(Strategies.class);
+ private final EnumMap<Strategies, CleanupStrategy> strategies =
new EnumMap<>(Strategies.class);
- public void activate(Strategies strategy) {
+ private void activate(Strategies strategy) {
activate(strategy, EMPTY_STRATEGY);
}
- public void activate(Strategies strategy, CleanupStrategy
config) {
+ private void activate(Strategies strategy, CleanupStrategy
config) {
strategies.put(strategy, config);
}
public boolean inFullSnapshot() {
return
strategies.containsKey(Strategies.FULL_STATE_SCAN_SNAPSHOT);
}
+ public boolean isCleanupInBackground() {
+ return isCleanupInBackground;
+ }
+
+ private void setCleanupInBackground(boolean
cleanupInBackground) {
+ isCleanupInBackground = cleanupInBackground;
+ }
+
@Nullable
public IncrementalCleanupStrategy
getIncrementalCleanupStrategy() {
- return (IncrementalCleanupStrategy)
strategies.get(Strategies.INCREMENTAL_CLEANUP);
+ if (isCleanupInBackground()) {
+ return (IncrementalCleanupStrategy)
strategies.getOrDefault(Strategies.INCREMENTAL_CLEANUP,
DEFAULT_INCREMENTAL_CLEANUP_STRATEGY);
+ } else {
+ return (IncrementalCleanupStrategy)
strategies.get(Strategies.INCREMENTAL_CLEANUP);
+ }
}
public boolean inRocksdbCompactFilter() {
- return
strategies.containsKey(Strategies.ROCKSDB_COMPACTION_FILTER);
+ return getRocksdbCompactFilterCleanupStrategy() != null;
}
@Nullable
public RocksdbCompactFilterCleanupStrategy
getRocksdbCompactFilterCleanupStrategy() {
- return (RocksdbCompactFilterCleanupStrategy)
strategies.get(Strategies.ROCKSDB_COMPACTION_FILTER);
+ if (isCleanupInBackground()) {
+ return (RocksdbCompactFilterCleanupStrategy)
strategies.getOrDefault(Strategies.ROCKSDB_COMPACTION_FILTER,
DEFAULT_ROCKSDB_COMPACT_FILTER_CLEANUP_STRATEGY);
+ } else {
+ return (RocksdbCompactFilterCleanupStrategy)
strategies.get(Strategies.ROCKSDB_COMPACTION_FILTER);
+ }
}
}
/** Configuration of cleanup strategy while taking the full snapshot.
*/
public static class IncrementalCleanupStrategy implements
CleanupStrategies.CleanupStrategy {
private static final long serialVersionUID =
3109278696501988780L;
+ public static final IncrementalCleanupStrategy
DEFAULT_INCREMENTAL_CLEANUP_STRATEGY = new IncrementalCleanupStrategy(10, true);
Review comment:
it can be at least package private.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services