This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 38fe9c0  [FLINK-26560][state] Make the threshold of the overlap 
fraction of incremental restoring configurable
38fe9c0 is described below

commit 38fe9c0b34ebec376aa4c13d88859dc021346b04
Author: fredia <[email protected]>
AuthorDate: Wed Mar 16 12:43:24 2022 +0800

    [FLINK-26560][state] Make the threshold of the overlap fraction of 
incremental restoring configurable
    
    This closes #19106.
---
 .../rocksdb_configurable_configuration.html        |  6 +++
 .../state/EmbeddedRocksDBStateBackend.java         | 21 +++++++-
 .../state/RocksDBConfigurableOptions.java          | 12 ++++-
 .../state/RocksDBIncrementalCheckpointUtils.java   | 58 ++++++++++------------
 .../state/RocksDBKeyedStateBackendBuilder.java     | 11 +++-
 .../RocksDBIncrementalRestoreOperation.java        |  7 ++-
 .../RocksDBIncrementalCheckpointUtilsTest.java     | 22 +++++---
 .../state/RocksDBStateBackendConfigTest.java       |  4 ++
 8 files changed, 99 insertions(+), 42 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html 
b/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html
index d6bb28f..180a3bd 100644
--- a/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html
+++ b/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html
@@ -93,6 +93,12 @@
             <td>The maximum size of RocksDB's file used for information 
logging. If the log files becomes larger than this, a new file will be created. 
If 0, all logs will be written to one log file. The default maximum file size 
is '25MB'. </td>
         </tr>
         <tr>
+            
<td><h5>state.backend.rocksdb.restore-overlap-fraction-threshold</h5></td>
+            <td style="word-wrap: break-word;">0.75</td>
+            <td>Double</td>
+            <td>The threshold of the overlap fraction between the handle's 
key-group range and target key-group range. When restore base DB, only the 
handle which overlap fraction greater than or equal to *threshold* has a chance 
to be an initial handle.</td>
+        </tr>
+        <tr>
             <td><h5>state.backend.rocksdb.thread.num</h5></td>
             <td style="word-wrap: break-word;">2</td>
             <td>Integer</td>
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
index bd906e2..6e6a9c9 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
@@ -77,6 +77,7 @@ import java.util.Random;
 import java.util.UUID;
 
 import static org.apache.flink.configuration.description.TextElement.text;
+import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.WRITE_BATCH_SIZE;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBOptions.TIMER_SERVICE_FACTORY;
@@ -112,6 +113,8 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
 
     private static final long UNDEFINED_WRITE_BATCH_SIZE = -1;
 
+    private static final double UNDEFINED_OVERLAP_FRACTION_THRESHOLD = -1;
+
     // ------------------------------------------------------------------------
 
     // -- configuration values, set in the application / configuration
@@ -167,6 +170,11 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
      */
     private long writeBatchSize;
 
+    /**
+     * The threshold of the overlap fraction between the handle's key-group 
range and target
+     * key-group range.
+     */
+    private double overlapFractionThreshold;
     // ------------------------------------------------------------------------
 
     /** Creates a new {@code EmbeddedRocksDBStateBackend} for storing local 
state. */
@@ -194,6 +202,7 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
         this.defaultMetricOptions = new RocksDBNativeMetricOptions();
         this.memoryConfiguration = new RocksDBMemoryConfiguration();
         this.writeBatchSize = UNDEFINED_WRITE_BATCH_SIZE;
