[ 
https://issues.apache.org/jira/browse/FLINK-8365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334144#comment-16334144
 ] 

ASF GitHub Bot commented on FLINK-8365:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5326#discussion_r162903235
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
 ---
    @@ -128,27 +128,33 @@ public void update(List<V> values) throws Exception {
     
                if (values != null && !values.isEmpty()) {
                        final N namespace = currentNamespace;
    -                   final StateTable<K, N, ArrayList<V>> map = stateTable;
    +                   final StateTable<K, N, List<V>> map = stateTable;
    +                   List<V> list = map.get(namespace);
     
    -                   map.put(namespace, new ArrayList<>(values));
    +                   if (list == null) {
    +                           list = new ArrayList<>(values);
    +                           map.put(namespace, list);
    +                   } else {
    +                           list.clear();
    +                           list.addAll(values);
    +                   }
                }
        }
     
        @Override
        public void addAll(List<V> values) throws Exception {
                if (values != null && !values.isEmpty()) {
                        final N namespace = currentNamespace;
    -                   final StateTable<K, N, ArrayList<V>> map = stateTable;
    +                   final StateTable<K, N, List<V>> map = stateTable;
     
    -                   ArrayList<V> list = map.get(currentNamespace);
    +                   List<V> list = map.get(currentNamespace);
     
                        if (list == null) {
                                list = new ArrayList<>();
    --- End diff --
    
    We could already create a `new ArrayList<>(values.size())`


> Relax List type in HeapListState and HeapKeyedStateBackend
> ----------------------------------------------------------
>
>                 Key: FLINK-8365
>                 URL: https://issues.apache.org/jira/browse/FLINK-8365
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.5.0
>            Reporter: Bowen Li
>            Assignee: Bowen Li
>            Priority: Major
>             Fix For: 1.5.0
>
>
> {{stateTable}} in HeapListState and 
> {{HeapKeyedStateBackend#createListState()}} are both strongly typed to 
> {{ArrayList}} right now.
> As discussed with [~StephanEwen] and [[email protected]] in 
> https://github.com/apache/flink/pull/4963, we may want to relax the type to 
> {{List}}.
> Problems discovered now:
> 1. That may require changing serializer from {{ArrayListSerializer}} to 
> {{ListSerializer}} in the following code, and we need to discuss the pros and 
> cons
> {code:java}
> @Override
>       public <N, T> InternalListState<N, T> createListState(
>                       TypeSerializer<N> namespaceSerializer,
>                       ListStateDescriptor<T> stateDesc) throws Exception {
>               // the list state does some manual mapping, because the state 
> is typed to the generic
>               // 'List' interface, but we want to use an implementation typed 
> to ArrayList
>               // using a more specialized implementation opens up runtime 
> optimizations
>               StateTable<K, N, ArrayList<T>> stateTable = 
> tryRegisterStateTable(
>                               stateDesc.getName(),
>                               stateDesc.getType(),
>                               namespaceSerializer,
>                               new 
> ArrayListSerializer<T>(stateDesc.getElementSerializer()));
>               return new HeapListState<>(stateDesc, stateTable, 
> keySerializer, namespaceSerializer);
>       }
> {code}
> 2. for non-RocksDBStateBackend (AsyncFileStateBackendTest, 
> AsyncMemoryStateBackendTest, FileStateBackendTest, and 
> MemoryStateBackendTest), unit tests testListState and 
> testListStateAddUpdateAndGet fail



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to