Repository: flink
Updated Branches:
  refs/heads/master df4216083 -> fef9f1158


[FLINK-2294] [streaming] Fix partitioned state next-input setting for copying 
chained collectors


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fef9f115
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fef9f115
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fef9f115

Branch: refs/heads/master
Commit: fef9f115838b3ba3d3769f8669ee251c2cd403c6
Parents: df42160
Author: Gyula Fora <[email protected]>
Authored: Tue Jun 30 13:48:17 2015 +0200
Committer: Gyula Fora <[email protected]>
Committed: Tue Jun 30 14:29:11 2015 +0200

----------------------------------------------------------------------
 .../streaming/runtime/tasks/OutputHandler.java  |  1 +
 .../api/state/StatefulOperatorTest.java         | 35 ++++++++++++++++++++
 2 files changed, 36 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fef9f115/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
index 2d2f29b..73f0a89 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
@@ -274,6 +274,7 @@ public class OutputHandler<OUT> {
                @Override
                public void collect(T record) {
                        try {
+                               
operator.getRuntimeContext().setNextInput(record);
                                
operator.processElement(serializer.copy(record));
                        } catch (Exception e) {
                                if (LOG.isErrorEnabled()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/fef9f115/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
index af719f3..774b431 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
@@ -109,6 +109,11 @@ public class StatefulOperatorTest {
                        public void invoke(String value) throws Exception {}
                });
                
+               keyedStream.map(new 
StatefulMapper2()).setParallelism(1).addSink(new SinkFunction<String>() {
+                       private static final long serialVersionUID = 1L;
+                       public void invoke(String value) throws Exception {}
+               });
+               
                try {
                        keyedStream.shuffle();
                        fail();
@@ -224,6 +229,36 @@ public class StatefulOperatorTest {
                }
        }
        
+       public static class StatefulMapper2 extends RichMapFunction<Integer, 
String> {
+               private static final long serialVersionUID = 1L;
+               OperatorState<Integer> groupCounter;
+               
+               @Override
+               public String map(Integer value) throws Exception {
+                       groupCounter.updateState(groupCounter.getState() + 1);
+                       
+                       return value.toString();
+               }
+
+               @Override
+               public void open(Configuration conf) throws IOException {       
        
+                       groupCounter = 
getRuntimeContext().getOperatorState("groupCounter", 0, true);
+               }
+               
+               @SuppressWarnings({ "rawtypes", "unchecked" })
+               @Override
+               public void close() throws Exception {
+                       Map<String, StreamOperatorState> states = 
((StreamingRuntimeContext) getRuntimeContext()).getOperatorStates();
+                       PartitionedStreamOperatorState<Integer, Integer, 
Integer> groupCounter = (PartitionedStreamOperatorState<Integer, Integer, 
Integer>) states.get("groupCounter");
+                       for (Entry<Serializable, Integer> count : 
groupCounter.getPartitionedState().entrySet()) {
+                               Integer key = (Integer) count.getKey();
+                               Integer expected = key < 3 ? 2 : 1;
+                               assertEquals(expected, count.getValue());
+                       }
+               }
+               
+       }
+       
        public static class ModKey implements KeySelector<Integer, 
Serializable> {
 
                private static final long serialVersionUID = 
4193026742083046736L;

Reply via email to