Repository: flink Updated Branches: refs/heads/master e2cc83176 -> 14518067c
[FLINK-9336] [state] Set the userKeyOffset in every MapEntry. Previously the userKeyOffset in RocksDB was set when serializing the key and namespace. This was prone to failure as different code paths followed by queryable state were not setting it appropriately. This closes #5993. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/14518067 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/14518067 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/14518067 Branch: refs/heads/master Commit: 14518067cc22b928ba8e230413d0a74237a3ceec Parents: e2cc831 Author: sihuazhou <[email protected]> Authored: Fri May 11 17:15:07 2018 +0200 Committer: kkloudas <[email protected]> Committed: Mon May 14 14:30:31 2018 +0200 ---------------------------------------------------------------------- .../runtime/state/StateBackendTestBase.java | 84 +++++++++++--------- .../streaming/state/RocksDBMapState.java | 14 ++-- 2 files changed, 54 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/14518067/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 784b628..b809d84 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -2591,45 +2591,53 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten @SuppressWarnings("unchecked,rawtypes") public void testMapState() throws Exception { CheckpointStreamFactory streamFactory = createStreamFactory(); - AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE); + AbstractKeyedStateBackend<String> backend = createKeyedBackend(StringSerializer.INSTANCE); MapStateDescriptor<Integer, String> kvId = new MapStateDescriptor<>("id", Integer.class, String.class); - TypeSerializer<Integer> keySerializer = IntSerializer.INSTANCE; + TypeSerializer<String> keySerializer = StringSerializer.INSTANCE; TypeSerializer<VoidNamespace> namespaceSerializer = VoidNamespaceSerializer.INSTANCE; MapState<Integer, String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); @SuppressWarnings("unchecked") - InternalKvState<Integer, VoidNamespace, Map<Integer, String>> kvState = (InternalKvState<Integer, VoidNamespace, Map<Integer, String>>) state; + InternalKvState<String, VoidNamespace, Map<Integer, String>> kvState = (InternalKvState<String, VoidNamespace, Map<Integer, String>>) state; // these are only available after the backend initialized the serializer TypeSerializer<Integer> userKeySerializer = kvId.getKeySerializer(); TypeSerializer<String> userValueSerializer = kvId.getValueSerializer(); // some modifications to the state - backend.setCurrentKey(1); + backend.setCurrentKey("1"); assertNull(state.get(1)); - assertNull(getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); + assertNull(getSerializedMap(kvState, "1", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); state.put(1, "1"); - backend.setCurrentKey(2); + backend.setCurrentKey("2"); assertNull(state.get(2)); - assertNull(getSerializedMap(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); + assertNull(getSerializedMap(kvState, "2", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); state.put(2, "2"); - backend.setCurrentKey(1); + + // put entry with different userKeyOffset + backend.setCurrentKey("11"); + state.put(11, "11"); + + backend.setCurrentKey("1"); assertTrue(state.contains(1)); assertEquals("1", state.get(1)); assertEquals(new HashMap<Integer, String>() {{ put (1, "1"); }}, - getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); + getSerializedMap(kvState, "1", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); + + assertEquals(new HashMap<Integer, String>() {{ put (11, "11"); }}, + getSerializedMap(kvState, "11", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); // draw a snapshot KeyedStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation())); // make some more modifications - backend.setCurrentKey(1); + backend.setCurrentKey("1"); state.put(1, "101"); - backend.setCurrentKey(2); + backend.setCurrentKey("2"); state.put(102, "102"); - backend.setCurrentKey(3); + backend.setCurrentKey("3"); state.put(103, "103"); state.putAll(new HashMap<Integer, String>() {{ put(1031, "1031"); put(1032, "1032"); }}); @@ -2637,19 +2645,19 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten KeyedStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation())); // validate the original state - backend.setCurrentKey(1); + backend.setCurrentKey("1"); assertEquals("101", state.get(1)); assertEquals(new HashMap<Integer, String>() {{ put(1, "101"); }}, - getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); - backend.setCurrentKey(2); + getSerializedMap(kvState, "1", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); + backend.setCurrentKey("2"); assertEquals("102", state.get(102)); assertEquals(new HashMap<Integer, String>() {{ put(2, "2"); put(102, "102"); }}, - getSerializedMap(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); - backend.setCurrentKey(3); + getSerializedMap(kvState, "2", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); + backend.setCurrentKey("3"); assertTrue(state.contains(103)); assertEquals("103", state.get(103)); assertEquals(new HashMap<Integer, String>() {{ put(103, "103"); put(1031, "1031"); put(1032, "1032"); }}, - getSerializedMap(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); + getSerializedMap(kvState, "3", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); List<Integer> keys = new ArrayList<>(); for (Integer key : state.keys()) { @@ -2670,11 +2678,11 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten assertTrue(values.isEmpty()); // make some more modifications - backend.setCurrentKey(1); + backend.setCurrentKey("1"); state.clear(); - backend.setCurrentKey(2); + backend.setCurrentKey("2"); state.remove(102); - backend.setCurrentKey(3); + backend.setCurrentKey("3"); final String updateSuffix = "_updated"; Iterator<Map.Entry<Integer, String>> iterator = state.iterator(); while (iterator.hasNext()) { @@ -2687,10 +2695,10 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten } // validate the state - backend.setCurrentKey(1); - backend.setCurrentKey(2); + backend.setCurrentKey("1"); + backend.setCurrentKey("2"); assertFalse(state.contains(102)); - backend.setCurrentKey(3); + backend.setCurrentKey("3"); for (Map.Entry<Integer, String> entry : state.entries()) { assertEquals(4 + updateSuffix.length(), entry.getValue().length()); assertTrue(entry.getValue().endsWith(updateSuffix)); @@ -2698,44 +2706,44 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten backend.dispose(); // restore the first snapshot and validate it - backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1); + backend = restoreKeyedBackend(StringSerializer.INSTANCE, snapshot1); snapshot1.discardState(); MapState<Integer, String> restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); @SuppressWarnings("unchecked") - InternalKvState<Integer, VoidNamespace, Map<Integer, String>> restoredKvState1 = (InternalKvState<Integer, VoidNamespace, Map<Integer, String>>) restored1; + InternalKvState<String, VoidNamespace, Map<Integer, String>> restoredKvState1 = (InternalKvState<String, VoidNamespace, Map<Integer, String>>) restored1; - backend.setCurrentKey(1); + backend.setCurrentKey("1"); assertEquals("1", restored1.get(1)); assertEquals(new HashMap<Integer, String>() {{ put (1, "1"); }}, - getSerializedMap(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); - backend.setCurrentKey(2); + getSerializedMap(restoredKvState1, "1", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); + backend.setCurrentKey("2"); assertEquals("2", restored1.get(2)); assertEquals(new HashMap<Integer, String>() {{ put (2, "2"); }}, - getSerializedMap(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); + getSerializedMap(restoredKvState1, "2", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); backend.dispose(); // restore the second snapshot and validate it - backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2); + backend = restoreKeyedBackend(StringSerializer.INSTANCE, snapshot2); snapshot2.discardState(); @SuppressWarnings("unchecked") MapState<Integer, String> restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); @SuppressWarnings("unchecked") - InternalKvState<Integer, VoidNamespace, Map<Integer, String>> restoredKvState2 = (InternalKvState<Integer, VoidNamespace, Map<Integer, String>>) restored2; + InternalKvState<String, VoidNamespace, Map<Integer, String>> restoredKvState2 = (InternalKvState<String, VoidNamespace, Map<Integer, String>>) restored2; - backend.setCurrentKey(1); + backend.setCurrentKey("1"); assertEquals("101", restored2.get(1)); assertEquals(new HashMap<Integer, String>() {{ put (1, "101"); }}, - getSerializedMap(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); - backend.setCurrentKey(2); + getSerializedMap(restoredKvState2, "1", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); + backend.setCurrentKey("2"); assertEquals("102", restored2.get(102)); assertEquals(new HashMap<Integer, String>() {{ put(2, "2"); put (102, "102"); }}, - getSerializedMap(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); - backend.setCurrentKey(3); + getSerializedMap(restoredKvState2, "2", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); + backend.setCurrentKey("3"); assertEquals("103", restored2.get(103)); assertEquals(new HashMap<Integer, String>() {{ put(103, "103"); put(1031, "1031"); put(1032, "1032"); }}, - getSerializedMap(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); + getSerializedMap(restoredKvState2, "3", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); backend.dispose(); } http://git-wip-us.apache.org/repos/asf/flink/blob/14518067/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java index 56a7cc4..219f3ae 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java @@ -67,9 +67,6 @@ public class RocksDBMapState<K, N, UK, UV> private final TypeSerializer<UK> userKeySerializer; private final TypeSerializer<UV> userValueSerializer; - /** The offset of User Key offset in raw key bytes. */ - private int userKeyOffset; - /** * Creates a new {@code RocksDBMapState}. * @@ -305,7 +302,6 @@ public class RocksDBMapState<K, N, UK, UV> private byte[] serializeCurrentKeyAndNamespace() throws IOException { writeCurrentKeyWithGroupAndNamespace(); - userKeyOffset = keySerializationStream.getPosition(); return keySerializationStream.toByteArray(); } @@ -338,7 +334,7 @@ public class RocksDBMapState<K, N, UK, UV> return keySerializationStream.toByteArray(); } - private UK deserializeUserKey(byte[] rawKeyBytes, TypeSerializer<UK> keySerializer) throws IOException { + private UK deserializeUserKey(int userKeyOffset, byte[] rawKeyBytes, TypeSerializer<UK> keySerializer) throws IOException { ByteArrayInputStreamWithPos bais = new ByteArrayInputStreamWithPos(rawKeyBytes); DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais); @@ -380,18 +376,23 @@ public class RocksDBMapState<K, N, UK, UV> private UV userValue; + /** The offset of User Key offset in raw key bytes. */ + private final int userKeyOffset; + private TypeSerializer<UK> keySerializer; private TypeSerializer<UV> valueSerializer; RocksDBMapEntry( @Nonnull final RocksDB db, + @Nonnull final int userKeyOffset, @Nonnull final byte[] rawKeyBytes, @Nonnull final byte[] rawValueBytes, @Nonnull final TypeSerializer<UK> keySerializer, @Nonnull final TypeSerializer<UV> valueSerializer) { this.db = db; + this.userKeyOffset = userKeyOffset; this.keySerializer = keySerializer; this.valueSerializer = valueSerializer; @@ -415,7 +416,7 @@ public class RocksDBMapState<K, N, UK, UV> public UK getKey() { if (userKey == null) { try { - userKey = deserializeUserKey(rawKeyBytes, keySerializer); + userKey = deserializeUserKey(userKeyOffset, rawKeyBytes, keySerializer); } catch (IOException e) { throw new RuntimeException("Error while deserializing the user key.", e); } @@ -583,6 +584,7 @@ public class RocksDBMapState<K, N, UK, UV> RocksDBMapEntry entry = new RocksDBMapEntry( db, + keyPrefixBytes.length, iterator.key(), iterator.value(), keySerializer,
