klion26 closed pull request #7210: [FLINK-9702][State Backends, Checkpointing]
Improvement in (de)serialization of keys and values for RocksDB state
URL: https://github.com/apache/flink/pull/7210
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
index 22330c5f360..2f4fdfe5b1e 100644
---
a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
+++
b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
@@ -22,7 +22,6 @@
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.util.Preconditions;
-import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
@@ -112,7 +111,7 @@ public void setPosition(int position) {
}
@Override
- public void close() throws IOException {
+ public void close() {
}
public byte[] getBuf() {
diff --git
a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
index 01feae03380..b749c84711c 100644
---
a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
+++
b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
@@ -18,6 +18,8 @@
package org.apache.flink.core.memory;
+import org.apache.flink.util.Preconditions;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -350,6 +352,11 @@ public void write(DataInputView source, int numBytes)
throws IOException {
this.position += numBytes;
}
+ public void setPosition(int position) {
+ Preconditions.checkArgument(position >= 0 && position <=
this.position, "Position out of bounds.");
+ this.position = position;
+ }
+
//
------------------------------------------------------------------------
// Utilities
//
------------------------------------------------------------------------
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 19de2102a35..9abe3ea475f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -56,10 +56,10 @@
protected final TypeSerializer<K> keySerializer;
/** The currently active key. */
- private K currentKey;
+ protected K currentKey;
/** The key group of the currently active key. */
- private int currentKeyGroup;
+ protected int currentKeyGroup;
/** So that we can give out state when the user uses the same key. */
private final HashMap<String, InternalKvState<K, ?, ?>>
keyValueStatesByName;
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 2218bc0d3e0..18b589980be 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -20,8 +20,11 @@
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.internal.InternalKvState;
@@ -34,6 +37,7 @@
import org.rocksdb.WriteOptions;
import java.io.IOException;
+import java.util.List;
/**
* Base class for {@link State} implementations that store state in a RocksDB
database.
@@ -55,7 +59,7 @@
final TypeSerializer<V> valueSerializer;
/** The current namespace, which the next value methods will refer to.
*/
- private N currentNamespace;
+ protected N currentNamespace;
/** Backend that holds the actual RocksDB instance where we store
state. */
protected RocksDBKeyedStateBackend<K> backend;
@@ -71,7 +75,9 @@
protected final DataInputDeserializer dataInputView;
- private final boolean ambiguousKeyPossible;
+ private final RocksDBSerializedCompositeKeyBuilder<K>
sharedKeyNamespaceSerializer;
+ private final ByteArrayOutputStreamWithPos valueSerializationStream;
+ private final DataOutputView valueSerializationDataOutputView;
/**
* Creates a new RocksDB backed state.
@@ -100,8 +106,9 @@ protected AbstractRocksDBState(
this.dataOutputView = new DataOutputSerializer(128);
this.dataInputView = new DataInputDeserializer();
- this.ambiguousKeyPossible =
-
RocksDBKeySerializationUtils.isAmbiguousKeyPossible(backend.getKeySerializer(),
namespaceSerializer);
+ this.sharedKeyNamespaceSerializer =
backend.getSharedRocksKeyBuilder();
+ this.valueSerializationStream = new
ByteArrayOutputStreamWithPos(32);
+ this.valueSerializationDataOutputView = new
DataOutputViewStreamWrapper(valueSerializationStream);
}
//
------------------------------------------------------------------------
@@ -109,17 +116,15 @@ protected AbstractRocksDBState(
@Override
public void clear() {
try {
- writeCurrentKeyWithGroupAndNamespace();
- byte[] key = dataOutputView.getCopyOfBuffer();
- backend.db.delete(columnFamily, writeOptions, key);
- } catch (IOException | RocksDBException e) {
+ backend.db.delete(columnFamily, writeOptions,
serializeCurrentKeyWithGroupAndNamespace());
+ } catch (RocksDBException e) {
throw new FlinkRuntimeException("Error while removing
entry from RocksDB", e);
}
}
@Override
public void setCurrentNamespace(N namespace) {
- this.currentNamespace = Preconditions.checkNotNull(namespace,
"Namespace");
+ this.currentNamespace = namespace;
}
@Override
@@ -129,30 +134,78 @@ public void setCurrentNamespace(N namespace) {
final TypeSerializer<N> safeNamespaceSerializer,
final TypeSerializer<V> safeValueSerializer) throws
Exception {
- Preconditions.checkNotNull(serializedKeyAndNamespace);
- Preconditions.checkNotNull(safeKeySerializer);
- Preconditions.checkNotNull(safeNamespaceSerializer);
- Preconditions.checkNotNull(safeValueSerializer);
-
//TODO make KvStateSerializer key-group aware to save this
round trip and key-group computation
Tuple2<K, N> keyAndNamespace =
KvStateSerializer.deserializeKeyAndNamespace(
serializedKeyAndNamespace, safeKeySerializer,
safeNamespaceSerializer);
int keyGroup =
KeyGroupRangeAssignment.assignToKeyGroup(keyAndNamespace.f0,
backend.getNumberOfKeyGroups());
- // we cannot reuse the keySerializationStream member since this
method
- // is called concurrently to the other ones and it may thus
contain garbage
- DataOutputSerializer tmpKeySerializationView = new
DataOutputSerializer(128);
+ RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
+ new
RocksDBSerializedCompositeKeyBuilder<>(
+ safeKeySerializer,
+
backend.getKeyGroupPrefixBytes(),
+ 32
+ );
+ keyBuilder.setKeyAndKeyGroup(keyAndNamespace.f0, keyGroup);
+ byte[] key =
keyBuilder.buildCompositeKeyNamespace(keyAndNamespace.f1, namespaceSerializer);
+ return backend.db.get(columnFamily, key);
+ }
+
+ protected <UK> byte[]
serializeCurrentKeyWithGroupAndNamespacePlusUserKey(
+ UK userKey,
+ TypeSerializer<UK> userKeySerializer) throws IOException {
+ return
sharedKeyNamespaceSerializer.buildCompositeKeyNamesSpaceUserKey(
+ currentNamespace,
+ namespaceSerializer,
+ userKey,
+ userKeySerializer
+ );
+ }
+
+ protected byte[] serializeCurrentKeyWithGroupAndNamespace() {
+ return
sharedKeyNamespaceSerializer.buildCompositeKeyNamespace(currentNamespace,
namespaceSerializer);
+ }
+
+ protected byte[] serializeValue(V value) throws IOException {
+ return serializeValue(value, valueSerializer);
+ }
+
+ protected <T> byte[] serializeValueNullSensitive(T value,
TypeSerializer<T> serializer) throws IOException {
+ valueSerializationStream.reset();
+ valueSerializationDataOutputView.writeBoolean(value == null);
+ return serializeValueInternal(value, serializer);
+ }
+
+ protected <T> byte[] serializeValue(T value, TypeSerializer<T>
serializer) throws IOException {
+ valueSerializationStream.reset();
+ return serializeValueInternal(value, serializer);
+ }
+
+ protected <T> byte[] serializeValueInternal(T value, TypeSerializer<T>
serializer) throws IOException {
+ serializer.serialize(value, valueSerializationDataOutputView);
+ return valueSerializationStream.toByteArray();
+ }
+
+ protected <T> byte[] serializeValueList(
+ List<T> valueList,
+ TypeSerializer<T> elementSerializer,
+ byte delimiter) throws IOException {
+
+ valueSerializationStream.reset();
+ boolean first = true;
- writeKeyWithGroupAndNamespace(
- keyGroup,
- keyAndNamespace.f0,
- safeKeySerializer,
- keyAndNamespace.f1,
- safeNamespaceSerializer,
- tmpKeySerializationView);
+ for (T value : valueList) {
+ Preconditions.checkNotNull(value, "You cannot add null
to a value list.");
+
+ if (first) {
+ first = false;
+ } else {
+ valueSerializationStream.write(delimiter);
+ }
+ elementSerializer.serialize(value,
valueSerializationDataOutputView);
+ }
- return backend.db.get(columnFamily,
tmpKeySerializationView.getCopyOfBuffer());
+ return valueSerializationStream.toByteArray();
}
public void migrateSerializedValue(
@@ -170,12 +223,7 @@ public void migrateSerializedValue(
}
byte[] getKeyBytes() {
- try {
- writeCurrentKeyWithGroupAndNamespace();
- return dataOutputView.getCopyOfBuffer();
- } catch (IOException e) {
- throw new FlinkRuntimeException("Error while
serializing key", e);
- }
+ return serializeCurrentKeyWithGroupAndNamespace();
}
byte[] getValueBytes(V value) {
@@ -188,45 +236,6 @@ public void migrateSerializedValue(
}
}
- protected void writeCurrentKeyWithGroupAndNamespace() throws
IOException {
- writeKeyWithGroupAndNamespace(
- backend.getCurrentKeyGroupIndex(),
- backend.getCurrentKey(),
- currentNamespace,
- dataOutputView);
- }
-
- protected void writeKeyWithGroupAndNamespace(
- int keyGroup, K key, N namespace,
- DataOutputSerializer keySerializationDataOutputView)
throws IOException {
-
- writeKeyWithGroupAndNamespace(
- keyGroup,
- key,
- backend.getKeySerializer(),
- namespace,
- namespaceSerializer,
- keySerializationDataOutputView);
- }
-
- protected void writeKeyWithGroupAndNamespace(
- final int keyGroup,
- final K key,
- final TypeSerializer<K> keySerializer,
- final N namespace,
- final TypeSerializer<N> namespaceSerializer,
- final DataOutputSerializer
keySerializationDataOutputView) throws IOException {
-
- Preconditions.checkNotNull(key, "No key set. This method should
not be called outside of a keyed context.");
- Preconditions.checkNotNull(keySerializer);
- Preconditions.checkNotNull(namespaceSerializer);
-
- keySerializationDataOutputView.clear();
- RocksDBKeySerializationUtils.writeKeyGroup(keyGroup,
backend.getKeyGroupPrefixBytes(), keySerializationDataOutputView);
- RocksDBKeySerializationUtils.writeKey(key, keySerializer,
keySerializationDataOutputView, ambiguousKeyPossible);
- RocksDBKeySerializationUtils.writeNameSpace(namespace,
namespaceSerializer, keySerializationDataOutputView, ambiguousKeyPossible);
- }
-
protected V getDefaultValue() {
if (defaultValue != null) {
return valueSerializer.copy(defaultValue);
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
index 2085fb86256..7a4f6a21818 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
@@ -109,19 +109,14 @@ public void mergeNamespaces(N target, Collection<N>
sources) {
return;
}
- // cache key and namespace
- final K key = backend.getCurrentKey();
- final int keyGroup = backend.getCurrentKeyGroupIndex();
-
try {
ACC current = null;
// merge the sources to the target
for (N source : sources) {
if (source != null) {
- writeKeyWithGroupAndNamespace(keyGroup,
key, source, dataOutputView);
-
- final byte[] sourceKey =
dataOutputView.getCopyOfBuffer();
+ setCurrentNamespace(source);
+ final byte[] sourceKey =
serializeCurrentKeyWithGroupAndNamespace();
final byte[] valueBytes =
backend.db.get(columnFamily, sourceKey);
backend.db.delete(columnFamily,
writeOptions, sourceKey);
@@ -141,10 +136,9 @@ public void mergeNamespaces(N target, Collection<N>
sources) {
// if something came out of merging the sources, merge
it or write it to the target
if (current != null) {
+ setCurrentNamespace(target);
// create the target full-binary-key
- writeKeyWithGroupAndNamespace(keyGroup, key,
target, dataOutputView);
-
- final byte[] targetKey =
dataOutputView.getCopyOfBuffer();
+ final byte[] targetKey =
serializeCurrentKeyWithGroupAndNamespace();
final byte[] targetValueBytes =
backend.db.get(columnFamily, targetKey);
if (targetValueBytes != null) {
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
index d8844bfece1..02b4ffcecfd 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
@@ -23,6 +23,8 @@
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
+import javax.annotation.Nonnull;
+
import java.io.IOException;
/**
@@ -80,8 +82,12 @@ static int readKeyGroup(int keyGroupPrefixBytes,
DataInputView inputView) throws
}
}
+ public static boolean isSerializerTypeVariableSized(@Nonnull
TypeSerializer<?> serializer) {
+ return serializer.getLength() < 0;
+ }
+
public static boolean isAmbiguousKeyPossible(TypeSerializer
keySerializer, TypeSerializer namespaceSerializer) {
- return (keySerializer.getLength() < 0) &&
(namespaceSerializer.getLength() < 0);
+ return (isSerializerTypeVariableSized(keySerializer) &&
isSerializerTypeVariableSized(namespaceSerializer));
}
public static void writeKeyGroup(
@@ -108,7 +114,7 @@ public static void writeKeyGroup(
}
}
- private static void readVariableIntBytes(DataInputView inputView, int
value) throws IOException {
+ public static void readVariableIntBytes(DataInputView inputView, int
value) throws IOException {
do {
inputView.readByte();
value >>>= 8;
@@ -122,7 +128,7 @@ private static void writeLengthFrom(
writeVariableIntBytes(length,
keySerializationDateDataOutputView);
}
- private static void writeVariableIntBytes(
+ public static void writeVariableIntBytes(
int value,
DataOutputView keySerializationDateDataOutputView)
throws IOException {
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 50caa0d912a..5881390ff9b 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -249,6 +249,9 @@
/** The native metrics monitor. */
private RocksDBNativeMetricMonitor nativeMetricMonitor;
+ /** Helper to build the byte arrays of composite keys to address data
in RocksDB. Shared across all states.*/
+ private final RocksDBSerializedCompositeKeyBuilder<K>
sharedRocksKeyBuilder;
+
public RocksDBKeyedStateBackend(
String operatorIdentifier,
ClassLoader userCodeClassLoader,
@@ -300,6 +303,7 @@ public RocksDBKeyedStateBackend(
this.restoredKvStateMetaInfos = new HashMap<>();
this.writeOptions = new WriteOptions().setDisableWAL(true);
+ this.sharedRocksKeyBuilder = new
RocksDBSerializedCompositeKeyBuilder<>(keySerializer, keyGroupPrefixBytes, 32);
this.metricOptions = metricOptions;
this.metricGroup = metricGroup;
@@ -379,6 +383,12 @@ private void registerKvStateInformation(String
columnFamilyName, Tuple2<ColumnFa
}
}
+ @Override
+ public void setCurrentKey(K newKey) {
+ super.setCurrentKey(newKey);
+ sharedRocksKeyBuilder.setKeyAndKeyGroup(currentKey,
currentKeyGroup);
+ }
+
/**
* Should only be called by one thread, and only after all accesses to
the DB happened.
*/
@@ -463,6 +473,10 @@ public WriteOptions getWriteOptions() {
return writeOptions;
}
+ RocksDBSerializedCompositeKeyBuilder<K> getSharedRocksKeyBuilder() {
+ return sharedRocksKeyBuilder;
+ }
+
/**
* Triggers an asynchronous snapshot of the keyed state backend from
RocksDB. This snapshot can be canceled and
* is also stopped when the backend is closed through {@link
#dispose()}. For each backend, this method must always
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index 6904c853fe3..96cf182e6c3 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -115,11 +115,10 @@ private RocksDBListState(
@Override
public List<V> getInternal() {
try {
- writeCurrentKeyWithGroupAndNamespace();
- byte[] key = dataOutputView.getCopyOfBuffer();
+ byte[] key = serializeCurrentKeyWithGroupAndNamespace();
byte[] valueBytes = backend.db.get(columnFamily, key);
return deserializeList(valueBytes);
- } catch (IOException | RocksDBException e) {
+ } catch (RocksDBException e) {
throw new FlinkRuntimeException("Error while retrieving
data from RocksDB", e);
}
}
@@ -160,11 +159,12 @@ public void add(V value) {
Preconditions.checkNotNull(value, "You cannot add null to a
ListState.");
try {
- writeCurrentKeyWithGroupAndNamespace();
- byte[] key = dataOutputView.getCopyOfBuffer();
- dataOutputView.clear();
- elementSerializer.serialize(value, dataOutputView);
- backend.db.merge(columnFamily, writeOptions, key,
dataOutputView.getCopyOfBuffer());
+ backend.db.merge(
+ columnFamily,
+ writeOptions,
+ serializeCurrentKeyWithGroupAndNamespace(),
+ serializeValue(value, elementSerializer)
+ );
} catch (Exception e) {
throw new FlinkRuntimeException("Error while adding
data to RocksDB", e);
}
@@ -176,21 +176,17 @@ public void mergeNamespaces(N target, Collection<N>
sources) {
return;
}
- // cache key and namespace
- final K key = backend.getCurrentKey();
- final int keyGroup = backend.getCurrentKeyGroupIndex();
-
try {
// create the target full-binary-key
- writeKeyWithGroupAndNamespace(keyGroup, key, target,
dataOutputView);
- final byte[] targetKey =
dataOutputView.getCopyOfBuffer();
+ setCurrentNamespace(target);
+ final byte[] targetKey =
serializeCurrentKeyWithGroupAndNamespace();
// merge the sources to the target
for (N source : sources) {
if (source != null) {
- writeKeyWithGroupAndNamespace(keyGroup,
key, source, dataOutputView);
+ setCurrentNamespace(source);
+ final byte[] sourceKey =
serializeCurrentKeyWithGroupAndNamespace();
- byte[] sourceKey =
dataOutputView.getCopyOfBuffer();
byte[] valueBytes =
backend.db.get(columnFamily, sourceKey);
backend.db.delete(columnFamily,
writeOptions, sourceKey);
@@ -218,10 +214,11 @@ public void updateInternal(List<V> values) {
if (!values.isEmpty()) {
try {
- writeCurrentKeyWithGroupAndNamespace();
- byte[] key = dataOutputView.getCopyOfBuffer();
- byte[] premerge = getPreMergedValue(values,
elementSerializer, dataOutputView);
- backend.db.put(columnFamily, writeOptions, key,
premerge);
+ backend.db.put(
+ columnFamily,
+ writeOptions,
+
serializeCurrentKeyWithGroupAndNamespace(),
+ serializeValueList(values,
elementSerializer, DELIMITER));
} catch (IOException | RocksDBException e) {
throw new FlinkRuntimeException("Error while
updating data to RocksDB", e);
}
@@ -234,10 +231,11 @@ public void addAll(List<V> values) {
if (!values.isEmpty()) {
try {
- writeCurrentKeyWithGroupAndNamespace();
- byte[] key = dataOutputView.getCopyOfBuffer();
- byte[] premerge = getPreMergedValue(values,
elementSerializer, dataOutputView);
- backend.db.merge(columnFamily, writeOptions,
key, premerge);
+ backend.db.merge(
+ columnFamily,
+ writeOptions,
+
serializeCurrentKeyWithGroupAndNamespace(),
+ serializeValueList(values,
elementSerializer, DELIMITER));
} catch (IOException | RocksDBException e) {
throw new FlinkRuntimeException("Error while
updating data to RocksDB", e);
}
@@ -273,26 +271,6 @@ public void migrateSerializedValue(
}
}
- private static <V> byte[] getPreMergedValue(
- List<V> values,
- TypeSerializer<V> elementSerializer,
- DataOutputSerializer keySerializationStream) throws IOException
{
-
- keySerializationStream.clear();
- boolean first = true;
- for (V value : values) {
- Preconditions.checkNotNull(value, "You cannot add null
to a ListState.");
- if (first) {
- first = false;
- } else {
- keySerializationStream.write(DELIMITER);
- }
- elementSerializer.serialize(value,
keySerializationStream);
- }
-
- return keySerializationStream.getCopyOfBuffer();
- }
-
@SuppressWarnings("unchecked")
static <E, K, N, SV, S extends State, IS extends S> IS create(
StateDescriptor<S, SV> stateDesc,
@@ -343,10 +321,32 @@ public void migrateSerializedValue(
prevPosition = in.getPosition();
}
try {
- return result.isEmpty() ? null :
getPreMergedValue(result, elementSerializer, out);
+ return result.isEmpty() ? null :
serializeValueList(result, elementSerializer, DELIMITER);
} catch (IOException e) {
throw new FlinkRuntimeException("Failed to
serialize transformed list", e);
}
}
+
+ byte[] serializeValueList(
+ List<T> valueList,
+ TypeSerializer<T> elementSerializer,
+ byte delimiter) throws IOException {
+
+ out.clear();
+ boolean first = true;
+
+ for (T value : valueList) {
+ Preconditions.checkNotNull(value, "You cannot
add null to a value list.");
+
+ if (first) {
+ first = false;
+ } else {
+ out.write(delimiter);
+ }
+ elementSerializer.serialize(value, out);
+ }
+
+ return out.getCopyOfBuffer();
+ }
}
}
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index cb656b53b1b..5a8f4b18bbf 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -119,7 +119,7 @@ private RocksDBMapState(
@Override
public UV get(UK userKey) throws IOException, RocksDBException {
- byte[] rawKeyBytes =
serializeUserKeyWithCurrentKeyAndNamespace(userKey);
+ byte[] rawKeyBytes =
serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
byte[] rawValueBytes = backend.db.get(columnFamily,
rawKeyBytes);
return (rawValueBytes == null ? null :
deserializeUserValue(dataInputView, rawValueBytes, userValueSerializer));
@@ -128,8 +128,8 @@ public UV get(UK userKey) throws IOException,
RocksDBException {
@Override
public void put(UK userKey, UV userValue) throws IOException,
RocksDBException {
- byte[] rawKeyBytes =
serializeUserKeyWithCurrentKeyAndNamespace(userKey);
- byte[] rawValueBytes = serializeUserValue(userValue,
userValueSerializer, dataOutputView);
+ byte[] rawKeyBytes =
serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
+ byte[] rawValueBytes = serializeValueNullSensitive(userValue,
userValueSerializer);
backend.db.put(columnFamily, writeOptions, rawKeyBytes,
rawValueBytes);
}
@@ -142,8 +142,8 @@ public void putAll(Map<UK, UV> map) throws IOException,
RocksDBException {
try (RocksDBWriteBatchWrapper writeBatchWrapper = new
RocksDBWriteBatchWrapper(backend.db, writeOptions)) {
for (Map.Entry<UK, UV> entry : map.entrySet()) {
- byte[] rawKeyBytes =
serializeUserKeyWithCurrentKeyAndNamespace(entry.getKey());
- byte[] rawValueBytes =
serializeUserValue(entry.getValue(), userValueSerializer, dataOutputView);
+ byte[] rawKeyBytes =
serializeCurrentKeyWithGroupAndNamespacePlusUserKey(entry.getKey(),
userKeySerializer);
+ byte[] rawValueBytes =
serializeValueNullSensitive(entry.getValue(), userValueSerializer);
writeBatchWrapper.put(columnFamily,
rawKeyBytes, rawValueBytes);
}
}
@@ -151,21 +151,21 @@ public void putAll(Map<UK, UV> map) throws IOException,
RocksDBException {
@Override
public void remove(UK userKey) throws IOException, RocksDBException {
- byte[] rawKeyBytes =
serializeUserKeyWithCurrentKeyAndNamespace(userKey);
+ byte[] rawKeyBytes =
serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
backend.db.delete(columnFamily, writeOptions, rawKeyBytes);
}
@Override
public boolean contains(UK userKey) throws IOException,
RocksDBException {
- byte[] rawKeyBytes =
serializeUserKeyWithCurrentKeyAndNamespace(userKey);
+ byte[] rawKeyBytes =
serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
byte[] rawValueBytes = backend.db.get(columnFamily,
rawKeyBytes);
return (rawValueBytes != null);
}
@Override
- public Iterable<Map.Entry<UK, UV>> entries() throws IOException {
+ public Iterable<Map.Entry<UK, UV>> entries() {
final Iterator<Map.Entry<UK, UV>> iterator = iterator();
// Return null to make the behavior consistent with other
states.
@@ -177,10 +177,11 @@ public boolean contains(UK userKey) throws IOException,
RocksDBException {
}
@Override
- public Iterable<UK> keys() throws IOException {
- final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
+ public Iterable<UK> keys() {
+ final byte[] prefixBytes =
serializeCurrentKeyWithGroupAndNamespace();
return () -> new RocksDBMapIterator<UK>(backend.db,
prefixBytes, userKeySerializer, userValueSerializer, dataInputView) {
+ @Nullable
@Override
public UK next() {
RocksDBMapEntry entry = nextEntry();
@@ -190,8 +191,8 @@ public UK next() {
}
@Override
- public Iterable<UV> values() throws IOException {
- final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
+ public Iterable<UV> values() {
+ final byte[] prefixBytes =
serializeCurrentKeyWithGroupAndNamespace();
return () -> new RocksDBMapIterator<UV>(backend.db,
prefixBytes, userKeySerializer, userValueSerializer, dataInputView) {
@Override
@@ -203,8 +204,8 @@ public UV next() {
}
@Override
- public Iterator<Map.Entry<UK, UV>> iterator() throws IOException {
- final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
+ public Iterator<Map.Entry<UK, UV>> iterator() {
+ final byte[] prefixBytes =
serializeCurrentKeyWithGroupAndNamespace();
return new RocksDBMapIterator<Map.Entry<UK, UV>>(backend.db,
prefixBytes, userKeySerializer, userValueSerializer, dataInputView) {
@Override
@@ -220,7 +221,7 @@ public void clear() {
try (RocksIteratorWrapper iterator =
RocksDBKeyedStateBackend.getRocksIterator(backend.db, columnFamily);
WriteBatch writeBatch = new WriteBatch(128)) {
- final byte[] keyPrefixBytes =
serializeCurrentKeyAndNamespace();
+ final byte[] keyPrefixBytes =
serializeCurrentKeyWithGroupAndNamespace();
iterator.seek(keyPrefixBytes);
while (iterator.isValid()) {
@@ -259,18 +260,15 @@ public void clear() {
int keyGroup =
KeyGroupRangeAssignment.assignToKeyGroup(keyAndNamespace.f0,
backend.getNumberOfKeyGroups());
- DataOutputSerializer outputView = new DataOutputSerializer(128);
- DataInputDeserializer inputView = new DataInputDeserializer();
-
- writeKeyWithGroupAndNamespace(
- keyGroup,
- keyAndNamespace.f0,
+ RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
+ new RocksDBSerializedCompositeKeyBuilder<>(
safeKeySerializer,
- keyAndNamespace.f1,
- safeNamespaceSerializer,
- outputView);
+ backend.getKeyGroupPrefixBytes(),
+ 32);
+
+ keyBuilder.setKeyAndKeyGroup(keyAndNamespace.f0, keyGroup);
- final byte[] keyPrefixBytes = outputView.getCopyOfBuffer();
+ final byte[] keyPrefixBytes =
keyBuilder.buildCompositeKeyNamespace(keyAndNamespace.f1, namespaceSerializer);
final MapSerializer<UK, UV> serializer = (MapSerializer<UK,
UV>) safeValueSerializer;
@@ -282,7 +280,8 @@ public void clear() {
keyPrefixBytes,
dupUserKeySerializer,
dupUserValueSerializer,
- inputView) {
+ dataInputView
+ ) {
@Override
public Map.Entry<UK, UV> next() {
@@ -302,36 +301,6 @@ public void clear() {
// Serialization Methods
//
------------------------------------------------------------------------
- private byte[] serializeCurrentKeyAndNamespace() throws IOException {
- writeCurrentKeyWithGroupAndNamespace();
-
- return dataOutputView.getCopyOfBuffer();
- }
-
- private byte[] serializeUserKeyWithCurrentKeyAndNamespace(UK userKey)
throws IOException {
- serializeCurrentKeyAndNamespace();
- userKeySerializer.serialize(userKey, dataOutputView);
-
- return dataOutputView.getCopyOfBuffer();
- }
-
- private static <UV> byte[] serializeUserValue(
- UV userValue,
- TypeSerializer<UV> valueSerializer,
- DataOutputSerializer dataOutputView) throws IOException {
-
- dataOutputView.clear();
-
- if (userValue == null) {
- dataOutputView.writeBoolean(true);
- } else {
- dataOutputView.writeBoolean(false);
- valueSerializer.serialize(userValue, dataOutputView);
- }
-
- return dataOutputView.getCopyOfBuffer();
- }
-
private static <UK> UK deserializeUserKey(
DataInputDeserializer dataInputView,
int userKeyOffset,
@@ -474,7 +443,7 @@ public UV setValue(UV value) {
try {
userValue = value;
- rawValueBytes = serializeUserValue(value,
valueSerializer, dataOutputView);
+ rawValueBytes =
serializeValueNullSensitive(value, valueSerializer);
db.put(columnFamily, writeOptions, rawKeyBytes,
rawValueBytes);
} catch (IOException | RocksDBException e) {
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
index 138357b0d77..7220168d7fa 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -102,20 +102,14 @@ public void mergeNamespaces(N target, Collection<N>
sources) {
return;
}
- // cache key and namespace
- final K key = backend.getCurrentKey();
- final int keyGroup = backend.getCurrentKeyGroupIndex();
-
try {
V current = null;
// merge the sources to the target
for (N source : sources) {
if (source != null) {
-
- writeKeyWithGroupAndNamespace(keyGroup,
key, source, dataOutputView);
-
- final byte[] sourceKey =
dataOutputView.getCopyOfBuffer();
+ setCurrentNamespace(source);
+ final byte[] sourceKey =
serializeCurrentKeyWithGroupAndNamespace();
final byte[] valueBytes =
backend.db.get(columnFamily, sourceKey);
backend.db.delete(columnFamily,
writeOptions, sourceKey);
@@ -136,9 +130,8 @@ public void mergeNamespaces(N target, Collection<N>
sources) {
// if something came out of merging the sources, merge
it or write it to the target
if (current != null) {
// create the target full-binary-key
- writeKeyWithGroupAndNamespace(keyGroup, key,
target, dataOutputView);
-
- final byte[] targetKey =
dataOutputView.getCopyOfBuffer();
+ setCurrentNamespace(target);
+ final byte[] targetKey =
serializeCurrentKeyWithGroupAndNamespace();
final byte[] targetValueBytes =
backend.db.get(columnFamily, targetKey);
if (targetValueBytes != null) {
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java
new file mode 100644
index 00000000000..5faa84c23c4
--- /dev/null
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.IOException;
+
+/**
+ * Responsible for serialization of currentKey, currentGroup and namespace.
+ * Will reuse the previous serialized currentKeyed if possible.
+ * @param <K> type of the key.
+ */
+@NotThreadSafe
+class RocksDBSerializedCompositeKeyBuilder<K> {
+ @Nonnull
+ private final TypeSerializer<K> keySerializer;
+ @Nonnull
+ private final DataOutputSerializer keyOutView;
+ @Nonnegative
+ private final int keyGroupPrefixBytes;
+ private final boolean keySerializerTypeVariableSized;
+ @Nonnegative
+ private int keyMark;
+
+ RocksDBSerializedCompositeKeyBuilder(
+ @Nonnull TypeSerializer<K> keySerializer,
+ @Nonnegative int keyGroupPrefixBytes,
+ @Nonnegative int initialSize) {
+ this.keySerializer = keySerializer;
+ this.keyOutView = new DataOutputSerializer(initialSize);
+ this.keyGroupPrefixBytes = keyGroupPrefixBytes;
+ this.keySerializerTypeVariableSized =
RocksDBKeySerializationUtils.isSerializerTypeVariableSized(keySerializer);
+ this.keyMark = 0;
+ }
+
+ void setKeyAndKeyGroup(@Nonnull K key, @Nonnegative int keyGroupId) {
+ try {
+ serializeKeyGroupAndKey(key, keyGroupId);
+ } catch (IOException shouldNeverHappen) {
+ throw new FlinkRuntimeException(shouldNeverHappen);
+ }
+ }
+
+ <N> byte[] buildCompositeKeyNamespace(@Nonnull N namespace, @Nonnull
TypeSerializer<N> namespaceSerializer) {
+ try {
+ serializeNamespace(namespace, namespaceSerializer);
+ final byte[] result = keyOutView.getCopyOfBuffer();
+ resetToKey();
+ return result;
+ } catch (IOException shouldNeverHappen) {
+ throw new FlinkRuntimeException(shouldNeverHappen);
+ }
+ }
+
+ <N, UK> byte[] buildCompositeKeyNamesSpaceUserKey(
+ @Nonnull N namespace,
+ @Nonnull TypeSerializer<N> namespaceSerializer,
+ @Nonnull UK userKey,
+ @Nonnull TypeSerializer<UK> userKeySerializer) throws
IOException {
+ serializeNamespace(namespace, namespaceSerializer);
+ userKeySerializer.serialize(userKey, keyOutView);
+ byte[] result = keyOutView.getCopyOfBuffer();
+ resetToKey();
+ return result;
+ }
+
+ private void serializeKeyGroupAndKey(K key, int keyGroupId) throws
IOException {
+ resetFully();
+ // write key-group
+ RocksDBKeySerializationUtils.writeKeyGroup(
+ keyGroupId,
+ keyGroupPrefixBytes,
+ keyOutView);
+ // write key
+ keySerializer.serialize(key, keyOutView);
+ keyMark = keyOutView.length();
+ }
+
+ private <N> void serializeNamespace(
+ @Nonnull N namespace,
+ @Nonnull TypeSerializer<N> namespaceSerializer) throws
IOException {
+ assert isKeyWritten();
+ final boolean ambiguousKeyPossible =
+ keySerializerTypeVariableSized &
+
RocksDBKeySerializationUtils.isSerializerTypeVariableSized(namespaceSerializer);
+ if (ambiguousKeyPossible) {
+ RocksDBKeySerializationUtils.writeVariableIntBytes(
+ keyMark - keyGroupPrefixBytes,
+ keyOutView);
+ }
+ RocksDBKeySerializationUtils.writeNameSpace(
+ namespace,
+ namespaceSerializer,
+ keyOutView,
+ ambiguousKeyPossible);
+ }
+
+ private void resetFully() {
+ keyMark = 0;
+ keyOutView.clear();
+ }
+
+ private void resetToKey() {
+ this.keyOutView.setPosition(keyMark);
+ }
+
+ private boolean isKeyWritten() {
+ return keyMark > 0;
+ }
+}
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
index 0ca90d4a521..7ef2e8e38b6 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
@@ -80,9 +80,9 @@ private RocksDBValueState(
@Override
public V value() {
try {
- writeCurrentKeyWithGroupAndNamespace();
- byte[] key = dataOutputView.getCopyOfBuffer();
- byte[] valueBytes = backend.db.get(columnFamily, key);
+ byte[] valueBytes = backend.db.get(columnFamily,
+ serializeCurrentKeyWithGroupAndNamespace());
+
if (valueBytes == null) {
return getDefaultValue();
}
@@ -101,11 +101,9 @@ public void update(V value) {
}
try {
- writeCurrentKeyWithGroupAndNamespace();
- byte[] key = dataOutputView.getCopyOfBuffer();
- dataOutputView.clear();
- valueSerializer.serialize(value, dataOutputView);
- backend.db.put(columnFamily, writeOptions, key,
dataOutputView.getCopyOfBuffer());
+ backend.db.put(columnFamily, writeOptions,
+ serializeCurrentKeyWithGroupAndNamespace(),
+ serializeValue(value));
} catch (Exception e) {
throw new FlinkRuntimeException("Error while adding
data to RocksDB", e);
}
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksValueDeserializer.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksValueDeserializer.java
new file mode 100644
index 00000000000..bc564f5695a
--- /dev/null
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksValueDeserializer.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import java.io.IOException;
+
+/**
+ * RocksValueDeserializer.
+ */
+public class RocksValueDeserializer {
+ private final DataInputViewStreamWrapper inView;
+ private final ByteArrayInputStreamWithPos inStream;
+ private final DataOutputViewStreamWrapper outView;
+ private final ByteArrayOutputStreamWithPos outStream;
+
+ public RocksValueDeserializer() {
+ this.inStream = new ByteArrayInputStreamWithPos();
+ this.inView = new DataInputViewStreamWrapper(inStream);
+ this.outStream = new ByteArrayOutputStreamWithPos();
+ this.outView = new DataOutputViewStreamWrapper(outStream);
+ }
+
+ <V> byte[] serializeValue(V value, TypeSerializer<V> valueSerializer)
throws IOException {
+ valueSerializer.serialize(value, outView);
+ final byte[] result = outStream.toByteArray();
+ outStream.reset();
+ return result;
+ }
+
+ <V> V deserializeValue(byte[] data, TypeSerializer<V> valueSerializer)
throws IOException {
+ return deserializeValue(data, 0, data.length, valueSerializer);
+ }
+
+ <V> V deserializeValue(byte[] data, int off, int len, TypeSerializer<V>
valueSerializer) throws IOException {
+ inStream.setBuffer(data, off, len);
+ return valueSerializer.deserialize(inView);
+ }
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services