Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4963#discussion_r159546670 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java --- @@ -158,4 +158,28 @@ public void mergeNamespaces(N target, Collection<N> sources) throws Exception { throw new Exception("Error while merging state in RocksDB", e); } } + + @Override + public void update(List<V> values) throws Exception { + clear(); + + if (values != null && !values.isEmpty()) { + try { + writeCurrentKeyWithGroupAndNamespace(); + byte[] key = keySerializationStream.toByteArray(); + DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream); + + List<byte[]> bytes = new ArrayList<>(values.size()); + for (V value : values) { + keySerializationStream.reset(); + valueSerializer.serialize(value, out); + bytes.add(keySerializationStream.toByteArray()); + } + + backend.db.put(columnFamily, writeOptions, key, MergeUtils.merge(bytes)); --- End diff -- good idea. I will do some benchmarking
---