Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/5326#discussion_r162903235
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
---
@@ -128,27 +128,33 @@ public void update(List<V> values) throws Exception {
if (values != null && !values.isEmpty()) {
final N namespace = currentNamespace;
- final StateTable<K, N, ArrayList<V>> map = stateTable;
+ final StateTable<K, N, List<V>> map = stateTable;
+ List<V> list = map.get(namespace);
- map.put(namespace, new ArrayList<>(values));
+ if (list == null) {
+ list = new ArrayList<>(values);
+ map.put(namespace, list);
+ } else {
+ list.clear();
+ list.addAll(values);
+ }
}
}
@Override
public void addAll(List<V> values) throws Exception {
if (values != null && !values.isEmpty()) {
final N namespace = currentNamespace;
- final StateTable<K, N, ArrayList<V>> map = stateTable;
+ final StateTable<K, N, List<V>> map = stateTable;
- ArrayList<V> list = map.get(currentNamespace);
+ List<V> list = map.get(currentNamespace);
if (list == null) {
list = new ArrayList<>();
--- End diff --
We could already create a `new ArrayList<>(values.size())`
---