[
https://issues.apache.org/jira/browse/FLINK-8090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16256826#comment-16256826
]
Xingcan Cui commented on FLINK-8090:
------------------------------------
Hi [~kkl0u], thanks for raising this. I found that a {{State}} is only decided
by the name in its {{StateDescriptor}}. In other words, if we create two
descriptors with an identical name and type, it will return the same state
object (though the wrapper may be different). On the other hand, for two
descriptors with an identical name but different types, we can detect them with
a {{ClassCastException}}, which is caused by the type erasure in Java. I'll try
to refactor the {{DefaultKeyedStateStore}} to provide a better error message
for the later case.
Best, Xingcan
> Improve error message when registering different states under the same name.
> ----------------------------------------------------------------------------
>
> Key: FLINK-8090
> URL: https://issues.apache.org/jira/browse/FLINK-8090
> Project: Flink
> Issue Type: Bug
> Components: DataStream API
> Affects Versions: 1.4.0
> Reporter: Kostas Kloudas
> Assignee: Xingcan Cui
>
> Currently a {{ProcessFunction}} like this:
> {code}
> final MapStateDescriptor<Integer, Tuple2<Integer, Long>>
> firstMapStateDescriptor = new MapStateDescriptor<>(
> "timon-one",
> BasicTypeInfo.INT_TYPE_INFO,
> source.getType());
> final ListStateDescriptor<Integer> secondListStateDescriptor = new
> ListStateDescriptor<Integer>(
> "timon-one",
> BasicTypeInfo.INT_TYPE_INFO);
> new ProcessFunction<Tuple2<Integer, Long>, Object>() {
> private static final long serialVersionUID =
> -805125545438296619L;
> private transient MapState<Integer,
> Tuple2<Integer, Long>> firstMapState;
> private transient ListState<Integer>
> secondListState;
> @Override
> public void open(Configuration parameters)
> throws Exception {
> super.open(parameters);
> firstMapState =
> getRuntimeContext().getMapState(firstMapStateDescriptor);
> secondListState =
> getRuntimeContext().getListState(secondListStateDescriptor);
> }
> @Override
> public void processElement(Tuple2<Integer,
> Long> value, Context ctx, Collector<Object> out) throws Exception {
> Tuple2<Integer, Long> v =
> firstMapState.get(value.f0);
> if (v == null) {
> v = new Tuple2<>(value.f0, 0L);
> }
> firstMapState.put(value.f0, new
> Tuple2<>(v.f0, v.f1 + value.f1));
> }
> }
> {code}
> fails with:
> {code}
> java.lang.RuntimeException: Error while getting state
> at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74)
> at
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:127)
> at
> org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase$2.open(AbstractQueryableStateTestBase.java:327)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:58)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:381)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassCastException:
> org.apache.flink.runtime.state.heap.HeapMapState cannot be cast to
> org.apache.flink.api.common.state.ListState
> at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:71)
> ... 9 more
> {code}
> Which is cryptic, as it does not explain the reason for the problem. The
> error message should be something along the line of "Duplicate state name".
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)