[ 
https://issues.apache.org/jira/browse/FLINK-9571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16514786#comment-16514786
 ] 

ASF GitHub Bot commented on FLINK-9571:
---------------------------------------

Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6173#discussion_r195903433
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ---
    @@ -203,91 +216,16 @@ private boolean hasRegisteredState() {
        }
     
        @Override
    -   public <N, V> InternalValueState<K, N, V> createValueState(
    -                   TypeSerializer<N> namespaceSerializer,
    -                   ValueStateDescriptor<V> stateDesc) throws Exception {
    -
    -           StateTable<K, N, V> stateTable = 
tryRegisterStateTable(namespaceSerializer, stateDesc);
    -           return new HeapValueState<>(
    -                           stateTable,
    -                           keySerializer,
    -                           stateTable.getStateSerializer(),
    -                           stateTable.getNamespaceSerializer(),
    -                           stateDesc.getDefaultValue());
    -   }
    -
    -   @Override
    -   public <N, T> InternalListState<K, N, T> createListState(
    -                   TypeSerializer<N> namespaceSerializer,
    -                   ListStateDescriptor<T> stateDesc) throws Exception {
    -
    -           StateTable<K, N, List<T>> stateTable = 
tryRegisterStateTable(namespaceSerializer, stateDesc);
    -           return new HeapListState<>(
    -                           stateTable,
    -                           keySerializer,
    -                           stateTable.getStateSerializer(),
    -                           stateTable.getNamespaceSerializer(),
    -                           stateDesc.getDefaultValue());
    -   }
    -
    -   @Override
    -   public <N, T> InternalReducingState<K, N, T> createReducingState(
    -                   TypeSerializer<N> namespaceSerializer,
    -                   ReducingStateDescriptor<T> stateDesc) throws Exception {
    -
    -           StateTable<K, N, T> stateTable = 
tryRegisterStateTable(namespaceSerializer, stateDesc);
    -           return new HeapReducingState<>(
    -                           stateTable,
    -                           keySerializer,
    -                           stateTable.getStateSerializer(),
    -                           stateTable.getNamespaceSerializer(),
    -                           stateDesc.getDefaultValue(),
    -                           stateDesc.getReduceFunction());
    -   }
    -
    -   @Override
    -   public <N, T, ACC, R> InternalAggregatingState<K, N, T, ACC, R> 
createAggregatingState(
    -                   TypeSerializer<N> namespaceSerializer,
    -                   AggregatingStateDescriptor<T, ACC, R> stateDesc) throws 
Exception {
    -
    -           StateTable<K, N, ACC> stateTable = 
tryRegisterStateTable(namespaceSerializer, stateDesc);
    -           return new HeapAggregatingState<>(
    -                           stateTable,
    -                           keySerializer,
    -                           stateTable.getStateSerializer(),
    -                           stateTable.getNamespaceSerializer(),
    -                           stateDesc.getDefaultValue(),
    -                           stateDesc.getAggregateFunction());
    -   }
    -
    -   @Override
    -   public <N, T, ACC> InternalFoldingState<K, N, T, ACC> 
createFoldingState(
    -                   TypeSerializer<N> namespaceSerializer,
    -                   FoldingStateDescriptor<T, ACC> stateDesc) throws 
Exception {
    -
    -           StateTable<K, N, ACC> stateTable = 
tryRegisterStateTable(namespaceSerializer, stateDesc);
    -           return new HeapFoldingState<>(
    -                           stateTable,
    -                           keySerializer,
    -                           stateTable.getStateSerializer(),
    -                           stateTable.getNamespaceSerializer(),
    -                           stateDesc.getDefaultValue(),
    -                           stateDesc.getFoldFunction());
    -   }
    -
    -   @Override
    -   protected <N, UK, UV> InternalMapState<K, N, UK, UV> createMapState(
    -                   TypeSerializer<N> namespaceSerializer,
    -                   MapStateDescriptor<UK, UV> stateDesc) throws Exception {
    -
    -           StateTable<K, N, Map<UK, UV>> stateTable = 
tryRegisterStateTable(namespaceSerializer, stateDesc);
    -
    -           return new HeapMapState<>(
    -                           stateTable,
    -                           keySerializer,
    -                           stateTable.getStateSerializer(),
    -                           stateTable.getNamespaceSerializer(),
    -                           stateDesc.getDefaultValue());
    +   public <N, SV, S extends State, IS extends S> IS createState(
    +           TypeSerializer<N> namespaceSerializer,
    +           StateDescriptor<S, SV> stateDesc) throws Exception {
    +           if (!STATE_FACTORIES.containsKey(stateDesc.getClass())) {
    +                   String message = String.format("State %s is not 
supported by %s",
    +                           stateDesc.getClass(), this.getClass());
    +                   throw new FlinkRuntimeException(message);
    +           }
    +           StateTable<K, N, SV> stateTable = 
tryRegisterStateTable(namespaceSerializer, stateDesc);
    +           return 
STATE_FACTORIES.get(stateDesc.getClass()).createState(stateDesc, stateTable, 
keySerializer);
    --- End diff --
    
    The same like above, maybe the `get()` and `containsKey()` could be merged 
into one `get()`.


> Switch to internal states in StateBinder
> ----------------------------------------
>
>                 Key: FLINK-9571
>                 URL: https://issues.apache.org/jira/browse/FLINK-9571
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.6.0
>            Reporter: Andrey Zagrebin
>            Assignee: Andrey Zagrebin
>            Priority: Major
>             Fix For: 1.6.0
>
>
> The StateBinder factory for state objects is not a part of public API and it 
> produces in fact only internal states.
> It can be changed it to produce internal state interfaces instead of public 
> API.
> This can help to expose internal state API for internal components which use 
> the factory, e.g. for state TTL wrappers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to