This is an automated email from the ASF dual-hosted git repository.
azagrebin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 6ab4891 [FLINK-15621][state][TTL] Remove deprecated option and method
to disable TTL compaction filter
6ab4891 is described below
commit 6ab48912faf5355fd8d754b98ce3ea75890d2134
Author: Jiayi Liao <[email protected]>
AuthorDate: Mon May 25 12:57:51 2020 +0800
[FLINK-15621][state][TTL] Remove deprecated option and method to disable
TTL compaction filter
This closes #12307.
---
.../flink/api/common/state/StateTtlConfig.java | 24 +----------
.../tests/DataStreamStateTTLTestProgram.java | 4 --
flink-python/pyflink/datastream/state_backend.py | 45 --------------------
.../pyflink/datastream/tests/test_state_backend.py | 10 -----
.../state/RocksDBKeyedStateBackendBuilder.java | 10 +----
.../contrib/streaming/state/RocksDBOptions.java | 13 ------
.../streaming/state/RocksDBStateBackend.java | 49 ----------------------
.../state/ttl/RocksDbTtlCompactFiltersManager.java | 13 +-----
.../state/ttl/RocksDBTtlStateTestBase.java | 2 -
9 files changed, 5 insertions(+), 165 deletions(-)
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
index 9ac0c39..1f42db4 100644
---
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
+++
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
@@ -271,28 +271,6 @@ public class StateTtlConfig implements Serializable {
/**
* Cleanup expired state while Rocksdb compaction is running.
*
- * <p>RocksDB runs periodic compaction of state updates and
merges them to free storage.
- * During this process, the TTL filter checks timestamp of
state entries and drops expired ones.
- * The feature has to be activated in RocksDb backend firstly
- * using the following Flink configuration option:
- * state.backend.rocksdb.ttl.compaction.filter.enabled.
- *
- * <p>Due to specifics of RocksDB compaction filter,
- * cleanup is not properly guaranteed if put and merge
operations are used at the same time:
- *
https://github.com/facebook/rocksdb/blob/master/include/rocksdb/compaction_filter.h#L69
- * It means that the TTL filter should be tested for List state
taking into account this caveat.
- *
- * @deprecated Use more general configuration method {@link
#cleanupInRocksdbCompactFilter(long)} instead
- */
- @Nonnull
- @Deprecated
- public Builder cleanupInRocksdbCompactFilter() {
- return cleanupInRocksdbCompactFilter(1000L);
- }
-
- /**
- * Cleanup expired state while Rocksdb compaction is running.
- *
* <p>RocksDB compaction filter will query current timestamp,
* used to check expiration, from Flink every time after
processing {@code queryTimeAfterNumEntries} number of state entries.
* Updating the timestamp more often can improve cleanup speed
@@ -312,7 +290,7 @@ public class StateTtlConfig implements Serializable {
* Disable default cleanup of expired state in background
(enabled by default).
*
* <p>If some specific cleanup is configured, e.g. {@link
#cleanupIncrementally(int, boolean)} or
- * {@link #cleanupInRocksdbCompactFilter()}, this setting does
not disable it.
+ * {@link #cleanupInRocksdbCompactFilter(long)}, this setting
does not disable it.
*/
@Nonnull
public Builder disableCleanupInBackground() {
diff --git
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java
index 21baed5..703a7c3 100644
---
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java
+++
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.tests;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
@@ -81,9 +80,6 @@ public class DataStreamStateTTLTestProgram {
final MonotonicTTLTimeProvider ttlTimeProvider = new
MonotonicTTLTimeProvider();
final StateBackend configuredBackend = env.getStateBackend();
- if (configuredBackend instanceof RocksDBStateBackend) {
- ((RocksDBStateBackend)
configuredBackend).enableTtlCompactionFilter();
- }
final StateBackend stubBackend = new
StubStateBackend(configuredBackend, ttlTimeProvider);
env.setStateBackend(stubBackend);
}
diff --git a/flink-python/pyflink/datastream/state_backend.py
b/flink-python/pyflink/datastream/state_backend.py
index d894137..d975195 100644
--- a/flink-python/pyflink/datastream/state_backend.py
+++ b/flink-python/pyflink/datastream/state_backend.py
@@ -16,8 +16,6 @@
# limitations under the License.
################################################################################
-import warnings
-
from abc import ABCMeta
from py4j.java_gateway import get_java_class
@@ -559,49 +557,6 @@ class RocksDBStateBackend(StateBackend):
"""
return self._j_rocks_db_state_backend.isIncrementalCheckpointsEnabled()
- def is_ttl_compaction_filter_enabled(self):
- """
- Gets whether compaction filter to cleanup state with TTL is enabled.
-
- :return: True if enabled, false otherwise.
-
- .. note:: Deprecated in 1.10. Enabled by default and will be removed
in the future.
- """
- warnings.warn(
- "Deprecated in 1.10. Enabled by default and will be removed in the
future.",
- DeprecationWarning)
- return self._j_rocks_db_state_backend.isTtlCompactionFilterEnabled()
-
- def enable_ttl_compaction_filter(self):
- """
- Enable compaction filter to cleanup state with TTL.
-
- .. note::
- User can still decide in state TTL configuration in state
descriptor
- whether the filter is active for particular state or not.
-
- .. note:: Deprecated in 1.10. Enabled by default and will be removed
in the future.
- """
- warnings.warn(
- "Deprecated in 1.10. Enabled by default and will be removed in the
future.",
- DeprecationWarning)
- self._j_rocks_db_state_backend.enableTtlCompactionFilter()
-
- def disable_ttl_compaction_filter(self):
- """
- Disable compaction filter to cleanup state with TTL.
-
- .. note::
- This is an advanced option and the method should only be used
- when experiencing serious performance degradations during
compaction in RocksDB.
-
- .. note:: Deprecated in 1.10. Enabled by default and will be removed
in the future.
- """
- warnings.warn(
- "Deprecated in 1.10. Enabled by default and will be removed in the
future.",
- DeprecationWarning)
- self._j_rocks_db_state_backend.disableTtlCompactionFilter()
-
def set_predefined_options(self, options):
"""
Sets the predefined options for RocksDB.
diff --git a/flink-python/pyflink/datastream/tests/test_state_backend.py
b/flink-python/pyflink/datastream/tests/test_state_backend.py
index 60a376f..4f3249f 100644
--- a/flink-python/pyflink/datastream/tests/test_state_backend.py
+++ b/flink-python/pyflink/datastream/tests/test_state_backend.py
@@ -154,16 +154,6 @@ class RocksDBStateBackendTests(PyFlinkTestCase):
state_backend.set_db_storage_paths(*storage_path)
self.assertEqual(state_backend.get_db_storage_paths(), expected)
- def test_get_set_ttl_compaction_filter(self):
-
- state_backend = RocksDBStateBackend("file://var/checkpoints/")
-
- self.assertTrue(state_backend.is_ttl_compaction_filter_enabled())
-
- state_backend.disable_ttl_compaction_filter()
-
- self.assertFalse(state_backend.is_ttl_compaction_filter_enabled())
-
def test_get_set_predefined_options(self):
state_backend = RocksDBStateBackend("file://var/checkpoints/")
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index 70c438e..d1a0184 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -108,8 +108,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBacken
/** True if incremental checkpointing is enabled. */
private boolean enableIncrementalCheckpointing;
- /** True if ttl compaction filter is enabled. */
- private boolean enableTtlCompactionFilter;
+
private RocksDBNativeMetricOptions nativeMetricOptions;
private int numberOfTransferingThreads;
private long writeBatchSize =
RocksDBConfigurableOptions.WRITE_BATCH_SIZE.defaultValue().getBytes();
@@ -211,11 +210,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBacken
return this;
}
- RocksDBKeyedStateBackendBuilder<K> setEnableTtlCompactionFilter(boolean
enableTtlCompactionFilter) {
- this.enableTtlCompactionFilter = enableTtlCompactionFilter;
- return this;
- }
-
RocksDBKeyedStateBackendBuilder<K>
setNativeMetricOptions(RocksDBNativeMetricOptions nativeMetricOptions) {
this.nativeMetricOptions = nativeMetricOptions;
return this;
@@ -254,7 +248,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBacken
RocksDB db = null;
AbstractRocksDBRestoreOperation restoreOperation = null;
RocksDbTtlCompactFiltersManager ttlCompactFiltersManager =
- new
RocksDbTtlCompactFiltersManager(enableTtlCompactionFilter, ttlTimeProvider);
+ new RocksDbTtlCompactFiltersManager(ttlTimeProvider);
ResourceGuard rocksDBResourceGuard = new ResourceGuard();
SnapshotStrategy<K> snapshotStrategy;
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
index 9b2e082..457ca33 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
@@ -66,19 +66,6 @@ public class RocksDBOptions {
.withDescription("The number of threads (per stateful operator)
used to transfer (download and upload) files in RocksDBStateBackend.");
/**
- * This determines if compaction filter to cleanup state with TTL is
enabled.
- *
- * @deprecated the option will be removed in the future and should only
be used
- * when experiencing serious performance degradations.
- */
- @Deprecated
- public static final ConfigOption<Boolean> TTL_COMPACT_FILTER_ENABLED =
ConfigOptions
- .key("state.backend.rocksdb.ttl.compaction.filter.enabled")
- .defaultValue(true)
- .withDescription("This determines if compaction filter to
cleanup state with TTL is enabled for backend. " +
- "Note: User can still decide in state TTL configuration
in state descriptor " +
- "whether the filter is active for particular state or
not.");
- /**
* The predefined settings for RocksDB DBOptions and
ColumnFamilyOptions by Flink community.
*/
@Documentation.Section(Documentation.Sections.EXPERT_ROCKSDB)
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index add3cf1..4de126e 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -74,7 +74,6 @@ import java.util.UUID;
import static
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.WRITE_BATCH_SIZE;
import static
org.apache.flink.contrib.streaming.state.RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM;
import static
org.apache.flink.contrib.streaming.state.RocksDBOptions.TIMER_SERVICE_FACTORY;
-import static
org.apache.flink.contrib.streaming.state.RocksDBOptions.TTL_COMPACT_FILTER_ENABLED;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -141,14 +140,6 @@ public class RocksDBStateBackend extends
AbstractStateBackend implements Configu
/** Thread number used to transfer (download and upload) state, default
value: 1. */
private int numberOfTransferThreads;
- /**
- * This determines if compaction filter to cleanup state with TTL is
enabled.
- *
- * <p>Note: User can still decide in state TTL configuration in state
descriptor
- * whether the filter is active for particular state or not.
- */
- private TernaryBoolean enableTtlCompactionFilter;
-
/** The configuration for memory settings (pool sizes, etc.). */
private final RocksDBMemoryConfiguration memoryConfiguration;
@@ -276,7 +267,6 @@ public class RocksDBStateBackend extends
AbstractStateBackend implements Configu
this.enableIncrementalCheckpointing =
enableIncrementalCheckpointing;
this.numberOfTransferThreads =
UNDEFINED_NUMBER_OF_TRANSFER_THREADS;
this.defaultMetricOptions = new RocksDBNativeMetricOptions();
- this.enableTtlCompactionFilter = TernaryBoolean.UNDEFINED;
this.memoryConfiguration = new RocksDBMemoryConfiguration();
this.writeBatchSize = UNDEFINED_WRITE_BATCH_SIZE;
}
@@ -326,8 +316,6 @@ public class RocksDBStateBackend extends
AbstractStateBackend implements Configu
} else {
this.writeBatchSize = original.writeBatchSize;
}
- this.enableTtlCompactionFilter =
original.enableTtlCompactionFilter
-
.resolveUndefined(config.get(TTL_COMPACT_FILTER_ENABLED));
this.memoryConfiguration =
RocksDBMemoryConfiguration.fromOtherAndConfiguration(original.memoryConfiguration,
config);
this.memoryConfiguration.validate();
@@ -541,7 +529,6 @@ public class RocksDBStateBackend extends
AbstractStateBackend implements Configu
cancelStreamRegistry
)
.setEnableIncrementalCheckpointing(isIncrementalCheckpointsEnabled())
-
.setEnableTtlCompactionFilter(isTtlCompactionFilterEnabled())
.setNumberOfTransferingThreads(getNumberOfTransferThreads())
.setNativeMetricOptions(resourceContainer.getMemoryWatcherOptions(defaultMetricOptions))
.setWriteBatchSize(getWriteBatchSize());
@@ -735,42 +722,6 @@ public class RocksDBStateBackend extends
AbstractStateBackend implements Configu
}
/**
- * Gets whether incremental checkpoints are enabled for this state
backend.
- *
- * @deprecated enabled by default and will be removed in the future.
- */
- @Deprecated
- public boolean isTtlCompactionFilterEnabled() {
- return
enableTtlCompactionFilter.getOrDefault(TTL_COMPACT_FILTER_ENABLED.defaultValue());
- }
-
- /**
- * Enable compaction filter to cleanup state with TTL.
- *
- * <p>Note: User can still decide in state TTL configuration in state
descriptor
- * whether the filter is active for particular state or not.
- *
- * @deprecated enabled by default and will be removed in the future.
- */
- @Deprecated
- public void enableTtlCompactionFilter() {
- enableTtlCompactionFilter = TernaryBoolean.TRUE;
- }
-
- /**
- * Disable compaction filter to cleanup state with TTL.
- *
- * <p>Note: This is an advanced option and the method should only be
used
- * when experiencing serious performance degradations during compaction
in RocksDB.
- *
- * @deprecated enabled by default and will be removed in the future.
- */
- @Deprecated
- public void disableTtlCompactionFilter() {
- enableTtlCompactionFilter = TernaryBoolean.FALSE;
- }
-
- /**
* Gets the type of the priority queue state. It will fallback to the
default value, if it is not explicitly set.
* @return The type of the priority queue state.
*/
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ttl/RocksDbTtlCompactFiltersManager.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ttl/RocksDbTtlCompactFiltersManager.java
index affea5b..b1a013b 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ttl/RocksDbTtlCompactFiltersManager.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ttl/RocksDbTtlCompactFiltersManager.java
@@ -52,16 +52,12 @@ import java.util.LinkedHashMap;
public class RocksDbTtlCompactFiltersManager {
private static final Logger LOG =
LoggerFactory.getLogger(FlinkCompactionFilter.class);
- /** Enables RocksDb compaction filter for State with TTL. */
- private final boolean enableTtlCompactionFilter;
-
private final TtlTimeProvider ttlTimeProvider;
/** Registered compaction filter factories. */
private final LinkedHashMap<String, FlinkCompactionFilterFactory>
compactionFilterFactories;
- public RocksDbTtlCompactFiltersManager(boolean
enableTtlCompactionFilter, TtlTimeProvider ttlTimeProvider) {
- this.enableTtlCompactionFilter = enableTtlCompactionFilter;
+ public RocksDbTtlCompactFiltersManager(TtlTimeProvider ttlTimeProvider)
{
this.ttlTimeProvider = ttlTimeProvider;
this.compactionFilterFactories = new LinkedHashMap<>();
}
@@ -70,7 +66,7 @@ public class RocksDbTtlCompactFiltersManager {
@Nonnull RegisteredStateMetaInfoBase metaInfoBase,
@Nonnull ColumnFamilyOptions options) {
- if (enableTtlCompactionFilter && metaInfoBase instanceof
RegisteredKeyValueStateBackendMetaInfo) {
+ if (metaInfoBase instanceof
RegisteredKeyValueStateBackendMetaInfo) {
RegisteredKeyValueStateBackendMetaInfo kvMetaInfoBase =
(RegisteredKeyValueStateBackendMetaInfo) metaInfoBase;
if
(TtlStateFactory.TtlSerializer.isTtlStateSerializer(kvMetaInfoBase.getStateSerializer()))
{
createAndSetCompactFilterFactory(metaInfoBase.getName(), options);
@@ -109,11 +105,6 @@ public class RocksDbTtlCompactFiltersManager {
TypeSerializer<?> stateSerializer) {
StateTtlConfig ttlConfig = stateDesc.getTtlConfig();
if (ttlConfig.isEnabled() &&
ttlConfig.getCleanupStrategies().inRocksdbCompactFilter()) {
- if (!enableTtlCompactionFilter) {
- LOG.warn("Cannot configure RocksDB TTL
compaction filter for state <{}>: " +
- "feature is disabled for the state
backend.", stateDesc.getName());
- return;
- }
FlinkCompactionFilterFactory compactionFilterFactory =
compactionFilterFactories.get(stateDesc.getName());
Preconditions.checkNotNull(compactionFilterFactory);
long ttl = ttlConfig.getTtl().toMilliseconds();
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/ttl/RocksDBTtlStateTestBase.java
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/ttl/RocksDBTtlStateTestBase.java
index 4129c00..fcc90c1 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/ttl/RocksDBTtlStateTestBase.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/ttl/RocksDBTtlStateTestBase.java
@@ -38,7 +38,6 @@ import org.rocksdb.RocksDBException;
import java.io.IOException;
-import static
org.apache.flink.contrib.streaming.state.RocksDBOptions.TTL_COMPACT_FILTER_ENABLED;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
@@ -70,7 +69,6 @@ public abstract class RocksDBTtlStateTestBase extends
TtlStateTestBase {
}
RocksDBStateBackend backend = new RocksDBStateBackend(new
FsStateBackend(checkpointPath), enableIncrementalCheckpointing);
Configuration config = new Configuration();
- config.setBoolean(TTL_COMPACT_FILTER_ENABLED, true);
backend = backend.configure(config,
Thread.currentThread().getContextClassLoader());
backend.setDbStoragePath(dbPath);
return backend;