azagrebin commented on a change in pull request #6438: [FLINK-9981] Tune
performance of RocksDB implementation
URL: https://github.com/apache/flink/pull/6438#discussion_r206506427
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/MinMaxPriorityQueueOrderedSetCache.java
##########
@@ -41,95 +38,86 @@
*
* @param <E> type of the contained elements.
*/
-public class TreeOrderedSetCache<E> implements
CachingInternalPriorityQueueSet.OrderedSetCache<E> {
+public class MinMaxPriorityQueueOrderedSetCache<E extends
HeapPriorityQueueElement>
+ implements CachingInternalPriorityQueueSet.OrderedSetCache<E> {
/** The tree is used to store cached elements. */
@Nonnull
- private final ObjectAVLTreeSet<E> avlTree;
+ private final HeapMinMaxPriorityQueueSet<E> minMaxHeapSet;
/** The element comparator. */
@Nonnull
- private final Comparator<E> elementComparator;
+ private final PriorityComparator<E> priorityComparator;
/** The maximum capacity of the cache. */
@Nonnegative
private final int capacity;
/**
- * Creates a new {@link TreeOrderedSetCache} with the given capacity
and element comparator. Capacity must be > 0.
- * @param elementComparator comparator for the cached elements.
+ * Creates a new {@link MinMaxPriorityQueueOrderedSetCache} with the
given capacity and element comparator. Capacity must be > 0.
+ * @param priorityComparator comparator for the cached elements.
* @param capacity the capacity of the cache. Must be > 0.
*/
- public TreeOrderedSetCache(@Nonnull Comparator<E> elementComparator,
@Nonnegative int capacity) {
+ public MinMaxPriorityQueueOrderedSetCache(@Nonnull
PriorityComparator<E> priorityComparator, @Nonnegative int capacity) {
Preconditions.checkArgument(capacity > 0, "Cache capacity must
be greater than 0.");
- this.avlTree = new ObjectAVLTreeSet<>(elementComparator);
- this.elementComparator = elementComparator;
+ this.minMaxHeapSet = new
HeapMinMaxPriorityQueueSet<>(priorityComparator, capacity);
+ this.priorityComparator = priorityComparator;
this.capacity = capacity;
}
@Override
public void add(@Nonnull E element) {
assert !isFull();
- avlTree.add(element);
+ minMaxHeapSet.add(element);
}
@Override
public void remove(@Nonnull E element) {
- avlTree.remove(element);
+ minMaxHeapSet.remove(element);
}
@Override
public boolean isFull() {
- return avlTree.size() == capacity;
+ return minMaxHeapSet.size() == capacity;
}
@Override
public boolean isEmpty() {
- return avlTree.isEmpty();
+ return minMaxHeapSet.isEmpty();
}
@Override
public boolean isInLowerBound(@Nonnull E toCheck) {
- return avlTree.isEmpty() ||
elementComparator.compare(peekLast(), toCheck) > 0;
+ return minMaxHeapSet.isEmpty() ||
priorityComparator.comparePriority(peekLast(), toCheck) >= 0;
}
@Nullable
@Override
public E removeFirst() {
- if (avlTree.isEmpty()) {
- return null;
- }
- final E first = avlTree.first();
- avlTree.remove(first);
- return first;
+ return minMaxHeapSet.pollFirst();
}
@Nullable
@Override
public E removeLast() {
- if (avlTree.isEmpty()) {
- return null;
- }
- final E last = avlTree.last();
- avlTree.remove(last);
- return last;
+ return minMaxHeapSet.pollLast();
}
@Nullable
@Override
public E peekFirst() {
- return !avlTree.isEmpty() ? avlTree.first() : null;
+ return minMaxHeapSet.peekFirst();
}
@Nullable
@Override
public E peekLast() {
- return !avlTree.isEmpty() ? avlTree.last() : null;
+ return minMaxHeapSet.peekLast();
}
@Nonnull
@Override
public CloseableIterator<E> orderedIterator() {
- return CloseableIterator.adapterForIterator(avlTree.iterator());
+ return minMaxHeapSet.iterator();
Review comment:
I think the underlying iterator is not ordered any more and we do not rely
on orderness of this interface method anywhere. It can be renamed in this case.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services