[ 
https://issues.apache.org/jira/browse/FLINK-9571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16514788#comment-16514788
 ] 

ASF GitHub Bot commented on FLINK-9571:
---------------------------------------

Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6173#discussion_r195903619
  
    --- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
    @@ -1303,103 +1316,18 @@ private ColumnFamilyHandle 
createColumnFamily(String stateName) throws IOExcepti
        }
     
        @Override
    -   protected <N, T> InternalValueState<K, N, T> createValueState(
    -           TypeSerializer<N> namespaceSerializer,
    -           ValueStateDescriptor<T> stateDesc) throws Exception {
    -
    -           Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<N, T>> registerResult =
    -                           tryRegisterKvStateInformation(stateDesc, 
namespaceSerializer);
    -
    -           return new RocksDBValueState<>(
    -                           registerResult.f0,
    -                           registerResult.f1.getNamespaceSerializer(),
    -                           registerResult.f1.getStateSerializer(),
    -                           stateDesc.getDefaultValue(),
    -                           this);
    -   }
    -
    -   @Override
    -   protected <N, T> InternalListState<K, N, T> createListState(
    -           TypeSerializer<N> namespaceSerializer,
    -           ListStateDescriptor<T> stateDesc) throws Exception {
    -
    -           Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<N, List<T>>> registerResult =
    -                           tryRegisterKvStateInformation(stateDesc, 
namespaceSerializer);
    -
    -           return new RocksDBListState<>(
    -                           registerResult.f0,
    -                           registerResult.f1.getNamespaceSerializer(),
    -                           registerResult.f1.getStateSerializer(),
    -                           stateDesc.getDefaultValue(),
    -                           stateDesc.getElementSerializer(),
    -                           this);
    -   }
    -
    -   @Override
    -   protected <N, T> InternalReducingState<K, N, T> createReducingState(
    -           TypeSerializer<N> namespaceSerializer,
    -           ReducingStateDescriptor<T> stateDesc) throws Exception {
    -
    -           Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<N, T>> registerResult =
    -                           tryRegisterKvStateInformation(stateDesc, 
namespaceSerializer);
    -
    -           return new RocksDBReducingState<>(
    -                           registerResult.f0,
    -                           registerResult.f1.getNamespaceSerializer(),
    -                           registerResult.f1.getStateSerializer(),
    -                           stateDesc.getDefaultValue(),
    -                           stateDesc.getReduceFunction(),
    -                           this);
    -   }
    -
    -   @Override
    -   protected <N, T, ACC, R> InternalAggregatingState<K, N, T, ACC, R> 
createAggregatingState(
    -           TypeSerializer<N> namespaceSerializer,
    -           AggregatingStateDescriptor<T, ACC, R> stateDesc) throws 
Exception {
    -
    -           Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<N, ACC>> registerResult =
    -                           tryRegisterKvStateInformation(stateDesc, 
namespaceSerializer);
    -
    -           return new RocksDBAggregatingState<>(
    -                           registerResult.f0,
    -                           registerResult.f1.getNamespaceSerializer(),
    -                           registerResult.f1.getStateSerializer(),
    -                           stateDesc.getDefaultValue(),
    -                           stateDesc.getAggregateFunction(),
    -                           this);
    -   }
    -
    -   @Override
    -   protected <N, T, ACC> InternalFoldingState<K, N, T, ACC> 
createFoldingState(
    +   public <N, SV, S extends State, IS extends S> IS createState(
                TypeSerializer<N> namespaceSerializer,
    -           FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
    -
    -           Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<N, ACC>> registerResult =
    -                           tryRegisterKvStateInformation(stateDesc, 
namespaceSerializer);
    -
    -           return new RocksDBFoldingState<>(
    -                           registerResult.f0,
    -                           registerResult.f1.getNamespaceSerializer(),
    -                           registerResult.f1.getStateSerializer(),
    -                           stateDesc.getDefaultValue(),
    -                           stateDesc.getFoldFunction(),
    -                           this);
    -   }
    -
    -   @Override
    -   protected <N, UK, UV> InternalMapState<K, N, UK, UV> createMapState(
    -           TypeSerializer<N> namespaceSerializer,
    -           MapStateDescriptor<UK, UV> stateDesc) throws Exception {
    -
    -           Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<N, Map<UK, UV>>> registerResult =
    -                           tryRegisterKvStateInformation(stateDesc, 
namespaceSerializer);
    -
    -           return new RocksDBMapState<>(
    -                           registerResult.f0,
    -                           registerResult.f1.getNamespaceSerializer(),
    -                           registerResult.f1.getStateSerializer(),
    -                           stateDesc.getDefaultValue(),
    -                           this);
    +           StateDescriptor<S, SV> stateDesc) throws Exception {
    +           if (!STATE_FACTORIES.containsKey(stateDesc.getClass())) {
    +                   String message = String.format("State %s is not 
supported by %s",
    +                           stateDesc.getClass(), this.getClass());
    +                   throw new UnsupportedOperationException(message);
    +           }
    +           Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<N, SV>> registerResult =
    +                   tryRegisterKvStateInformation(stateDesc, 
namespaceSerializer);
    +           return STATE_FACTORIES.get(stateDesc.getClass()).createState(
    --- End diff --
    
    The same above, the `containsKey()` and `get()` might could be merged into 
a single `get()`.


> Switch to internal states in StateBinder
> ----------------------------------------
>
>                 Key: FLINK-9571
>                 URL: https://issues.apache.org/jira/browse/FLINK-9571
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.6.0
>            Reporter: Andrey Zagrebin
>            Assignee: Andrey Zagrebin
>            Priority: Major
>             Fix For: 1.6.0
>
>
> The StateBinder factory for state objects is not a part of public API and it 
> produces in fact only internal states.
> It can be changed it to produce internal state interfaces instead of public 
> API.
> This can help to expose internal state API for internal components which use 
> the factory, e.g. for state TTL wrappers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to