[hotfix] [runtime] Various code cleanups and reductions of warnings in heap 
state restoring code


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b8a784e9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b8a784e9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b8a784e9

Branch: refs/heads/master
Commit: b8a784e93811a71f525070cee8ff32230fee8fee
Parents: 3b97128
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jan 20 14:41:35 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jan 22 21:53:39 2017 +0100

----------------------------------------------------------------------
 .../state/heap/HeapKeyedStateBackend.java       |  7 ++-
 .../flink/runtime/state/heap/StateTable.java    | 60 +++++++++++++-------
 2 files changed, 44 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b8a784e9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 89d4f76..b05b874 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -239,6 +239,7 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                }
        }
 
+       @SuppressWarnings("deprecation")
        @Override
        public void restore(Collection<KeyGroupsStateHandle> restoredState) 
throws Exception {
                LOG.info("Initializing heap keyed state backend from 
snapshot.");
@@ -388,6 +389,7 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                return "HeapKeyedStateBackend";
        }
 
+       @SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
        @Deprecated
        private void restoreOldSavepointKeyedState(
                        Collection<KeyGroupsStateHandle> stateHandles) throws 
IOException, ClassNotFoundException {
@@ -447,13 +449,14 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                                        stateSerializer);
 
                        StateTable<K, ?, ?> stateTable = new 
StateTable<>(registeredBackendStateMetaInfo, keyGroupRange);
-                       stateTable.getState().set(0, rawResultMap);
+                       stateTable.getState()[0] = rawResultMap;
 
                        // add named state to the backend
                        
stateTables.put(registeredBackendStateMetaInfo.getName(), stateTable);
                }
        }
 
+       @SuppressWarnings("deprecation")
        private RestoredState restoreHeapState(AbstractMemStateSnapshot<K, ?, 
?, ?, ?> stateSnapshot) throws IOException {
                return new RestoredState(
                                stateSnapshot.deserialize(),
@@ -461,6 +464,7 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                stateSnapshot.getStateSerializer());
        }
 
+       @SuppressWarnings({"rawtypes", "unchecked", "deprecation"})
        private RestoredState restoreFsState(AbstractFsStateSnapshot<K, ?, ?, 
?, ?> stateSnapshot) throws IOException {
                FileSystem fs = stateSnapshot.getFilePath().getFileSystem();
                //TODO register closeable to support fast cancelation?
@@ -492,6 +496,7 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                }
        }
 
+       @SuppressWarnings("rawtypes")
        static final class RestoredState {
 
                private final Map rawResultMap;

http://git-wip-us.apache.org/repos/asf/flink/blob/b8a784e9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
index 9d7232e..21265f4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
@@ -22,44 +22,64 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
 import org.apache.flink.runtime.state.KeyGroupRange;
 
-import java.util.Arrays;
-import java.util.List;
 import java.util.Map;
 
 public class StateTable<K, N, ST> {
 
-       /** Combined meta information such as name and serializers for this 
state */
-       protected RegisteredBackendStateMetaInfo<N, ST> metaInfo;
-
        /** Map for holding the actual state objects. */
-       private final List<Map<N, Map<K, ST>>> state;
+       private final Map<N, Map<K, ST>>[] state;
 
-       protected final KeyGroupRange keyGroupRange;
+       /** The offset to the contiguous key groups */
+       private final int keyGroupOffset;
 
-       public StateTable(
-                       RegisteredBackendStateMetaInfo<N, ST> metaInfo,
-                       KeyGroupRange keyGroupRange) {
+       /** Combined meta information such as name and serializers for this 
state */
+       private RegisteredBackendStateMetaInfo<N, ST> metaInfo;
+
+       // 
------------------------------------------------------------------------
+       public StateTable(RegisteredBackendStateMetaInfo<N, ST> metaInfo, 
KeyGroupRange keyGroupRange) {
                this.metaInfo = metaInfo;
-               this.keyGroupRange = keyGroupRange;
+               this.keyGroupOffset = keyGroupRange.getStartKeyGroup();
 
-               this.state = Arrays.asList((Map<N, Map<K, ST>>[]) new 
Map[keyGroupRange.getNumberOfKeyGroups()]);
+               @SuppressWarnings("unchecked")
+               Map<N, Map<K, ST>>[] state = (Map<N, Map<K, ST>>[]) new 
Map[keyGroupRange.getNumberOfKeyGroups()];
+               this.state = state;
        }
 
-       private int indexToOffset(int index) {
-               return index - keyGroupRange.getStartKeyGroup();
+       // 
------------------------------------------------------------------------
+       //  access to maps
+       // 
------------------------------------------------------------------------
+
+       public Map<N, Map<K, ST>>[] getState() {
+               return state;
        }
 
        public Map<N, Map<K, ST>> get(int index) {
-               return keyGroupRange.contains(index) ? 
state.get(indexToOffset(index)) : null;
+               final int pos = indexToOffset(index);
+               if (pos >= 0 && pos < state.length) {
+                       return state[pos];
+               } else {
+                       return null;
+               }
        }
 
        public void set(int index, Map<N, Map<K, ST>> map) {
-               if (!keyGroupRange.contains(index)) {
-                       throw new RuntimeException("Unexpected key group index. 
This indicates a bug.");
+               try {
+                       state[indexToOffset(index)] = map;
+               }
+               catch (ArrayIndexOutOfBoundsException e) {
+                       throw new IllegalArgumentException("Key group index out 
of range of key group range [" +
+                                       keyGroupOffset + ", " + (keyGroupOffset 
+ state.length) + ").");
                }
-               state.set(indexToOffset(index), map);
        }
 
+       private int indexToOffset(int index) {
+               return index - keyGroupOffset;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  metadata
+       // 
------------------------------------------------------------------------
+       
        public TypeSerializer<ST> getStateSerializer() {
                return metaInfo.getStateSerializer();
        }
@@ -76,10 +96,6 @@ public class StateTable<K, N, ST> {
                this.metaInfo = metaInfo;
        }
 
-       public List<Map<N, Map<K, ST>>> getState() {
-               return state;
-       }
-
        // 
------------------------------------------------------------------------
        //  for testing
        // 
------------------------------------------------------------------------

Reply via email to