[
https://issues.apache.org/jira/browse/FLINK-6775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16031064#comment-16031064
]
ASF GitHub Bot commented on FLINK-6775:
---------------------------------------
Github user tillrohrmann commented on the issue:
https://github.com/apache/flink/pull/4025
Thanks for the quick review @aljoscha. Yes my filter condition was
`*StateDescriptorTest`. Will add a test for the `AggregatingStateDescriptor`.
> StateDescriptor cannot be shared by multiple subtasks
> -----------------------------------------------------
>
> Key: FLINK-6775
> URL: https://issues.apache.org/jira/browse/FLINK-6775
> Project: Flink
> Issue Type: Bug
> Components: DataStream API
> Affects Versions: 1.0.3, 1.1.4, 1.3.0, 1.2.1, 1.4.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
> Priority: Blocker
>
> The {{StateDescriptor}} contains the {{TypeSerializer}} which is used to
> serialize the state. The serializer instance won't be duplicated when it is
> accessed. Therefore, the {{StateDescriptor}} cannot be shared if the
> {{TypeSerializer}} is stateful as in the case of the {{KryoSerializer}}.
> This problem can easily arise when a user defines a stateful operator which
> defines the {{StateDescriptor}} statically. The work around is to not define
> a static {{StateDescriptor}}. However, I would still make it a blocker,
> because it is extremely hard to debug for the user if things fail because the
> {{TypeSerializer}} is used concurrently.
> The following operator produces the problem:
> {code}
> private static final class StatefulMapper extends
> RichMapFunction<Tuple2<Long,Long>, Tuple2<Long, Long>> implements
> CheckpointedFunction {
> private static final long serialVersionUID = -1175717056869107847L;
> private static final ValueStateDescriptor<PojoType> POJO_VALUE_STATE
> = new ValueStateDescriptor<PojoType>("pojoType", PojoType.class);
> private transient ValueState<PojoType> valueState;
> public StatefulMapper() {
> valueState = null;
> }
> @Override
> public Tuple2<Long, Long> map(Tuple2<Long, Long> tuple) throws
> Exception {
> PojoType pojoType = new PojoType(1, 1.0, "1.0", new NestedPojo(2,
> 2.0));
> valueState.update(pojoType);
> return tuple;
> }
> @Override
> public void snapshotState(FunctionSnapshotContext
> functionSnapshotContext) throws Exception {}
> @Override
> public void initializeState(FunctionInitializationContext
> functionInitializationContext) throws Exception {
> valueState =
> functionInitializationContext.getKeyedStateStore().getState(POJO_VALUE_STATE);
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)