[
https://issues.apache.org/jira/browse/FLINK-7938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16330515#comment-16330515
]
ASF GitHub Bot commented on FLINK-7938:
---------------------------------------
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/5281#discussion_r162343086
--- Diff:
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
---
@@ -167,24 +167,48 @@ public void update(List<V> values) throws Exception {
try {
writeCurrentKeyWithGroupAndNamespace();
byte[] key =
keySerializationStream.toByteArray();
- DataOutputViewStreamWrapper out = new
DataOutputViewStreamWrapper(keySerializationStream);
- List<byte[]> bytes = new
ArrayList<>(values.size());
- for (V value : values) {
- keySerializationStream.reset();
- valueSerializer.serialize(value, out);
-
bytes.add(keySerializationStream.toByteArray());
+ byte[] premerge = getPreMergedValue(values);
+ if (premerge != null) {
+ backend.db.put(columnFamily,
writeOptions, key, premerge);
+ } else {
+ throw new IOException("Failed pre-merge
values in update()");
}
+ } catch (IOException | RocksDBException e) {
+ throw new RuntimeException("Error while
updating data to RocksDB", e);
+ }
+ }
+ }
+
+ @Override
+ public void addAll(List<V> values) throws Exception {
+ if (values != null && !values.isEmpty()) {
+ try {
+ writeCurrentKeyWithGroupAndNamespace();
+ byte[] key =
keySerializationStream.toByteArray();
- byte[] premerge = MergeUtils.merge(bytes);
+ byte[] premerge = getPreMergedValue(values);
if (premerge != null) {
- backend.db.put(columnFamily,
writeOptions, key, premerge);
+ backend.db.merge(columnFamily,
writeOptions, key, premerge);
} else {
- throw new IOException("Failed pre-merge
values");
+ throw new IOException("Failed pre-merge
values in addAll()");
}
} catch (IOException | RocksDBException e) {
throw new RuntimeException("Error while
updating data to RocksDB", e);
}
}
}
+
+ private byte[] getPreMergedValue(List<V> values) throws IOException {
+ DataOutputViewStreamWrapper out = new
DataOutputViewStreamWrapper(keySerializationStream);
+
+ List<byte[]> bytes = new ArrayList<>(values.size());
--- End diff --
Interesting, do you have an idea why it did not work? I think it should be
possible. In general, I am not a big fan of changing this code twice when we
already assume that we do an overhaul of that part, but we can do it for this
time if it makes your life easier.
Sorry that we could not get it done for the meetup, but I was blocked with
another important matter :-(
> support addAll() in ListState
> -----------------------------
>
> Key: FLINK-7938
> URL: https://issues.apache.org/jira/browse/FLINK-7938
> Project: Flink
> Issue Type: Improvement
> Components: State Backends, Checkpointing
> Reporter: Bowen Li
> Assignee: Bowen Li
> Priority: Major
> Fix For: 1.5.0
>
>
> support {{addAll()}} in {{ListState}}, so Flink can be more efficient in
> adding elements to {{ListState}} in batch. This should give us a much better
> performance especially for {{ListState}} backed by RocksDB
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)