tillrohrmann commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334032961
 
 

 ##########
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##########
 @@ -115,577 +144,783 @@ public void testInitStateMap() {
                assertFalse(stateMap.getStateIncrementalVisitor(100).hasNext());
 
                stateMap.close();
-               assertEquals(0, stateMap.size());
-               assertEquals(0, stateMap.totalSize());
-               assertTrue(stateMap.isClosed());
        }
 
        /**
-        * Test basic operations.
+        * Test state put operation.
         */
        @Test
-       public void testBasicOperations() throws Exception {
-               TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE;
-               TypeSerializer<Long> namespaceSerializer = 
LongSerializer.INSTANCE;
-               TypeSerializer<String> stateSerializer = 
StringSerializer.INSTANCE;
-               CopyOnWriteSkipListStateMap<Integer, Long, String> stateMap = 
new CopyOnWriteSkipListStateMap<>(
-                       keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+       public void testPutState() {
+               testWithFunction((totalSize, stateMap, referenceStates) -> 
getDefaultSizes(totalSize));
+       }
 
-               ThreadLocalRandom random = ThreadLocalRandom.current();
-               // map to store expected states, namespace -> key -> state
-               Map<Long, Map<Integer, String>> referenceStates = new 
HashMap<>();
-               int totalSize = 0;
+       /**
+        * Test remove existing state.
+        */
+       @Test
+       public void testRemoveExistingState() {
+               testRemoveState(false, false);
+       }
 
-               // put some states
-               for (long namespace = 0; namespace < 10; namespace++) {
-                       for (int key = 0; key < 100; key++) {
-                               totalSize++;
-                               String state = String.valueOf(key * namespace);
-                               if (random.nextBoolean()) {
-                                       stateMap.put(key, namespace, state);
-                               } else {
-                                       assertNull(stateMap.putAndGetOld(key, 
namespace, state));
+       /**
+        * Test remove and get existing state.
+        */
+       @Test
+       public void testRemoveAndGetExistingState() {
+               testRemoveState(false, true);
+       }
+
+       /**
+        * Test remove absent state.
+        */
+       @Test
+       public void testRemoveAbsentState() {
+               testRemoveState(true, true);
+       }
+
+       /**
+        * Test remove previously removed state.
+        */
+       @Test
+       public void testPutPreviouslyRemovedState() {
+               testWithFunction(
+                       (totalSize, stateMap, referenceStates) -> 
applyFunctionAfterRemove(stateMap, referenceStates,
+                               (removedCnt, removedStates) -> {
+                                       int size = totalSize - removedCnt;
+                                       for (Map.Entry<Long, Set<Integer>> 
entry : removedStates.entrySet()) {
+                                               long namespace = entry.getKey();
+                                               for (int key : 
entry.getValue()) {
+                                                       size++;
+                                                       String state = 
String.valueOf(key * namespace);
+                                                       
assertNull(stateMap.putAndGetOld(key, namespace, state));
+                                                       
referenceStates.computeIfAbsent(namespace, (none) -> new HashMap<>()).put(key, 
String.valueOf(state));
+                                               }
+                                       }
+                                       return getDefaultSizes(size);
                                }
-                               referenceStates.computeIfAbsent(namespace, 
(none) -> new HashMap<>()).put(key, state);
-                               assertEquals(totalSize, stateMap.size());
-                               assertEquals(totalSize, stateMap.totalSize());
-                       }
-               }
+                       )
+               );
 
 Review comment:
   tbh I'm not sure whether I really understand what's going on here. All the 
different layers of functions which take other functions to do something makes 
it super hard to reason about the test case. Usually for tests I'm a big fan of 
explicitness.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to