This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 6ab4891  [FLINK-15621][state][TTL] Remove deprecated option and method 
to disable TTL compaction filter
6ab4891 is described below

commit 6ab48912faf5355fd8d754b98ce3ea75890d2134
Author: Jiayi Liao <[email protected]>
AuthorDate: Mon May 25 12:57:51 2020 +0800

    [FLINK-15621][state][TTL] Remove deprecated option and method to disable 
TTL compaction filter
    
    This closes #12307.
---
 .../flink/api/common/state/StateTtlConfig.java     | 24 +----------
 .../tests/DataStreamStateTTLTestProgram.java       |  4 --
 flink-python/pyflink/datastream/state_backend.py   | 45 --------------------
 .../pyflink/datastream/tests/test_state_backend.py | 10 -----
 .../state/RocksDBKeyedStateBackendBuilder.java     | 10 +----
 .../contrib/streaming/state/RocksDBOptions.java    | 13 ------
 .../streaming/state/RocksDBStateBackend.java       | 49 ----------------------
 .../state/ttl/RocksDbTtlCompactFiltersManager.java | 13 +-----
 .../state/ttl/RocksDBTtlStateTestBase.java         |  2 -
 9 files changed, 5 insertions(+), 165 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
index 9ac0c39..1f42db4 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
@@ -271,28 +271,6 @@ public class StateTtlConfig implements Serializable {
                /**
                 * Cleanup expired state while Rocksdb compaction is running.
                 *
-                * <p>RocksDB runs periodic compaction of state updates and 
merges them to free storage.
-                * During this process, the TTL filter checks timestamp of 
state entries and drops expired ones.
-                * The feature has to be activated in RocksDb backend firstly
-                * using the following Flink configuration option:
-                * state.backend.rocksdb.ttl.compaction.filter.enabled.
-                *
-                * <p>Due to specifics of RocksDB compaction filter,
-                * cleanup is not properly guaranteed if put and merge 
operations are used at the same time:
-                * 
https://github.com/facebook/rocksdb/blob/master/include/rocksdb/compaction_filter.h#L69
-                * It means that the TTL filter should be tested for List state 
taking into account this caveat.
-                *
-                * @deprecated Use more general configuration method {@link 
#cleanupInRocksdbCompactFilter(long)} instead
-                */
-               @Nonnull
-               @Deprecated
-               public Builder cleanupInRocksdbCompactFilter() {
-                       return cleanupInRocksdbCompactFilter(1000L);
-               }
-
-               /**
-                * Cleanup expired state while Rocksdb compaction is running.
-                *
                 * <p>RocksDB compaction filter will query current timestamp,
                 * used to check expiration, from Flink every time after 
processing {@code queryTimeAfterNumEntries} number of state entries.
                 * Updating the timestamp more often can improve cleanup speed
@@ -312,7 +290,7 @@ public class StateTtlConfig implements Serializable {
                 * Disable default cleanup of expired state in background 
(enabled by default).
                 *
                 * <p>If some specific cleanup is configured, e.g. {@link 
#cleanupIncrementally(int, boolean)} or
-                * {@link #cleanupInRocksdbCompactFilter()}, this setting does 
not disable it.
+                * {@link #cleanupInRocksdbCompactFilter(long)}, this setting 
does not disable it.
                 */
                @Nonnull
                public Builder disableCleanupInBackground() {
diff --git 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java
 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java
index 21baed5..703a7c3 100644
--- 
a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java
+++ 
b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.tests;
 
 import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
@@ -81,9 +80,6 @@ public class DataStreamStateTTLTestProgram {
                final MonotonicTTLTimeProvider ttlTimeProvider = new 
MonotonicTTLTimeProvider();
 
                final StateBackend configuredBackend = env.getStateBackend();
-               if (configuredBackend instanceof RocksDBStateBackend) {
-                       ((RocksDBStateBackend) 
configuredBackend).enableTtlCompactionFilter();
-               }
                final StateBackend stubBackend = new 
StubStateBackend(configuredBackend, ttlTimeProvider);
                env.setStateBackend(stubBackend);
        }
diff --git a/flink-python/pyflink/datastream/state_backend.py 
b/flink-python/pyflink/datastream/state_backend.py
index d894137..d975195 100644
--- a/flink-python/pyflink/datastream/state_backend.py
+++ b/flink-python/pyflink/datastream/state_backend.py
@@ -16,8 +16,6 @@
 # limitations under the License.
 
################################################################################
 
-import warnings
-
 from abc import ABCMeta
 
 from py4j.java_gateway import get_java_class
@@ -559,49 +557,6 @@ class RocksDBStateBackend(StateBackend):
         """
         return self._j_rocks_db_state_backend.isIncrementalCheckpointsEnabled()
 
-    def is_ttl_compaction_filter_enabled(self):
-        """
-        Gets whether compaction filter to cleanup state with TTL is enabled.
-
-        :return: True if enabled, false otherwise.
-
-        .. note:: Deprecated in 1.10. Enabled by default and will be removed 
in the future.
-        """
-        warnings.warn(
-            "Deprecated in 1.10. Enabled by default and will be removed in the 
future.",
-            DeprecationWarning)
-        return self._j_rocks_db_state_backend.isTtlCompactionFilterEnabled()
-
-    def enable_ttl_compaction_filter(self):
-        """
-        Enable compaction filter to cleanup state with TTL.
-
-        .. note::
-            User can still decide in state TTL configuration in state 
descriptor
-            whether the filter is active for particular state or not.
-
-        .. note:: Deprecated in 1.10. Enabled by default and will be removed 
in the future.
-        """
-        warnings.warn(
-            "Deprecated in 1.10. Enabled by default and will be removed in the 
future.",
-            DeprecationWarning)
-        self._j_rocks_db_state_backend.enableTtlCompactionFilter()
-
-    def disable_ttl_compaction_filter(self):
-        """
-        Disable compaction filter to cleanup state with TTL.
-
-        .. note::
-            This is an advanced option and the method should only be used
-            when experiencing serious performance degradations during 
compaction in RocksDB.
-
-        .. note:: Deprecated in 1.10. Enabled by default and will be removed 
in the future.
-        """
-        warnings.warn(
-            "Deprecated in 1.10. Enabled by default and will be removed in the 
future.",
-            DeprecationWarning)
-        self._j_rocks_db_state_backend.disableTtlCompactionFilter()
-
     def set_predefined_options(self, options):
         """
         Sets the predefined options for RocksDB.
diff --git a/flink-python/pyflink/datastream/tests/test_state_backend.py 
b/flink-python/pyflink/datastream/tests/test_state_backend.py
index 60a376f..4f3249f 100644
--- a/flink-python/pyflink/datastream/tests/test_state_backend.py
+++ b/flink-python/pyflink/datastream/tests/test_state_backend.py
@@ -154,16 +154,6 @@ class RocksDBStateBackendTests(PyFlinkTestCase):
         state_backend.set_db_storage_paths(*storage_path)
         self.assertEqual(state_backend.get_db_storage_paths(), expected)
 
-    def test_get_set_ttl_compaction_filter(self):
-
-        state_backend = RocksDBStateBackend("file://var/checkpoints/")
-
-        self.assertTrue(state_backend.is_ttl_compaction_filter_enabled())
-
-        state_backend.disable_ttl_compaction_filter()
-
-        self.assertFalse(state_backend.is_ttl_compaction_filter_enabled())
-
     def test_get_set_predefined_options(self):
 
         state_backend = RocksDBStateBackend("file://var/checkpoints/")
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index 70c438e..d1a0184 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -108,8 +108,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
 
        /** True if incremental checkpointing is enabled. */
        private boolean enableIncrementalCheckpointing;
-       /** True if ttl compaction filter is enabled. */
-       private boolean enableTtlCompactionFilter;
+
        private RocksDBNativeMetricOptions nativeMetricOptions;
        private int numberOfTransferingThreads;
        private long writeBatchSize = 
RocksDBConfigurableOptions.WRITE_BATCH_SIZE.defaultValue().getBytes();
@@ -211,11 +210,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                return this;
        }
 
-       RocksDBKeyedStateBackendBuilder<K> setEnableTtlCompactionFilter(boolean 
enableTtlCompactionFilter) {
-               this.enableTtlCompactionFilter = enableTtlCompactionFilter;
-               return this;
-       }
-
        RocksDBKeyedStateBackendBuilder<K> 
setNativeMetricOptions(RocksDBNativeMetricOptions nativeMetricOptions) {
                this.nativeMetricOptions = nativeMetricOptions;
                return this;
@@ -254,7 +248,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                RocksDB db = null;
                AbstractRocksDBRestoreOperation restoreOperation = null;
                RocksDbTtlCompactFiltersManager ttlCompactFiltersManager =
-                       new 
RocksDbTtlCompactFiltersManager(enableTtlCompactionFilter, ttlTimeProvider);
+                       new RocksDbTtlCompactFiltersManager(ttlTimeProvider);
 
                ResourceGuard rocksDBResourceGuard = new ResourceGuard();
                SnapshotStrategy<K> snapshotStrategy;
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
index 9b2e082..457ca33 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
@@ -66,19 +66,6 @@ public class RocksDBOptions {
                .withDescription("The number of threads (per stateful operator) 
used to transfer (download and upload) files in RocksDBStateBackend.");
 
        /**
-        * This determines if compaction filter to cleanup state with TTL is 
enabled.
-        *
-        * @deprecated the option will be removed in the future and should only 
be used
-        * when experiencing serious performance degradations.
-        */
-       @Deprecated
-       public static final ConfigOption<Boolean> TTL_COMPACT_FILTER_ENABLED = 
ConfigOptions
-               .key("state.backend.rocksdb.ttl.compaction.filter.enabled")
-               .defaultValue(true)
-               .withDescription("This determines if compaction filter to 
cleanup state with TTL is enabled for backend. " +
-                       "Note: User can still decide in state TTL configuration 
in state descriptor " +
-                       "whether the filter is active for particular state or 
not.");
-       /**
         * The predefined settings for RocksDB DBOptions and 
ColumnFamilyOptions by Flink community.
         */
        @Documentation.Section(Documentation.Sections.EXPERT_ROCKSDB)
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index add3cf1..4de126e 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -74,7 +74,6 @@ import java.util.UUID;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.WRITE_BATCH_SIZE;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBOptions.TIMER_SERVICE_FACTORY;
-import static 
org.apache.flink.contrib.streaming.state.RocksDBOptions.TTL_COMPACT_FILTER_ENABLED;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -141,14 +140,6 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
        /** Thread number used to transfer (download and upload) state, default 
value: 1. */
        private int numberOfTransferThreads;
 
-       /**
-        * This determines if compaction filter to cleanup state with TTL is 
enabled.
-        *
-        * <p>Note: User can still decide in state TTL configuration in state 
descriptor
-        * whether the filter is active for particular state or not.
-        */
-       private TernaryBoolean enableTtlCompactionFilter;
-
        /** The configuration for memory settings (pool sizes, etc.). */
        private final RocksDBMemoryConfiguration memoryConfiguration;
 
@@ -276,7 +267,6 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
                this.enableIncrementalCheckpointing = 
enableIncrementalCheckpointing;
                this.numberOfTransferThreads = 
UNDEFINED_NUMBER_OF_TRANSFER_THREADS;
                this.defaultMetricOptions = new RocksDBNativeMetricOptions();
-               this.enableTtlCompactionFilter = TernaryBoolean.UNDEFINED;
                this.memoryConfiguration = new RocksDBMemoryConfiguration();
                this.writeBatchSize = UNDEFINED_WRITE_BATCH_SIZE;
        }
@@ -326,8 +316,6 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
                } else {
                        this.writeBatchSize = original.writeBatchSize;
                }
-               this.enableTtlCompactionFilter = 
original.enableTtlCompactionFilter
-                       
.resolveUndefined(config.get(TTL_COMPACT_FILTER_ENABLED));
 
                this.memoryConfiguration = 
RocksDBMemoryConfiguration.fromOtherAndConfiguration(original.memoryConfiguration,
 config);
                this.memoryConfiguration.validate();
@@ -541,7 +529,6 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
                        cancelStreamRegistry
                )
                        
.setEnableIncrementalCheckpointing(isIncrementalCheckpointsEnabled())
-                       
.setEnableTtlCompactionFilter(isTtlCompactionFilterEnabled())
                        
.setNumberOfTransferingThreads(getNumberOfTransferThreads())
                        
.setNativeMetricOptions(resourceContainer.getMemoryWatcherOptions(defaultMetricOptions))
                        .setWriteBatchSize(getWriteBatchSize());
@@ -735,42 +722,6 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
        }
 
        /**
-        * Gets whether incremental checkpoints are enabled for this state 
backend.
-        *
-        * @deprecated enabled by default and will be removed in the future.
-        */
-       @Deprecated
-       public boolean isTtlCompactionFilterEnabled() {
-               return 
enableTtlCompactionFilter.getOrDefault(TTL_COMPACT_FILTER_ENABLED.defaultValue());
-       }
-
-       /**
-        * Enable compaction filter to cleanup state with TTL.
-        *
-        * <p>Note: User can still decide in state TTL configuration in state 
descriptor
-        * whether the filter is active for particular state or not.
-        *
-        * @deprecated enabled by default and will be removed in the future.
-        */
-       @Deprecated
-       public void enableTtlCompactionFilter() {
-               enableTtlCompactionFilter = TernaryBoolean.TRUE;
-       }
-
-       /**
-        * Disable compaction filter to cleanup state with TTL.
-        *
-        * <p>Note: This is an advanced option and the method should only be 
used
-        * when experiencing serious performance degradations during compaction 
in RocksDB.
-        *
-        * @deprecated enabled by default and will be removed in the future.
-        */
-       @Deprecated
-       public void disableTtlCompactionFilter() {
-               enableTtlCompactionFilter = TernaryBoolean.FALSE;
-       }
-
-       /**
         * Gets the type of the priority queue state. It will fallback to the 
default value, if it is not explicitly set.
         * @return The type of the priority queue state.
         */
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ttl/RocksDbTtlCompactFiltersManager.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ttl/RocksDbTtlCompactFiltersManager.java
index affea5b..b1a013b 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ttl/RocksDbTtlCompactFiltersManager.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ttl/RocksDbTtlCompactFiltersManager.java
@@ -52,16 +52,12 @@ import java.util.LinkedHashMap;
 public class RocksDbTtlCompactFiltersManager {
        private static final Logger LOG = 
LoggerFactory.getLogger(FlinkCompactionFilter.class);
 
-       /** Enables RocksDb compaction filter for State with TTL. */
-       private final boolean enableTtlCompactionFilter;
-
        private final TtlTimeProvider ttlTimeProvider;
 
        /** Registered compaction filter factories. */
        private final LinkedHashMap<String, FlinkCompactionFilterFactory> 
compactionFilterFactories;
 
-       public RocksDbTtlCompactFiltersManager(boolean 
enableTtlCompactionFilter, TtlTimeProvider ttlTimeProvider) {
-               this.enableTtlCompactionFilter = enableTtlCompactionFilter;
+       public RocksDbTtlCompactFiltersManager(TtlTimeProvider ttlTimeProvider) 
{
                this.ttlTimeProvider = ttlTimeProvider;
                this.compactionFilterFactories = new LinkedHashMap<>();
        }
@@ -70,7 +66,7 @@ public class RocksDbTtlCompactFiltersManager {
                @Nonnull RegisteredStateMetaInfoBase metaInfoBase,
                @Nonnull ColumnFamilyOptions options) {
 
-               if (enableTtlCompactionFilter && metaInfoBase instanceof 
RegisteredKeyValueStateBackendMetaInfo) {
+               if (metaInfoBase instanceof 
RegisteredKeyValueStateBackendMetaInfo) {
                        RegisteredKeyValueStateBackendMetaInfo kvMetaInfoBase = 
(RegisteredKeyValueStateBackendMetaInfo) metaInfoBase;
                        if 
(TtlStateFactory.TtlSerializer.isTtlStateSerializer(kvMetaInfoBase.getStateSerializer()))
 {
                                
createAndSetCompactFilterFactory(metaInfoBase.getName(), options);
@@ -109,11 +105,6 @@ public class RocksDbTtlCompactFiltersManager {
                        TypeSerializer<?> stateSerializer) {
                StateTtlConfig ttlConfig = stateDesc.getTtlConfig();
                if (ttlConfig.isEnabled() && 
ttlConfig.getCleanupStrategies().inRocksdbCompactFilter()) {
-                       if (!enableTtlCompactionFilter) {
-                               LOG.warn("Cannot configure RocksDB TTL 
compaction filter for state <{}>: " +
-                                       "feature is disabled for the state 
backend.", stateDesc.getName());
-                               return;
-                       }
                        FlinkCompactionFilterFactory compactionFilterFactory = 
compactionFilterFactories.get(stateDesc.getName());
                        Preconditions.checkNotNull(compactionFilterFactory);
                        long ttl = ttlConfig.getTtl().toMilliseconds();
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/ttl/RocksDBTtlStateTestBase.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/ttl/RocksDBTtlStateTestBase.java
index 4129c00..fcc90c1 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/ttl/RocksDBTtlStateTestBase.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/ttl/RocksDBTtlStateTestBase.java
@@ -38,7 +38,6 @@ import org.rocksdb.RocksDBException;
 
 import java.io.IOException;
 
-import static 
org.apache.flink.contrib.streaming.state.RocksDBOptions.TTL_COMPACT_FILTER_ENABLED;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 
@@ -70,7 +69,6 @@ public abstract class RocksDBTtlStateTestBase extends 
TtlStateTestBase {
                }
                RocksDBStateBackend backend = new RocksDBStateBackend(new 
FsStateBackend(checkpointPath), enableIncrementalCheckpointing);
                Configuration config = new Configuration();
-               config.setBoolean(TTL_COMPACT_FILTER_ENABLED, true);
                backend = backend.configure(config, 
Thread.currentThread().getContextClassLoader());
                backend.setDbStoragePath(dbPath);
                return backend;

Reply via email to