[
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372790#comment-16372790
]
ASF GitHub Bot commented on FLINK-8679:
---------------------------------------
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/5518#discussion_r169950751
--- Diff:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
---
@@ -161,107 +160,131 @@ protected void writeKeyWithGroupAndNamespace(
Preconditions.checkNotNull(key, "No key set. This method should
not be called outside of a keyed context.");
keySerializationStream.reset();
- writeKeyGroup(keyGroup, keySerializationDataOutputView);
- writeKey(key, keySerializationStream,
keySerializationDataOutputView);
- writeNameSpace(namespace, keySerializationStream,
keySerializationDataOutputView);
+ AbstractRocksDBUtils.writeKeyGroup(keyGroup,
backend.getKeyGroupPrefixBytes(), keySerializationDataOutputView);
+ AbstractRocksDBUtils.writeKey(key, backend.getKeySerializer(),
keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible);
+ AbstractRocksDBUtils.writeNameSpace(namespace,
namespaceSerializer, keySerializationStream, keySerializationDataOutputView,
ambiguousKeyPossible);
}
- private void writeKeyGroup(
- int keyGroup,
- DataOutputView keySerializationDateDataOutputView)
throws IOException {
- for (int i = backend.getKeyGroupPrefixBytes(); --i >= 0;) {
- keySerializationDateDataOutputView.writeByte(keyGroup
>>> (i << 3));
- }
+ protected Tuple3<Integer, K, N>
readKeyWithGroupAndNamespace(ByteArrayInputStreamWithPos inputStream,
DataInputView inputView) throws IOException {
+ int keyGroup =
AbstractRocksDBUtils.readKeyGroup(backend.getKeyGroupPrefixBytes(), inputView);
+ K key =
AbstractRocksDBUtils.readKey(backend.getKeySerializer(), inputStream,
inputView, ambiguousKeyPossible);
+ N namespace =
AbstractRocksDBUtils.readNamespace(namespaceSerializer, inputStream, inputView,
ambiguousKeyPossible);
+
+ return new Tuple3<>(keyGroup, key, namespace);
}
- private void writeKey(
- K key,
- ByteArrayOutputStreamWithPos keySerializationStream,
- DataOutputView keySerializationDataOutputView) throws
IOException {
- //write key
- int beforeWrite = keySerializationStream.getPosition();
- backend.getKeySerializer().serialize(key,
keySerializationDataOutputView);
-
- if (ambiguousKeyPossible) {
- //write size of key
- writeLengthFrom(beforeWrite, keySerializationStream,
- keySerializationDataOutputView);
+ /**
+ * Utils for RocksDB state serialization and deserialization.
+ */
+ static class AbstractRocksDBUtils {
--- End diff --
The name of this class already suggests that it might better go to its own
file, maybe as `RocksDBKeySerializationUtils`. Now that the methods are public
and used in different places, I also suggest to have a test to guard their
behavior against accidental code changes.
> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with
> namespace
> ------------------------------------------------------------------------------------
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
> Issue Type: Bug
> Components: State Backends, Checkpointing
> Affects Versions: 1.5.0
> Reporter: Sihua Zhou
> Assignee: Sihua Zhou
> Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It
> doesn't use the namespace to filter data. And
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they
> should be consistent at least.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)