azagrebin commented on a change in pull request #6438: [FLINK-9981] Tune 
performance of RocksDB implementation
URL: https://github.com/apache/flink/pull/6438#discussion_r206506427
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/MinMaxPriorityQueueOrderedSetCache.java
 ##########
 @@ -41,95 +38,86 @@
  *
  * @param <E> type of the contained elements.
  */
-public class TreeOrderedSetCache<E> implements 
CachingInternalPriorityQueueSet.OrderedSetCache<E> {
+public class MinMaxPriorityQueueOrderedSetCache<E extends 
HeapPriorityQueueElement>
+       implements CachingInternalPriorityQueueSet.OrderedSetCache<E> {
 
        /** The tree is used to store cached elements. */
        @Nonnull
-       private final ObjectAVLTreeSet<E> avlTree;
+       private final HeapMinMaxPriorityQueueSet<E> minMaxHeapSet;
 
        /** The element comparator. */
        @Nonnull
-       private final Comparator<E> elementComparator;
+       private final PriorityComparator<E> priorityComparator;
 
        /** The maximum capacity of the cache. */
        @Nonnegative
        private final int capacity;
 
        /**
-        * Creates a new {@link TreeOrderedSetCache} with the given capacity 
and element comparator. Capacity must be > 0.
-        * @param elementComparator comparator for the cached elements.
+        * Creates a new {@link MinMaxPriorityQueueOrderedSetCache} with the 
given capacity and element comparator. Capacity must be > 0.
+        * @param priorityComparator comparator for the cached elements.
         * @param capacity the capacity of the cache. Must be > 0.
         */
-       public TreeOrderedSetCache(@Nonnull Comparator<E> elementComparator, 
@Nonnegative int capacity) {
+       public MinMaxPriorityQueueOrderedSetCache(@Nonnull 
PriorityComparator<E> priorityComparator, @Nonnegative int capacity) {
                Preconditions.checkArgument(capacity > 0, "Cache capacity must 
be greater than 0.");
-               this.avlTree = new ObjectAVLTreeSet<>(elementComparator);
-               this.elementComparator = elementComparator;
+               this.minMaxHeapSet = new 
HeapMinMaxPriorityQueueSet<>(priorityComparator, capacity);
+               this.priorityComparator = priorityComparator;
                this.capacity = capacity;
        }
 
        @Override
        public void add(@Nonnull E element) {
                assert !isFull();
-               avlTree.add(element);
+               minMaxHeapSet.add(element);
        }
 
        @Override
        public void remove(@Nonnull E element) {
-               avlTree.remove(element);
+               minMaxHeapSet.remove(element);
        }
 
        @Override
        public boolean isFull() {
-               return avlTree.size() == capacity;
+               return minMaxHeapSet.size() == capacity;
        }
 
        @Override
        public boolean isEmpty() {
-               return avlTree.isEmpty();
+               return minMaxHeapSet.isEmpty();
        }
 
        @Override
        public boolean isInLowerBound(@Nonnull E toCheck) {
-               return avlTree.isEmpty() || 
elementComparator.compare(peekLast(), toCheck) > 0;
+               return minMaxHeapSet.isEmpty() || 
priorityComparator.comparePriority(peekLast(), toCheck) >= 0;
        }
 
        @Nullable
        @Override
        public E removeFirst() {
-               if (avlTree.isEmpty()) {
-                       return null;
-               }
-               final E first = avlTree.first();
-               avlTree.remove(first);
-               return first;
+               return minMaxHeapSet.pollFirst();
        }
 
        @Nullable
        @Override
        public E removeLast() {
-               if (avlTree.isEmpty()) {
-                       return null;
-               }
-               final E last = avlTree.last();
-               avlTree.remove(last);
-               return last;
+               return minMaxHeapSet.pollLast();
        }
 
        @Nullable
        @Override
        public E peekFirst() {
-               return !avlTree.isEmpty() ? avlTree.first() : null;
+               return minMaxHeapSet.peekFirst();
        }
 
        @Nullable
        @Override
        public E peekLast() {
-               return !avlTree.isEmpty() ? avlTree.last() : null;
+               return minMaxHeapSet.peekLast();
        }
 
        @Nonnull
        @Override
        public CloseableIterator<E> orderedIterator() {
-               return CloseableIterator.adapterForIterator(avlTree.iterator());
+               return minMaxHeapSet.iterator();
 
 Review comment:
   I think the underlying iterator is not ordered any more and we do not rely 
on orderness of this interface method anywhere. It can be renamed in this case.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to