asfgit closed pull request #7288: [FLINK-9702] Improvement in (de)serialization
of keys and values for RocksDB state
URL: https://github.com/apache/flink/pull/7288
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
index 6668c572930..55ba8ab477f 100644
---
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
+++
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
@@ -25,6 +25,7 @@
import java.io.IOException;
import java.lang.reflect.Array;
+import java.util.Arrays;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -86,16 +87,21 @@ public boolean isImmutableType() {
@Override
public C[] copy(C[] from) {
- C[] copy = create(from.length);
- for (int i = 0; i < copy.length; i++) {
- C val = from[i];
- if (val != null) {
- copy[i] = this.componentSerializer.copy(val);
+ final TypeSerializer<C> serializer = this.componentSerializer;
+
+ if (serializer.isImmutableType()) {
+ return Arrays.copyOf(from, from.length);
+ } else {
+ C[] copy = create(from.length);
+ for (int i = 0; i < copy.length; i++) {
+ C val = from[i];
+ if (val != null) {
+ copy[i] = serializer.copy(val);
+ }
}
+ return copy;
}
-
- return copy;
}
@Override
diff --git
a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
index 22330c5f360..2f4fdfe5b1e 100644
---
a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
+++
b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
@@ -22,7 +22,6 @@
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.util.Preconditions;
-import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
@@ -112,7 +111,7 @@ public void setPosition(int position) {
}
@Override
- public void close() throws IOException {
+ public void close() {
}
public byte[] getBuf() {
diff --git
a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
index 01feae03380..b749c84711c 100644
---
a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
+++
b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
@@ -18,6 +18,8 @@
package org.apache.flink.core.memory;
+import org.apache.flink.util.Preconditions;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -350,6 +352,11 @@ public void write(DataInputView source, int numBytes)
throws IOException {
this.position += numBytes;
}
+ public void setPosition(int position) {
+ Preconditions.checkArgument(position >= 0 && position <=
this.position, "Position out of bounds.");
+ this.position = position;
+ }
+
//
------------------------------------------------------------------------
// Utilities
//
------------------------------------------------------------------------
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
index 8c0f4d7da7b..04e665a6e87 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
@@ -18,7 +18,6 @@
package org.apache.flink.contrib.streaming.state;
-import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.internal.InternalAppendingState;
import org.apache.flink.util.FlinkRuntimeException;
@@ -28,8 +27,8 @@
import java.io.IOException;
-abstract class AbstractRocksDBAppendingState <K, N, IN, SV, OUT, S extends
State>
- extends AbstractRocksDBState<K, N, SV, S>
+abstract class AbstractRocksDBAppendingState <K, N, IN, SV, OUT>
+ extends AbstractRocksDBState<K, N, SV>
implements InternalAppendingState<K, N, IN, SV, OUT> {
/**
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 2218bc0d3e0..fb19a8a2816 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -34,6 +34,7 @@
import org.rocksdb.WriteOptions;
import java.io.IOException;
+import java.util.List;
/**
* Base class for {@link State} implementations that store state in a RocksDB
database.
@@ -44,9 +45,8 @@
* @param <K> The type of the key.
* @param <N> The type of the namespace.
* @param <V> The type of values kept internally in state.
- * @param <S> The type of {@link State}.
*/
-public abstract class AbstractRocksDBState<K, N, V, S extends State>
implements InternalKvState<K, N, V>, State {
+public abstract class AbstractRocksDBState<K, N, V> implements
InternalKvState<K, N, V>, State {
/** Serializer for the namespace. */
final TypeSerializer<N> namespaceSerializer;
@@ -71,7 +71,7 @@
protected final DataInputDeserializer dataInputView;
- private final boolean ambiguousKeyPossible;
+ private final RocksDBSerializedCompositeKeyBuilder<K>
sharedKeyNamespaceSerializer;
/**
* Creates a new RocksDB backed state.
@@ -100,8 +100,7 @@ protected AbstractRocksDBState(
this.dataOutputView = new DataOutputSerializer(128);
this.dataInputView = new DataInputDeserializer();
- this.ambiguousKeyPossible =
-
RocksDBKeySerializationUtils.isAmbiguousKeyPossible(backend.getKeySerializer(),
namespaceSerializer);
+ this.sharedKeyNamespaceSerializer =
backend.getSharedRocksKeyBuilder();
}
//
------------------------------------------------------------------------
@@ -109,17 +108,15 @@ protected AbstractRocksDBState(
@Override
public void clear() {
try {
- writeCurrentKeyWithGroupAndNamespace();
- byte[] key = dataOutputView.getCopyOfBuffer();
- backend.db.delete(columnFamily, writeOptions, key);
- } catch (IOException | RocksDBException e) {
+ backend.db.delete(columnFamily, writeOptions,
serializeCurrentKeyWithGroupAndNamespace());
+ } catch (RocksDBException e) {
throw new FlinkRuntimeException("Error while removing
entry from RocksDB", e);
}
}
@Override
public void setCurrentNamespace(N namespace) {
- this.currentNamespace = Preconditions.checkNotNull(namespace,
"Namespace");
+ this.currentNamespace = namespace;
}
@Override
@@ -129,30 +126,78 @@ public void setCurrentNamespace(N namespace) {
final TypeSerializer<N> safeNamespaceSerializer,
final TypeSerializer<V> safeValueSerializer) throws
Exception {
- Preconditions.checkNotNull(serializedKeyAndNamespace);
- Preconditions.checkNotNull(safeKeySerializer);
- Preconditions.checkNotNull(safeNamespaceSerializer);
- Preconditions.checkNotNull(safeValueSerializer);
-
//TODO make KvStateSerializer key-group aware to save this
round trip and key-group computation
Tuple2<K, N> keyAndNamespace =
KvStateSerializer.deserializeKeyAndNamespace(
serializedKeyAndNamespace, safeKeySerializer,
safeNamespaceSerializer);
int keyGroup =
KeyGroupRangeAssignment.assignToKeyGroup(keyAndNamespace.f0,
backend.getNumberOfKeyGroups());
- // we cannot reuse the keySerializationStream member since this
method
- // is called concurrently to the other ones and it may thus
contain garbage
- DataOutputSerializer tmpKeySerializationView = new
DataOutputSerializer(128);
+ RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
+ new
RocksDBSerializedCompositeKeyBuilder<>(
+ safeKeySerializer,
+
backend.getKeyGroupPrefixBytes(),
+ 32
+ );
+ keyBuilder.setKeyAndKeyGroup(keyAndNamespace.f0, keyGroup);
+ byte[] key =
keyBuilder.buildCompositeKeyNamespace(keyAndNamespace.f1, namespaceSerializer);
+ return backend.db.get(columnFamily, key);
+ }
+
+ <UK> byte[] serializeCurrentKeyWithGroupAndNamespacePlusUserKey(
+ UK userKey,
+ TypeSerializer<UK> userKeySerializer) throws IOException {
+ return
sharedKeyNamespaceSerializer.buildCompositeKeyNamesSpaceUserKey(
+ currentNamespace,
+ namespaceSerializer,
+ userKey,
+ userKeySerializer
+ );
+ }
+
+ private <T> byte[] serializeValueInternal(T value, TypeSerializer<T>
serializer) throws IOException {
+ serializer.serialize(value, dataOutputView);
+ return dataOutputView.getCopyOfBuffer();
+ }
+
+ byte[] serializeCurrentKeyWithGroupAndNamespace() {
+ return
sharedKeyNamespaceSerializer.buildCompositeKeyNamespace(currentNamespace,
namespaceSerializer);
+ }
+
+ byte[] serializeValue(V value) throws IOException {
+ return serializeValue(value, valueSerializer);
+ }
+
+ <T> byte[] serializeValueNullSensitive(T value, TypeSerializer<T>
serializer) throws IOException {
+ dataOutputView.clear();
+ dataOutputView.writeBoolean(value == null);
+ return serializeValueInternal(value, serializer);
+ }
+
+ <T> byte[] serializeValue(T value, TypeSerializer<T> serializer) throws
IOException {
+ dataOutputView.clear();
+ return serializeValueInternal(value, serializer);
+ }
+
+ <T> byte[] serializeValueList(
+ List<T> valueList,
+ TypeSerializer<T> elementSerializer,
+ byte delimiter) throws IOException {
+
+ dataOutputView.clear();
+ boolean first = true;
- writeKeyWithGroupAndNamespace(
- keyGroup,
- keyAndNamespace.f0,
- safeKeySerializer,
- keyAndNamespace.f1,
- safeNamespaceSerializer,
- tmpKeySerializationView);
+ for (T value : valueList) {
+ Preconditions.checkNotNull(value, "You cannot add null
to a value list.");
+
+ if (first) {
+ first = false;
+ } else {
+ dataOutputView.write(delimiter);
+ }
+ elementSerializer.serialize(value, dataOutputView);
+ }
- return backend.db.get(columnFamily,
tmpKeySerializationView.getCopyOfBuffer());
+ return dataOutputView.getCopyOfBuffer();
}
public void migrateSerializedValue(
@@ -170,12 +215,7 @@ public void migrateSerializedValue(
}
byte[] getKeyBytes() {
- try {
- writeCurrentKeyWithGroupAndNamespace();
- return dataOutputView.getCopyOfBuffer();
- } catch (IOException e) {
- throw new FlinkRuntimeException("Error while
serializing key", e);
- }
+ return serializeCurrentKeyWithGroupAndNamespace();
}
byte[] getValueBytes(V value) {
@@ -188,45 +228,6 @@ public void migrateSerializedValue(
}
}
- protected void writeCurrentKeyWithGroupAndNamespace() throws
IOException {
- writeKeyWithGroupAndNamespace(
- backend.getCurrentKeyGroupIndex(),
- backend.getCurrentKey(),
- currentNamespace,
- dataOutputView);
- }
-
- protected void writeKeyWithGroupAndNamespace(
- int keyGroup, K key, N namespace,
- DataOutputSerializer keySerializationDataOutputView)
throws IOException {
-
- writeKeyWithGroupAndNamespace(
- keyGroup,
- key,
- backend.getKeySerializer(),
- namespace,
- namespaceSerializer,
- keySerializationDataOutputView);
- }
-
- protected void writeKeyWithGroupAndNamespace(
- final int keyGroup,
- final K key,
- final TypeSerializer<K> keySerializer,
- final N namespace,
- final TypeSerializer<N> namespaceSerializer,
- final DataOutputSerializer
keySerializationDataOutputView) throws IOException {
-
- Preconditions.checkNotNull(key, "No key set. This method should
not be called outside of a keyed context.");
- Preconditions.checkNotNull(keySerializer);
- Preconditions.checkNotNull(namespaceSerializer);
-
- keySerializationDataOutputView.clear();
- RocksDBKeySerializationUtils.writeKeyGroup(keyGroup,
backend.getKeyGroupPrefixBytes(), keySerializationDataOutputView);
- RocksDBKeySerializationUtils.writeKey(key, keySerializer,
keySerializationDataOutputView, ambiguousKeyPossible);
- RocksDBKeySerializationUtils.writeNameSpace(namespace,
namespaceSerializer, keySerializationDataOutputView, ambiguousKeyPossible);
- }
-
protected V getDefaultValue() {
if (defaultValue != null) {
return valueSerializer.copy(defaultValue);
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
index 2085fb86256..fbd2979feb1 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
@@ -43,7 +43,7 @@
* @param <R> The type of the value returned from the state
*/
class RocksDBAggregatingState<K, N, T, ACC, R>
- extends AbstractRocksDBAppendingState<K, N, T, ACC, R,
AggregatingState<T, R>>
+ extends AbstractRocksDBAppendingState<K, N, T, ACC, R>
implements InternalAggregatingState<K, N, T, ACC, R> {
/** User-specified aggregation function. */
@@ -109,19 +109,14 @@ public void mergeNamespaces(N target, Collection<N>
sources) {
return;
}
- // cache key and namespace
- final K key = backend.getCurrentKey();
- final int keyGroup = backend.getCurrentKeyGroupIndex();
-
try {
ACC current = null;
// merge the sources to the target
for (N source : sources) {
if (source != null) {
- writeKeyWithGroupAndNamespace(keyGroup,
key, source, dataOutputView);
-
- final byte[] sourceKey =
dataOutputView.getCopyOfBuffer();
+ setCurrentNamespace(source);
+ final byte[] sourceKey =
serializeCurrentKeyWithGroupAndNamespace();
final byte[] valueBytes =
backend.db.get(columnFamily, sourceKey);
backend.db.delete(columnFamily,
writeOptions, sourceKey);
@@ -141,10 +136,9 @@ public void mergeNamespaces(N target, Collection<N>
sources) {
// if something came out of merging the sources, merge
it or write it to the target
if (current != null) {
+ setCurrentNamespace(target);
// create the target full-binary-key
- writeKeyWithGroupAndNamespace(keyGroup, key,
target, dataOutputView);
-
- final byte[] targetKey =
dataOutputView.getCopyOfBuffer();
+ final byte[] targetKey =
serializeCurrentKeyWithGroupAndNamespace();
final byte[] targetValueBytes =
backend.db.get(columnFamily, targetKey);
if (targetValueBytes != null) {
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
index 4d6635704b9..c5e830fcedf 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
@@ -42,7 +42,7 @@
*/
@Deprecated
class RocksDBFoldingState<K, N, T, ACC>
- extends AbstractRocksDBAppendingState<K, N, T, ACC, ACC,
FoldingState<T, ACC>>
+ extends AbstractRocksDBAppendingState<K, N, T, ACC, ACC>
implements InternalFoldingState<K, N, T, ACC> {
/** User-specified fold function. */
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
index d8844bfece1..02b4ffcecfd 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
@@ -23,6 +23,8 @@
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
+import javax.annotation.Nonnull;
+
import java.io.IOException;
/**
@@ -80,8 +82,12 @@ static int readKeyGroup(int keyGroupPrefixBytes,
DataInputView inputView) throws
}
}
+ public static boolean isSerializerTypeVariableSized(@Nonnull
TypeSerializer<?> serializer) {
+ return serializer.getLength() < 0;
+ }
+
public static boolean isAmbiguousKeyPossible(TypeSerializer
keySerializer, TypeSerializer namespaceSerializer) {
- return (keySerializer.getLength() < 0) &&
(namespaceSerializer.getLength() < 0);
+ return (isSerializerTypeVariableSized(keySerializer) &&
isSerializerTypeVariableSized(namespaceSerializer));
}
public static void writeKeyGroup(
@@ -108,7 +114,7 @@ public static void writeKeyGroup(
}
}
- private static void readVariableIntBytes(DataInputView inputView, int
value) throws IOException {
+ public static void readVariableIntBytes(DataInputView inputView, int
value) throws IOException {
do {
inputView.readByte();
value >>>= 8;
@@ -122,7 +128,7 @@ private static void writeLengthFrom(
writeVariableIntBytes(length,
keySerializationDateDataOutputView);
}
- private static void writeVariableIntBytes(
+ public static void writeVariableIntBytes(
int value,
DataOutputView keySerializationDateDataOutputView)
throws IOException {
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 50caa0d912a..f5532e74dd4 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -249,6 +249,9 @@
/** The native metrics monitor. */
private RocksDBNativeMetricMonitor nativeMetricMonitor;
+ /** Helper to build the byte arrays of composite keys to address data
in RocksDB. Shared across all states.*/
+ private final RocksDBSerializedCompositeKeyBuilder<K>
sharedRocksKeyBuilder;
+
public RocksDBKeyedStateBackend(
String operatorIdentifier,
ClassLoader userCodeClassLoader,
@@ -300,6 +303,7 @@ public RocksDBKeyedStateBackend(
this.restoredKvStateMetaInfos = new HashMap<>();
this.writeOptions = new WriteOptions().setDisableWAL(true);
+ this.sharedRocksKeyBuilder = new
RocksDBSerializedCompositeKeyBuilder<>(keySerializer, keyGroupPrefixBytes, 32);
this.metricOptions = metricOptions;
this.metricGroup = metricGroup;
@@ -379,6 +383,12 @@ private void registerKvStateInformation(String
columnFamilyName, Tuple2<ColumnFa
}
}
+ @Override
+ public void setCurrentKey(K newKey) {
+ super.setCurrentKey(newKey);
+ sharedRocksKeyBuilder.setKeyAndKeyGroup(getCurrentKey(),
getCurrentKeyGroupIndex());
+ }
+
/**
* Should only be called by one thread, and only after all accesses to
the DB happened.
*/
@@ -463,6 +473,10 @@ public WriteOptions getWriteOptions() {
return writeOptions;
}
+ RocksDBSerializedCompositeKeyBuilder<K> getSharedRocksKeyBuilder() {
+ return sharedRocksKeyBuilder;
+ }
+
/**
* Triggers an asynchronous snapshot of the keyed state backend from
RocksDB. This snapshot can be canceled and
* is also stopped when the backend is closed through {@link
#dispose()}. For each backend, this method must always
@@ -1479,7 +1493,7 @@ private void copyStateDataHandleData(
}
@SuppressWarnings("unchecked")
- AbstractRocksDBState<?, ?, SV, S> rocksDBState =
(AbstractRocksDBState<?, ?, SV, S>) state;
+ AbstractRocksDBState<?, ?, SV> rocksDBState =
(AbstractRocksDBState<?, ?, SV>) state;
Snapshot rocksDBSnapshot = db.getSnapshot();
try (
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index 6904c853fe3..13f5559405e 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -59,7 +59,7 @@
* @param <V> The type of the values in the list state.
*/
class RocksDBListState<K, N, V>
- extends AbstractRocksDBState<K, N, List<V>, ListState<V>>
+ extends AbstractRocksDBState<K, N, List<V>>
implements InternalListState<K, N, V> {
/** Serializer for the values. */
@@ -115,11 +115,10 @@ private RocksDBListState(
@Override
public List<V> getInternal() {
try {
- writeCurrentKeyWithGroupAndNamespace();
- byte[] key = dataOutputView.getCopyOfBuffer();
+ byte[] key = serializeCurrentKeyWithGroupAndNamespace();
byte[] valueBytes = backend.db.get(columnFamily, key);
return deserializeList(valueBytes);
- } catch (IOException | RocksDBException e) {
+ } catch (RocksDBException e) {
throw new FlinkRuntimeException("Error while retrieving
data from RocksDB", e);
}
}
@@ -160,11 +159,12 @@ public void add(V value) {
Preconditions.checkNotNull(value, "You cannot add null to a
ListState.");
try {
- writeCurrentKeyWithGroupAndNamespace();
- byte[] key = dataOutputView.getCopyOfBuffer();
- dataOutputView.clear();
- elementSerializer.serialize(value, dataOutputView);
- backend.db.merge(columnFamily, writeOptions, key,
dataOutputView.getCopyOfBuffer());
+ backend.db.merge(
+ columnFamily,
+ writeOptions,
+ serializeCurrentKeyWithGroupAndNamespace(),
+ serializeValue(value, elementSerializer)
+ );
} catch (Exception e) {
throw new FlinkRuntimeException("Error while adding
data to RocksDB", e);
}
@@ -176,21 +176,17 @@ public void mergeNamespaces(N target, Collection<N>
sources) {
return;
}
- // cache key and namespace
- final K key = backend.getCurrentKey();
- final int keyGroup = backend.getCurrentKeyGroupIndex();
-
try {
// create the target full-binary-key
- writeKeyWithGroupAndNamespace(keyGroup, key, target,
dataOutputView);
- final byte[] targetKey =
dataOutputView.getCopyOfBuffer();
+ setCurrentNamespace(target);
+ final byte[] targetKey =
serializeCurrentKeyWithGroupAndNamespace();
// merge the sources to the target
for (N source : sources) {
if (source != null) {
- writeKeyWithGroupAndNamespace(keyGroup,
key, source, dataOutputView);
+ setCurrentNamespace(source);
+ final byte[] sourceKey =
serializeCurrentKeyWithGroupAndNamespace();
- byte[] sourceKey =
dataOutputView.getCopyOfBuffer();
byte[] valueBytes =
backend.db.get(columnFamily, sourceKey);
backend.db.delete(columnFamily,
writeOptions, sourceKey);
@@ -218,10 +214,11 @@ public void updateInternal(List<V> values) {
if (!values.isEmpty()) {
try {
- writeCurrentKeyWithGroupAndNamespace();
- byte[] key = dataOutputView.getCopyOfBuffer();
- byte[] premerge = getPreMergedValue(values,
elementSerializer, dataOutputView);
- backend.db.put(columnFamily, writeOptions, key,
premerge);
+ backend.db.put(
+ columnFamily,
+ writeOptions,
+
serializeCurrentKeyWithGroupAndNamespace(),
+ serializeValueList(values,
elementSerializer, DELIMITER));
} catch (IOException | RocksDBException e) {
throw new FlinkRuntimeException("Error while
updating data to RocksDB", e);
}
@@ -234,10 +231,11 @@ public void addAll(List<V> values) {
if (!values.isEmpty()) {
try {
- writeCurrentKeyWithGroupAndNamespace();
- byte[] key = dataOutputView.getCopyOfBuffer();
- byte[] premerge = getPreMergedValue(values,
elementSerializer, dataOutputView);
- backend.db.merge(columnFamily, writeOptions,
key, premerge);
+ backend.db.merge(
+ columnFamily,
+ writeOptions,
+
serializeCurrentKeyWithGroupAndNamespace(),
+ serializeValueList(values,
elementSerializer, DELIMITER));
} catch (IOException | RocksDBException e) {
throw new FlinkRuntimeException("Error while
updating data to RocksDB", e);
}
@@ -273,26 +271,6 @@ public void migrateSerializedValue(
}
}
- private static <V> byte[] getPreMergedValue(
- List<V> values,
- TypeSerializer<V> elementSerializer,
- DataOutputSerializer keySerializationStream) throws IOException
{
-
- keySerializationStream.clear();
- boolean first = true;
- for (V value : values) {
- Preconditions.checkNotNull(value, "You cannot add null
to a ListState.");
- if (first) {
- first = false;
- } else {
- keySerializationStream.write(DELIMITER);
- }
- elementSerializer.serialize(value,
keySerializationStream);
- }
-
- return keySerializationStream.getCopyOfBuffer();
- }
-
@SuppressWarnings("unchecked")
static <E, K, N, SV, S extends State, IS extends S> IS create(
StateDescriptor<S, SV> stateDesc,
@@ -343,10 +321,32 @@ public void migrateSerializedValue(
prevPosition = in.getPosition();
}
try {
- return result.isEmpty() ? null :
getPreMergedValue(result, elementSerializer, out);
+ return result.isEmpty() ? null :
serializeValueList(result, elementSerializer, DELIMITER);
} catch (IOException e) {
throw new FlinkRuntimeException("Failed to
serialize transformed list", e);
}
}
+
+ byte[] serializeValueList(
+ List<T> valueList,
+ TypeSerializer<T> elementSerializer,
+ byte delimiter) throws IOException {
+
+ out.clear();
+ boolean first = true;
+
+ for (T value : valueList) {
+ Preconditions.checkNotNull(value, "You cannot
add null to a value list.");
+
+ if (first) {
+ first = false;
+ } else {
+ out.write(delimiter);
+ }
+ elementSerializer.serialize(value, out);
+ }
+
+ return out.getCopyOfBuffer();
+ }
}
}
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index cb656b53b1b..13cbdedb1d9 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -64,7 +64,7 @@
* @param <UV> The type of the values in the map state.
*/
class RocksDBMapState<K, N, UK, UV>
- extends AbstractRocksDBState<K, N, Map<UK, UV>, MapState<UK, UV>>
+ extends AbstractRocksDBState<K, N, Map<UK, UV>>
implements InternalMapState<K, N, UK, UV> {
private static final Logger LOG =
LoggerFactory.getLogger(RocksDBMapState.class);
@@ -119,7 +119,7 @@ private RocksDBMapState(
@Override
public UV get(UK userKey) throws IOException, RocksDBException {
- byte[] rawKeyBytes =
serializeUserKeyWithCurrentKeyAndNamespace(userKey);
+ byte[] rawKeyBytes =
serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
byte[] rawValueBytes = backend.db.get(columnFamily,
rawKeyBytes);
return (rawValueBytes == null ? null :
deserializeUserValue(dataInputView, rawValueBytes, userValueSerializer));
@@ -128,8 +128,8 @@ public UV get(UK userKey) throws IOException,
RocksDBException {
@Override
public void put(UK userKey, UV userValue) throws IOException,
RocksDBException {
- byte[] rawKeyBytes =
serializeUserKeyWithCurrentKeyAndNamespace(userKey);
- byte[] rawValueBytes = serializeUserValue(userValue,
userValueSerializer, dataOutputView);
+ byte[] rawKeyBytes =
serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
+ byte[] rawValueBytes = serializeValueNullSensitive(userValue,
userValueSerializer);
backend.db.put(columnFamily, writeOptions, rawKeyBytes,
rawValueBytes);
}
@@ -142,8 +142,8 @@ public void putAll(Map<UK, UV> map) throws IOException,
RocksDBException {
try (RocksDBWriteBatchWrapper writeBatchWrapper = new
RocksDBWriteBatchWrapper(backend.db, writeOptions)) {
for (Map.Entry<UK, UV> entry : map.entrySet()) {
- byte[] rawKeyBytes =
serializeUserKeyWithCurrentKeyAndNamespace(entry.getKey());
- byte[] rawValueBytes =
serializeUserValue(entry.getValue(), userValueSerializer, dataOutputView);
+ byte[] rawKeyBytes =
serializeCurrentKeyWithGroupAndNamespacePlusUserKey(entry.getKey(),
userKeySerializer);
+ byte[] rawValueBytes =
serializeValueNullSensitive(entry.getValue(), userValueSerializer);
writeBatchWrapper.put(columnFamily,
rawKeyBytes, rawValueBytes);
}
}
@@ -151,21 +151,21 @@ public void putAll(Map<UK, UV> map) throws IOException,
RocksDBException {
@Override
public void remove(UK userKey) throws IOException, RocksDBException {
- byte[] rawKeyBytes =
serializeUserKeyWithCurrentKeyAndNamespace(userKey);
+ byte[] rawKeyBytes =
serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
backend.db.delete(columnFamily, writeOptions, rawKeyBytes);
}
@Override
public boolean contains(UK userKey) throws IOException,
RocksDBException {
- byte[] rawKeyBytes =
serializeUserKeyWithCurrentKeyAndNamespace(userKey);
+ byte[] rawKeyBytes =
serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
byte[] rawValueBytes = backend.db.get(columnFamily,
rawKeyBytes);
return (rawValueBytes != null);
}
@Override
- public Iterable<Map.Entry<UK, UV>> entries() throws IOException {
+ public Iterable<Map.Entry<UK, UV>> entries() {
final Iterator<Map.Entry<UK, UV>> iterator = iterator();
// Return null to make the behavior consistent with other
states.
@@ -177,10 +177,11 @@ public boolean contains(UK userKey) throws IOException,
RocksDBException {
}
@Override
- public Iterable<UK> keys() throws IOException {
- final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
+ public Iterable<UK> keys() {
+ final byte[] prefixBytes =
serializeCurrentKeyWithGroupAndNamespace();
return () -> new RocksDBMapIterator<UK>(backend.db,
prefixBytes, userKeySerializer, userValueSerializer, dataInputView) {
+ @Nullable
@Override
public UK next() {
RocksDBMapEntry entry = nextEntry();
@@ -190,8 +191,8 @@ public UK next() {
}
@Override
- public Iterable<UV> values() throws IOException {
- final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
+ public Iterable<UV> values() {
+ final byte[] prefixBytes =
serializeCurrentKeyWithGroupAndNamespace();
return () -> new RocksDBMapIterator<UV>(backend.db,
prefixBytes, userKeySerializer, userValueSerializer, dataInputView) {
@Override
@@ -203,8 +204,8 @@ public UV next() {
}
@Override
- public Iterator<Map.Entry<UK, UV>> iterator() throws IOException {
- final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
+ public Iterator<Map.Entry<UK, UV>> iterator() {
+ final byte[] prefixBytes =
serializeCurrentKeyWithGroupAndNamespace();
return new RocksDBMapIterator<Map.Entry<UK, UV>>(backend.db,
prefixBytes, userKeySerializer, userValueSerializer, dataInputView) {
@Override
@@ -220,7 +221,7 @@ public void clear() {
try (RocksIteratorWrapper iterator =
RocksDBKeyedStateBackend.getRocksIterator(backend.db, columnFamily);
WriteBatch writeBatch = new WriteBatch(128)) {
- final byte[] keyPrefixBytes =
serializeCurrentKeyAndNamespace();
+ final byte[] keyPrefixBytes =
serializeCurrentKeyWithGroupAndNamespace();
iterator.seek(keyPrefixBytes);
while (iterator.isValid()) {
@@ -259,18 +260,15 @@ public void clear() {
int keyGroup =
KeyGroupRangeAssignment.assignToKeyGroup(keyAndNamespace.f0,
backend.getNumberOfKeyGroups());
- DataOutputSerializer outputView = new DataOutputSerializer(128);
- DataInputDeserializer inputView = new DataInputDeserializer();
-
- writeKeyWithGroupAndNamespace(
- keyGroup,
- keyAndNamespace.f0,
+ RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
+ new RocksDBSerializedCompositeKeyBuilder<>(
safeKeySerializer,
- keyAndNamespace.f1,
- safeNamespaceSerializer,
- outputView);
+ backend.getKeyGroupPrefixBytes(),
+ 32);
+
+ keyBuilder.setKeyAndKeyGroup(keyAndNamespace.f0, keyGroup);
- final byte[] keyPrefixBytes = outputView.getCopyOfBuffer();
+ final byte[] keyPrefixBytes =
keyBuilder.buildCompositeKeyNamespace(keyAndNamespace.f1, namespaceSerializer);
final MapSerializer<UK, UV> serializer = (MapSerializer<UK,
UV>) safeValueSerializer;
@@ -282,7 +280,8 @@ public void clear() {
keyPrefixBytes,
dupUserKeySerializer,
dupUserValueSerializer,
- inputView) {
+ dataInputView
+ ) {
@Override
public Map.Entry<UK, UV> next() {
@@ -302,36 +301,6 @@ public void clear() {
// Serialization Methods
//
------------------------------------------------------------------------
- private byte[] serializeCurrentKeyAndNamespace() throws IOException {
- writeCurrentKeyWithGroupAndNamespace();
-
- return dataOutputView.getCopyOfBuffer();
- }
-
- private byte[] serializeUserKeyWithCurrentKeyAndNamespace(UK userKey)
throws IOException {
- serializeCurrentKeyAndNamespace();
- userKeySerializer.serialize(userKey, dataOutputView);
-
- return dataOutputView.getCopyOfBuffer();
- }
-
- private static <UV> byte[] serializeUserValue(
- UV userValue,
- TypeSerializer<UV> valueSerializer,
- DataOutputSerializer dataOutputView) throws IOException {
-
- dataOutputView.clear();
-
- if (userValue == null) {
- dataOutputView.writeBoolean(true);
- } else {
- dataOutputView.writeBoolean(false);
- valueSerializer.serialize(userValue, dataOutputView);
- }
-
- return dataOutputView.getCopyOfBuffer();
- }
-
private static <UK> UK deserializeUserKey(
DataInputDeserializer dataInputView,
int userKeyOffset,
@@ -474,7 +443,7 @@ public UV setValue(UV value) {
try {
userValue = value;
- rawValueBytes = serializeUserValue(value,
valueSerializer, dataOutputView);
+ rawValueBytes =
serializeValueNullSensitive(value, valueSerializer);
db.put(columnFamily, writeOptions, rawKeyBytes,
rawValueBytes);
} catch (IOException | RocksDBException e) {
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
index 138357b0d77..a5eb46eace6 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -41,7 +41,7 @@
* @param <V> The type of value that the state state stores.
*/
class RocksDBReducingState<K, N, V>
- extends AbstractRocksDBAppendingState<K, N, V, V, V, ReducingState<V>>
+ extends AbstractRocksDBAppendingState<K, N, V, V, V>
implements InternalReducingState<K, N, V> {
/** User-specified reduce function. */
@@ -102,20 +102,14 @@ public void mergeNamespaces(N target, Collection<N>
sources) {
return;
}
- // cache key and namespace
- final K key = backend.getCurrentKey();
- final int keyGroup = backend.getCurrentKeyGroupIndex();
-
try {
V current = null;
// merge the sources to the target
for (N source : sources) {
if (source != null) {
-
- writeKeyWithGroupAndNamespace(keyGroup,
key, source, dataOutputView);
-
- final byte[] sourceKey =
dataOutputView.getCopyOfBuffer();
+ setCurrentNamespace(source);
+ final byte[] sourceKey =
serializeCurrentKeyWithGroupAndNamespace();
final byte[] valueBytes =
backend.db.get(columnFamily, sourceKey);
backend.db.delete(columnFamily,
writeOptions, sourceKey);
@@ -136,9 +130,8 @@ public void mergeNamespaces(N target, Collection<N>
sources) {
// if something came out of merging the sources, merge
it or write it to the target
if (current != null) {
// create the target full-binary-key
- writeKeyWithGroupAndNamespace(keyGroup, key,
target, dataOutputView);
-
- final byte[] targetKey =
dataOutputView.getCopyOfBuffer();
+ setCurrentNamespace(target);
+ final byte[] targetKey =
serializeCurrentKeyWithGroupAndNamespace();
final byte[] targetValueBytes =
backend.db.get(columnFamily, targetKey);
if (targetValueBytes != null) {
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java
new file mode 100644
index 00000000000..8e83e292820
--- /dev/null
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.IOException;
+
+/**
+ * Responsible for serialization of currentKey, currentGroup and namespace.
+ * Will reuse the previous serialized currentKeyed if possible.
+ * @param <K> type of the key.
+ */
+@NotThreadSafe
+@Internal
+class RocksDBSerializedCompositeKeyBuilder<K> {
+
+ /** The serializer for the key. */
+ @Nonnull
+ private final TypeSerializer<K> keySerializer;
+
+ /** The output to write the key into. */
+ @Nonnull
+ private final DataOutputSerializer keyOutView;
+
+ /** The number of Key-group-prefix bytes for the key. */
+ @Nonnegative
+ private final int keyGroupPrefixBytes;
+
+ /** This flag indicates whether the key type has a variable byte size
in serialization. */
+ private final boolean keySerializerTypeVariableSized;
+
+ /** Mark for the position after the serialized key. */
+ @Nonnegative
+ private int afterKeyMark;
+
+ public RocksDBSerializedCompositeKeyBuilder(
+ @Nonnull TypeSerializer<K> keySerializer,
+ @Nonnegative int keyGroupPrefixBytes,
+ @Nonnegative int initialSize) {
+ this(
+ keySerializer,
+ new DataOutputSerializer(initialSize),
+ keyGroupPrefixBytes,
+
RocksDBKeySerializationUtils.isSerializerTypeVariableSized(keySerializer),
+ 0);
+ }
+
+ @VisibleForTesting
+ RocksDBSerializedCompositeKeyBuilder(
+ @Nonnull TypeSerializer<K> keySerializer,
+ @Nonnull DataOutputSerializer keyOutView,
+ @Nonnegative int keyGroupPrefixBytes,
+ boolean keySerializerTypeVariableSized,
+ @Nonnegative int afterKeyMark) {
+ this.keySerializer = keySerializer;
+ this.keyOutView = keyOutView;
+ this.keyGroupPrefixBytes = keyGroupPrefixBytes;
+ this.keySerializerTypeVariableSized =
keySerializerTypeVariableSized;
+ this.afterKeyMark = afterKeyMark;
+ }
+
+ /**
+ * Sets the key and key-group as prefix. This will serialize them into
the buffer and the will be used to create
+ * composite keys with provided namespaces.
+ *
+ * @param key the key.
+ * @param keyGroupId the key-group id for the key.
+ */
+ public void setKeyAndKeyGroup(@Nonnull K key, @Nonnegative int
keyGroupId) {
+ try {
+ serializeKeyGroupAndKey(key, keyGroupId);
+ } catch (IOException shouldNeverHappen) {
+ throw new FlinkRuntimeException(shouldNeverHappen);
+ }
+ }
+
+ /**
+ * Returns a serialized composite key, from the key and key-group
provided in a previous call to
+ * {@link #setKeyAndKeyGroup(Object, int)} and the given namespace.
+ *
+ * @param namespace the namespace to concatenate for the
serialized composite key bytes.
+ * @param namespaceSerializer the serializer to obtain the serialized
form of the namespace.
+ * @param <N> the type of the namespace.
+ * @return the bytes for the serialized composite key of key-group,
key, namespace.
+ */
+ @Nonnull
+ public <N> byte[] buildCompositeKeyNamespace(@Nonnull N namespace,
@Nonnull TypeSerializer<N> namespaceSerializer) {
+ try {
+ serializeNamespace(namespace, namespaceSerializer);
+ final byte[] result = keyOutView.getCopyOfBuffer();
+ resetToKey();
+ return result;
+ } catch (IOException shouldNeverHappen) {
+ throw new FlinkRuntimeException(shouldNeverHappen);
+ }
+ }
+
+ /**
+ * Returns a serialized composite key, from the key and key-group
provided in a previous call to
+ * {@link #setKeyAndKeyGroup(Object, int)} and the given namespace,
folloed by the given user-key.
+ *
+ * @param namespace the namespace to concatenate for the
serialized composite key bytes.
+ * @param namespaceSerializer the serializer to obtain the serialized
form of the namespace.
+ * @param userKey the user-key to concatenate for the
serialized composite key, after the namespace.
+ * @param userKeySerializer the serializer to obtain the serialized
form of the user-key.
+ * @param <N> the type of the namespace.
+ * @param <UK> the type of the user-key.
+ * @return the bytes for the serialized composite key of key-group,
key, namespace.
+ */
+ @Nonnull
+ public <N, UK> byte[] buildCompositeKeyNamesSpaceUserKey(
+ @Nonnull N namespace,
+ @Nonnull TypeSerializer<N> namespaceSerializer,
+ @Nonnull UK userKey,
+ @Nonnull TypeSerializer<UK> userKeySerializer) throws
IOException {
+ serializeNamespace(namespace, namespaceSerializer);
+ userKeySerializer.serialize(userKey, keyOutView);
+ byte[] result = keyOutView.getCopyOfBuffer();
+ resetToKey();
+ return result;
+ }
+
+ private void serializeKeyGroupAndKey(K key, int keyGroupId) throws
IOException {
+
+ // clear buffer and mark
+ resetFully();
+
+ // write key-group
+ RocksDBKeySerializationUtils.writeKeyGroup(
+ keyGroupId,
+ keyGroupPrefixBytes,
+ keyOutView);
+ // write key
+ keySerializer.serialize(key, keyOutView);
+ afterKeyMark = keyOutView.length();
+ }
+
+ private <N> void serializeNamespace(
+ @Nonnull N namespace,
+ @Nonnull TypeSerializer<N> namespaceSerializer) throws
IOException {
+
+ // this should only be called when there is already a key
written so that we build the composite.
+ assert isKeyWritten();
+
+ final boolean ambiguousCompositeKeyPossible =
isAmbiguousCompositeKeyPossible(namespaceSerializer);
+ if (ambiguousCompositeKeyPossible) {
+ RocksDBKeySerializationUtils.writeVariableIntBytes(
+ afterKeyMark - keyGroupPrefixBytes,
+ keyOutView);
+ }
+ RocksDBKeySerializationUtils.writeNameSpace(
+ namespace,
+ namespaceSerializer,
+ keyOutView,
+ ambiguousCompositeKeyPossible);
+ }
+
+ private void resetFully() {
+ afterKeyMark = 0;
+ keyOutView.clear();
+ }
+
+ private void resetToKey() {
+ keyOutView.setPosition(afterKeyMark);
+ }
+
+ private boolean isKeyWritten() {
+ return afterKeyMark > 0;
+ }
+
+ @VisibleForTesting
+ boolean isAmbiguousCompositeKeyPossible(TypeSerializer<?>
namespaceSerializer) {
+ return keySerializerTypeVariableSized &
+
RocksDBKeySerializationUtils.isSerializerTypeVariableSized(namespaceSerializer);
+ }
+
+ @VisibleForTesting
+ boolean isKeySerializerTypeVariableSized() {
+ return keySerializerTypeVariableSized;
+ }
+}
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
index 0ca90d4a521..a8f5163a7fd 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
@@ -40,7 +40,7 @@
* @param <V> The type of value that the state state stores.
*/
class RocksDBValueState<K, N, V>
- extends AbstractRocksDBState<K, N, V, ValueState<V>>
+ extends AbstractRocksDBState<K, N, V>
implements InternalValueState<K, N, V> {
/**
@@ -80,9 +80,9 @@ private RocksDBValueState(
@Override
public V value() {
try {
- writeCurrentKeyWithGroupAndNamespace();
- byte[] key = dataOutputView.getCopyOfBuffer();
- byte[] valueBytes = backend.db.get(columnFamily, key);
+ byte[] valueBytes = backend.db.get(columnFamily,
+ serializeCurrentKeyWithGroupAndNamespace());
+
if (valueBytes == null) {
return getDefaultValue();
}
@@ -101,11 +101,9 @@ public void update(V value) {
}
try {
- writeCurrentKeyWithGroupAndNamespace();
- byte[] key = dataOutputView.getCopyOfBuffer();
- dataOutputView.clear();
- valueSerializer.serialize(value, dataOutputView);
- backend.db.put(columnFamily, writeOptions, key,
dataOutputView.getCopyOfBuffer());
+ backend.db.put(columnFamily, writeOptions,
+ serializeCurrentKeyWithGroupAndNamespace(),
+ serializeValue(value));
} catch (Exception e) {
throw new FlinkRuntimeException("Error while adding
data to RocksDB", e);
}
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilderTest.java
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilderTest.java
new file mode 100644
index 00000000000..70a4c3cfef7
--- /dev/null
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilderTest.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * Test for @{@link RocksDBSerializedCompositeKeyBuilder}.
+ */
+public class RocksDBSerializedCompositeKeyBuilderTest {
+
+ private final DataOutputSerializer dataOutputSerializer = new
DataOutputSerializer(128);
+
+ private static final int[] TEST_PARALLELISMS = new int[]{64, 4096};
+ private static final Collection<Integer> TEST_INTS = Arrays.asList(42,
4711);
+ private static final Collection<String> TEST_STRINGS =
Arrays.asList("test123", "abc");
+
+ @Before
+ public void before() {
+ dataOutputSerializer.clear();
+ }
+
+ @Test
+ public void testSetKey() throws IOException {
+ for (int parallelism : TEST_PARALLELISMS) {
+ testSetKeyInternal(IntSerializer.INSTANCE, TEST_INTS,
parallelism);
+ testSetKeyInternal(StringSerializer.INSTANCE,
TEST_STRINGS, parallelism);
+ }
+ }
+
+ @Test
+ public void testSetKeyNamespace() throws IOException {
+ for (int parallelism : TEST_PARALLELISMS) {
+ testSetKeyNamespaceInternal(IntSerializer.INSTANCE,
IntSerializer.INSTANCE, TEST_INTS, TEST_INTS, parallelism);
+ testSetKeyNamespaceInternal(IntSerializer.INSTANCE,
StringSerializer.INSTANCE, TEST_INTS, TEST_STRINGS, parallelism);
+ testSetKeyNamespaceInternal(StringSerializer.INSTANCE,
IntSerializer.INSTANCE, TEST_STRINGS, TEST_INTS, parallelism);
+ testSetKeyNamespaceInternal(StringSerializer.INSTANCE,
StringSerializer.INSTANCE, TEST_STRINGS, TEST_STRINGS, parallelism);
+ }
+ }
+
+ @Test
+ public void testSetKeyNamespaceUserKey() throws IOException {
+ for (int parallelism : TEST_PARALLELISMS) {
+
testSetKeyNamespaceUserKeyInternal(IntSerializer.INSTANCE,
IntSerializer.INSTANCE, IntSerializer.INSTANCE, TEST_INTS, TEST_INTS,
TEST_INTS, parallelism);
+
testSetKeyNamespaceUserKeyInternal(IntSerializer.INSTANCE,
StringSerializer.INSTANCE, IntSerializer.INSTANCE, TEST_INTS, TEST_STRINGS,
TEST_INTS, parallelism);
+
testSetKeyNamespaceUserKeyInternal(StringSerializer.INSTANCE,
IntSerializer.INSTANCE, IntSerializer.INSTANCE, TEST_STRINGS, TEST_INTS,
TEST_INTS, parallelism);
+
testSetKeyNamespaceUserKeyInternal(StringSerializer.INSTANCE,
StringSerializer.INSTANCE, IntSerializer.INSTANCE, TEST_STRINGS, TEST_STRINGS,
TEST_INTS, parallelism);
+
testSetKeyNamespaceUserKeyInternal(IntSerializer.INSTANCE,
IntSerializer.INSTANCE, StringSerializer.INSTANCE, TEST_INTS, TEST_INTS,
TEST_STRINGS, parallelism);
+
testSetKeyNamespaceUserKeyInternal(IntSerializer.INSTANCE,
StringSerializer.INSTANCE, StringSerializer.INSTANCE, TEST_INTS, TEST_STRINGS,
TEST_STRINGS, parallelism);
+
testSetKeyNamespaceUserKeyInternal(StringSerializer.INSTANCE,
IntSerializer.INSTANCE, StringSerializer.INSTANCE, TEST_STRINGS, TEST_INTS,
TEST_STRINGS, parallelism);
+
testSetKeyNamespaceUserKeyInternal(StringSerializer.INSTANCE,
StringSerializer.INSTANCE, StringSerializer.INSTANCE, TEST_STRINGS,
TEST_STRINGS, TEST_STRINGS, parallelism);
+ }
+ }
+
+ private <K> void testSetKeyInternal(TypeSerializer<K> serializer,
Collection<K> testKeys, int maxParallelism) throws IOException {
+ final int prefixBytes = maxParallelism > Byte.MAX_VALUE ? 2 : 1;
+ RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
+ createRocksDBSerializedCompositeKeyBuilder(serializer,
prefixBytes);
+
+ final DataInputDeserializer deserializer = new
DataInputDeserializer();
+ for (K testKey : testKeys) {
+ int keyGroup = setKeyAndReturnKeyGroup(keyBuilder,
testKey, maxParallelism);
+ byte[] result = dataOutputSerializer.getCopyOfBuffer();
+ deserializer.setBuffer(result);
+ assertKeyKeyGroupBytes(testKey, keyGroup, prefixBytes,
serializer, deserializer, false);
+ Assert.assertEquals(0, deserializer.available());
+ }
+ }
+
+ private <K, N> void testSetKeyNamespaceInternal(
+ TypeSerializer<K> keySerializer,
+ TypeSerializer<N> namespaceSerializer,
+ Collection<K> testKeys,
+ Collection<N> testNamespaces,
+ int maxParallelism) throws IOException {
+ final int prefixBytes = maxParallelism > Byte.MAX_VALUE ? 2 : 1;
+
+ RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
+
createRocksDBSerializedCompositeKeyBuilder(keySerializer, prefixBytes);
+
+ final DataInputDeserializer deserializer = new
DataInputDeserializer();
+
+ final boolean ambiguousPossible =
keyBuilder.isAmbiguousCompositeKeyPossible(namespaceSerializer);
+
+ for (K testKey : testKeys) {
+ int keyGroup = setKeyAndReturnKeyGroup(keyBuilder,
testKey, maxParallelism);
+ for (N testNamespace : testNamespaces) {
+ byte[] compositeBytes =
keyBuilder.buildCompositeKeyNamespace(testNamespace, namespaceSerializer);
+ deserializer.setBuffer(compositeBytes);
+ assertKeyGroupKeyNamespaceBytes(
+ testKey,
+ keyGroup,
+ prefixBytes,
+ keySerializer,
+ testNamespace,
+ namespaceSerializer,
+ deserializer,
+ ambiguousPossible);
+ Assert.assertEquals(0,
deserializer.available());
+ }
+ }
+ }
+
+ private <K, N, U> void testSetKeyNamespaceUserKeyInternal(
+ TypeSerializer<K> keySerializer,
+ TypeSerializer<N> namespaceSerializer,
+ TypeSerializer<U> userKeySerializer,
+ Collection<K> testKeys,
+ Collection<N> testNamespaces,
+ Collection<U> testUserKeys,
+ int maxParallelism) throws IOException {
+ final int prefixBytes = maxParallelism > Byte.MAX_VALUE ? 2 : 1;
+
+ RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
+
createRocksDBSerializedCompositeKeyBuilder(keySerializer, prefixBytes);
+
+ final DataInputDeserializer deserializer = new
DataInputDeserializer();
+
+ final boolean ambiguousPossible =
keyBuilder.isAmbiguousCompositeKeyPossible(namespaceSerializer);
+
+ for (K testKey : testKeys) {
+ int keyGroup = setKeyAndReturnKeyGroup(keyBuilder,
testKey, maxParallelism);
+ for (N testNamespace : testNamespaces) {
+ for (U testUserKey : testUserKeys) {
+ byte[] compositeBytes =
keyBuilder.buildCompositeKeyNamesSpaceUserKey(
+ testNamespace,
+ namespaceSerializer,
+ testUserKey,
+ userKeySerializer);
+
+ deserializer.setBuffer(compositeBytes);
+ assertKeyGroupKeyNamespaceUserKeyBytes(
+ testKey,
+ keyGroup,
+ prefixBytes,
+ keySerializer,
+ testNamespace,
+ namespaceSerializer,
+ testUserKey,
+ userKeySerializer,
+ deserializer,
+ ambiguousPossible);
+
+ Assert.assertEquals(0,
deserializer.available());
+ }
+ }
+ }
+ }
+
+ private <K> RocksDBSerializedCompositeKeyBuilder<K>
createRocksDBSerializedCompositeKeyBuilder(
+ TypeSerializer<K> serializer,
+ int prefixBytes) {
+ final boolean variableSize =
RocksDBKeySerializationUtils.isSerializerTypeVariableSized(serializer);
+ return new RocksDBSerializedCompositeKeyBuilder<>(
+ serializer,
+ dataOutputSerializer,
+ prefixBytes,
+ variableSize,
+ 0);
+ }
+
+ private <K> int setKeyAndReturnKeyGroup(
+ RocksDBSerializedCompositeKeyBuilder<K> compositeKeyBuilder,
+ K key,
+ int maxParallelism) {
+
+ int keyGroup =
KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism,
maxParallelism);
+ compositeKeyBuilder.setKeyAndKeyGroup(key, keyGroup);
+ return keyGroup;
+ }
+
+ private <K> void assertKeyKeyGroupBytes(
+ K key,
+ int keyGroup,
+ int prefixBytes,
+ TypeSerializer<K> typeSerializer,
+ DataInputDeserializer deserializer,
+ boolean ambiguousCompositeKeyPossible) throws IOException {
+
+ Assert.assertEquals(keyGroup,
RocksDBKeySerializationUtils.readKeyGroup(prefixBytes, deserializer));
+ Assert.assertEquals(key,
RocksDBKeySerializationUtils.readKey(typeSerializer, deserializer,
ambiguousCompositeKeyPossible));
+ }
+
+ private <K, N> void assertKeyGroupKeyNamespaceBytes(
+ K key,
+ int keyGroup,
+ int prefixBytes,
+ TypeSerializer<K> keySerializer,
+ N namespace,
+ TypeSerializer<N> namespaceSerializer,
+ DataInputDeserializer deserializer,
+ boolean ambiguousCompositeKeyPossible) throws IOException {
+ assertKeyKeyGroupBytes(key, keyGroup, prefixBytes,
keySerializer, deserializer, ambiguousCompositeKeyPossible);
+ N readNamespace =
+
RocksDBKeySerializationUtils.readNamespace(namespaceSerializer, deserializer,
ambiguousCompositeKeyPossible);
+ Assert.assertEquals(namespace, readNamespace);
+ }
+
+ private <K, N, U> void assertKeyGroupKeyNamespaceUserKeyBytes(
+ K key,
+ int keyGroup,
+ int prefixBytes,
+ TypeSerializer<K> keySerializer,
+ N namespace,
+ TypeSerializer<N> namespaceSerializer,
+ U userKey,
+ TypeSerializer<U> userKeySerializer,
+ DataInputDeserializer deserializer,
+ boolean ambiguousCompositeKeyPossible) throws IOException {
+ assertKeyGroupKeyNamespaceBytes(
+ key,
+ keyGroup,
+ prefixBytes,
+ keySerializer,
+ namespace,
+ namespaceSerializer,
+ deserializer,
+ ambiguousCompositeKeyPossible);
+ Assert.assertEquals(userKey,
userKeySerializer.deserialize(deserializer));
+ }
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services