[
https://issues.apache.org/jira/browse/FLINK-5530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15828107#comment-15828107
]
ASF GitHub Bot commented on FLINK-5530:
---------------------------------------
Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/3143#discussion_r96638710
--- Diff:
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
---
@@ -132,55 +132,95 @@ public void setCurrentNamespace(N namespace) {
namespaceSerializer);
int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0,
backend.getNumberOfKeyGroups());
- writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1);
- return backend.db.get(columnFamily,
keySerializationStream.toByteArray());
+
+ // we cannot reuse the keySerializationStream member since this
method
+ // is called concurrently to the other ones and it may thus
contain garbage
+ ByteArrayOutputStreamWithPos tmpKeySerializationStream =
+ new ByteArrayOutputStreamWithPos(128);
+ DataOutputViewStreamWrapper
tmpKeySerializationDateDataOutputView =
+ new
DataOutputViewStreamWrapper(tmpKeySerializationStream);
+
+ writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1,
+ tmpKeySerializationStream,
tmpKeySerializationDateDataOutputView);
+
+ return backend.db.get(columnFamily,
tmpKeySerializationStream.toByteArray());
}
protected void writeCurrentKeyWithGroupAndNamespace() throws
IOException {
-
writeKeyWithGroupAndNamespace(backend.getCurrentKeyGroupIndex(),
backend.getCurrentKey(), currentNamespace);
+ writeKeyWithGroupAndNamespace(
+ backend.getCurrentKeyGroupIndex(),
+ backend.getCurrentKey(),
+ currentNamespace,
+ this.keySerializationStream,
+ this.keySerializationDateDataOutputView);
}
- protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, N
namespace) throws IOException {
+ protected void writeKeyWithGroupAndNamespace(int keyGroup, K key,
+ N namespace,
+ final ByteArrayOutputStreamWithPos keySerializationStream,
+ final DataOutputView keySerializationDateDataOutputView) throws
+ IOException {
+
keySerializationStream.reset();
- writeKeyGroup(keyGroup);
- writeKey(key);
- writeNameSpace(namespace);
+ writeKeyGroup(keyGroup, keySerializationDateDataOutputView);
+ writeKey(key, keySerializationStream,
keySerializationDateDataOutputView);
+ writeNameSpace(namespace, keySerializationStream,
keySerializationDateDataOutputView);
}
- private void writeKeyGroup(int keyGroup) throws IOException {
+ private void writeKeyGroup(int keyGroup,
+ final DataOutputView keySerializationDateDataOutputView) throws
+ IOException {
+
for (int i = backend.getKeyGroupPrefixBytes(); --i >= 0;) {
keySerializationDateDataOutputView.writeByte(keyGroup
>>> (i << 3));
}
}
- private void writeKey(K key) throws IOException {
+ private void writeKey(K key,
+ final ByteArrayOutputStreamWithPos keySerializationStream,
+ final DataOutputView keySerializationDateDataOutputView) throws
+ IOException {
+
//write key
int beforeWrite = keySerializationStream.getPosition();
backend.getKeySerializer().serialize(key,
keySerializationDateDataOutputView);
if (ambiguousKeyPossible) {
//write size of key
- writeLengthFrom(beforeWrite);
+ writeLengthFrom(beforeWrite, keySerializationStream,
+ keySerializationDateDataOutputView);
}
}
- private void writeNameSpace(N namespace) throws IOException {
+ private void writeNameSpace(N namespace,
+ final ByteArrayOutputStreamWithPos keySerializationStream,
+ final DataOutputView keySerializationDateDataOutputView) throws
+ IOException {
+
int beforeWrite = keySerializationStream.getPosition();
namespaceSerializer.serialize(namespace,
keySerializationDateDataOutputView);
if (ambiguousKeyPossible) {
//write length of namespace
- writeLengthFrom(beforeWrite);
+ writeLengthFrom(beforeWrite, keySerializationStream,
+ keySerializationDateDataOutputView);
}
}
- private void writeLengthFrom(int fromPosition) throws IOException {
+ private static void writeLengthFrom(int fromPosition,
+ final ByteArrayOutputStreamWithPos keySerializationStream,
+ final DataOutputView keySerializationDateDataOutputView) throws
+ IOException {
--- End diff --
I'll try to do that manually, too, as I didn't see a way to teach my
IntelliJ style that behaviour
> race condition in AbstractRocksDBState#getSerializedValue
> ---------------------------------------------------------
>
> Key: FLINK-5530
> URL: https://issues.apache.org/jira/browse/FLINK-5530
> Project: Flink
> Issue Type: Bug
> Components: Queryable State
> Affects Versions: 1.2.0
> Reporter: Nico Kruber
> Assignee: Nico Kruber
> Priority: Blocker
>
> AbstractRocksDBState#getSerializedValue() uses the same key serialisation
> stream as the ordinary state access methods but is called in parallel during
> state queries thus violating the assumption of only one thread accessing it.
> This may lead to either wrong results in queries or corrupt data while
> queries are executed.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)