Repository: flink Updated Branches: refs/heads/master df4216083 -> fef9f1158
[FLINK-2294] [streaming] Fix partitioned state next-input setting for copying chained collectors Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fef9f115 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fef9f115 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fef9f115 Branch: refs/heads/master Commit: fef9f115838b3ba3d3769f8669ee251c2cd403c6 Parents: df42160 Author: Gyula Fora <[email protected]> Authored: Tue Jun 30 13:48:17 2015 +0200 Committer: Gyula Fora <[email protected]> Committed: Tue Jun 30 14:29:11 2015 +0200 ---------------------------------------------------------------------- .../streaming/runtime/tasks/OutputHandler.java | 1 + .../api/state/StatefulOperatorTest.java | 35 ++++++++++++++++++++ 2 files changed, 36 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fef9f115/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java index 2d2f29b..73f0a89 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java @@ -274,6 +274,7 @@ public class OutputHandler<OUT> { @Override public void collect(T record) { try { + operator.getRuntimeContext().setNextInput(record); operator.processElement(serializer.copy(record)); } catch (Exception e) { if (LOG.isErrorEnabled()) { http://git-wip-us.apache.org/repos/asf/flink/blob/fef9f115/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java index af719f3..774b431 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java @@ -109,6 +109,11 @@ public class StatefulOperatorTest { public void invoke(String value) throws Exception {} }); + keyedStream.map(new StatefulMapper2()).setParallelism(1).addSink(new SinkFunction<String>() { + private static final long serialVersionUID = 1L; + public void invoke(String value) throws Exception {} + }); + try { keyedStream.shuffle(); fail(); @@ -224,6 +229,36 @@ public class StatefulOperatorTest { } } + public static class StatefulMapper2 extends RichMapFunction<Integer, String> { + private static final long serialVersionUID = 1L; + OperatorState<Integer> groupCounter; + + @Override + public String map(Integer value) throws Exception { + groupCounter.updateState(groupCounter.getState() + 1); + + return value.toString(); + } + + @Override + public void open(Configuration conf) throws IOException { + groupCounter = getRuntimeContext().getOperatorState("groupCounter", 0, true); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public void close() throws Exception { + Map<String, StreamOperatorState> states = ((StreamingRuntimeContext) getRuntimeContext()).getOperatorStates(); + PartitionedStreamOperatorState<Integer, Integer, Integer> groupCounter = (PartitionedStreamOperatorState<Integer, Integer, Integer>) states.get("groupCounter"); + for (Entry<Serializable, Integer> count : groupCounter.getPartitionedState().entrySet()) { + Integer key = (Integer) count.getKey(); + Integer expected = key < 3 ? 2 : 1; + assertEquals(expected, count.getValue()); + } + } + + } + public static class ModKey implements KeySelector<Integer, Serializable> { private static final long serialVersionUID = 4193026742083046736L;
