[
https://issues.apache.org/jira/browse/FLINK-6775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16031061#comment-16031061
]
ASF GitHub Bot commented on FLINK-6775:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4025#discussion_r119340919
--- Diff:
flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
---
@@ -101,4 +108,35 @@ public void testValueStateDescriptorAutoSerializer()
throws Exception {
assertNotNull(copy.getElementSerializer());
assertEquals(StringSerializer.INSTANCE,
copy.getElementSerializer());
}
+
+ /**
+ * FLINK-6775
+ *
+ * Tests that the returned serializer is duplicated if it is stateful.
This allows to
--- End diff --
Will change it.
> StateDescriptor cannot be shared by multiple subtasks
> -----------------------------------------------------
>
> Key: FLINK-6775
> URL: https://issues.apache.org/jira/browse/FLINK-6775
> Project: Flink
> Issue Type: Bug
> Components: DataStream API
> Affects Versions: 1.0.3, 1.1.4, 1.3.0, 1.2.1, 1.4.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
> Priority: Blocker
>
> The {{StateDescriptor}} contains the {{TypeSerializer}} which is used to
> serialize the state. The serializer instance won't be duplicated when it is
> accessed. Therefore, the {{StateDescriptor}} cannot be shared if the
> {{TypeSerializer}} is stateful as in the case of the {{KryoSerializer}}.
> This problem can easily arise when a user defines a stateful operator which
> defines the {{StateDescriptor}} statically. The work around is to not define
> a static {{StateDescriptor}}. However, I would still make it a blocker,
> because it is extremely hard to debug for the user if things fail because the
> {{TypeSerializer}} is used concurrently.
> The following operator produces the problem:
> {code}
> private static final class StatefulMapper extends
> RichMapFunction<Tuple2<Long,Long>, Tuple2<Long, Long>> implements
> CheckpointedFunction {
> private static final long serialVersionUID = -1175717056869107847L;
> private static final ValueStateDescriptor<PojoType> POJO_VALUE_STATE
> = new ValueStateDescriptor<PojoType>("pojoType", PojoType.class);
> private transient ValueState<PojoType> valueState;
> public StatefulMapper() {
> valueState = null;
> }
> @Override
> public Tuple2<Long, Long> map(Tuple2<Long, Long> tuple) throws
> Exception {
> PojoType pojoType = new PojoType(1, 1.0, "1.0", new NestedPojo(2,
> 2.0));
> valueState.update(pojoType);
> return tuple;
> }
> @Override
> public void snapshotState(FunctionSnapshotContext
> functionSnapshotContext) throws Exception {}
> @Override
> public void initializeState(FunctionInitializationContext
> functionInitializationContext) throws Exception {
> valueState =
> functionInitializationContext.getKeyedStateStore().getState(POJO_VALUE_STATE);
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)