[
https://issues.apache.org/jira/browse/FLINK-7475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16310416#comment-16310416
]
ASF GitHub Bot commented on FLINK-7475:
---------------------------------------
Github user bowenli86 commented on a diff in the pull request:
https://github.com/apache/flink/pull/4963#discussion_r159546670
--- Diff:
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
---
@@ -158,4 +158,28 @@ public void mergeNamespaces(N target, Collection<N>
sources) throws Exception {
throw new Exception("Error while merging state in
RocksDB", e);
}
}
+
+ @Override
+ public void update(List<V> values) throws Exception {
+ clear();
+
+ if (values != null && !values.isEmpty()) {
+ try {
+ writeCurrentKeyWithGroupAndNamespace();
+ byte[] key =
keySerializationStream.toByteArray();
+ DataOutputViewStreamWrapper out = new
DataOutputViewStreamWrapper(keySerializationStream);
+
+ List<byte[]> bytes = new
ArrayList<>(values.size());
+ for (V value : values) {
+ keySerializationStream.reset();
+ valueSerializer.serialize(value, out);
+
bytes.add(keySerializationStream.toByteArray());
+ }
+
+ backend.db.put(columnFamily, writeOptions, key,
MergeUtils.merge(bytes));
--- End diff --
good idea. I will do some benchmarking
> support update() in ListState
> -----------------------------
>
> Key: FLINK-7475
> URL: https://issues.apache.org/jira/browse/FLINK-7475
> Project: Flink
> Issue Type: Improvement
> Components: Core, DataStream API, State Backends, Checkpointing
> Affects Versions: 1.4.0
> Reporter: yf
> Assignee: Bowen Li
> Fix For: 1.5.0
>
>
> If I want to update the list.
> I have to do two steps:
> listState.clear()
> for (Element e : myList) {
> listState.add(e);
> }
> Why not I update the state by:
> listState.update(myList) ?
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)