Github user bowenli86 commented on a diff in the pull request:
https://github.com/apache/flink/pull/5281#discussion_r162479898
--- Diff:
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
---
@@ -167,24 +167,48 @@ public void update(List<V> values) throws Exception {
try {
writeCurrentKeyWithGroupAndNamespace();
byte[] key =
keySerializationStream.toByteArray();
- DataOutputViewStreamWrapper out = new
DataOutputViewStreamWrapper(keySerializationStream);
- List<byte[]> bytes = new
ArrayList<>(values.size());
- for (V value : values) {
- keySerializationStream.reset();
- valueSerializer.serialize(value, out);
-
bytes.add(keySerializationStream.toByteArray());
+ byte[] premerge = getPreMergedValue(values);
+ if (premerge != null) {
+ backend.db.put(columnFamily,
writeOptions, key, premerge);
+ } else {
+ throw new IOException("Failed pre-merge
values in update()");
}
+ } catch (IOException | RocksDBException e) {
+ throw new RuntimeException("Error while
updating data to RocksDB", e);
+ }
+ }
+ }
+
+ @Override
+ public void addAll(List<V> values) throws Exception {
+ if (values != null && !values.isEmpty()) {
+ try {
+ writeCurrentKeyWithGroupAndNamespace();
+ byte[] key =
keySerializationStream.toByteArray();
- byte[] premerge = MergeUtils.merge(bytes);
+ byte[] premerge = getPreMergedValue(values);
if (premerge != null) {
- backend.db.put(columnFamily,
writeOptions, key, premerge);
+ backend.db.merge(columnFamily,
writeOptions, key, premerge);
} else {
- throw new IOException("Failed pre-merge
values");
+ throw new IOException("Failed pre-merge
values in addAll()");
}
} catch (IOException | RocksDBException e) {
throw new RuntimeException("Error while
updating data to RocksDB", e);
}
}
}
+
+ private byte[] getPreMergedValue(List<V> values) throws IOException {
+ DataOutputViewStreamWrapper out = new
DataOutputViewStreamWrapper(keySerializationStream);
+
+ List<byte[]> bytes = new ArrayList<>(values.size());
--- End diff --
That'll be great, let's get this in. I will dive into FLINK-8441 in a
couple days. Thanks!
---