[
https://issues.apache.org/jira/browse/FLINK-8365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334144#comment-16334144
]
ASF GitHub Bot commented on FLINK-8365:
---------------------------------------
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/5326#discussion_r162903235
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
---
@@ -128,27 +128,33 @@ public void update(List<V> values) throws Exception {
if (values != null && !values.isEmpty()) {
final N namespace = currentNamespace;
- final StateTable<K, N, ArrayList<V>> map = stateTable;
+ final StateTable<K, N, List<V>> map = stateTable;
+ List<V> list = map.get(namespace);
- map.put(namespace, new ArrayList<>(values));
+ if (list == null) {
+ list = new ArrayList<>(values);
+ map.put(namespace, list);
+ } else {
+ list.clear();
+ list.addAll(values);
+ }
}
}
@Override
public void addAll(List<V> values) throws Exception {
if (values != null && !values.isEmpty()) {
final N namespace = currentNamespace;
- final StateTable<K, N, ArrayList<V>> map = stateTable;
+ final StateTable<K, N, List<V>> map = stateTable;
- ArrayList<V> list = map.get(currentNamespace);
+ List<V> list = map.get(currentNamespace);
if (list == null) {
list = new ArrayList<>();
--- End diff --
We could already create a `new ArrayList<>(values.size())`
> Relax List type in HeapListState and HeapKeyedStateBackend
> ----------------------------------------------------------
>
> Key: FLINK-8365
> URL: https://issues.apache.org/jira/browse/FLINK-8365
> Project: Flink
> Issue Type: Improvement
> Components: State Backends, Checkpointing
> Affects Versions: 1.5.0
> Reporter: Bowen Li
> Assignee: Bowen Li
> Priority: Major
> Fix For: 1.5.0
>
>
> {{stateTable}} in HeapListState and
> {{HeapKeyedStateBackend#createListState()}} are both strongly typed to
> {{ArrayList}} right now.
> As discussed with [~StephanEwen] and [[email protected]] in
> https://github.com/apache/flink/pull/4963, we may want to relax the type to
> {{List}}.
> Problems discovered now:
> 1. That may require changing serializer from {{ArrayListSerializer}} to
> {{ListSerializer}} in the following code, and we need to discuss the pros and
> cons
> {code:java}
> @Override
> public <N, T> InternalListState<N, T> createListState(
> TypeSerializer<N> namespaceSerializer,
> ListStateDescriptor<T> stateDesc) throws Exception {
> // the list state does some manual mapping, because the state
> is typed to the generic
> // 'List' interface, but we want to use an implementation typed
> to ArrayList
> // using a more specialized implementation opens up runtime
> optimizations
> StateTable<K, N, ArrayList<T>> stateTable =
> tryRegisterStateTable(
> stateDesc.getName(),
> stateDesc.getType(),
> namespaceSerializer,
> new
> ArrayListSerializer<T>(stateDesc.getElementSerializer()));
> return new HeapListState<>(stateDesc, stateTable,
> keySerializer, namespaceSerializer);
> }
> {code}
> 2. for non-RocksDBStateBackend (AsyncFileStateBackendTest,
> AsyncMemoryStateBackendTest, FileStateBackendTest, and
> MemoryStateBackendTest), unit tests testListState and
> testListStateAddUpdateAndGet fail
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)