[
https://issues.apache.org/jira/browse/FLINK-8090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-8090:
----------------------------------
Labels: auto-deprioritized-major auto-deprioritized-minor auto-unassigned
pull-request-available (was: auto-deprioritized-major auto-unassigned
pull-request-available stale-minor)
Priority: Not a Priority (was: Minor)
This issue was labeled "stale-minor" 7 days ago and has not received any
updates so it is being deprioritized. If this ticket is actually Minor, please
raise the priority and ask a committer to assign you the issue or revive the
public discussion.
> Improve error message when registering different states under the same name.
> ----------------------------------------------------------------------------
>
> Key: FLINK-8090
> URL: https://issues.apache.org/jira/browse/FLINK-8090
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 1.4.0
> Reporter: Kostas Kloudas
> Priority: Not a Priority
> Labels: auto-deprioritized-major, auto-deprioritized-minor,
> auto-unassigned, pull-request-available
> Time Spent: 10m
> Remaining Estimate: 0h
>
> Currently a {{ProcessFunction}} like this:
> {code}
> final MapStateDescriptor<Integer, Tuple2<Integer, Long>>
> firstMapStateDescriptor = new MapStateDescriptor<>(
> "timon-one",
> BasicTypeInfo.INT_TYPE_INFO,
> source.getType());
> final ListStateDescriptor<Integer> secondListStateDescriptor = new
> ListStateDescriptor<Integer>(
> "timon-one",
> BasicTypeInfo.INT_TYPE_INFO);
> new ProcessFunction<Tuple2<Integer, Long>, Object>() {
> private static final long serialVersionUID =
> -805125545438296619L;
> private transient MapState<Integer,
> Tuple2<Integer, Long>> firstMapState;
> private transient ListState<Integer>
> secondListState;
> @Override
> public void open(Configuration parameters)
> throws Exception {
> super.open(parameters);
> firstMapState =
> getRuntimeContext().getMapState(firstMapStateDescriptor);
> secondListState =
> getRuntimeContext().getListState(secondListStateDescriptor);
> }
> @Override
> public void processElement(Tuple2<Integer,
> Long> value, Context ctx, Collector<Object> out) throws Exception {
> Tuple2<Integer, Long> v =
> firstMapState.get(value.f0);
> if (v == null) {
> v = new Tuple2<>(value.f0, 0L);
> }
> firstMapState.put(value.f0, new
> Tuple2<>(v.f0, v.f1 + value.f1));
> }
> }
> {code}
> fails with:
> {code}
> java.lang.RuntimeException: Error while getting state
> at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74)
> at
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:127)
> at
> org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase$2.open(AbstractQueryableStateTestBase.java:327)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:58)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:381)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassCastException:
> org.apache.flink.runtime.state.heap.HeapMapState cannot be cast to
> org.apache.flink.api.common.state.ListState
> at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:71)
> ... 9 more
> {code}
> Which is cryptic, as it does not explain the reason for the problem. The
> error message should be something along the line of "Duplicate state name".
--
This message was sent by Atlassian Jira
(v8.20.1#820001)