Repository: flink
Updated Branches:
  refs/heads/release-1.4 2b8fd0a5d -> 3d4ff17a6


[FLINK-8890] Compare checkpoints with order in 
CompletedCheckpoint.checkpointsMatch()

This method is used, among other things, to check if a list of restored
checkpoints is stable after several restore attempts in the ZooKeeper
checkpoint store. The order of checkpoints is somewhat important because
we want the latest checkpoint to stay the latest checkpoint.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3d4ff17a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3d4ff17a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3d4ff17a

Branch: refs/heads/release-1.4
Commit: 3d4ff17a6cad02475406943af87794d21c42128b
Parents: 2b8fd0a
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Wed Mar 7 11:58:07 2018 +0100
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Wed Mar 7 14:42:44 2018 +0100

----------------------------------------------------------------------
 .../runtime/checkpoint/CompletedCheckpoint.java |  12 +-
 .../checkpoint/CompletedCheckpointTest.java     | 136 ++++++++++++++++++-
 2 files changed, 141 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3d4ff17a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index e4f7da2..bd6fdd1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -35,9 +35,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -283,17 +282,18 @@ public class CompletedCheckpoint implements Serializable {
        public static boolean checkpointsMatch(
                Collection<CompletedCheckpoint> first,
                Collection<CompletedCheckpoint> second) {
+               if (first.size() != second.size()) {
+                       return false;
+               }
 
-               Set<Tuple2<Long, JobID>> firstInterestingFields =
-                       new HashSet<>();
+               List<Tuple2<Long, JobID>> firstInterestingFields = new 
ArrayList<>(first.size());
 
                for (CompletedCheckpoint checkpoint : first) {
                        firstInterestingFields.add(
                                new Tuple2<>(checkpoint.getCheckpointID(), 
checkpoint.getJobId()));
                }
 
-               Set<Tuple2<Long, JobID>> secondInterestingFields =
-                       new HashSet<>();
+               List<Tuple2<Long, JobID>> secondInterestingFields = new 
ArrayList<>(second.size());
 
                for (CompletedCheckpoint checkpoint : second) {
                        secondInterestingFields.add(

http://git-wip-us.apache.org/repos/asf/flink/blob/3d4ff17a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index 293675c..ea21e51 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -26,17 +26,22 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.mockito.Mockito;
 
 import java.io.File;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -46,6 +51,135 @@ public class CompletedCheckpointTest {
        @Rule
        public final TemporaryFolder tmpFolder = new TemporaryFolder();
 
+       @Test
+       public void testCompareCheckpointsWithDifferentOrder() {
+
+               CompletedCheckpoint checkpoint1 = new CompletedCheckpoint(
+                       new JobID(), 0, 0, 1,
+                       new HashMap<>(),
+                       Collections.emptyList(),
+                       CheckpointProperties.forStandardCheckpoint(),
+                       null,
+                       null);
+
+               CompletedCheckpoint checkpoint2 = new CompletedCheckpoint(
+                       new JobID(), 1, 0, 1,
+                       new HashMap<>(),
+                       Collections.emptyList(),
+                       CheckpointProperties.forStandardCheckpoint(),
+                       null,
+                       null);
+
+               List<CompletedCheckpoint> checkpoints1= new ArrayList<>();
+               checkpoints1.add(checkpoint1);
+               checkpoints1.add(checkpoint2);
+               checkpoints1.add(checkpoint1);
+
+               List<CompletedCheckpoint> checkpoints2 = new ArrayList<>();
+               checkpoints2.add(checkpoint2);
+               checkpoints2.add(checkpoint1);
+               checkpoints2.add(checkpoint2);
+
+               assertFalse(CompletedCheckpoint.checkpointsMatch(checkpoints1, 
checkpoints2));
+       }
+
+       @Test
+       public void testCompareCheckpointsWithSameOrder() {
+
+               CompletedCheckpoint checkpoint1 = new CompletedCheckpoint(
+                       new JobID(), 0, 0, 1,
+                       new HashMap<>(),
+                       Collections.emptyList(),
+                       CheckpointProperties.forStandardCheckpoint(),
+                       null,
+                       null);
+
+               CompletedCheckpoint checkpoint2 = new CompletedCheckpoint(
+                       new JobID(), 1, 0, 1,
+                       new HashMap<>(),
+                       Collections.emptyList(),
+                       CheckpointProperties.forStandardCheckpoint(),
+                       null,
+                       null);
+
+               List<CompletedCheckpoint> checkpoints1= new ArrayList<>();
+               checkpoints1.add(checkpoint1);
+               checkpoints1.add(checkpoint2);
+               checkpoints1.add(checkpoint1);
+
+               List<CompletedCheckpoint> checkpoints2 = new ArrayList<>();
+               checkpoints2.add(checkpoint1);
+               checkpoints2.add(checkpoint2);
+               checkpoints2.add(checkpoint1);
+
+               assertTrue(CompletedCheckpoint.checkpointsMatch(checkpoints1, 
checkpoints2));
+       }
+
+       /**
+        * Verify that both JobID and checkpoint id are taken into account when 
comparing.
+        */
+       @Test
+       public void testCompareCheckpointsWithSameJobID() {
+               JobID jobID = new JobID();
+
+               CompletedCheckpoint checkpoint1 = new CompletedCheckpoint(
+                       jobID, 0, 0, 1,
+                       new HashMap<>(),
+                       Collections.emptyList(),
+                       CheckpointProperties.forStandardCheckpoint(),
+                       null,
+                       null);
+
+               CompletedCheckpoint checkpoint2 = new CompletedCheckpoint(
+                       jobID, 1, 0, 1,
+                       new HashMap<>(),
+                       Collections.emptyList(),
+                       CheckpointProperties.forStandardCheckpoint(),
+                       null,
+                       null);
+
+               List<CompletedCheckpoint> checkpoints1= new ArrayList<>();
+               checkpoints1.add(checkpoint1);
+
+               List<CompletedCheckpoint> checkpoints2 = new ArrayList<>();
+               checkpoints2.add(checkpoint2);
+
+               assertFalse(CompletedCheckpoint.checkpointsMatch(checkpoints1, 
checkpoints2));
+       }
+
+       /**
+        * Verify that both JobID and checkpoint id are taken into account when 
comparing.
+        */
+       @Test
+       public void testCompareCheckpointsWithSameCheckpointId() {
+               JobID jobID1 = new JobID();
+               JobID jobID2 = new JobID();
+
+               CompletedCheckpoint checkpoint1 = new CompletedCheckpoint(
+                       jobID1, 0, 0, 1,
+                       new HashMap<>(),
+                       Collections.emptyList(),
+                       CheckpointProperties.forStandardCheckpoint(),
+                       null,
+                       null);
+
+               CompletedCheckpoint checkpoint2 = new CompletedCheckpoint(
+                       jobID2, 0, 0, 1,
+                       new HashMap<>(),
+                       Collections.emptyList(),
+                       CheckpointProperties.forStandardCheckpoint(),
+                       null,
+                       null);
+
+               List<CompletedCheckpoint> checkpoints1= new ArrayList<>();
+               checkpoints1.add(checkpoint1);
+
+               List<CompletedCheckpoint> checkpoints2 = new ArrayList<>();
+               checkpoints2.add(checkpoint2);
+
+               assertFalse(CompletedCheckpoint.checkpointsMatch(checkpoints1, 
checkpoints2));
+       }
+
        /**
         * Tests that persistent checkpoints discard their header file.
         */
@@ -83,7 +217,7 @@ public class CompletedCheckpointTest {
 
                boolean discardSubsumed = true;
                CheckpointProperties props = new CheckpointProperties(false, 
false, false, discardSubsumed, true, true, true, true);
-               
+
                CompletedCheckpoint checkpoint = new CompletedCheckpoint(
                                new JobID(), 0, 0, 1,
                                operatorStates,

Reply via email to