Github user azagrebin commented on a diff in the pull request:
https://github.com/apache/flink/pull/6228#discussion_r199806903
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java
---
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Testbase for implementations of {@link InternalPriorityQueue}.
+ */
+public abstract class InternalPriorityQueueTestBase extends TestLogger {
+
+ protected static final KeyGroupRange KEY_GROUP_RANGE = new
KeyGroupRange(0, 2);
+ protected static final KeyExtractorFunction<TestElement>
KEY_EXTRACTOR_FUNCTION = TestElement::getKey;
+ protected static final Comparator<TestElement> TEST_ELEMENT_COMPARATOR =
+
Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey);
+
+ protected static void insertRandomTimers(
+ @Nonnull InternalPriorityQueue<TestElement> priorityQueue,
+ @Nonnull Set<TestElement> checkSet,
+ int count) {
+
+ ThreadLocalRandom localRandom = ThreadLocalRandom.current();
+
+ final int numUniqueKeys = Math.max(count / 4, 64);
+
+ long duplicatePriority = Long.MIN_VALUE;
+
+ for (int i = 0; i < count; ++i) {
+ TestElement element;
+ do {
+ long elementPriority;
+ if (duplicatePriority == Long.MIN_VALUE) {
+ elementPriority =
localRandom.nextLong();
+ } else {
+ elementPriority = duplicatePriority;
+ duplicatePriority = Long.MIN_VALUE;
+ }
+ element = new
TestElement(localRandom.nextInt(numUniqueKeys), elementPriority);
+ } while (!checkSet.add(element));
+
+ if (localRandom.nextInt(10) == 0) {
+ duplicatePriority = element.getPriority();
+ }
+
+ final boolean headChangedIndicated =
priorityQueue.add(element);
+ if (element.equals(priorityQueue.peek())) {
+ Assert.assertTrue(headChangedIndicated);
+ }
+ }
+ Assert.assertEquals(count, priorityQueue.size());
+ }
+
+ @Test
+ public void testPeekPollOrder() {
+ final int initialCapacity = 4;
+ final int testSize = 1000;
+ InternalPriorityQueue<TestElement> priorityQueue =
+ newPriorityQueue(initialCapacity);
+ HashSet<TestElement> checkSet = new HashSet<>(testSize);
+
+ insertRandomTimers(priorityQueue, checkSet, testSize);
+
+ long lastPriorityValue = Long.MIN_VALUE;
+ int lastSize = priorityQueue.size();
+ Assert.assertEquals(testSize, lastSize);
+ TestElement testElement;
+ while ((testElement = priorityQueue.peek()) != null) {
+ Assert.assertFalse(priorityQueue.isEmpty());
+ Assert.assertEquals(lastSize, priorityQueue.size());
+ Assert.assertEquals(testElement, priorityQueue.poll());
+ Assert.assertTrue(checkSet.remove(testElement));
+ Assert.assertTrue(testElement.getPriority() >=
lastPriorityValue);
+ lastPriorityValue = testElement.getPriority();
+ --lastSize;
+ }
+
+ Assert.assertTrue(priorityQueue.isEmpty());
+ Assert.assertEquals(0, priorityQueue.size());
+ Assert.assertEquals(0, checkSet.size());
+ }
+
+ @Test
+ public void testStopInsertMixKeepsOrder() {
+
+ InternalPriorityQueue<TestElement> priorityQueue =
newPriorityQueue(3);
+
+ final int testSize = 128;
+ HashSet<TestElement> checkSet = new HashSet<>(testSize);
+
+ insertRandomTimers(priorityQueue, checkSet, testSize);
+
+ // check that the whole set is still in order
+ while (!checkSet.isEmpty()) {
+
+ Iterator<TestElement> iterator = checkSet.iterator();
+ TestElement element = iterator.next();
+ iterator.remove();
+
+ boolean removesHead =
element.equals(priorityQueue.peek());
+ if (removesHead) {
+
Assert.assertTrue(priorityQueue.remove(element));
+ } else {
+ priorityQueue.remove(element);
+ }
+ Assert.assertEquals(checkSet.size(),
priorityQueue.size());
+
+ long lastPriorityValue = Long.MIN_VALUE;
--- End diff --
Could be
`long lastPriorityValue = removesHead ? element.getPriority() :
Long.MIN_VALUE;`
---