StefanRRichter commented on code in PR #24367:
URL: https://github.com/apache/flink/pull/24367#discussion_r1498971141
##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java:
##########
@@ -153,13 +154,14 @@ private static void deleteRange(
byte[] beginKeyBytes,
byte[] endKeyBytes)
throws RocksDBException {
-
+ List<byte[]> deletedRange = Arrays.asList(beginKeyBytes, endKeyBytes);
for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
// Using RocksDB's deleteRange will take advantage of delete
// tombstones, which mark the range as deleted.
//
//
https://github.com/ververica/frocksdb/blob/FRocksDB-6.20.3/include/rocksdb/db.h#L363-L377
db.deleteRange(columnFamilyHandle, beginKeyBytes, endKeyBytes);
+ db.deleteFilesInRanges(columnFamilyHandle, deletedRange, false);
Review Comment:
If we move the `deleteFilesInRanges` call to `clipDBWithKeyGroupRange` we
can do one call instead of two, which at least according to the RocksDB java
doc could be faster than calling twice. I'd also suggest to introduce parameter
protection for the change and call deleteRange after deleteRangeInFiles. e.g.
```
public static void clipDBWithKeyGroupRange(
@Nonnull RocksDB db,
@Nonnull List<ColumnFamilyHandle> columnFamilyHandles,
@Nonnull KeyGroupRange targetKeyGroupRange,
@Nonnull KeyGroupRange currentKeyGroupRange,
@Nonnegative int keyGroupPrefixBytes,
boolean useDeleteFilesInRange)
throws RocksDBException {
List<byte[]> deleteFilesRanges = new ArrayList<>(4);
List<ThrowingRunnable<RocksDBException>> deleteRangeActions = new
ArrayList<>(2);
if (currentKeyGroupRange.getStartKeyGroup() <
targetKeyGroupRange.getStartKeyGroup()) {
byte[] beginKeyGroupBytes = new byte[keyGroupPrefixBytes];
byte[] endKeyGroupBytes = new byte[keyGroupPrefixBytes];
CompositeKeySerializationUtils.serializeKeyGroup(
currentKeyGroupRange.getStartKeyGroup(),
beginKeyGroupBytes);
CompositeKeySerializationUtils.serializeKeyGroup(
targetKeyGroupRange.getStartKeyGroup(),
endKeyGroupBytes);
deleteFilesRanges.add(beginKeyGroupBytes);
deleteFilesRanges.add(endKeyGroupBytes);
deleteRangeActions.add(
() ->
deleteRange(
db, columnFamilyHandles,
beginKeyGroupBytes, endKeyGroupBytes));
}
if (currentKeyGroupRange.getEndKeyGroup() >
targetKeyGroupRange.getEndKeyGroup()) {
byte[] beginKeyGroupBytes = new byte[keyGroupPrefixBytes];
byte[] endKeyGroupBytes = new byte[keyGroupPrefixBytes];
CompositeKeySerializationUtils.serializeKeyGroup(
targetKeyGroupRange.getEndKeyGroup() + 1,
beginKeyGroupBytes);
CompositeKeySerializationUtils.serializeKeyGroup(
currentKeyGroupRange.getEndKeyGroup() + 1,
endKeyGroupBytes);
deleteFilesRanges.add(beginKeyGroupBytes);
deleteFilesRanges.add(endKeyGroupBytes);
deleteRangeActions.add(
() ->
deleteRange(
db, columnFamilyHandles,
beginKeyGroupBytes, endKeyGroupBytes));
}
// First delete the files.
if (useDeleteFilesInRange) {
deleteFilesInRanges(db, columnFamilyHandles, deleteFilesRanges);
}
// Then put range limiting tombstones in place.
for (ThrowingRunnable<RocksDBException> deleteRangeAction :
deleteRangeActions) {
deleteRangeAction.run();
}
}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]