Github user bowenli86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4963#discussion_r159546670
  
    --- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
 ---
    @@ -158,4 +158,28 @@ public void mergeNamespaces(N target, Collection<N> 
sources) throws Exception {
                        throw new Exception("Error while merging state in 
RocksDB", e);
                }
        }
    +
    +   @Override
    +   public void update(List<V> values) throws Exception {
    +           clear();
    +
    +           if (values != null && !values.isEmpty()) {
    +                   try {
    +                           writeCurrentKeyWithGroupAndNamespace();
    +                           byte[] key = 
keySerializationStream.toByteArray();
    +                           DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
    +
    +                           List<byte[]> bytes = new 
ArrayList<>(values.size());
    +                           for (V value : values) {
    +                                   keySerializationStream.reset();
    +                                   valueSerializer.serialize(value, out);
    +                                   
bytes.add(keySerializationStream.toByteArray());
    +                           }
    +
    +                           backend.db.put(columnFamily, writeOptions, key, 
MergeUtils.merge(bytes));
    --- End diff --
    
    good idea. I will do some benchmarking


---

Reply via email to