[
https://issues.apache.org/jira/browse/FLINK-8090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16262007#comment-16262007
]
ASF GitHub Bot commented on FLINK-8090:
---------------------------------------
Github user bowenli86 commented on a diff in the pull request:
https://github.com/apache/flink/pull/5032#discussion_r152475532
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
---
@@ -270,6 +271,20 @@ public void testMapStateReturnsEmptyMapByDefault()
throws Exception {
assertFalse(value.iterator().hasNext());
}
+ @Test(expected = DuplicateStateNameException.class)
+ public void testDuplicateStateName() throws Exception {
+ StreamingRuntimeContext context = new StreamingRuntimeContext(
+ createMapPlainMockOp(),
+ createMockEnvironment(),
+ Collections.emptyMap());
+ MapStateDescriptor<Integer, String> mapStateDesc =
+ new MapStateDescriptor<>("name", Integer.class,
String.class);
+ ListStateDescriptor<String> listStateDesc =
+ new ListStateDescriptor<>("name", String.class);
+ context.getMapState(mapStateDesc);
--- End diff --
This is good enough 👍
> Improve error message when registering different states under the same name.
> ----------------------------------------------------------------------------
>
> Key: FLINK-8090
> URL: https://issues.apache.org/jira/browse/FLINK-8090
> Project: Flink
> Issue Type: Bug
> Components: DataStream API
> Affects Versions: 1.4.0
> Reporter: Kostas Kloudas
> Assignee: Xingcan Cui
>
> Currently a {{ProcessFunction}} like this:
> {code}
> final MapStateDescriptor<Integer, Tuple2<Integer, Long>>
> firstMapStateDescriptor = new MapStateDescriptor<>(
> "timon-one",
> BasicTypeInfo.INT_TYPE_INFO,
> source.getType());
> final ListStateDescriptor<Integer> secondListStateDescriptor = new
> ListStateDescriptor<Integer>(
> "timon-one",
> BasicTypeInfo.INT_TYPE_INFO);
> new ProcessFunction<Tuple2<Integer, Long>, Object>() {
> private static final long serialVersionUID =
> -805125545438296619L;
> private transient MapState<Integer,
> Tuple2<Integer, Long>> firstMapState;
> private transient ListState<Integer>
> secondListState;
> @Override
> public void open(Configuration parameters)
> throws Exception {
> super.open(parameters);
> firstMapState =
> getRuntimeContext().getMapState(firstMapStateDescriptor);
> secondListState =
> getRuntimeContext().getListState(secondListStateDescriptor);
> }
> @Override
> public void processElement(Tuple2<Integer,
> Long> value, Context ctx, Collector<Object> out) throws Exception {
> Tuple2<Integer, Long> v =
> firstMapState.get(value.f0);
> if (v == null) {
> v = new Tuple2<>(value.f0, 0L);
> }
> firstMapState.put(value.f0, new
> Tuple2<>(v.f0, v.f1 + value.f1));
> }
> }
> {code}
> fails with:
> {code}
> java.lang.RuntimeException: Error while getting state
> at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74)
> at
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:127)
> at
> org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase$2.open(AbstractQueryableStateTestBase.java:327)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:58)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:381)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassCastException:
> org.apache.flink.runtime.state.heap.HeapMapState cannot be cast to
> org.apache.flink.api.common.state.ListState
> at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:71)
> ... 9 more
> {code}
> Which is cryptic, as it does not explain the reason for the problem. The
> error message should be something along the line of "Duplicate state name".
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)