This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 89038cdaa7a89112258902340f62669aac769fcb Author: Piotr Nowojski <[email protected]> AuthorDate: Fri Mar 19 10:10:40 2021 +0100 [FLINK-21381][state] Add test for KeyGroupedInternalPriorityQueue --- .../flink/runtime/state/StateBackendTestBase.java | 58 ++++++++++++++++++++++ .../runtime/testutils/statemigration/TestType.java | 5 ++ 2 files changed, 63 insertions(+) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 4344e58..588969e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -72,6 +72,7 @@ import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.runtime.state.internal.InternalReducingState; import org.apache.flink.runtime.state.internal.InternalValueState; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.runtime.testutils.statemigration.TestType; import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory; import org.apache.flink.types.IntValue; import org.apache.flink.util.IOUtils; @@ -122,6 +123,7 @@ import static org.hamcrest.CoreMatchers.anyOf; import static org.hamcrest.CoreMatchers.isA; import static org.hamcrest.Matchers.both; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.isOneOf; import static org.hamcrest.Matchers.lessThan; @@ -267,6 +269,62 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten } @Test + public void testKeyGroupedInternalPriorityQueue() throws Exception { + testKeyGroupedInternalPriorityQueue(false); + } + + @Test + public void testKeyGroupedInternalPriorityQueueAddAll() throws Exception { + testKeyGroupedInternalPriorityQueue(true); + } + + public void testKeyGroupedInternalPriorityQueue(boolean addAll) throws Exception { + String fieldName = "key-grouped-priority-queue"; + CheckpointableKeyedStateBackend<Integer> backend = + createKeyedBackend(IntSerializer.INSTANCE); + KeyGroupedInternalPriorityQueue<TestType> priorityQueue = + backend.create(fieldName, new TestType.V1TestTypeSerializer()); + + TestType elementA42 = new TestType("a", 42); + TestType elementA44 = new TestType("a", 44); + TestType elementB1 = new TestType("b", 1); + TestType elementB3 = new TestType("b", 3); + + TestType[] elements = { + elementA44, elementB1, elementB1, elementB3, elementA42, + }; + + if (addAll) { + priorityQueue.addAll(asList(elements)); + } else { + assertTrue(priorityQueue.add(elements[0])); + assertTrue(priorityQueue.add(elements[1])); + assertFalse(priorityQueue.add(elements[2])); + assertFalse(priorityQueue.add(elements[3])); + assertFalse(priorityQueue.add(elements[4])); + } + assertFalse(priorityQueue.isEmpty()); + assertThat( + priorityQueue.getSubsetForKeyGroup(1), containsInAnyOrder(elementA42, elementA44)); + assertThat(priorityQueue.getSubsetForKeyGroup(8), containsInAnyOrder(elementB1, elementB3)); + + assertThat(priorityQueue.peek(), equalTo(elementB1)); + assertThat(priorityQueue.poll(), equalTo(elementB1)); + assertThat(priorityQueue.peek(), equalTo(elementB3)); + + List<TestType> actualList = new ArrayList<>(); + priorityQueue.iterator().forEachRemaining(actualList::add); + + assertThat(actualList, containsInAnyOrder(elementB3, elementA42, elementA44)); + + assertEquals(3, priorityQueue.size()); + + assertFalse(priorityQueue.remove(elementB1)); + assertTrue(priorityQueue.remove(elementB3)); + assertThat(priorityQueue.peek(), equalTo(elementA42)); + } + + @Test public void testGetKeys() throws Exception { final int namespace1ElementsNum = 1000; final int namespace2ElementsNum = 1000; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/TestType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/TestType.java index 4892491..b024f76 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/TestType.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/TestType.java @@ -94,6 +94,11 @@ public class TestType return 31 * key.hashCode() + value; } + @Override + public String toString() { + return String.format("TestType(key='%s', value=%d)", key, value); + } + /** A serializer that read / writes {@link TestType} in schema version 1. */ public static class V1TestTypeSerializer extends TestTypeSerializerBase { private static final long serialVersionUID = 5053346160938769779L;
