Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199816750
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java
 ---
    @@ -0,0 +1,452 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import org.apache.flink.api.common.typeutils.CompatibilityResult;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
    +import org.apache.flink.util.CloseableIterator;
    +import org.apache.flink.util.MathUtils;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Comparator;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.NoSuchElementException;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.ThreadLocalRandom;
    +
    +/**
    + * Testbase for implementations of {@link InternalPriorityQueue}.
    + */
    +public abstract class InternalPriorityQueueTestBase extends TestLogger {
    +
    +   protected static final KeyGroupRange KEY_GROUP_RANGE = new 
KeyGroupRange(0, 2);
    +   protected static final KeyExtractorFunction<TestElement> 
KEY_EXTRACTOR_FUNCTION = TestElement::getKey;
    +   protected static final Comparator<TestElement> TEST_ELEMENT_COMPARATOR =
    +           
Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey);
    +
    +   protected static void insertRandomTimers(
    +           @Nonnull InternalPriorityQueue<TestElement> priorityQueue,
    +           @Nonnull Set<TestElement> checkSet,
    +           int count) {
    +
    +           ThreadLocalRandom localRandom = ThreadLocalRandom.current();
    +
    +           final int numUniqueKeys = Math.max(count / 4, 64);
    +
    +           long duplicatePriority = Long.MIN_VALUE;
    +
    +           for (int i = 0; i < count; ++i) {
    +                   TestElement element;
    +                   do {
    +                           long elementPriority;
    +                           if (duplicatePriority == Long.MIN_VALUE) {
    +                                   elementPriority = 
localRandom.nextLong();
    +                           } else {
    +                                   elementPriority = duplicatePriority;
    +                                   duplicatePriority = Long.MIN_VALUE;
    +                           }
    +                           element = new 
TestElement(localRandom.nextInt(numUniqueKeys), elementPriority);
    +                   } while (!checkSet.add(element));
    +
    +                   if (localRandom.nextInt(10) == 0) {
    +                           duplicatePriority = element.getPriority();
    +                   }
    +
    +                   final boolean headChangedIndicated = 
priorityQueue.add(element);
    +                   if (element.equals(priorityQueue.peek())) {
    +                           Assert.assertTrue(headChangedIndicated);
    +                   }
    +           }
    +           Assert.assertEquals(count, priorityQueue.size());
    +   }
    +
    +   @Test
    +   public void testPeekPollOrder() {
    +           final int initialCapacity = 4;
    +           final int testSize = 1000;
    +           InternalPriorityQueue<TestElement> priorityQueue =
    +                   newPriorityQueue(initialCapacity);
    +           HashSet<TestElement> checkSet = new HashSet<>(testSize);
    +
    +           insertRandomTimers(priorityQueue, checkSet, testSize);
    +
    +           long lastPriorityValue = Long.MIN_VALUE;
    +           int lastSize = priorityQueue.size();
    +           Assert.assertEquals(testSize, lastSize);
    +           TestElement testElement;
    +           while ((testElement = priorityQueue.peek()) != null) {
    +                   Assert.assertFalse(priorityQueue.isEmpty());
    +                   Assert.assertEquals(lastSize, priorityQueue.size());
    +                   Assert.assertEquals(testElement, priorityQueue.poll());
    +                   Assert.assertTrue(checkSet.remove(testElement));
    +                   Assert.assertTrue(testElement.getPriority() >= 
lastPriorityValue);
    +                   lastPriorityValue = testElement.getPriority();
    +                   --lastSize;
    +           }
    +
    +           Assert.assertTrue(priorityQueue.isEmpty());
    +           Assert.assertEquals(0, priorityQueue.size());
    +           Assert.assertEquals(0, checkSet.size());
    +   }
    +
    +   @Test
    +   public void testStopInsertMixKeepsOrder() {
    --- End diff --
    
    👍 


---

Reply via email to