[ 
https://issues.apache.org/jira/browse/FLINK-8090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16257193#comment-16257193
 ] 

ASF GitHub Bot commented on FLINK-8090:
---------------------------------------

GitHub user xccui opened a pull request:

    https://github.com/apache/flink/pull/5032

    [FLINK-8090] [DataStream] Improve the error message for duplicate state name

    ## What is the purpose of the change
    
    This PR improves the error message when users trying to access two states 
of different types, but with an identical name. However, it cannot detect two 
states of the same type and name, but are registered via two descriptors.
    
    ## Brief change log
    
    Refactor `DefaultKeyedStateStore.java` to raise an error message when the 
required state type does not match the real state type.
    
    ## Verifying this change
    
    The change can be verified by existing tests.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/xccui/flink FLINK-8090

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5032.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5032
    
----
commit 78acc38ff0a1527eeb8204a74b65765bc673a6b3
Author: Xingcan Cui <[email protected]>
Date:   2017-11-17T12:13:49Z

    [FLINK-8090] [DataStream] Improve error message when registering different 
states under the same name

----


> Improve error message when registering different states under the same name.
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-8090
>                 URL: https://issues.apache.org/jira/browse/FLINK-8090
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.4.0
>            Reporter: Kostas Kloudas
>            Assignee: Xingcan Cui
>
> Currently a {{ProcessFunction}} like this:
> {code}
> final MapStateDescriptor<Integer, Tuple2<Integer, Long>> 
> firstMapStateDescriptor = new MapStateDescriptor<>(
>                                       "timon-one",
>                                       BasicTypeInfo.INT_TYPE_INFO,
>                                       source.getType());
> final ListStateDescriptor<Integer> secondListStateDescriptor = new 
> ListStateDescriptor<Integer>(
>                                       "timon-one",
>                                       BasicTypeInfo.INT_TYPE_INFO);
> new ProcessFunction<Tuple2<Integer, Long>, Object>() {
>                               private static final long serialVersionUID = 
> -805125545438296619L;
>                               private transient MapState<Integer, 
> Tuple2<Integer, Long>> firstMapState;
>                                 private transient ListState<Integer> 
> secondListState;
>                               @Override
>                               public void open(Configuration parameters) 
> throws Exception {
>                                       super.open(parameters);
>                                       firstMapState = 
> getRuntimeContext().getMapState(firstMapStateDescriptor);
>                                       secondListState = 
> getRuntimeContext().getListState(secondListStateDescriptor);
>                               }
>                               @Override
>                               public void processElement(Tuple2<Integer, 
> Long> value, Context ctx, Collector<Object> out) throws Exception {
>                                       Tuple2<Integer, Long> v = 
> firstMapState.get(value.f0);
>                                       if (v == null) {
>                                               v = new Tuple2<>(value.f0, 0L);
>                                       }
>                                       firstMapState.put(value.f0, new 
> Tuple2<>(v.f0, v.f1 + value.f1));
>                               }
>                       }
> {code}
> fails with:
> {code}
> java.lang.RuntimeException: Error while getting state
>       at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74)
>       at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:127)
>       at 
> org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase$2.open(AbstractQueryableStateTestBase.java:327)
>       at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>       at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>       at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:58)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:381)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassCastException: 
> org.apache.flink.runtime.state.heap.HeapMapState cannot be cast to 
> org.apache.flink.api.common.state.ListState
>       at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:71)
>       ... 9 more
> {code}
> Which is cryptic, as it does not explain the reason for the problem. The 
> error message should be something along the line of "Duplicate state name".



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to