This is an automated email from the ASF dual-hosted git repository.

liyu pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 33caa00e8df88565f022d4258148d09c90d9452b
Author: Yun Tang <myas...@live.com>
AuthorDate: Mon Jun 22 16:27:02 2020 +0800

    [hot-fix][rocksdb] Ensure RocksDBKeyedStateBackend disposed at 
RocksDBStateMisuseOptionTest
    
    This closes #12736.
---
 .../state/RocksDBStateMisuseOptionTest.java        | 86 ++++++++++++----------
 1 file changed, 48 insertions(+), 38 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java
index 59a4822..20e2906 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateMisuseOptionTest.java
@@ -71,26 +71,31 @@ public class RocksDBStateMisuseOptionTest {
        @Test
        public void testMisuseOptimizePointLookupWithMapState() throws 
Exception {
                RocksDBStateBackend rocksDBStateBackend = 
createStateBackendWithOptimizePointLookup();
-               RocksDBKeyedStateBackend<Integer> keyedStateBackend = 
createKeyedStateBackend(rocksDBStateBackend, new 
MockEnvironmentBuilder().build(), IntSerializer.INSTANCE);
-               MapStateDescriptor<Integer, Long> stateDescriptor = new 
MapStateDescriptor<>("map", IntSerializer.INSTANCE, LongSerializer.INSTANCE);
-               MapState<Integer, Long> mapState = 
keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, stateDescriptor);
-
-               keyedStateBackend.setCurrentKey(1);
-               Map<Integer, Long> expectedResult = new HashMap<>();
-               for (int i = 0; i < 100; i++) {
-                       long uv = ThreadLocalRandom.current().nextLong();
-                       mapState.put(i, uv);
-                       expectedResult.put(i, uv);
-               }
+               RocksDBKeyedStateBackend<Integer> keyedStateBackend =
+                       createKeyedStateBackend(rocksDBStateBackend, new 
MockEnvironmentBuilder().build(), IntSerializer.INSTANCE);
+               try {
+                       MapStateDescriptor<Integer, Long> stateDescriptor = new 
MapStateDescriptor<>("map", IntSerializer.INSTANCE, LongSerializer.INSTANCE);
+                       MapState<Integer, Long> mapState = 
keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, stateDescriptor);
+
+                       keyedStateBackend.setCurrentKey(1);
+                       Map<Integer, Long> expectedResult = new HashMap<>();
+                       for (int i = 0; i < 100; i++) {
+                               long uv = 
ThreadLocalRandom.current().nextLong();
+                               mapState.put(i, uv);
+                               expectedResult.put(i, uv);
+                       }
 
-               Iterator<Map.Entry<Integer, Long>> iterator = 
mapState.entries().iterator();
-               while (iterator.hasNext()) {
-                       Map.Entry<Integer, Long> entry = iterator.next();
-                       assertEquals(entry.getValue(), 
expectedResult.remove(entry.getKey()));
-                       iterator.remove();
+                       Iterator<Map.Entry<Integer, Long>> iterator = 
mapState.entries().iterator();
+                       while (iterator.hasNext()) {
+                               Map.Entry<Integer, Long> entry = 
iterator.next();
+                               assertEquals(entry.getValue(), 
expectedResult.remove(entry.getKey()));
+                               iterator.remove();
+                       }
+                       assertTrue(expectedResult.isEmpty());
+                       assertTrue(mapState.isEmpty());
+               } finally {
+                       keyedStateBackend.dispose();
                }
-               assertTrue(expectedResult.isEmpty());
-               assertTrue(mapState.isEmpty());
        }
 
        /**
@@ -101,27 +106,32 @@ public class RocksDBStateMisuseOptionTest {
        @Test
        public void testMisuseOptimizePointLookupWithPriorityQueue() throws 
IOException {
                RocksDBStateBackend rocksDBStateBackend = 
createStateBackendWithOptimizePointLookup();
-               RocksDBKeyedStateBackend<Integer> keyedStateBackend = 
createKeyedStateBackend(rocksDBStateBackend, new 
MockEnvironmentBuilder().build(), IntSerializer.INSTANCE);
-               KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<Integer, 
VoidNamespace>> priorityQueue =
-                       keyedStateBackend.create("timer", new 
TimerSerializer<>(keyedStateBackend.getKeySerializer(), 
VoidNamespaceSerializer.INSTANCE));
-
-               PriorityQueue<TimerHeapInternalTimer<Integer, VoidNamespace>> 
expectedPriorityQueue = new PriorityQueue<>((o1, o2) -> (int) 
(o1.getTimestamp() - o2.getTimestamp()));
-               // ensure we insert timers more than cache capacity.
-               int queueSize = 
RocksDBPriorityQueueSetFactory.DEFAULT_CACHES_SIZE + 42;
-               List<Integer> timeStamps = IntStream.range(0, 
queueSize).boxed().collect(Collectors.toList());
-               Collections.shuffle(timeStamps);
-               for (Integer timeStamp : timeStamps) {
-                       TimerHeapInternalTimer<Integer, VoidNamespace> timer = 
new TimerHeapInternalTimer<>(timeStamp, timeStamp, VoidNamespace.INSTANCE);
-                       priorityQueue.add(timer);
-                       expectedPriorityQueue.add(timer);
-               }
-               assertEquals(queueSize, priorityQueue.size());
-               TimerHeapInternalTimer<Integer, VoidNamespace> timer;
-               while ((timer = priorityQueue.poll()) != null) {
-                       assertEquals(expectedPriorityQueue.poll(), timer);
+               RocksDBKeyedStateBackend<Integer> keyedStateBackend =
+                       createKeyedStateBackend(rocksDBStateBackend, new 
MockEnvironmentBuilder().build(), IntSerializer.INSTANCE);
+               try {
+                       
KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<Integer, VoidNamespace>> 
priorityQueue =
+                               keyedStateBackend.create("timer", new 
TimerSerializer<>(keyedStateBackend.getKeySerializer(), 
VoidNamespaceSerializer.INSTANCE));
+
+                       PriorityQueue<TimerHeapInternalTimer<Integer, 
VoidNamespace>> expectedPriorityQueue = new PriorityQueue<>((o1, o2) -> (int) 
(o1.getTimestamp() - o2.getTimestamp()));
+                       // ensure we insert timers more than cache capacity.
+                       int queueSize = 
RocksDBPriorityQueueSetFactory.DEFAULT_CACHES_SIZE + 42;
+                       List<Integer> timeStamps = IntStream.range(0, 
queueSize).boxed().collect(Collectors.toList());
+                       Collections.shuffle(timeStamps);
+                       for (Integer timeStamp : timeStamps) {
+                               TimerHeapInternalTimer<Integer, VoidNamespace> 
timer = new TimerHeapInternalTimer<>(timeStamp, timeStamp, 
VoidNamespace.INSTANCE);
+                               priorityQueue.add(timer);
+                               expectedPriorityQueue.add(timer);
+                       }
+                       assertEquals(queueSize, priorityQueue.size());
+                       TimerHeapInternalTimer<Integer, VoidNamespace> timer;
+                       while ((timer = priorityQueue.poll()) != null) {
+                               assertEquals(expectedPriorityQueue.poll(), 
timer);
+                       }
+                       assertTrue(expectedPriorityQueue.isEmpty());
+                       assertTrue(priorityQueue.isEmpty());
+               } finally {
+                       keyedStateBackend.dispose();
                }
-               assertTrue(expectedPriorityQueue.isEmpty());
-               assertTrue(priorityQueue.isEmpty());
 
        }
 

Reply via email to