Repository: flink Updated Branches: refs/heads/master a2e6fb06c -> f37507d94
[FLINK-5917] [statebackends] Remove size() method from MapState This closes #3462 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f37507d9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f37507d9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f37507d9 Branch: refs/heads/master Commit: f37507d9434d5ae16f7d164686af2aa5438995bf Parents: a2e6fb0 Author: xiaogang.sxg <[email protected]> Authored: Fri Mar 3 10:27:11 2017 +0800 Committer: Stefan Richter <[email protected]> Committed: Fri Mar 3 14:14:29 2017 +0100 ---------------------------------------------------------------------- .../streaming/state/RocksDBMapState.java | 13 ------------- .../apache/flink/api/common/state/MapState.java | 7 ------- .../flink/runtime/state/UserFacingMapState.java | 5 ----- .../flink/runtime/state/heap/HeapMapState.java | 20 -------------------- .../runtime/state/StateBackendTestBase.java | 9 ++------- 5 files changed, 2 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f37507d9/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java index e9e9d9b..5125240 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java @@ -137,19 +137,6 @@ public class RocksDBMapState<K, N, UK, UV> return (rawValueBytes != null); } - - @Override - public int size() throws IOException, RocksDBException { - Iterator<Map.Entry<UK, UV>> iterator = iterator(); - - int count = 0; - while (iterator.hasNext()) { - count++; - iterator.next(); - } - - return count; - } @Override public Iterable<Map.Entry<UK, UV>> entries() throws IOException, RocksDBException { http://git-wip-us.apache.org/repos/asf/flink/blob/f37507d9/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java index fa657ef..0660f68 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java @@ -90,13 +90,6 @@ public interface MapState<UK, UV> extends State { boolean contains(UK key) throws Exception; /** - * @return The number of mappings in the state. - * - * @throws Exception Thrown if the system cannot access the state. - */ - int size() throws Exception; - - /** * Returns all the mappings in the state * * @return An iterable view of all the key-value pairs in the state. http://git-wip-us.apache.org/repos/asf/flink/blob/f37507d9/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java index 6cddf6d..ce4d032 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java @@ -73,11 +73,6 @@ class UserFacingMapState<K, V> implements MapState<K, V> { } @Override - public int size() throws Exception { - return originalState.size(); - } - - @Override public Iterable<Map.Entry<K, V>> entries() throws Exception { Iterable<Map.Entry<K, V>> original = originalState.entries(); return original != null ? original : emptyState.entrySet(); http://git-wip-us.apache.org/repos/asf/flink/blob/f37507d9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java index b28d661..0360161 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java @@ -183,26 +183,6 @@ public class HeapMapState<K, N, UK, UV> } @Override - public int size() { - Preconditions.checkState(currentNamespace != null, "No namespace set."); - Preconditions.checkState(backend.getCurrentKey() != null, "No key set."); - - Map<N, Map<K, HashMap<UK, UV>>> namespaceMap = stateTable.get(backend.getCurrentKeyGroupIndex()); - if (namespaceMap == null) { - return 0; - } - - Map<K, HashMap<UK, UV>> keyedMap = namespaceMap.get(currentNamespace); - if (keyedMap == null) { - return 0; - } - - HashMap<UK, UV> userMap = keyedMap.get(backend.<K>getCurrentKey()); - - return userMap == null ? 0 : userMap.size(); - } - - @Override public Iterable<Map.Entry<UK, UV>> entries() { Preconditions.checkState(currentNamespace != null, "No namespace set."); Preconditions.checkState(backend.getCurrentKey() != null, "No key set."); http://git-wip-us.apache.org/repos/asf/flink/blob/f37507d9/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index f2416b9..40ac72c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -813,17 +813,14 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten // some modifications to the state backend.setCurrentKey(1); - assertEquals(0, state.size()); assertEquals(null, state.get(1)); assertEquals(null, getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); state.put(1, "1"); backend.setCurrentKey(2); - assertEquals(0, state.size()); assertEquals(null, state.get(2)); assertEquals(null, getSerializedMap(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); state.put(2, "2"); backend.setCurrentKey(1); - assertEquals(1, state.size()); assertTrue(state.contains(1)); assertEquals("1", state.get(1)); assertEquals(new HashMap<Integer, String>() {{ put (1, "1"); }}, @@ -854,7 +851,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten assertEquals(new HashMap<Integer, String>() {{ put(2, "2"); put(102, "102"); }}, getSerializedMap(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); backend.setCurrentKey(3); - assertEquals(3, state.size()); assertTrue(state.contains(103)); assertEquals("103", state.get(103)); assertEquals(new HashMap<Integer, String>() {{ put(103, "103"); put(1031, "1031"); put(1032, "1032"); }}, @@ -897,7 +893,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten // validate the state backend.setCurrentKey(1); - assertEquals(0, state.size()); backend.setCurrentKey(2); assertFalse(state.contains(102)); backend.setCurrentKey(3); @@ -1106,8 +1101,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.put("Ciao", "Hello"); state.put("Bello", "Nice"); - - assertEquals(state.size(), 2); + + assertNotNull(state.entries()); assertEquals(state.get("Ciao"), "Hello"); assertEquals(state.get("Bello"), "Nice");
