Repository: flink Updated Branches: refs/heads/master 46423b9c7 -> 51a357351
[FLINK-5467] Avoid legacy state for CheckpointedRestoring operators This closes #3102. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/51a35735 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/51a35735 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/51a35735 Branch: refs/heads/master Commit: 51a357351b955844941edd9a9b1406cdc787b18a Parents: 46423b9 Author: Stefan Richter <[email protected]> Authored: Thu Jan 12 12:24:34 2017 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Fri Jan 13 11:20:53 2017 +0100 ---------------------------------------------------------------------- .../streaming/api/operators/AbstractUdfStreamOperator.java | 6 +++--- .../org/apache/flink/test/checkpointing/RescalingITCase.java | 8 +++++++- 2 files changed, 10 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/51a35735/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java index 81f709b..15e26c9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java @@ -195,14 +195,13 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function> } catch (Exception e) { throw new Exception("Failed to draw state snapshot from function: " + e.getMessage(), e); } - } else if (userFunction instanceof CheckpointedRestoring) { - out.write(0); } } @Override public void restoreState(FSDataInputStream in) throws Exception { - if (userFunction instanceof CheckpointedRestoring) { + if (userFunction instanceof Checkpointed || + (userFunction instanceof CheckpointedRestoring && in instanceof Migration)) { @SuppressWarnings("unchecked") CheckpointedRestoring<Serializable> chkFunction = (CheckpointedRestoring<Serializable>) userFunction; @@ -219,6 +218,7 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function> } } } else if (in instanceof Migration) { + // absorb the introduced byte from the migration stream without too much further consequences int hasUdfState = in.read(); if (hasUdfState == 1) { throw new Exception("Found UDF state but operator is not instance of CheckpointedRestoring"); http://git-wip-us.apache.org/repos/asf/flink/blob/51a35735/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java index bc65abf..da4a01b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java @@ -42,6 +42,7 @@ import org.apache.flink.runtime.testingUtils.TestingCluster; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -903,7 +904,7 @@ public class RescalingITCase extends TestLogger { } } - private static class PartitionedStateSource extends StateSourceBase implements CheckpointedFunction { + private static class PartitionedStateSource extends StateSourceBase implements CheckpointedFunction, CheckpointedRestoring<Integer> { private static final long serialVersionUID = -359715965103593462L; private static final int NUM_PARTITIONS = 7; @@ -945,5 +946,10 @@ public class RescalingITCase extends TestLogger { CHECK_CORRECT_RESTORE[getRuntimeContext().getIndexOfThisSubtask()] = counter; } } + + @Override + public void restoreState(Integer state) throws Exception { + counterPartitions.add(state); + } } }
