azagrebin commented on a change in pull request #6438: [FLINK-9981] Tune
performance of RocksDB implementation
URL: https://github.com/apache/flink/pull/6438#discussion_r206982020
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
##########
@@ -2683,39 +2686,29 @@ public static RocksIteratorWrapper getRocksIterator(
final ColumnFamilyHandle columnFamilyHandle = entry.f0;
- @Nonnull
- TieBreakingPriorityComparator<T> tieBreakingComparator =
- new TieBreakingPriorityComparator<>(
- priorityComparator,
- byteOrderedElementSerializer,
- elementSerializationOutStream,
- elementSerializationOutView);
-
return new KeyGroupPartitionedPriorityQueue<>(
KeyExtractorFunction.forKeyedObjects(),
priorityComparator,
- new
KeyGroupPartitionedPriorityQueue.PartitionQueueSetFactory<T,
CachingInternalPriorityQueueSet<T>>() {
+ new
KeyGroupPartitionedPriorityQueue.PartitionQueueSetFactory<T,
RocksDBCachingPriorityQueueSet<T>>() {
@Nonnull
@Override
- public
CachingInternalPriorityQueueSet<T> create(
+ public
RocksDBCachingPriorityQueueSet<T> create(
int keyGroupId,
int numKeyGroups,
+ @Nonnull
KeyExtractorFunction<T> keyExtractor,
@Nonnull PriorityComparator<T>
elementPriorityComparator) {
-
-
CachingInternalPriorityQueueSet.OrderedSetCache<T> cache =
- new
TreeOrderedSetCache<>(tieBreakingComparator, DEFAULT_CACHES_SIZE);
-
CachingInternalPriorityQueueSet.OrderedSetStore<T> store =
- new
RocksDBOrderedSetStore<>(
- keyGroupId,
-
keyGroupPrefixBytes,
- db,
-
columnFamilyHandle,
-
byteOrderedElementSerializer,
-
elementSerializationOutStream,
-
elementSerializationOutView,
-
writeBatchWrapper);
-
- return new
CachingInternalPriorityQueueSet<>(cache, store);
+ TreeOrderedSetCache
orderedSetCache = new TreeOrderedSetCache(DEFAULT_CACHES_SIZE);
Review comment:
This `create` method is a bit hard to read, I think we could extract
`tryToRegisterNewEntry` and `getPartitionQueueSetFactory`, like in test.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services