[ https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372934#comment-16372934 ]
ASF GitHub Bot commented on FLINK-8679: --------------------------------------- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169991080 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java --- @@ -161,107 +160,131 @@ protected void writeKeyWithGroupAndNamespace( Preconditions.checkNotNull(key, "No key set. This method should not be called outside of a keyed context."); keySerializationStream.reset(); - writeKeyGroup(keyGroup, keySerializationDataOutputView); - writeKey(key, keySerializationStream, keySerializationDataOutputView); - writeNameSpace(namespace, keySerializationStream, keySerializationDataOutputView); + AbstractRocksDBUtils.writeKeyGroup(keyGroup, backend.getKeyGroupPrefixBytes(), keySerializationDataOutputView); + AbstractRocksDBUtils.writeKey(key, backend.getKeySerializer(), keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible); + AbstractRocksDBUtils.writeNameSpace(namespace, namespaceSerializer, keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible); } - private void writeKeyGroup( - int keyGroup, - DataOutputView keySerializationDateDataOutputView) throws IOException { - for (int i = backend.getKeyGroupPrefixBytes(); --i >= 0;) { - keySerializationDateDataOutputView.writeByte(keyGroup >>> (i << 3)); - } + protected Tuple3<Integer, K, N> readKeyWithGroupAndNamespace(ByteArrayInputStreamWithPos inputStream, DataInputView inputView) throws IOException { + int keyGroup = AbstractRocksDBUtils.readKeyGroup(backend.getKeyGroupPrefixBytes(), inputView); + K key = AbstractRocksDBUtils.readKey(backend.getKeySerializer(), inputStream, inputView, ambiguousKeyPossible); + N namespace = AbstractRocksDBUtils.readNamespace(namespaceSerializer, inputStream, inputView, ambiguousKeyPossible); + + return new Tuple3<>(keyGroup, key, namespace); } - private void writeKey( - K key, - ByteArrayOutputStreamWithPos keySerializationStream, - DataOutputView keySerializationDataOutputView) throws IOException { - //write key - int beforeWrite = keySerializationStream.getPosition(); - backend.getKeySerializer().serialize(key, keySerializationDataOutputView); - - if (ambiguousKeyPossible) { - //write size of key - writeLengthFrom(beforeWrite, keySerializationStream, - keySerializationDataOutputView); + /** + * Utils for RocksDB state serialization and deserialization. + */ + static class AbstractRocksDBUtils { --- End diff -- Addressing > RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with > namespace > ------------------------------------------------------------------------------------ > > Key: FLINK-8679 > URL: https://issues.apache.org/jira/browse/FLINK-8679 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing > Affects Versions: 1.5.0 > Reporter: Sihua Zhou > Assignee: Sihua Zhou > Priority: Blocker > Fix For: 1.5.0 > > > Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It > doesn't use the namespace to filter data. And > `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they > should be consistent at least. -- This message was sent by Atlassian JIRA (v7.6.3#76005)