Myasuka commented on code in PR #20405:
URL: https://github.com/apache/flink/pull/20405#discussion_r946555027
##########
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java:
##########
@@ -637,11 +642,75 @@ public void testMapStateClear() throws Exception {
throw new RocksDBException("Artificial failure");
})
.when(keyedStateBackend.db)
- .newIterator(any(ColumnFamilyHandle.class),
any(ReadOptions.class));
+ .deleteRange(any(ColumnFamilyHandle.class), any(byte[].class),
any(byte[].class));
state.clear();
}
+ @Test
+ public void testMapStateClearWithOneBytePrefix() throws Exception {
+ verifyMapStateClear(Byte.MAX_VALUE + 1);
+ }
+
+ @Test
+ public void testMapStateClearWithTwoBytesPrefix() throws Exception {
+
verifyMapStateClear(KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM);
+ }
+
+ public void verifyMapStateClear(int maxParallelism) throws Exception {
+ try {
+ prepareRocksDB();
+
+ RocksDBKeyedStateBackendBuilder keyedStateBackendBuilder =
+ RocksDBTestUtils.builderForTestDB(
+ TEMP_FOLDER.newFolder(),
+ IntSerializer.INSTANCE,
+ db,
+ defaultCFHandle,
+ optionsContainer.getColumnOptions(),
+ maxParallelism);
+ keyedStateBackend = keyedStateBackendBuilder.build();
+
+ MapStateDescriptor<Integer, String> kvId =
+ new MapStateDescriptor<>("id", Integer.class,
String.class);
+ MapState<Integer, String> state =
+ keyedStateBackend.getPartitionedState(
+ VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE, kvId);
+
+ int[] keys = new int[maxParallelism];
+ BitSet bitSet = new BitSet(maxParallelism);
+ // Make sure each keyGroup has a key
+ for (int i = 0; bitSet.cardinality() != maxParallelism; i++) {
+ int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(i,
maxParallelism);
+ if (!bitSet.get(keyGroup)) {
+ bitSet.set(keyGroup);
+ keys[keyGroup] = i;
+ }
+ }
+
+ for (int key : keys) {
+ keyedStateBackend.setCurrentKey(key);
+ state.put(key, "retain " + key);
+ }
+
+ int clearKeyGroup = RandomUtils.nextInt(0, maxParallelism);
+ keyedStateBackend.setCurrentKey(keys[clearKeyGroup]);
+ state.clear();
Review Comment:
First of all, `state.isEmpty` would create an iterator and scan the rocksDB,
which is obviously slower than point look up, could you try to use
`assertNull(state.get(keys[j])` to verify the empty iterator?
Moreover, I am not sure whether this is the regression after introducing too
much `deleteRange`, you can compare the performance with previous
implementation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]