StefanRRichter commented on a change in pull request #8611: 
[FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable
URL: https://github.com/apache/flink/pull/8611#discussion_r293556751
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
 ##########
 @@ -198,16 +377,118 @@ public void 
setMetaInfo(RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo) {
 
        // Snapshot / Restore 
-------------------------------------------------------------------------
 
-       public abstract void put(K key, int keyGroup, N namespace, S state);
+       public void put(K key, int keyGroup, N namespace, S state) {
+               checkKeyNamespacePreconditions(key, namespace);
+
+               StateMap<K, N, S> stateMap = getMapForKeyGroup(keyGroup);
+
+               if (stateMap == null) {
+                       stateMap = createStateMap();
+                       setMapForKeyGroup(keyGroup, stateMap);
+               }
+
+               stateMap.put(key, namespace, state);
+       }
+
+       @Override
+       public Iterator<StateEntry<K, N, S>> iterator() {
+               return Arrays.stream(state)
+                       .filter(Objects::nonNull)
+                       .map(StateMap::iterator)
+                       .flatMap(iter -> 
StreamSupport.stream(Spliterators.spliteratorUnknownSize(iter, 0), false))
+                       .iterator();
+       }
 
        // For testing 
--------------------------------------------------------------------------------
 
        @VisibleForTesting
-       public abstract int sizeOfNamespace(Object namespace);
+       public int sizeOfNamespace(Object namespace) {
+               int count = 0;
+               for (StateMap<K, N, S> stateMap : state) {
+                       if (stateMap != null) {
+                               count += stateMap.sizeOfNamespace(namespace);
+                       }
+               }
+
+               return count;
+       }
 
        @Nonnull
        @Override
        public StateSnapshotKeyGroupReader keyGroupReader(int readVersion) {
                return StateTableByKeyGroupReaders.readerForVersion(this, 
readVersion);
        }
+
+       // StateEntryIterator  
---------------------------------------------------------------------------------------------
+
+       class StateEntryIterator implements StateIncrementalVisitor<K, N, S> {
+
+               final int recommendedMaxNumberOfReturnedRecords;
+
+               int keyGroupIndex;
+
+               StateIncrementalVisitor<K, N, S> stateIncrementalVisitor;
+
+               StateEntryIterator(int recommendedMaxNumberOfReturnedRecords) {
+                       this.recommendedMaxNumberOfReturnedRecords = 
recommendedMaxNumberOfReturnedRecords;
+                       this.keyGroupIndex = 0;
+                       next();
+               }
+
+               private void next() {
+                       while (keyGroupIndex < state.length) {
+                               StateMap<K, N, S> stateMap = 
state[keyGroupIndex++];
+                               if (stateMap != null) {
+                                       StateIncrementalVisitor<K, N, S> 
visitor =
+                                               
stateMap.getStateIncrementalVisitor(recommendedMaxNumberOfReturnedRecords);
+                                       if (visitor.hasNext()) {
+                                               stateIncrementalVisitor = 
visitor;
+                                               return;
+                                       }
+                               }
+                       }
+               }
+
+               @Override
+               public boolean hasNext() {
+                       while (stateIncrementalVisitor == null || 
!stateIncrementalVisitor.hasNext()) {
+                               while (keyGroupIndex < state.length && 
state[keyGroupIndex] == null) {
+                                       keyGroupIndex++;
+                               }
+                               if (keyGroupIndex == state.length) {
+                                       return false;
+                               }
+                               StateIncrementalVisitor<K, N, S> visitor =
+                                       
state[keyGroupIndex].getStateIncrementalVisitor(recommendedMaxNumberOfReturnedRecords);
+                               if (visitor.hasNext()) {
+                                       stateIncrementalVisitor = visitor;
+                                       keyGroupIndex++;
+                                       break;
+                               }
+                       }
+                       return true;
+               }
+
+               @Override
+               public Collection<StateEntry<K, N, S>> nextEntries() {
+                       if (!hasNext()) {
+                               return null;
+                       }
+
+                       Collection<StateEntry<K, N, S>> collection =
+                               stateIncrementalVisitor.nextEntries();
+
+                       return collection;
 
 Review comment:
   Nit: could just `return stateIncrementalVisitor.nextEntries();`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to