[ 
https://issues.apache.org/jira/browse/FLINK-8090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas updated FLINK-8090:
----------------------------------
    Description: 
Currently a {{ProcessFunction}} like this:

{code}

final MapStateDescriptor<Integer, Tuple2<Integer, Long>> 
firstMapStateDescriptor = new MapStateDescriptor<>(
                                        "timon-one",
                                        BasicTypeInfo.INT_TYPE_INFO,
                                        source.getType());

final ListStateDescriptor<Integer> secondListStateDescriptor = new 
ListStateDescriptor<Integer>(
                                        "timon-one",
                                        BasicTypeInfo.INT_TYPE_INFO);

new ProcessFunction<Tuple2<Integer, Long>, Object>() {
                                private static final long serialVersionUID = 
-805125545438296619L;

                                private transient MapState<Integer, 
Tuple2<Integer, Long>> firstMapState;
                                private transient ListState<Integer> 
secondListState;

                                @Override
                                public void open(Configuration parameters) 
throws Exception {
                                        super.open(parameters);
                                        firstMapState = 
getRuntimeContext().getMapState(firstMapStateDescriptor);
                                        secondListState = 
getRuntimeContext().getListState(secondListStateDescriptor);
                                }

                                @Override
                                public void processElement(Tuple2<Integer, 
Long> value, Context ctx, Collector<Object> out) throws Exception {
                                        Tuple2<Integer, Long> v = 
firstMapState.get(value.f0);
                                        if (v == null) {
                                                v = new Tuple2<>(value.f0, 0L);
                                        }
                                        firstMapState.put(value.f0, new 
Tuple2<>(v.f0, v.f1 + value.f1));
                                }
                        }
{code}

fails with:

{code}
java.lang.RuntimeException: Error while getting state
        at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74)
        at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:127)
        at 
org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase$2.open(AbstractQueryableStateTestBase.java:327)
        at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
        at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:58)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:381)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: 
org.apache.flink.runtime.state.heap.HeapMapState cannot be cast to 
org.apache.flink.api.common.state.ListState
        at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:71)
        ... 9 more
{code}

Which is cryptic, as it does not explain the reason for the problem. The error 
message should be something along the line of "Duplicate state name".

  was:
Currently a {{ProcessFunction}} like this is a valid job:

{code}

final MapStateDescriptor<Integer, Tuple2<Integer, Long>> 
firstMapStateDescriptor = new MapStateDescriptor<>(
                                        "timon-one",
                                        BasicTypeInfo.INT_TYPE_INFO,
                                        source.getType());

final ListStateDescriptor<Integer> secondListStateDescriptor = new 
ListStateDescriptor<Integer>(
                                        "timon-one",
                                        BasicTypeInfo.INT_TYPE_INFO);

new ProcessFunction<Tuple2<Integer, Long>, Object>() {
                                private static final long serialVersionUID = 
-805125545438296619L;

                                private transient MapState<Integer, 
Tuple2<Integer, Long>> firstMapState;
                                private transient ListState<Integer> 
secondListState;

                                @Override
                                public void open(Configuration parameters) 
throws Exception {
                                        super.open(parameters);
                                        firstMapState = 
getRuntimeContext().getMapState(firstMapStateDescriptor);
                                        secondListState = 
getRuntimeContext().getListState(secondListStateDescriptor);
                                }

                                @Override
                                public void processElement(Tuple2<Integer, 
Long> value, Context ctx, Collector<Object> out) throws Exception {
                                        Tuple2<Integer, Long> v = 
firstMapState.get(value.f0);
                                        if (v == null) {
                                                v = new Tuple2<>(value.f0, 0L);
                                        }
                                        firstMapState.put(value.f0, new 
Tuple2<>(v.f0, v.f1 + value.f1));
                                }
                        }
{code}

fails with:

{code}
java.lang.RuntimeException: Error while getting state
        at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74)
        at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:127)
        at 
org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase$2.open(AbstractQueryableStateTestBase.java:327)
        at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
        at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:58)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:381)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: 
org.apache.flink.runtime.state.heap.HeapMapState cannot be cast to 
org.apache.flink.api.common.state.ListState
        at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:71)
        ... 9 more
{code}

Which is cryptic, as it does not explain the reason for the problem. The error 
message should be something along the line of "Duplicate state name".


> An operator should not be able to register two states with the same name.
> -------------------------------------------------------------------------
>
>                 Key: FLINK-8090
>                 URL: https://issues.apache.org/jira/browse/FLINK-8090
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.4.0
>            Reporter: Kostas Kloudas
>
> Currently a {{ProcessFunction}} like this:
> {code}
> final MapStateDescriptor<Integer, Tuple2<Integer, Long>> 
> firstMapStateDescriptor = new MapStateDescriptor<>(
>                                       "timon-one",
>                                       BasicTypeInfo.INT_TYPE_INFO,
>                                       source.getType());
> final ListStateDescriptor<Integer> secondListStateDescriptor = new 
> ListStateDescriptor<Integer>(
>                                       "timon-one",
>                                       BasicTypeInfo.INT_TYPE_INFO);
> new ProcessFunction<Tuple2<Integer, Long>, Object>() {
>                               private static final long serialVersionUID = 
> -805125545438296619L;
>                               private transient MapState<Integer, 
> Tuple2<Integer, Long>> firstMapState;
>                                 private transient ListState<Integer> 
> secondListState;
>                               @Override
>                               public void open(Configuration parameters) 
> throws Exception {
>                                       super.open(parameters);
>                                       firstMapState = 
> getRuntimeContext().getMapState(firstMapStateDescriptor);
>                                       secondListState = 
> getRuntimeContext().getListState(secondListStateDescriptor);
>                               }
>                               @Override
>                               public void processElement(Tuple2<Integer, 
> Long> value, Context ctx, Collector<Object> out) throws Exception {
>                                       Tuple2<Integer, Long> v = 
> firstMapState.get(value.f0);
>                                       if (v == null) {
>                                               v = new Tuple2<>(value.f0, 0L);
>                                       }
>                                       firstMapState.put(value.f0, new 
> Tuple2<>(v.f0, v.f1 + value.f1));
>                               }
>                       }
> {code}
> fails with:
> {code}
> java.lang.RuntimeException: Error while getting state
>       at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74)
>       at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:127)
>       at 
> org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase$2.open(AbstractQueryableStateTestBase.java:327)
>       at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>       at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>       at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:58)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:381)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassCastException: 
> org.apache.flink.runtime.state.heap.HeapMapState cannot be cast to 
> org.apache.flink.api.common.state.ListState
>       at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:71)
>       ... 9 more
> {code}
> Which is cryptic, as it does not explain the reason for the problem. The 
> error message should be something along the line of "Duplicate state name".



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to