[hotfix] [runtime] Various code cleanups and reductions of warnings in heap state restoring code
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b8a784e9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b8a784e9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b8a784e9 Branch: refs/heads/master Commit: b8a784e93811a71f525070cee8ff32230fee8fee Parents: 3b97128 Author: Stephan Ewen <se...@apache.org> Authored: Fri Jan 20 14:41:35 2017 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Sun Jan 22 21:53:39 2017 +0100 ---------------------------------------------------------------------- .../state/heap/HeapKeyedStateBackend.java | 7 ++- .../flink/runtime/state/heap/StateTable.java | 60 +++++++++++++------- 2 files changed, 44 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b8a784e9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index 89d4f76..b05b874 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -239,6 +239,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } } + @SuppressWarnings("deprecation") @Override public void restore(Collection<KeyGroupsStateHandle> restoredState) throws Exception { LOG.info("Initializing heap keyed state backend from snapshot."); @@ -388,6 +389,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { return "HeapKeyedStateBackend"; } + @SuppressWarnings({"unchecked", "rawtypes", "deprecation"}) @Deprecated private void restoreOldSavepointKeyedState( Collection<KeyGroupsStateHandle> stateHandles) throws IOException, ClassNotFoundException { @@ -447,13 +449,14 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { stateSerializer); StateTable<K, ?, ?> stateTable = new StateTable<>(registeredBackendStateMetaInfo, keyGroupRange); - stateTable.getState().set(0, rawResultMap); + stateTable.getState()[0] = rawResultMap; // add named state to the backend stateTables.put(registeredBackendStateMetaInfo.getName(), stateTable); } } + @SuppressWarnings("deprecation") private RestoredState restoreHeapState(AbstractMemStateSnapshot<K, ?, ?, ?, ?> stateSnapshot) throws IOException { return new RestoredState( stateSnapshot.deserialize(), @@ -461,6 +464,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { stateSnapshot.getStateSerializer()); } + @SuppressWarnings({"rawtypes", "unchecked", "deprecation"}) private RestoredState restoreFsState(AbstractFsStateSnapshot<K, ?, ?, ?, ?> stateSnapshot) throws IOException { FileSystem fs = stateSnapshot.getFilePath().getFileSystem(); //TODO register closeable to support fast cancelation? @@ -492,6 +496,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } } + @SuppressWarnings("rawtypes") static final class RestoredState { private final Map rawResultMap; http://git-wip-us.apache.org/repos/asf/flink/blob/b8a784e9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java index 9d7232e..21265f4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java @@ -22,44 +22,64 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo; import org.apache.flink.runtime.state.KeyGroupRange; -import java.util.Arrays; -import java.util.List; import java.util.Map; public class StateTable<K, N, ST> { - /** Combined meta information such as name and serializers for this state */ - protected RegisteredBackendStateMetaInfo<N, ST> metaInfo; - /** Map for holding the actual state objects. */ - private final List<Map<N, Map<K, ST>>> state; + private final Map<N, Map<K, ST>>[] state; - protected final KeyGroupRange keyGroupRange; + /** The offset to the contiguous key groups */ + private final int keyGroupOffset; - public StateTable( - RegisteredBackendStateMetaInfo<N, ST> metaInfo, - KeyGroupRange keyGroupRange) { + /** Combined meta information such as name and serializers for this state */ + private RegisteredBackendStateMetaInfo<N, ST> metaInfo; + + // ------------------------------------------------------------------------ + public StateTable(RegisteredBackendStateMetaInfo<N, ST> metaInfo, KeyGroupRange keyGroupRange) { this.metaInfo = metaInfo; - this.keyGroupRange = keyGroupRange; + this.keyGroupOffset = keyGroupRange.getStartKeyGroup(); - this.state = Arrays.asList((Map<N, Map<K, ST>>[]) new Map[keyGroupRange.getNumberOfKeyGroups()]); + @SuppressWarnings("unchecked") + Map<N, Map<K, ST>>[] state = (Map<N, Map<K, ST>>[]) new Map[keyGroupRange.getNumberOfKeyGroups()]; + this.state = state; } - private int indexToOffset(int index) { - return index - keyGroupRange.getStartKeyGroup(); + // ------------------------------------------------------------------------ + // access to maps + // ------------------------------------------------------------------------ + + public Map<N, Map<K, ST>>[] getState() { + return state; } public Map<N, Map<K, ST>> get(int index) { - return keyGroupRange.contains(index) ? state.get(indexToOffset(index)) : null; + final int pos = indexToOffset(index); + if (pos >= 0 && pos < state.length) { + return state[pos]; + } else { + return null; + } } public void set(int index, Map<N, Map<K, ST>> map) { - if (!keyGroupRange.contains(index)) { - throw new RuntimeException("Unexpected key group index. This indicates a bug."); + try { + state[indexToOffset(index)] = map; + } + catch (ArrayIndexOutOfBoundsException e) { + throw new IllegalArgumentException("Key group index out of range of key group range [" + + keyGroupOffset + ", " + (keyGroupOffset + state.length) + ")."); } - state.set(indexToOffset(index), map); } + private int indexToOffset(int index) { + return index - keyGroupOffset; + } + + // ------------------------------------------------------------------------ + // metadata + // ------------------------------------------------------------------------ + public TypeSerializer<ST> getStateSerializer() { return metaInfo.getStateSerializer(); } @@ -76,10 +96,6 @@ public class StateTable<K, N, ST> { this.metaInfo = metaInfo; } - public List<Map<N, Map<K, ST>>> getState() { - return state; - } - // ------------------------------------------------------------------------ // for testing // ------------------------------------------------------------------------