This is an automated email from the ASF dual-hosted git repository.
tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 38fe9c0 [FLINK-26560][state] Make the threshold of the overlap
fraction of incremental restoring configurable
38fe9c0 is described below
commit 38fe9c0b34ebec376aa4c13d88859dc021346b04
Author: fredia <[email protected]>
AuthorDate: Wed Mar 16 12:43:24 2022 +0800
[FLINK-26560][state] Make the threshold of the overlap fraction of
incremental restoring configurable
This closes #19106.
---
.../rocksdb_configurable_configuration.html | 6 +++
.../state/EmbeddedRocksDBStateBackend.java | 21 +++++++-
.../state/RocksDBConfigurableOptions.java | 12 ++++-
.../state/RocksDBIncrementalCheckpointUtils.java | 58 ++++++++++------------
.../state/RocksDBKeyedStateBackendBuilder.java | 11 +++-
.../RocksDBIncrementalRestoreOperation.java | 7 ++-
.../RocksDBIncrementalCheckpointUtilsTest.java | 22 +++++---
.../state/RocksDBStateBackendConfigTest.java | 4 ++
8 files changed, 99 insertions(+), 42 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html
b/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html
index d6bb28f..180a3bd 100644
--- a/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html
+++ b/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html
@@ -93,6 +93,12 @@
<td>The maximum size of RocksDB's file used for information
logging. If the log files becomes larger than this, a new file will be created.
If 0, all logs will be written to one log file. The default maximum file size
is '25MB'. </td>
</tr>
<tr>
+
<td><h5>state.backend.rocksdb.restore-overlap-fraction-threshold</h5></td>
+ <td style="word-wrap: break-word;">0.75</td>
+ <td>Double</td>
+ <td>The threshold of the overlap fraction between the handle's
key-group range and target key-group range. When restore base DB, only the
handle which overlap fraction greater than or equal to *threshold* has a chance
to be an initial handle.</td>
+ </tr>
+ <tr>
<td><h5>state.backend.rocksdb.thread.num</h5></td>
<td style="word-wrap: break-word;">2</td>
<td>Integer</td>
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
index bd906e2..6e6a9c9 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
@@ -77,6 +77,7 @@ import java.util.Random;
import java.util.UUID;
import static org.apache.flink.configuration.description.TextElement.text;
+import static
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD;
import static
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.WRITE_BATCH_SIZE;
import static
org.apache.flink.contrib.streaming.state.RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM;
import static
org.apache.flink.contrib.streaming.state.RocksDBOptions.TIMER_SERVICE_FACTORY;
@@ -112,6 +113,8 @@ public class EmbeddedRocksDBStateBackend extends
AbstractManagedMemoryStateBacke
private static final long UNDEFINED_WRITE_BATCH_SIZE = -1;
+ private static final double UNDEFINED_OVERLAP_FRACTION_THRESHOLD = -1;
+
// ------------------------------------------------------------------------
// -- configuration values, set in the application / configuration
@@ -167,6 +170,11 @@ public class EmbeddedRocksDBStateBackend extends
AbstractManagedMemoryStateBacke
*/
private long writeBatchSize;
+ /**
+ * The threshold of the overlap fraction between the handle's key-group
range and target
+ * key-group range.
+ */
+ private double overlapFractionThreshold;
// ------------------------------------------------------------------------
/** Creates a new {@code EmbeddedRocksDBStateBackend} for storing local
state. */
@@ -194,6 +202,7 @@ public class EmbeddedRocksDBStateBackend extends
AbstractManagedMemoryStateBacke
this.defaultMetricOptions = new RocksDBNativeMetricOptions();
this.memoryConfiguration = new RocksDBMemoryConfiguration();
this.writeBatchSize = UNDEFINED_WRITE_BATCH_SIZE;
+ this.overlapFractionThreshold = UNDEFINED_OVERLAP_FRACTION_THRESHOLD;
}
/**
@@ -280,6 +289,15 @@ public class EmbeddedRocksDBStateBackend extends
AbstractManagedMemoryStateBacke
// configure latency tracking
latencyTrackingConfigBuilder =
original.latencyTrackingConfigBuilder.configure(config);
+
+ // configure overlap fraction threshold
+ overlapFractionThreshold =
+ original.overlapFractionThreshold ==
UNDEFINED_OVERLAP_FRACTION_THRESHOLD
+ ? config.get(RESTORE_OVERLAP_FRACTION_THRESHOLD)
+ : original.overlapFractionThreshold;
+ checkArgument(
+ overlapFractionThreshold >= 0 && this.overlapFractionThreshold
<= 1,
+ "Overlap fraction threshold of restoring should be between 0
and 1");
}
// ------------------------------------------------------------------------
@@ -479,7 +497,8 @@ public class EmbeddedRocksDBStateBackend extends
AbstractManagedMemoryStateBacke
.setNumberOfTransferingThreads(getNumberOfTransferThreads())
.setNativeMetricOptions(
resourceContainer.getMemoryWatcherOptions(defaultMetricOptions))
- .setWriteBatchSize(getWriteBatchSize());
+ .setWriteBatchSize(getWriteBatchSize())
+ .setOverlapFractionThreshold(overlapFractionThreshold);
return builder.build();
}
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
index 8e6e389..c914736 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
@@ -255,6 +255,15 @@ public class RocksDBConfigurableOptions implements
Serializable {
"If true, RocksDB will use block-based filter
instead of full filter, this only take effect when bloom filter is used. "
+ "The default value is 'false'.");
+ public static final ConfigOption<Double>
RESTORE_OVERLAP_FRACTION_THRESHOLD =
+ key("state.backend.rocksdb.restore-overlap-fraction-threshold")
+ .doubleType()
+ .defaultValue(0.75)
+ .withDescription(
+ "The threshold of the overlap fraction between the
handle's key-group range and target key-group range. "
+ + "When restore base DB, only the handle
which overlap fraction greater than or equal to *threshold* "
+ + "has a chance to be an initial handle.");
+
static final ConfigOption<?>[] CANDIDATE_CONFIGS =
new ConfigOption<?>[] {
// configurable DBOptions
@@ -278,7 +287,8 @@ public class RocksDBConfigurableOptions implements
Serializable {
BLOCK_CACHE_SIZE,
USE_BLOOM_FILTER,
BLOOM_FILTER_BITS_PER_KEY,
- BLOOM_FILTER_BLOCK_BASED_MODE
+ BLOOM_FILTER_BLOCK_BASED_MODE,
+ RESTORE_OVERLAP_FRACTION_THRESHOLD
};
private static final Set<ConfigOption<?>> POSITIVE_INT_CONFIG_SET =
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
index 579fef2..4f73ff8 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
@@ -33,39 +33,32 @@ import javax.annotation.Nullable;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
-import java.util.function.BiFunction;
/** Utils for RocksDB Incremental Checkpoint. */
public class RocksDBIncrementalCheckpointUtils {
-
- /**
- * The threshold of the overlap fraction of the handle's key-group range
with target key-group
- * range to be an initial handle.
- */
- private static final double OVERLAP_FRACTION_THRESHOLD = 0.75;
-
/**
* Evaluates state handle's "score" regarding to the target range when
choosing the best state
- * handle to init the initial db for recovery, if the overlap fraction is
less than {@link
- * #OVERLAP_FRACTION_THRESHOLD}, then just return {@code Score.MIN} to
mean the handle has no
- * chance to be the initial handle.
+ * handle to init the initial db for recovery, if the overlap fraction is
less than
+ * overlapFractionThreshold, then just return {@code Score.MIN} to mean
the handle has no chance
+ * to be the initial handle.
*/
- private static final BiFunction<KeyedStateHandle, KeyGroupRange, Score>
STATE_HANDLE_EVALUATOR =
- (stateHandle, targetKeyGroupRange) -> {
- final KeyGroupRange handleKeyGroupRange =
stateHandle.getKeyGroupRange();
- final KeyGroupRange intersectGroup =
-
handleKeyGroupRange.getIntersection(targetKeyGroupRange);
-
- final double overlapFraction =
- (double) intersectGroup.getNumberOfKeyGroups()
- / handleKeyGroupRange.getNumberOfKeyGroups();
-
- if (overlapFraction < OVERLAP_FRACTION_THRESHOLD) {
- return Score.MIN;
- }
-
- return new Score(intersectGroup.getNumberOfKeyGroups(),
overlapFraction);
- };
+ private static Score stateHandleEvaluator(
+ KeyedStateHandle stateHandle,
+ KeyGroupRange targetKeyGroupRange,
+ double overlapFractionThreshold) {
+ final KeyGroupRange handleKeyGroupRange =
stateHandle.getKeyGroupRange();
+ final KeyGroupRange intersectGroup =
+ handleKeyGroupRange.getIntersection(targetKeyGroupRange);
+
+ final double overlapFraction =
+ (double) intersectGroup.getNumberOfKeyGroups()
+ / handleKeyGroupRange.getNumberOfKeyGroups();
+
+ if (overlapFraction < overlapFractionThreshold) {
+ return Score.MIN;
+ }
+ return new Score(intersectGroup.getNumberOfKeyGroups(),
overlapFraction);
+ }
/**
* Score of the state handle, intersect group range is compared first, and
then compare the
@@ -193,8 +186,8 @@ public class RocksDBIncrementalCheckpointUtils {
}
/**
- * Choose the best state handle according to the {@link
#STATE_HANDLE_EVALUATOR} to init the
- * initial db.
+ * Choose the best state handle according to the {@link
#stateHandleEvaluator(KeyedStateHandle,
+ * KeyGroupRange, double)} to init the initial db.
*
* @param restoreStateHandles The candidate state handles.
* @param targetKeyGroupRange The target key group range.
@@ -203,12 +196,15 @@ public class RocksDBIncrementalCheckpointUtils {
@Nullable
public static KeyedStateHandle chooseTheBestStateHandleForInitial(
@Nonnull Collection<KeyedStateHandle> restoreStateHandles,
- @Nonnull KeyGroupRange targetKeyGroupRange) {
+ @Nonnull KeyGroupRange targetKeyGroupRange,
+ double overlapFractionThreshold) {
KeyedStateHandle bestStateHandle = null;
Score bestScore = Score.MIN;
for (KeyedStateHandle rawStateHandle : restoreStateHandles) {
- Score handleScore = STATE_HANDLE_EVALUATOR.apply(rawStateHandle,
targetKeyGroupRange);
+ Score handleScore =
+ stateHandleEvaluator(
+ rawStateHandle, targetKeyGroupRange,
overlapFractionThreshold);
if (handleScore.compareTo(bestScore) > 0) {
bestStateHandle = rawStateHandle;
bestScore = handleScore;
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index d8114e6..8706a90 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -76,6 +76,7 @@ import java.util.TreeMap;
import java.util.UUID;
import java.util.function.Function;
+import static
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD;
import static org.apache.flink.util.Preconditions.checkArgument;
/**
@@ -119,6 +120,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBacken
RocksDBConfigurableOptions.WRITE_BATCH_SIZE.defaultValue().getBytes();
private RocksDB injectedTestDB; // for testing
+ private double overlapFractionThreshold =
RESTORE_OVERLAP_FRACTION_THRESHOLD.defaultValue();
private ColumnFamilyHandle injectedDefaultColumnFamilyHandle; // for
testing
private RocksDBStateUploader injectRocksDBStateUploader; // for testing
@@ -254,6 +256,12 @@ public class RocksDBKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBacken
return this;
}
+ RocksDBKeyedStateBackendBuilder<K> setOverlapFractionThreshold(
+ double overlapFractionThreshold) {
+ this.overlapFractionThreshold = overlapFractionThreshold;
+ return this;
+ }
+
private static void checkAndCreateDirectory(File directory) throws
IOException {
if (directory.exists()) {
if (!directory.isDirectory()) {
@@ -464,7 +472,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBacken
restoreStateHandles,
ttlCompactFiltersManager,
writeBatchSize,
- optionsContainer.getWriteBufferManagerCapacity());
+ optionsContainer.getWriteBufferManagerCapacity(),
+ overlapFractionThreshold);
} else if (priorityQueueStateType
== EmbeddedRocksDBStateBackend.PriorityQueueStateType.HEAP) {
return new RocksDBHeapTimersFullRestoreOperation<>(
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index 32bb661..657b6f2 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -99,6 +99,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements
RocksDBRestoreOper
private long lastCompletedCheckpointId;
private UUID backendUID;
private final long writeBatchSize;
+ private final double overlapFractionThreshold;
private boolean isKeySerializerCompatibilityChecked;
@@ -120,7 +121,8 @@ public class RocksDBIncrementalRestoreOperation<K>
implements RocksDBRestoreOper
@Nonnull Collection<KeyedStateHandle> restoreStateHandles,
@Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
@Nonnegative long writeBatchSize,
- Long writeBufferManagerCapacity) {
+ Long writeBufferManagerCapacity,
+ double overlapFractionThreshold) {
this.rocksHandle =
new RocksDBHandle(
kvStateInformation,
@@ -136,6 +138,7 @@ public class RocksDBIncrementalRestoreOperation<K>
implements RocksDBRestoreOper
this.lastCompletedCheckpointId = -1L;
this.backendUID = UUID.randomUUID();
this.writeBatchSize = writeBatchSize;
+ this.overlapFractionThreshold = overlapFractionThreshold;
this.restoreStateHandles = restoreStateHandles;
this.cancelStreamRegistry = cancelStreamRegistry;
this.keyGroupRange = keyGroupRange;
@@ -284,7 +287,7 @@ public class RocksDBIncrementalRestoreOperation<K>
implements RocksDBRestoreOper
// Prepare for restore with rescaling
KeyedStateHandle initialHandle =
RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
- restoreStateHandles, keyGroupRange);
+ restoreStateHandles, keyGroupRange,
overlapFractionThreshold);
// Init base DB instance
if (initialHandle != null) {
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java
index 4ed4f75..ff6d854 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtilsTest.java
@@ -38,6 +38,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import static
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -95,31 +96,38 @@ public class RocksDBIncrementalCheckpointUtilsTest extends
TestLogger {
when(keyedStateHandle3.getKeyGroupRange()).thenReturn(new
KeyGroupRange(8, 12));
keyedStateHandles.add(keyedStateHandle3);
- // this should choose no one handle.
Assert.assertNull(
RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
- keyedStateHandles, new KeyGroupRange(3, 5)));
+ keyedStateHandles,
+ new KeyGroupRange(3, 5),
+ RESTORE_OVERLAP_FRACTION_THRESHOLD.defaultValue()));
// this should choose keyedStateHandle2, because keyedStateHandle2's
key-group range
// satisfies the overlap fraction demand.
Assert.assertEquals(
keyedStateHandle2,
RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
- keyedStateHandles, new KeyGroupRange(3, 6)));
+ keyedStateHandles,
+ new KeyGroupRange(3, 6),
+ RESTORE_OVERLAP_FRACTION_THRESHOLD.defaultValue()));
// both keyedStateHandle2 & keyedStateHandle3's key-group range
satisfies the overlap
// fraction, but keyedStateHandle3's key group range is better.
Assert.assertEquals(
keyedStateHandle3,
RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
- keyedStateHandles, new KeyGroupRange(5, 12)));
+ keyedStateHandles,
+ new KeyGroupRange(5, 12),
+ RESTORE_OVERLAP_FRACTION_THRESHOLD.defaultValue()));
// The intersect key group number of keyedStateHandle2 &
keyedStateHandle3's with [4, 11]
// are 4. But the over fraction of keyedStateHandle2 is better.
Assert.assertEquals(
keyedStateHandle2,
RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
- keyedStateHandles, new KeyGroupRange(4, 11)));
+ keyedStateHandles,
+ new KeyGroupRange(4, 11),
+ RESTORE_OVERLAP_FRACTION_THRESHOLD.defaultValue()));
// both keyedStateHandle2 & keyedStateHandle3's key-group range are
covered by [3, 12],
// but this should choose the keyedStateHandle3, because
keyedStateHandle3's key-group is
@@ -127,7 +135,9 @@ public class RocksDBIncrementalCheckpointUtilsTest extends
TestLogger {
Assert.assertEquals(
keyedStateHandle3,
RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
- keyedStateHandles, new KeyGroupRange(3, 12)));
+ keyedStateHandles,
+ new KeyGroupRange(3, 12),
+ RESTORE_OVERLAP_FRACTION_THRESHOLD.defaultValue()));
}
private void testClipDBWithKeyGroupRangeHelper(
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index a1d93df..a3be47c 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -499,6 +499,8 @@ public class RocksDBStateBackendConfigTest {
verifyIllegalArgument(RocksDBConfigurableOptions.COMPACTION_STYLE,
"LEV");
verifyIllegalArgument(RocksDBConfigurableOptions.USE_BLOOM_FILTER,
"NO");
verifyIllegalArgument(RocksDBConfigurableOptions.BLOOM_FILTER_BLOCK_BASED_MODE,
"YES");
+ verifyIllegalArgument(
+
RocksDBConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD, "2");
}
// verify legal configuration
@@ -521,6 +523,8 @@ public class RocksDBStateBackendConfigTest {
configuration.setString(RocksDBConfigurableOptions.METADATA_BLOCK_SIZE.key(),
"8 kb");
configuration.setString(RocksDBConfigurableOptions.BLOCK_CACHE_SIZE.key(), "512
mb");
configuration.setString(RocksDBConfigurableOptions.USE_BLOOM_FILTER.key(),
"TRUE");
+ configuration.setString(
+
RocksDBConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD.key(), "0.5");
try (RocksDBResourceContainer optionsContainer =
new RocksDBResourceContainer(