[ 
https://issues.apache.org/jira/browse/FLINK-8090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16256826#comment-16256826
 ] 

Xingcan Cui commented on FLINK-8090:
------------------------------------

Hi [~kkl0u], thanks for raising this. I found that a {{State}} is only decided 
by the name in its {{StateDescriptor}}. In other words, if we create two 
descriptors with an identical name and type, it will return the same state 
object (though the wrapper may be different). On the other hand, for two 
descriptors with an identical name but different types, we can detect them with 
a {{ClassCastException}}, which is caused by the type erasure in Java. I'll try 
to refactor the {{DefaultKeyedStateStore}} to provide a better error message 
for the later case.

Best, Xingcan

> Improve error message when registering different states under the same name.
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-8090
>                 URL: https://issues.apache.org/jira/browse/FLINK-8090
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.4.0
>            Reporter: Kostas Kloudas
>            Assignee: Xingcan Cui
>
> Currently a {{ProcessFunction}} like this:
> {code}
> final MapStateDescriptor<Integer, Tuple2<Integer, Long>> 
> firstMapStateDescriptor = new MapStateDescriptor<>(
>                                       "timon-one",
>                                       BasicTypeInfo.INT_TYPE_INFO,
>                                       source.getType());
> final ListStateDescriptor<Integer> secondListStateDescriptor = new 
> ListStateDescriptor<Integer>(
>                                       "timon-one",
>                                       BasicTypeInfo.INT_TYPE_INFO);
> new ProcessFunction<Tuple2<Integer, Long>, Object>() {
>                               private static final long serialVersionUID = 
> -805125545438296619L;
>                               private transient MapState<Integer, 
> Tuple2<Integer, Long>> firstMapState;
>                                 private transient ListState<Integer> 
> secondListState;
>                               @Override
>                               public void open(Configuration parameters) 
> throws Exception {
>                                       super.open(parameters);
>                                       firstMapState = 
> getRuntimeContext().getMapState(firstMapStateDescriptor);
>                                       secondListState = 
> getRuntimeContext().getListState(secondListStateDescriptor);
>                               }
>                               @Override
>                               public void processElement(Tuple2<Integer, 
> Long> value, Context ctx, Collector<Object> out) throws Exception {
>                                       Tuple2<Integer, Long> v = 
> firstMapState.get(value.f0);
>                                       if (v == null) {
>                                               v = new Tuple2<>(value.f0, 0L);
>                                       }
>                                       firstMapState.put(value.f0, new 
> Tuple2<>(v.f0, v.f1 + value.f1));
>                               }
>                       }
> {code}
> fails with:
> {code}
> java.lang.RuntimeException: Error while getting state
>       at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74)
>       at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:127)
>       at 
> org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase$2.open(AbstractQueryableStateTestBase.java:327)
>       at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>       at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>       at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:58)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:381)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassCastException: 
> org.apache.flink.runtime.state.heap.HeapMapState cannot be cast to 
> org.apache.flink.api.common.state.ListState
>       at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:71)
>       ... 9 more
> {code}
> Which is cryptic, as it does not explain the reason for the problem. The 
> error message should be something along the line of "Duplicate state name".



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to