+        this.overlapFractionThreshold = UNDEFINED_OVERLAP_FRACTION_THRESHOLD;
     }
 
     /**
@@ -280,6 +289,15 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
 
         // configure latency tracking
         latencyTrackingConfigBuilder = 
original.latencyTrackingConfigBuilder.configure(config);
+
+        // configure overlap fraction threshold
+        overlapFractionThreshold =
+                original.overlapFractionThreshold == 
UNDEFINED_OVERLAP_FRACTION_THRESHOLD
+                        ? config.get(RESTORE_OVERLAP_FRACTION_THRESHOLD)
+                        : original.overlapFractionThreshold;
+        checkArgument(
+                overlapFractionThreshold >= 0 && this.overlapFractionThreshold 
<= 1,
+                "Overlap fraction threshold of restoring should be between 0 
and 1");
     }
 
     // ------------------------------------------------------------------------
@@ -479,7 +497,8 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
                         
.setNumberOfTransferingThreads(getNumberOfTransferThreads())
                         .setNativeMetricOptions(
                                 
resourceContainer.getMemoryWatcherOptions(defaultMetricOptions))
-                        .setWriteBatchSize(getWriteBatchSize());
+                        .setWriteBatchSize(getWriteBatchSize())
+                        .setOverlapFractionThreshold(overlapFractionThreshold);
         return builder.build();
     }
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
index 8e6e389..c914736 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
@@ -255,6 +255,15 @@ public class RocksDBConfigurableOptions implements 
Serializable {
                             "If true, RocksDB will use block-based filter 
instead of full filter, this only take effect when bloom filter is used. "
                                     + "The default value is 'false'.");
 
+    public static final ConfigOption<Double> 
RESTORE_OVERLAP_FRACTION_THRESHOLD =
+            key("state.backend.rocksdb.restore-overlap-fraction-threshold")
+                    .doubleType()
+                    .defaultValue(0.75)
+                    .withDescription(
+                            "The threshold of the overlap fraction between the 
handle's key-group range and target key-group range. "
+                                    + "When restore base DB, only the handle 
which overlap fraction greater than or equal to *threshold* "
+                                    + "has a chance to be an initial handle.");
+
     static final ConfigOption<?>[] CANDIDATE_CONFIGS =
             new ConfigOption<?>[] {
                 // configurable DBOptions
@@ -278,7 +287,8 @@ public class RocksDBConfigurableOptions implements 
Serializable {
                 BLOCK_CACHE_SIZE,
                 USE_BLOOM_FILTER,
                 BLOOM_FILTER_BITS_PER_KEY,
-                BLOOM_FILTER_BLOCK_BASED_MODE
+                BLOOM_FILTER_BLOCK_BASED_MODE,
+                RESTORE_OVERLAP_FRACTION_THRESHOLD
             };
 
     private static final Set<ConfigOption<?>> POSITIVE_INT_CONFIG_SET =
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
index 579fef2..4f73ff8 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
@@ -33,39 +33,32 @@ import javax.annotation.Nullable;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
-import java.util.function.BiFunction;
 
 /** Utils for RocksDB Incremental Checkpoint. */
 public class RocksDBIncrementalCheckpointUtils {
-
-    /**
-     * The threshold of the overlap fraction of the handle's key-group range 
with target key-group
-     * range to be an initial handle.
-     */
-    private static final double OVERLAP_FRACTION_THRESHOLD = 0.75;
-
     /**
      * Evaluates state handle's "score" regarding to the target range when 
choosing the best state
-     * handle to init the initial db for recovery, if the overlap fraction is 
less than {@link
-     * #OVERLAP_FRACTION_THRESHOLD}, then just return {@code Score.MIN} to 
mean the handle has no
-     * chance to be the initial handle.
+     * handle to init the initial db for recovery, if the overlap fraction is 
less than
+     * overlapFractionThreshold, then just return {@code Score.MIN} to mean 
the handle has no chance
+     * to be the initial handle.
      */
-    private static final BiFunction<KeyedStateHandle, KeyGroupRange, Score> 
STATE_HANDLE_EVALUATOR =
-            (stateHandle, targetKeyGroupRange) -> {
-                final KeyGroupRange handleKeyGroupRange = 
stateHandle.getKeyGroupRange();
-                final KeyGroupRange intersectGroup =
-                        
handleKeyGroupRange.getIntersection(targetKeyGroupRange);
-
-                final double overlapFraction =
-                        (double) intersectGroup.getNumberOfKeyGroups()
-                                / handleKeyGroupRange.getNumberOfKeyGroups();
-
-                if (overlapFraction < OVERLAP_FRACTION_THRESHOLD) {
-                    return Score.MIN;
-                }
-
-                return new Score(intersectGroup.getNumberOfKeyGroups(), 
overlapFraction);
-            };
+    private static Score stateHandleEvaluator(
+            KeyedStateHandle stateHandle,
+            KeyGroupRange targetKeyGroupRange,
+            double overlapFractionThreshold) {
+        final KeyGroupRange handleKeyGroupRange = 
stateHandle.getKeyGroupRange();
+        final KeyGroupRange intersectGroup =
+                handleKeyGroupRange.getIntersection(targetKeyGroupRange);
+
+        final double overlapFraction =
+                (double) intersectGroup.getNumberOfKeyGroups()
+                        / handleKeyGroupRange.getNumberOfKeyGroups();
+
+        if (overlapFraction < overlapFractionThreshold) {
+            return Score.MIN;
+        }
+        return new Score(intersectGroup.getNumberOfKeyGroups(), 
overlapFraction);
+    }
 
     /**
      * Score of the state handle, intersect group range is compared first, and 
then compare the
@@ -193,8 +186,8 @@ public class RocksDBIncrementalCheckpointUtils {
     }
 
     /**
-     * Choose the best state handle according to the {@link 
#STATE_HANDLE_EVALUATOR} to init the
-     * initial db.
+     * Choose the best state handle according to the {@link 
#stateHandleEvaluator(KeyedStateHandle,
+     * KeyGroupRange, double)} to init the initial db.
      *
      * @param restoreStateHandles The candidate state handles.
      * @param targetKeyGroupRange The target key group range.
@@ -203,12 +196,15 @@ public class RocksDBIncrementalCheckpointUtils {
     @Nullable
     public static KeyedStateHandle chooseTheBestStateHandleForInitial(
             @Nonnull Collection<KeyedStateHandle> restoreStateHandles,
-            @Nonnull KeyGroupRange targetKeyGroupRange) {
+            @Nonnull KeyGroupRange targetKeyGroupRange,
+            double overlapFractionThreshold) {
 
         KeyedStateHandle bestStateHandle = null;
         Score bestScore = Score.MIN;
         for (KeyedStateHandle rawStateHandle : restoreStateHandles) {
-            Score handleScore = STATE_HANDLE_EVALUATOR.apply(rawStateHandle, 
targetKeyGroupRange);
+            Score handleScore =
+                    stateHandleEvaluator(
+                            rawStateHandle, targetKeyGroupRange, 
overlapFractionThreshold);
             if (handleScore.compareTo(bestScore) > 0) {
                 bestStateHandle = rawStateHandle;
                 bestScore = handleScore;
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index d8114e6..8706a90 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -76,6 +76,7 @@ import java.util.TreeMap;
 import java.util.UUID;
 import java.util.function.Function;
 
+import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
@@ -119,6 +120,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
             
RocksDBConfigurableOptions.WRITE_BATCH_SIZE.defaultValue().getBytes();
 
     private RocksDB injectedTestDB; // for testing
+    private double overlapFractionThreshold = 
RESTORE_OVERLAP_FRACTION_THRESHOLD.defaultValue();
     private ColumnFamilyHandle injectedDefaultColumnFamilyHandle; // for 
testing
     private RocksDBStateUploader injectRocksDBStateUploader; // for testing
 
@@ -254,6 +256,12 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
         return this;
     }
 
+    RocksDBKeyedStateBackendBuilder<K> setOverlapFractionThreshold(
+            double overlapFractionThreshold) {
+        this.overlapFractionThreshold = overlapFractionThreshold;
+        return this;
+    }
+
     private static void checkAndCreateDirectory(File directory) throws 
IOException {
         if (directory.exists()) {
             if (!directory.isDirectory()) {
@@ -464,7 +472,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                     restoreStateHandles,
                     ttlCompactFiltersManager,
                     writeBatchSize,
-                    optionsContainer.getWriteBufferManagerCapacity());
+                    optionsContainer.getWriteBufferManagerCapacity(),
+                    overlapFractionThreshold);
         } else if (priorityQueueStateType
                 == EmbeddedRocksDBStateBackend.PriorityQueueStateType.HEAP) {
             return new RocksDBHeapTimersFullRestoreOperation<>(
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index 32bb661..657b6f2 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -99,6 +99,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements 
RocksDBRestoreOper
     private long lastCompletedCheckpointId;
     private UUID backendUID;
     private final long writeBatchSize;
+    private final double overlapFractionThreshold;
 
     private boolean isKeySerializerCompatibilityChecked;
 
@@ -120,7 +121,8 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
             @Nonnull Collection<KeyedStateHandle> restoreStateHandles,
             @Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
             @Nonnegative long writeBatchSize,
-            Long writeBufferManagerCapacity) {
+            Long writeBufferManagerCapacity,
+            double overlapFractionThreshold) {
         this.rocksHandle =
                 new RocksDBHandle(
                         kvStateInformation,
@@ -136,6 +138,7 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
         this.lastCompletedCheckpointId = -1L;
         this.backendUID = UUID.randomUUID();
         this.writeBatchSize = writeBatchSize;
+        this.overlapFractionThreshold = overlapFractionThreshold;
         this.restoreStateHandles = restoreStateHandles;
         this.cancelStreamRegistry = cancelStreamRegistry;
         this.keyGroupRange = keyGroupRange;
@@ -284,7 +287,7 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
         // Prepare for restore with rescaling
         KeyedStateHandle initialHandle =
                 
RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
-                        restoreStateHandles, keyGroupRange);
+                        restoreStateHandles, keyGroupRange, 
overlapFractionThreshold);
 
         // Init base DB instance
         if (initialHandle != null) {
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java
index 4ed4f75..ff6d854 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java
@@ -38,6 +38,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -95,31 +96,38 @@ public class RocksDBIncrementalCheckpointUtilsTest extends 
TestLogger {
         when(keyedStateHandle3.getKeyGroupRange()).thenReturn(new 
KeyGroupRange(8, 12));
         keyedStateHandles.add(keyedStateHandle3);
 
-        // this should choose no one handle.
         Assert.assertNull(
                 
RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
-                        keyedStateHandles, new KeyGroupRange(3, 5)));
+                        keyedStateHandles,
+                        new KeyGroupRange(3, 5),
+                        RESTORE_OVERLAP_FRACTION_THRESHOLD.defaultValue()));
 
         // this should choose keyedStateHandle2, because keyedStateHandle2's 
key-group range
         // satisfies the overlap fraction demand.
         Assert.assertEquals(
                 keyedStateHandle2,
                 
RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
-                        keyedStateHandles, new KeyGroupRange(3, 6)));
+                        keyedStateHandles,
+                        new KeyGroupRange(3, 6),
+                        RESTORE_OVERLAP_FRACTION_THRESHOLD.defaultValue()));
 
         // both keyedStateHandle2 & keyedStateHandle3's key-group range 
satisfies the overlap
         // fraction, but keyedStateHandle3's key group range is better.
         Assert.assertEquals(
                 keyedStateHandle3,
                 
RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
-                        keyedStateHandles, new KeyGroupRange(5, 12)));
+                        keyedStateHandles,
+                        new KeyGroupRange(5, 12),
+                        RESTORE_OVERLAP_FRACTION_THRESHOLD.defaultValue()));
 
         // The intersect key group number of keyedStateHandle2 & 
keyedStateHandle3's with [4, 11]
         // are 4. But the over fraction of keyedStateHandle2 is better.
         Assert.assertEquals(
                 keyedStateHandle2,
                 
RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
-                        keyedStateHandles, new KeyGroupRange(4, 11)));
+                        keyedStateHandles,
+                        new KeyGroupRange(4, 11),
+                        RESTORE_OVERLAP_FRACTION_THRESHOLD.defaultValue()));
 
         // both keyedStateHandle2 & keyedStateHandle3's key-group range are 
covered by [3, 12],
         // but this should choose the keyedStateHandle3, because 
keyedStateHandle3's key-group is
@@ -127,7 +135,9 @@ public class RocksDBIncrementalCheckpointUtilsTest extends 
TestLogger {
         Assert.assertEquals(
                 keyedStateHandle3,
                 
RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
-                        keyedStateHandles, new KeyGroupRange(3, 12)));
+                        keyedStateHandles,
+                        new KeyGroupRange(3, 12),
+                        RESTORE_OVERLAP_FRACTION_THRESHOLD.defaultValue()));
     }
 
     private void testClipDBWithKeyGroupRangeHelper(
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index a1d93df..a3be47c 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -499,6 +499,8 @@ public class RocksDBStateBackendConfigTest {
             verifyIllegalArgument(RocksDBConfigurableOptions.COMPACTION_STYLE, 
"LEV");
             verifyIllegalArgument(RocksDBConfigurableOptions.USE_BLOOM_FILTER, 
"NO");
             
verifyIllegalArgument(RocksDBConfigurableOptions.BLOOM_FILTER_BLOCK_BASED_MODE, 
"YES");
+            verifyIllegalArgument(
+                    
RocksDBConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD, "2");
         }
 
         // verify legal configuration
@@ -521,6 +523,8 @@ public class RocksDBStateBackendConfigTest {
             
configuration.setString(RocksDBConfigurableOptions.METADATA_BLOCK_SIZE.key(), 
"8 kb");
             
configuration.setString(RocksDBConfigurableOptions.BLOCK_CACHE_SIZE.key(), "512 
mb");
             
configuration.setString(RocksDBConfigurableOptions.USE_BLOOM_FILTER.key(), 
"TRUE");
+            configuration.setString(
+                    
RocksDBConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD.key(), "0.5");
 
             try (RocksDBResourceContainer optionsContainer =
                     new RocksDBResourceContainer(

Reply via email to