Repository: flink
Updated Branches:
  refs/heads/release-1.5 80020cb58 -> 969909bff


[FLINK-8890] Compare checkpoints with order in 
CompletedCheckpoint.checkpointsMatch()

This method is used, among other things, to check if a list of restored
checkpoints is stable after several restore attempts in the ZooKeeper
checkpoint store. The order of checkpoints is somewhat important because
we want the latest checkpoint to stay the latest checkpoint.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/969909bf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/969909bf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/969909bf

Branch: refs/heads/release-1.5
Commit: 969909bff0dd9e2f6394a623c3f7f64fe04b82b0
Parents: 80020cb
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Wed Mar 7 11:58:07 2018 +0100
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Wed Mar 7 14:38:01 2018 +0100

----------------------------------------------------------------------
 .../runtime/checkpoint/CompletedCheckpoint.java |  12 +-
 .../checkpoint/CompletedCheckpointTest.java     | 123 +++++++++++++++++++
 2 files changed, 129 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/969909bf/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 5842427..1932b19 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -38,9 +38,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -289,17 +288,18 @@ public class CompletedCheckpoint implements Serializable {
        public static boolean checkpointsMatch(
                Collection<CompletedCheckpoint> first,
                Collection<CompletedCheckpoint> second) {
+               if (first.size() != second.size()) {
+                       return false;
+               }
 
-               Set<Tuple2<Long, JobID>> firstInterestingFields =
-                       new HashSet<>();
+               List<Tuple2<Long, JobID>> firstInterestingFields = new 
ArrayList<>(first.size());
 
                for (CompletedCheckpoint checkpoint : first) {
                        firstInterestingFields.add(
                                new Tuple2<>(checkpoint.getCheckpointID(), 
checkpoint.getJobId()));
                }
 
-               Set<Tuple2<Long, JobID>> secondInterestingFields =
-                       new HashSet<>();
+               List<Tuple2<Long, JobID>> secondInterestingFields = new 
ArrayList<>(second.size());
 
                for (CompletedCheckpoint checkpoint : second) {
                        secondInterestingFields.add(

http://git-wip-us.apache.org/repos/asf/flink/blob/969909bf/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index 69003cd..5af7c76 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -31,8 +31,10 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
@@ -51,6 +53,127 @@ public class CompletedCheckpointTest {
        public final TemporaryFolder tmpFolder = new TemporaryFolder();
 
        @Test
+       public void testCompareCheckpointsWithDifferentOrder() {
+
+               CompletedCheckpoint checkpoint1 = new CompletedCheckpoint(
+                       new JobID(), 0, 0, 1,
+                       new HashMap<>(),
+                       Collections.emptyList(),
+                       
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
+                       new TestCompletedCheckpointStorageLocation());
+
+               CompletedCheckpoint checkpoint2 = new CompletedCheckpoint(
+                       new JobID(), 1, 0, 1,
+                       new HashMap<>(),
+                       Collections.emptyList(),
+                       
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
+                       new TestCompletedCheckpointStorageLocation());
+
+               List<CompletedCheckpoint> checkpoints1= new ArrayList<>();
+               checkpoints1.add(checkpoint1);
+               checkpoints1.add(checkpoint2);
+               checkpoints1.add(checkpoint1);
+
+               List<CompletedCheckpoint> checkpoints2 = new ArrayList<>();
+               checkpoints2.add(checkpoint2);
+               checkpoints2.add(checkpoint1);
+               checkpoints2.add(checkpoint2);
+
+               assertFalse(CompletedCheckpoint.checkpointsMatch(checkpoints1, 
checkpoints2));
+       }
+
+       @Test
+       public void testCompareCheckpointsWithSameOrder() {
+
+               CompletedCheckpoint checkpoint1 = new CompletedCheckpoint(
+                       new JobID(), 0, 0, 1,
+                       new HashMap<>(),
+                       Collections.emptyList(),
+                       
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
+                       new TestCompletedCheckpointStorageLocation());
+
+               CompletedCheckpoint checkpoint2 = new CompletedCheckpoint(
+                       new JobID(), 1, 0, 1,
+                       new HashMap<>(),
+                       Collections.emptyList(),
+                       
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
+                       new TestCompletedCheckpointStorageLocation());
+
+               List<CompletedCheckpoint> checkpoints1= new ArrayList<>();
+               checkpoints1.add(checkpoint1);
+               checkpoints1.add(checkpoint2);
+               checkpoints1.add(checkpoint1);
+
+               List<CompletedCheckpoint> checkpoints2 = new ArrayList<>();
+               checkpoints2.add(checkpoint1);
+               checkpoints2.add(checkpoint2);
+               checkpoints2.add(checkpoint1);
+
+               assertTrue(CompletedCheckpoint.checkpointsMatch(checkpoints1, 
checkpoints2));
+       }
+
+       /**
+        * Verify that both JobID and checkpoint id are taken into account when 
comparing.
+        */
+       @Test
+       public void testCompareCheckpointsWithSameJobID() {
+               JobID jobID = new JobID();
+
+               CompletedCheckpoint checkpoint1 = new CompletedCheckpoint(
+                       jobID, 0, 0, 1,
+                       new HashMap<>(),
+                       Collections.emptyList(),
+                       
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
+                       new TestCompletedCheckpointStorageLocation());
+
+               CompletedCheckpoint checkpoint2 = new CompletedCheckpoint(
+                       jobID, 1, 0, 1,
+                       new HashMap<>(),
+                       Collections.emptyList(),
+                       
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
+                       new TestCompletedCheckpointStorageLocation());
+
+               List<CompletedCheckpoint> checkpoints1= new ArrayList<>();
+               checkpoints1.add(checkpoint1);
+
+               List<CompletedCheckpoint> checkpoints2 = new ArrayList<>();
+               checkpoints2.add(checkpoint2);
+
+               assertFalse(CompletedCheckpoint.checkpointsMatch(checkpoints1, 
checkpoints2));
+       }
+
+       /**
+        * Verify that both JobID and checkpoint id are taken into account when 
comparing.
+        */
+       @Test
+       public void testCompareCheckpointsWithSameCheckpointId() {
+               JobID jobID1 = new JobID();
+               JobID jobID2 = new JobID();
+
+               CompletedCheckpoint checkpoint1 = new CompletedCheckpoint(
+                       jobID1, 0, 0, 1,
+                       new HashMap<>(),
+                       Collections.emptyList(),
+                       
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
+                       new TestCompletedCheckpointStorageLocation());
+
+               CompletedCheckpoint checkpoint2 = new CompletedCheckpoint(
+                       jobID2, 0, 0, 1,
+                       new HashMap<>(),
+                       Collections.emptyList(),
+                       
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
+                       new TestCompletedCheckpointStorageLocation());
+
+               List<CompletedCheckpoint> checkpoints1= new ArrayList<>();
+               checkpoints1.add(checkpoint1);
+
+               List<CompletedCheckpoint> checkpoints2 = new ArrayList<>();
+               checkpoints2.add(checkpoint2);
+
+               assertFalse(CompletedCheckpoint.checkpointsMatch(checkpoints1, 
checkpoints2));
+       }
+
+       @Test
        public void testRegisterStatesAtRegistry() {
                OperatorState state = mock(OperatorState.class);
                Map<OperatorID, OperatorState> operatorStates = new HashMap<>();

Reply via email to