Repository: flink Updated Branches: refs/heads/release-1.4 2b8fd0a5d -> 3d4ff17a6
[FLINK-8890] Compare checkpoints with order in CompletedCheckpoint.checkpointsMatch() This method is used, among other things, to check if a list of restored checkpoints is stable after several restore attempts in the ZooKeeper checkpoint store. The order of checkpoints is somewhat important because we want the latest checkpoint to stay the latest checkpoint. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3d4ff17a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3d4ff17a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3d4ff17a Branch: refs/heads/release-1.4 Commit: 3d4ff17a6cad02475406943af87794d21c42128b Parents: 2b8fd0a Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Wed Mar 7 11:58:07 2018 +0100 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Wed Mar 7 14:42:44 2018 +0100 ---------------------------------------------------------------------- .../runtime/checkpoint/CompletedCheckpoint.java | 12 +- .../checkpoint/CompletedCheckpointTest.java | 136 ++++++++++++++++++- 2 files changed, 141 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3d4ff17a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java index e4f7da2..bd6fdd1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java @@ -35,9 +35,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; +import java.util.List; import java.util.Map; -import java.util.Set; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -283,17 +282,18 @@ public class CompletedCheckpoint implements Serializable { public static boolean checkpointsMatch( Collection<CompletedCheckpoint> first, Collection<CompletedCheckpoint> second) { + if (first.size() != second.size()) { + return false; + } - Set<Tuple2<Long, JobID>> firstInterestingFields = - new HashSet<>(); + List<Tuple2<Long, JobID>> firstInterestingFields = new ArrayList<>(first.size()); for (CompletedCheckpoint checkpoint : first) { firstInterestingFields.add( new Tuple2<>(checkpoint.getCheckpointID(), checkpoint.getJobId())); } - Set<Tuple2<Long, JobID>> secondInterestingFields = - new HashSet<>(); + List<Tuple2<Long, JobID>> secondInterestingFields = new ArrayList<>(second.size()); for (CompletedCheckpoint checkpoint : second) { secondInterestingFields.add( http://git-wip-us.apache.org/repos/asf/flink/blob/3d4ff17a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java index 293675c..ea21e51 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java @@ -26,17 +26,22 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.filesystem.FileStateHandle; + import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.mockito.Mockito; import java.io.File; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -46,6 +51,135 @@ public class CompletedCheckpointTest { @Rule public final TemporaryFolder tmpFolder = new TemporaryFolder(); + @Test + public void testCompareCheckpointsWithDifferentOrder() { + + CompletedCheckpoint checkpoint1 = new CompletedCheckpoint( + new JobID(), 0, 0, 1, + new HashMap<>(), + Collections.emptyList(), + CheckpointProperties.forStandardCheckpoint(), + null, + null); + + CompletedCheckpoint checkpoint2 = new CompletedCheckpoint( + new JobID(), 1, 0, 1, + new HashMap<>(), + Collections.emptyList(), + CheckpointProperties.forStandardCheckpoint(), + null, + null); + + List<CompletedCheckpoint> checkpoints1= new ArrayList<>(); + checkpoints1.add(checkpoint1); + checkpoints1.add(checkpoint2); + checkpoints1.add(checkpoint1); + + List<CompletedCheckpoint> checkpoints2 = new ArrayList<>(); + checkpoints2.add(checkpoint2); + checkpoints2.add(checkpoint1); + checkpoints2.add(checkpoint2); + + assertFalse(CompletedCheckpoint.checkpointsMatch(checkpoints1, checkpoints2)); + } + + @Test + public void testCompareCheckpointsWithSameOrder() { + + CompletedCheckpoint checkpoint1 = new CompletedCheckpoint( + new JobID(), 0, 0, 1, + new HashMap<>(), + Collections.emptyList(), + CheckpointProperties.forStandardCheckpoint(), + null, + null); + + CompletedCheckpoint checkpoint2 = new CompletedCheckpoint( + new JobID(), 1, 0, 1, + new HashMap<>(), + Collections.emptyList(), + CheckpointProperties.forStandardCheckpoint(), + null, + null); + + List<CompletedCheckpoint> checkpoints1= new ArrayList<>(); + checkpoints1.add(checkpoint1); + checkpoints1.add(checkpoint2); + checkpoints1.add(checkpoint1); + + List<CompletedCheckpoint> checkpoints2 = new ArrayList<>(); + checkpoints2.add(checkpoint1); + checkpoints2.add(checkpoint2); + checkpoints2.add(checkpoint1); + + assertTrue(CompletedCheckpoint.checkpointsMatch(checkpoints1, checkpoints2)); + } + + /** + * Verify that both JobID and checkpoint id are taken into account when comparing. + */ + @Test + public void testCompareCheckpointsWithSameJobID() { + JobID jobID = new JobID(); + + CompletedCheckpoint checkpoint1 = new CompletedCheckpoint( + jobID, 0, 0, 1, + new HashMap<>(), + Collections.emptyList(), + CheckpointProperties.forStandardCheckpoint(), + null, + null); + + CompletedCheckpoint checkpoint2 = new CompletedCheckpoint( + jobID, 1, 0, 1, + new HashMap<>(), + Collections.emptyList(), + CheckpointProperties.forStandardCheckpoint(), + null, + null); + + List<CompletedCheckpoint> checkpoints1= new ArrayList<>(); + checkpoints1.add(checkpoint1); + + List<CompletedCheckpoint> checkpoints2 = new ArrayList<>(); + checkpoints2.add(checkpoint2); + + assertFalse(CompletedCheckpoint.checkpointsMatch(checkpoints1, checkpoints2)); + } + + /** + * Verify that both JobID and checkpoint id are taken into account when comparing. + */ + @Test + public void testCompareCheckpointsWithSameCheckpointId() { + JobID jobID1 = new JobID(); + JobID jobID2 = new JobID(); + + CompletedCheckpoint checkpoint1 = new CompletedCheckpoint( + jobID1, 0, 0, 1, + new HashMap<>(), + Collections.emptyList(), + CheckpointProperties.forStandardCheckpoint(), + null, + null); + + CompletedCheckpoint checkpoint2 = new CompletedCheckpoint( + jobID2, 0, 0, 1, + new HashMap<>(), + Collections.emptyList(), + CheckpointProperties.forStandardCheckpoint(), + null, + null); + + List<CompletedCheckpoint> checkpoints1= new ArrayList<>(); + checkpoints1.add(checkpoint1); + + List<CompletedCheckpoint> checkpoints2 = new ArrayList<>(); + checkpoints2.add(checkpoint2); + + assertFalse(CompletedCheckpoint.checkpointsMatch(checkpoints1, checkpoints2)); + } + /** * Tests that persistent checkpoints discard their header file. */ @@ -83,7 +217,7 @@ public class CompletedCheckpointTest { boolean discardSubsumed = true; CheckpointProperties props = new CheckpointProperties(false, false, false, discardSubsumed, true, true, true, true); - + CompletedCheckpoint checkpoint = new CompletedCheckpoint( new JobID(), 0, 0, 1, operatorStates,