Repository: flink
Updated Branches:
  refs/heads/master 46423b9c7 -> 51a357351


[FLINK-5467] Avoid legacy state for CheckpointedRestoring operators

This closes #3102.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/51a35735
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/51a35735
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/51a35735

Branch: refs/heads/master
Commit: 51a357351b955844941edd9a9b1406cdc787b18a
Parents: 46423b9
Author: Stefan Richter <[email protected]>
Authored: Thu Jan 12 12:24:34 2017 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Fri Jan 13 11:20:53 2017 +0100

----------------------------------------------------------------------
 .../streaming/api/operators/AbstractUdfStreamOperator.java   | 6 +++---
 .../org/apache/flink/test/checkpointing/RescalingITCase.java | 8 +++++++-
 2 files changed, 10 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/51a35735/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index 81f709b..15e26c9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -195,14 +195,13 @@ public abstract class AbstractUdfStreamOperator<OUT, F 
extends Function>
                        } catch (Exception e) {
                                throw new Exception("Failed to draw state 
snapshot from function: " + e.getMessage(), e);
                        }
-               } else if (userFunction instanceof CheckpointedRestoring) {
-                       out.write(0);
                }
        }
 
        @Override
        public void restoreState(FSDataInputStream in) throws Exception {
-               if (userFunction instanceof CheckpointedRestoring) {
+               if (userFunction instanceof Checkpointed ||
+                               (userFunction instanceof CheckpointedRestoring 
&& in instanceof Migration)) {
                        @SuppressWarnings("unchecked")
                        CheckpointedRestoring<Serializable> chkFunction = 
(CheckpointedRestoring<Serializable>) userFunction;
 
@@ -219,6 +218,7 @@ public abstract class AbstractUdfStreamOperator<OUT, F 
extends Function>
                                }
                        }
                } else if (in instanceof Migration) {
+                       // absorb the introduced byte from the migration stream 
without too much further consequences
                        int hasUdfState = in.read();
                        if (hasUdfState == 1) {
                                throw new Exception("Found UDF state but 
operator is not instance of CheckpointedRestoring");

http://git-wip-us.apache.org/repos/asf/flink/blob/51a35735/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index bc65abf..da4a01b 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -903,7 +904,7 @@ public class RescalingITCase extends TestLogger {
                }
        }
 
-       private static class PartitionedStateSource extends StateSourceBase 
implements CheckpointedFunction {
+       private static class PartitionedStateSource extends StateSourceBase 
implements CheckpointedFunction, CheckpointedRestoring<Integer> {
 
                private static final long serialVersionUID = 
-359715965103593462L;
                private static final int NUM_PARTITIONS = 7;
@@ -945,5 +946,10 @@ public class RescalingITCase extends TestLogger {
                                
CHECK_CORRECT_RESTORE[getRuntimeContext().getIndexOfThisSubtask()] = counter;
                        }
                }
+
+               @Override
+               public void restoreState(Integer state) throws Exception {
+                       counterPartitions.add(state);
+               }
        }
 }

Reply via email to