This is an automated email from the ASF dual-hosted git repository. liyu pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 33caa00e8df88565f022d4258148d09c90d9452b Author: Yun Tang <myas...@live.com> AuthorDate: Mon Jun 22 16:27:02 2020 +0800 [hot-fix][rocksdb] Ensure RocksDBKeyedStateBackend disposed at RocksDBStateMisuseOptionTest This closes #12736. --- .../state/RocksDBStateMisuseOptionTest.java | 86 ++++++++++++---------- 1 file changed, 48 insertions(+), 38 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java index 59a4822..20e2906 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java @@ -71,26 +71,31 @@ public class RocksDBStateMisuseOptionTest { @Test public void testMisuseOptimizePointLookupWithMapState() throws Exception { RocksDBStateBackend rocksDBStateBackend = createStateBackendWithOptimizePointLookup(); - RocksDBKeyedStateBackend<Integer> keyedStateBackend = createKeyedStateBackend(rocksDBStateBackend, new MockEnvironmentBuilder().build(), IntSerializer.INSTANCE); - MapStateDescriptor<Integer, Long> stateDescriptor = new MapStateDescriptor<>("map", IntSerializer.INSTANCE, LongSerializer.INSTANCE); - MapState<Integer, Long> mapState = keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescriptor); - - keyedStateBackend.setCurrentKey(1); - Map<Integer, Long> expectedResult = new HashMap<>(); - for (int i = 0; i < 100; i++) { - long uv = ThreadLocalRandom.current().nextLong(); - mapState.put(i, uv); - expectedResult.put(i, uv); - } + RocksDBKeyedStateBackend<Integer> keyedStateBackend = + createKeyedStateBackend(rocksDBStateBackend, new MockEnvironmentBuilder().build(), IntSerializer.INSTANCE); + try { + MapStateDescriptor<Integer, Long> stateDescriptor = new MapStateDescriptor<>("map", IntSerializer.INSTANCE, LongSerializer.INSTANCE); + MapState<Integer, Long> mapState = keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescriptor); + + keyedStateBackend.setCurrentKey(1); + Map<Integer, Long> expectedResult = new HashMap<>(); + for (int i = 0; i < 100; i++) { + long uv = ThreadLocalRandom.current().nextLong(); + mapState.put(i, uv); + expectedResult.put(i, uv); + } - Iterator<Map.Entry<Integer, Long>> iterator = mapState.entries().iterator(); - while (iterator.hasNext()) { - Map.Entry<Integer, Long> entry = iterator.next(); - assertEquals(entry.getValue(), expectedResult.remove(entry.getKey())); - iterator.remove(); + Iterator<Map.Entry<Integer, Long>> iterator = mapState.entries().iterator(); + while (iterator.hasNext()) { + Map.Entry<Integer, Long> entry = iterator.next(); + assertEquals(entry.getValue(), expectedResult.remove(entry.getKey())); + iterator.remove(); + } + assertTrue(expectedResult.isEmpty()); + assertTrue(mapState.isEmpty()); + } finally { + keyedStateBackend.dispose(); } - assertTrue(expectedResult.isEmpty()); - assertTrue(mapState.isEmpty()); } /** @@ -101,27 +106,32 @@ public class RocksDBStateMisuseOptionTest { @Test public void testMisuseOptimizePointLookupWithPriorityQueue() throws IOException { RocksDBStateBackend rocksDBStateBackend = createStateBackendWithOptimizePointLookup(); - RocksDBKeyedStateBackend<Integer> keyedStateBackend = createKeyedStateBackend(rocksDBStateBackend, new MockEnvironmentBuilder().build(), IntSerializer.INSTANCE); - KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<Integer, VoidNamespace>> priorityQueue = - keyedStateBackend.create("timer", new TimerSerializer<>(keyedStateBackend.getKeySerializer(), VoidNamespaceSerializer.INSTANCE)); - - PriorityQueue<TimerHeapInternalTimer<Integer, VoidNamespace>> expectedPriorityQueue = new PriorityQueue<>((o1, o2) -> (int) (o1.getTimestamp() - o2.getTimestamp())); - // ensure we insert timers more than cache capacity. - int queueSize = RocksDBPriorityQueueSetFactory.DEFAULT_CACHES_SIZE + 42; - List<Integer> timeStamps = IntStream.range(0, queueSize).boxed().collect(Collectors.toList()); - Collections.shuffle(timeStamps); - for (Integer timeStamp : timeStamps) { - TimerHeapInternalTimer<Integer, VoidNamespace> timer = new TimerHeapInternalTimer<>(timeStamp, timeStamp, VoidNamespace.INSTANCE); - priorityQueue.add(timer); - expectedPriorityQueue.add(timer); - } - assertEquals(queueSize, priorityQueue.size()); - TimerHeapInternalTimer<Integer, VoidNamespace> timer; - while ((timer = priorityQueue.poll()) != null) { - assertEquals(expectedPriorityQueue.poll(), timer); + RocksDBKeyedStateBackend<Integer> keyedStateBackend = + createKeyedStateBackend(rocksDBStateBackend, new MockEnvironmentBuilder().build(), IntSerializer.INSTANCE); + try { + KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<Integer, VoidNamespace>> priorityQueue = + keyedStateBackend.create("timer", new TimerSerializer<>(keyedStateBackend.getKeySerializer(), VoidNamespaceSerializer.INSTANCE)); + + PriorityQueue<TimerHeapInternalTimer<Integer, VoidNamespace>> expectedPriorityQueue = new PriorityQueue<>((o1, o2) -> (int) (o1.getTimestamp() - o2.getTimestamp())); + // ensure we insert timers more than cache capacity. + int queueSize = RocksDBPriorityQueueSetFactory.DEFAULT_CACHES_SIZE + 42; + List<Integer> timeStamps = IntStream.range(0, queueSize).boxed().collect(Collectors.toList()); + Collections.shuffle(timeStamps); + for (Integer timeStamp : timeStamps) { + TimerHeapInternalTimer<Integer, VoidNamespace> timer = new TimerHeapInternalTimer<>(timeStamp, timeStamp, VoidNamespace.INSTANCE); + priorityQueue.add(timer); + expectedPriorityQueue.add(timer); + } + assertEquals(queueSize, priorityQueue.size()); + TimerHeapInternalTimer<Integer, VoidNamespace> timer; + while ((timer = priorityQueue.poll()) != null) { + assertEquals(expectedPriorityQueue.poll(), timer); + } + assertTrue(expectedPriorityQueue.isEmpty()); + assertTrue(priorityQueue.isEmpty()); + } finally { + keyedStateBackend.dispose(); } - assertTrue(expectedPriorityQueue.isEmpty()); - assertTrue(priorityQueue.isEmpty()); }