tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334035510
 
 

 ##########
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##########
 @@ -115,577 +144,783 @@ public void testInitStateMap() {
                assertFalse(stateMap.getStateIncrementalVisitor(100).hasNext());
 
                stateMap.close();
-               assertEquals(0, stateMap.size());
-               assertEquals(0, stateMap.totalSize());
-               assertTrue(stateMap.isClosed());
        }
 
        /**
-        * Test basic operations.
+        * Test state put operation.
         */
        @Test
-       public void testBasicOperations() throws Exception {
-               TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
-               TypeSerializer<Long> namespaceSerializer = 
LongSerializer.INSTANCE;
-               TypeSerializer<String> stateSerializer = 
StringSerializer.INSTANCE;
-               CopyOnWriteSkipListStateMap<Integer, Long, String> stateMap = 
new CopyOnWriteSkipListStateMap<>(
-                       keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+       public void testPutState() {
+               testWithFunction((totalSize, stateMap, referenceStates) -> 
getDefaultSizes(totalSize));
+       }
 
-               ThreadLocalRandom random = ThreadLocalRandom.current();
-               // map to store expected states, namespace -> key -> state
-               Map<Long, Map<Integer, String>> referenceStates = new 
HashMap<>();
-               int totalSize = 0;
+       /**
+        * Test remove existing state.
+        */
+       @Test
+       public void testRemoveExistingState() {
+               testRemoveState(false, false);
+       }
 
-               // put some states
-               for (long namespace = 0; namespace < 10; namespace++) {
-                       for (int key = 0; key < 100; key++) {
-                               totalSize++;
-                               String state = String.valueOf(key * namespace);
-                               if (random.nextBoolean()) {
-                                       stateMap.put(key, namespace, state);
-                               } else {
-                                       assertNull(stateMap.putAndGetOld(key, 
namespace, state));
+       /**
+        * Test remove and get existing state.
+        */
+       @Test
+       public void testRemoveAndGetExistingState() {
+               testRemoveState(false, true);
+       }
+
+       /**
+        * Test remove absent state.
+        */
+       @Test
+       public void testRemoveAbsentState() {
+               testRemoveState(true, true);
+       }
+
+       /**
+        * Test remove previously removed state.
+        */
+       @Test
+       public void testPutPreviouslyRemovedState() {
+               testWithFunction(
+                       (totalSize, stateMap, referenceStates) -> 
applyFunctionAfterRemove(stateMap, referenceStates,
+                               (removedCnt, removedStates) -> {
+                                       int size = totalSize - removedCnt;
+                                       for (Map.Entry<Long, Set<Integer>> 
entry : removedStates.entrySet()) {
+                                               long namespace = entry.getKey();
+                                               for (int key : 
entry.getValue()) {
+                                                       size++;
+                                                       String state = 
String.valueOf(key * namespace);
+                                                       
assertNull(stateMap.putAndGetOld(key, namespace, state));
+                                                       
referenceStates.computeIfAbsent(namespace, (none) -> new HashMap<>()).put(key, 
String.valueOf(state));
+                                               }
+                                       }
+                                       return getDefaultSizes(size);
                                }
-                               referenceStates.computeIfAbsent(namespace, 
(none) -> new HashMap<>()).put(key, state);
-                               assertEquals(totalSize, stateMap.size());
-                               assertEquals(totalSize, stateMap.totalSize());
-                       }
-               }
+                       )
+               );
+       }
 
-               // validates space allocation. Each pair need 2 spaces
-               assertEquals(totalSize * 2, 
spaceAllocator.getTotalSpaceNumber());
-               verifyState(referenceStates, stateMap);
+       private void testRemoveState(boolean removeAbsent, boolean getOld) {
+               testWithFunction(
+                       (totalSize, stateMap, referenceStates) -> {
+                               if (removeAbsent) {
+                                       totalSize -= 
removeAbsentState(stateMap, referenceStates);
+                               } else {
+                                       totalSize -= 
removeExistingState(stateMap, referenceStates, getOld);
+                               }
+                               return getDefaultSizes(totalSize);
+                       });
+       }
 
-               // remove some states
-               Map<Long, Set<Integer>> removedStates = new HashMap<>();
+       private int removeExistingState(
+               CopyOnWriteSkipListStateMap<Integer, Long, String> stateMap,
+               @Nonnull Map<Long, Map<Integer, String>> referenceStates,
+               boolean getOld) {
+               int removedCnt = 0;
                for (Map.Entry<Long, Map<Integer, String>> namespaceEntry : 
referenceStates.entrySet()) {
                        long namespace = namespaceEntry.getKey();
-                       for (Map.Entry<Integer, String> keyEntry : 
namespaceEntry.getValue().entrySet()) {
+                       Map<Integer, String> kvMap = namespaceEntry.getValue();
+                       Iterator<Map.Entry<Integer, String>> kvIterator = 
kvMap.entrySet().iterator();
+                       while (kvIterator.hasNext()) {
+                               Map.Entry<Integer, String> keyEntry = 
kvIterator.next();
                                if (random.nextBoolean()) {
                                        int key = keyEntry.getKey();
                                        String state = keyEntry.getValue();
-                                       
removedStates.computeIfAbsent(namespace, (none) -> new HashSet<>()).add(key);
-                                       totalSize--;
-                                       if (random.nextBoolean()) {
-                                               stateMap.remove(key, namespace);
-                                       } else {
+                                       removedCnt++;
+                                       // remove from state map
+                                       if (getOld) {
                                                assertEquals(state, 
stateMap.removeAndGetOld(key, namespace));
+                                       } else {
+                                               stateMap.remove(key, namespace);
                                        }
-                                       assertEquals(totalSize, 
stateMap.size());
-                                       assertEquals(totalSize, 
stateMap.totalSize());
+                                       // remove from reference to keep in 
accordance
+                                       kvIterator.remove();
                                }
                        }
                }
+               return removedCnt;
+       }
 
-               for (Map.Entry<Long, Set<Integer>> entry : 
removedStates.entrySet()) {
-                       long namespace = entry.getKey();
-                       Map<Integer, String> keyMap = 
referenceStates.get(namespace);
-                       if (keyMap != null) {
-                               entry.getValue().forEach(keyMap::remove);
-                               if (keyMap.isEmpty()) {
-                                       referenceStates.remove(namespace);
+       private int removeAbsentState(
+                       CopyOnWriteSkipListStateMap<Integer, Long, String> 
stateMap,
+                       Map<Long, Map<Integer, String>> referenceStates) {
+               return applyFunctionAfterRemove(
+                       stateMap,
+                       referenceStates,
+                       (removedCnt, removedStates) -> {
+                               // remove the same keys again, which would be 
absent already
+                               for (Map.Entry<Long, Set<Integer>> entry : 
removedStates.entrySet()) {
+                                       long namespace = entry.getKey();
+                                       for (int key : entry.getValue()) {
+                                               
assertNull(stateMap.removeAndGetOld(key, namespace));
+                                       }
                                }
+                               return removedCnt;
                        }
-                       for (int key : entry.getValue()) {
-                               assertNull(stateMap.get(key, namespace));
-                               assertFalse(stateMap.containsKey(key, 
namespace));
-                       }
-               }
-
-               assertEquals(totalSize * 2, 
spaceAllocator.getTotalSpaceNumber());
-               verifyState(referenceStates, stateMap);
+               );
+       }
 
-               // update some states
+       /**
+        * Apply the given function after removing some states.
+        *
+        * @param stateMap the state map to test against.
+        * @param referenceStates the reference of states for correctness 
verfication.
+        * @param function a {@link BiFunction} which takes [removedCnt, 
removedStates] as input parameters.
+        * @param <R> The type of the result returned by the function.
+        * @return The result of applying the given function.
+        */
+       private <R> R applyFunctionAfterRemove(
+               CopyOnWriteSkipListStateMap<Integer, Long, String> stateMap,
+               @Nonnull Map<Long, Map<Integer, String>> referenceStates,
+               BiFunction<Integer, Map<Long, Set<Integer>>, R> function) {
+               int removedCnt = 0;
+               Map<Long, Set<Integer>> removedStates = new HashMap<>();
+               // remove some state
                for (Map.Entry<Long, Map<Integer, String>> namespaceEntry : 
referenceStates.entrySet()) {
                        long namespace = namespaceEntry.getKey();
-                       for (Map.Entry<Integer, String> keyEntry : 
namespaceEntry.getValue().entrySet()) {
+                       Map<Integer, String> kvMap = namespaceEntry.getValue();
+                       Iterator<Map.Entry<Integer, String>> kvIterator = 
kvMap.entrySet().iterator();
+                       while (kvIterator.hasNext()) {
+                               Map.Entry<Integer, String> keyEntry = 
kvIterator.next();
                                if (random.nextBoolean()) {
 
 Review comment:
   If something fails for whatever reason, this `random` behaviour will make it 
super hard to reproduce it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to