liming30 commented on code in PR #20405:
URL: https://github.com/apache/flink/pull/20405#discussion_r951119238
##########
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java:
##########
@@ -637,11 +642,75 @@ public void testMapStateClear() throws Exception {
throw new RocksDBException("Artificial failure");
})
.when(keyedStateBackend.db)
- .newIterator(any(ColumnFamilyHandle.class),
any(ReadOptions.class));
+ .deleteRange(any(ColumnFamilyHandle.class), any(byte[].class),
any(byte[].class));
state.clear();
}
+ @Test
+ public void testMapStateClearWithOneBytePrefix() throws Exception {
+ verifyMapStateClear(Byte.MAX_VALUE + 1);
+ }
+
+ @Test
+ public void testMapStateClearWithTwoBytesPrefix() throws Exception {
+
verifyMapStateClear(KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM);
+ }
+
+ public void verifyMapStateClear(int maxParallelism) throws Exception {
+ try {
+ prepareRocksDB();
+
+ RocksDBKeyedStateBackendBuilder keyedStateBackendBuilder =
+ RocksDBTestUtils.builderForTestDB(
+ TEMP_FOLDER.newFolder(),
+ IntSerializer.INSTANCE,
+ db,
+ defaultCFHandle,
+ optionsContainer.getColumnOptions(),
+ maxParallelism);
+ keyedStateBackend = keyedStateBackendBuilder.build();
+
+ MapStateDescriptor<Integer, String> kvId =
+ new MapStateDescriptor<>("id", Integer.class,
String.class);
+ MapState<Integer, String> state =
+ keyedStateBackend.getPartitionedState(
+ VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE, kvId);
+
+ int[] keys = new int[maxParallelism];
+ BitSet bitSet = new BitSet(maxParallelism);
+ // Make sure each keyGroup has a key
+ for (int i = 0; bitSet.cardinality() != maxParallelism; i++) {
+ int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(i,
maxParallelism);
+ if (!bitSet.get(keyGroup)) {
+ bitSet.set(keyGroup);
+ keys[keyGroup] = i;
+ }
+ }
+
+ for (int key : keys) {
+ keyedStateBackend.setCurrentKey(key);
+ state.put(key, "retain " + key);
+ }
+
+ int clearKeyGroup = RandomUtils.nextInt(0, maxParallelism);
+ keyedStateBackend.setCurrentKey(keys[clearKeyGroup]);
+ state.clear();
Review Comment:
@Myasuka Sorry for taking so long to reply.
I tried to do the following test to compare the test performance before and
after modification. In the original implementation, the total time of a single
`loop2` is stable at about 25 milliseconds. However, in the `deleteRange` way,
the total time of each `loop2` becomes longer and longer (about 10ms each
time). Unless we actively call `compactRange`, this time will only get longer
and longer.
Maybe this is not a good optimization, we moved the cost of delete with
Iterator to read, and this cost will persist unless a compaction occurs. cc
@fredia
```
for (int i = 0; i < maxParallelism; i++) { // loop1
keyedStateBackend.setCurrentKey(keys[i]);
state.clear2();
for (int j = 0; j < maxParallelism; j++) { // loop2
keyedStateBackend.setCurrentKey(keys[j]);
if (j <= i) {
assertNull(state.get(keys[j]));
} else {
Integer expected = keys[j];
assertEquals(expected, state.get(keys[j]));
}
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]