This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 89038cdaa7a89112258902340f62669aac769fcb
Author: Piotr Nowojski <[email protected]>
AuthorDate: Fri Mar 19 10:10:40 2021 +0100

    [FLINK-21381][state] Add test for KeyGroupedInternalPriorityQueue
---
 .../flink/runtime/state/StateBackendTestBase.java  | 58 ++++++++++++++++++++++
 .../runtime/testutils/statemigration/TestType.java |  5 ++
 2 files changed, 63 insertions(+)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 4344e58..588969e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -72,6 +72,7 @@ import 
org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.runtime.state.internal.InternalReducingState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.runtime.testutils.statemigration.TestType;
 import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.util.IOUtils;
@@ -122,6 +123,7 @@ import static org.hamcrest.CoreMatchers.anyOf;
 import static org.hamcrest.CoreMatchers.isA;
 import static org.hamcrest.Matchers.both;
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.isOneOf;
 import static org.hamcrest.Matchers.lessThan;
@@ -267,6 +269,62 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
     }
 
     @Test
+    public void testKeyGroupedInternalPriorityQueue() throws Exception {
+        testKeyGroupedInternalPriorityQueue(false);
+    }
+
+    @Test
+    public void testKeyGroupedInternalPriorityQueueAddAll() throws Exception {
+        testKeyGroupedInternalPriorityQueue(true);
+    }
+
+    public void testKeyGroupedInternalPriorityQueue(boolean addAll) throws 
Exception {
+        String fieldName = "key-grouped-priority-queue";
+        CheckpointableKeyedStateBackend<Integer> backend =
+                createKeyedBackend(IntSerializer.INSTANCE);
+        KeyGroupedInternalPriorityQueue<TestType> priorityQueue =
+                backend.create(fieldName, new TestType.V1TestTypeSerializer());
+
+        TestType elementA42 = new TestType("a", 42);
+        TestType elementA44 = new TestType("a", 44);
+        TestType elementB1 = new TestType("b", 1);
+        TestType elementB3 = new TestType("b", 3);
+
+        TestType[] elements = {
+            elementA44, elementB1, elementB1, elementB3, elementA42,
+        };
+
+        if (addAll) {
+            priorityQueue.addAll(asList(elements));
+        } else {
+            assertTrue(priorityQueue.add(elements[0]));
+            assertTrue(priorityQueue.add(elements[1]));
+            assertFalse(priorityQueue.add(elements[2]));
+            assertFalse(priorityQueue.add(elements[3]));
+            assertFalse(priorityQueue.add(elements[4]));
+        }
+        assertFalse(priorityQueue.isEmpty());
+        assertThat(
+                priorityQueue.getSubsetForKeyGroup(1), 
containsInAnyOrder(elementA42, elementA44));
+        assertThat(priorityQueue.getSubsetForKeyGroup(8), 
containsInAnyOrder(elementB1, elementB3));
+
+        assertThat(priorityQueue.peek(), equalTo(elementB1));
+        assertThat(priorityQueue.poll(), equalTo(elementB1));
+        assertThat(priorityQueue.peek(), equalTo(elementB3));
+
+        List<TestType> actualList = new ArrayList<>();
+        priorityQueue.iterator().forEachRemaining(actualList::add);
+
+        assertThat(actualList, containsInAnyOrder(elementB3, elementA42, 
elementA44));
+
+        assertEquals(3, priorityQueue.size());
+
+        assertFalse(priorityQueue.remove(elementB1));
+        assertTrue(priorityQueue.remove(elementB3));
+        assertThat(priorityQueue.peek(), equalTo(elementA42));
+    }
+
+    @Test
     public void testGetKeys() throws Exception {
         final int namespace1ElementsNum = 1000;
         final int namespace2ElementsNum = 1000;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/TestType.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/TestType.java
index 4892491..b024f76 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/TestType.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/TestType.java
@@ -94,6 +94,11 @@ public class TestType
         return 31 * key.hashCode() + value;
     }
 
+    @Override
+    public String toString() {
+        return String.format("TestType(key='%s', value=%d)", key, value);
+    }
+
     /** A serializer that read / writes {@link TestType} in schema version 1. 
*/
     public static class V1TestTypeSerializer extends TestTypeSerializerBase {
         private static final long serialVersionUID = 5053346160938769779L;

Reply via email to