[
https://issues.apache.org/jira/browse/FLINK-6775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Till Rohrmann updated FLINK-6775:
---------------------------------
Description:
The {{StateDescriptor}} contains the {{TypeSerializer}} which is used to
serialize the state. The serializer instance won't be duplicated when it is
accessed. Therefore, the {{StateDescriptor}} cannot be shared if the
{{TypeSerializer}} is stateful as in the case of the {{KryoSerializer}}.
This problem can easily arise when a user defines a stateful operator which
defines the {{StateDescriptor}} statically. The work around is to not define a
static {{StateDescriptor}}. However, I would still make it a blocker, because
it is extremely hard to debug for the user if things fail because the
{{TypeSerializer}} is used concurrently.
The following operator produces the problem:
{code}
private static final class StatefulMapper extends
RichMapFunction<Tuple2<Long,Long>, Tuple2<Long, Long>> implements
CheckpointedFunction {
private static final long serialVersionUID = -1175717056869107847L;
private static final ValueStateDescriptor<PojoType> POJO_VALUE_STATE =
new ValueStateDescriptor<PojoType>("pojoType", PojoType.class);
private transient ValueState<PojoType> valueState;
public StatefulMapper() {
valueState = null;
}
@Override
public Tuple2<Long, Long> map(Tuple2<Long, Long> tuple) throws
Exception {
PojoType pojoType = new PojoType(1, 1.0, "1.0", new NestedPojo(2,
2.0));
valueState.update(pojoType);
return tuple;
}
@Override
public void snapshotState(FunctionSnapshotContext
functionSnapshotContext) throws Exception {}
@Override
public void initializeState(FunctionInitializationContext
functionInitializationContext) throws Exception {
valueState =
functionInitializationContext.getKeyedStateStore().getState(POJO_VALUE_STATE);
}
}
{code}
was:
The {{StateDescriptor}} contains the {{TypeSerializer}} which is used to
serialize the state. The serializer instance won't be duplicated when it is
accessed. Therefore, the {{StateDescriptor}} cannot be shared if the
{{TypeSerializer}} is stateful as in the case of the {{KryoSerializer}}.
This problem can easily arise when a user defines a stateful operator which
defines the {{StateDescriptor}} statically. The work around is to not define a
static {{StateDescriptor}}. However, I would still make it a blocker, because
it is extremely hard to debug for the user if things fail because the
{{TypeSerializer}} is used concurrently.
> StateDescriptor cannot be shared by multiple subtasks
> -----------------------------------------------------
>
> Key: FLINK-6775
> URL: https://issues.apache.org/jira/browse/FLINK-6775
> Project: Flink
> Issue Type: Bug
> Components: DataStream API
> Affects Versions: 1.0.3, 1.1.4, 1.3.0, 1.2.1, 1.4.0
> Reporter: Till Rohrmann
> Priority: Blocker
>
> The {{StateDescriptor}} contains the {{TypeSerializer}} which is used to
> serialize the state. The serializer instance won't be duplicated when it is
> accessed. Therefore, the {{StateDescriptor}} cannot be shared if the
> {{TypeSerializer}} is stateful as in the case of the {{KryoSerializer}}.
> This problem can easily arise when a user defines a stateful operator which
> defines the {{StateDescriptor}} statically. The work around is to not define
> a static {{StateDescriptor}}. However, I would still make it a blocker,
> because it is extremely hard to debug for the user if things fail because the
> {{TypeSerializer}} is used concurrently.
> The following operator produces the problem:
> {code}
> private static final class StatefulMapper extends
> RichMapFunction<Tuple2<Long,Long>, Tuple2<Long, Long>> implements
> CheckpointedFunction {
> private static final long serialVersionUID = -1175717056869107847L;
> private static final ValueStateDescriptor<PojoType> POJO_VALUE_STATE
> = new ValueStateDescriptor<PojoType>("pojoType", PojoType.class);
> private transient ValueState<PojoType> valueState;
> public StatefulMapper() {
> valueState = null;
> }
> @Override
> public Tuple2<Long, Long> map(Tuple2<Long, Long> tuple) throws
> Exception {
> PojoType pojoType = new PojoType(1, 1.0, "1.0", new NestedPojo(2,
> 2.0));
> valueState.update(pojoType);
> return tuple;
> }
> @Override
> public void snapshotState(FunctionSnapshotContext
> functionSnapshotContext) throws Exception {}
> @Override
> public void initializeState(FunctionInitializationContext
> functionInitializationContext) throws Exception {
> valueState =
> functionInitializationContext.getKeyedStateStore().getState(POJO_VALUE_STATE);
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)