Repository: flink
Updated Branches:
  refs/heads/release-1.3 0069541d7 -> f3a5685f7


[FLINK-8890] Compare checkpoints with order in 
CompletedCheckpoint.checkpointsMatch()

This method is used, among other things, to check if a list of restored
checkpoints is stable after several restore attempts in the ZooKeeper
checkpoint store. The order of checkpoints is somewhat important because
we want the latest checkpoint to stay the latest checkpoint.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f3a5685f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f3a5685f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f3a5685f

Branch: refs/heads/release-1.3
Commit: f3a5685f7e9dec074f3f231770dc797fae0dd04a
Parents: 0069541
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Wed Mar 7 11:58:07 2018 +0100
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Wed Mar 7 14:49:02 2018 +0100

----------------------------------------------------------------------
 .../runtime/checkpoint/CompletedCheckpoint.java |  12 +-
 .../checkpoint/CompletedCheckpointTest.java     | 134 +++++++++++++++++++
 2 files changed, 140 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f3a5685f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 5e7a76a..ee14e80 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -35,9 +35,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -284,17 +283,18 @@ public class CompletedCheckpoint implements Serializable {
        public static boolean checkpointsMatch(
                Collection<CompletedCheckpoint> first,
                Collection<CompletedCheckpoint> second) {
+               if (first.size() != second.size()) {
+                       return false;
+               }
 
-               Set<Tuple2<Long, JobID>> firstInterestingFields =
-                       new HashSet<>();
+               List<Tuple2<Long, JobID>> firstInterestingFields = new 
ArrayList<>(first.size());
 
                for (CompletedCheckpoint checkpoint : first) {
                        firstInterestingFields.add(
                                new Tuple2<>(checkpoint.getCheckpointID(), 
checkpoint.getJobId()));
                }
 
-               Set<Tuple2<Long, JobID>> secondInterestingFields =
-                       new HashSet<>();
+               List<Tuple2<Long, JobID>> secondInterestingFields = new 
ArrayList<>(second.size());
 
                for (CompletedCheckpoint checkpoint : second) {
                        secondInterestingFields.add(

http://git-wip-us.apache.org/repos/asf/flink/blob/f3a5685f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index 4846879..7fcef46 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -26,17 +26,22 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.mockito.Mockito;
 
 import java.io.File;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -46,6 +51,135 @@ public class CompletedCheckpointTest {
        @Rule
        public final TemporaryFolder tmpFolder = new TemporaryFolder();
 
+       @Test
+       public void testCompareCheckpointsWithDifferentOrder() {
+
+               CompletedCheckpoint checkpoint1 = new CompletedCheckpoint(
+                       new JobID(), 0, 0, 1,
+                       new HashMap<OperatorID, OperatorState>(),
+                       null,
+                       CheckpointProperties.forStandardCheckpoint(),
+                       null,
+                       null);
+
+               CompletedCheckpoint checkpoint2 = new CompletedCheckpoint(
+                       new JobID(), 1, 0, 1,
+                       new HashMap<OperatorID, OperatorState>(),
+                       null,
+                       CheckpointProperties.forStandardCheckpoint(),
+                       null,
+                       null);
+
+               List<CompletedCheckpoint> checkpoints1= new ArrayList<>();
+               checkpoints1.add(checkpoint1);
+               checkpoints1.add(checkpoint2);
+               checkpoints1.add(checkpoint1);
+
+               List<CompletedCheckpoint> checkpoints2 = new ArrayList<>();
+               checkpoints2.add(checkpoint2);
+               checkpoints2.add(checkpoint1);
+               checkpoints2.add(checkpoint2);
+
+               assertFalse(CompletedCheckpoint.checkpointsMatch(checkpoints1, 
checkpoints2));
+       }
+
+       @Test
+       public void testCompareCheckpointsWithSameOrder() {
+
+               CompletedCheckpoint checkpoint1 = new CompletedCheckpoint(
+                       new JobID(), 0, 0, 1,
+                       new HashMap<OperatorID, OperatorState>(),
+                       null,
+                       CheckpointProperties.forStandardCheckpoint(),
+                       null,
+                       null);
+
+               CompletedCheckpoint checkpoint2 = new CompletedCheckpoint(
+                       new JobID(), 1, 0, 1,
+                       new HashMap<OperatorID, OperatorState>(),
+                       null,
+                       CheckpointProperties.forStandardCheckpoint(),
+                       null,
+                       null);
+
+               List<CompletedCheckpoint> checkpoints1= new ArrayList<>();
+               checkpoints1.add(checkpoint1);
+               checkpoints1.add(checkpoint2);
+               checkpoints1.add(checkpoint1);
+
+               List<CompletedCheckpoint> checkpoints2 = new ArrayList<>();
+               checkpoints2.add(checkpoint1);
+               checkpoints2.add(checkpoint2);
+               checkpoints2.add(checkpoint1);
+
+               assertTrue(CompletedCheckpoint.checkpointsMatch(checkpoints1, 
checkpoints2));
+       }
+
+       /**
+        * Verify that both JobID and checkpoint id are taken into account when 
comparing.
+        */
+       @Test
+       public void testCompareCheckpointsWithSameJobID() {
+               JobID jobID = new JobID();
+
+               CompletedCheckpoint checkpoint1 = new CompletedCheckpoint(
+                       jobID, 0, 0, 1,
+                       new HashMap<OperatorID, OperatorState>(),
+                       null,
+                       CheckpointProperties.forStandardCheckpoint(),
+                       null,
+                       null);
+
+               CompletedCheckpoint checkpoint2 = new CompletedCheckpoint(
+                       jobID, 1, 0, 1,
+                       new HashMap<OperatorID, OperatorState>(),
+                       null,
+                       CheckpointProperties.forStandardCheckpoint(),
+                       null,
+                       null);
+
+               List<CompletedCheckpoint> checkpoints1= new ArrayList<>();
+               checkpoints1.add(checkpoint1);
+
+               List<CompletedCheckpoint> checkpoints2 = new ArrayList<>();
+               checkpoints2.add(checkpoint2);
+
+               assertFalse(CompletedCheckpoint.checkpointsMatch(checkpoints1, 
checkpoints2));
+       }
+
+       /**
+        * Verify that both JobID and checkpoint id are taken into account when 
comparing.
+        */
+       @Test
+       public void testCompareCheckpointsWithSameCheckpointId() {
+               JobID jobID1 = new JobID();
+               JobID jobID2 = new JobID();
+
+               CompletedCheckpoint checkpoint1 = new CompletedCheckpoint(
+                       jobID1, 0, 0, 1,
+                       new HashMap<OperatorID, OperatorState>(),
+                       null,
+                       CheckpointProperties.forStandardCheckpoint(),
+                       null,
+                       null);
+
+               CompletedCheckpoint checkpoint2 = new CompletedCheckpoint(
+                       jobID2, 0, 0, 1,
+                       new HashMap<OperatorID, OperatorState>(),
+                       null,
+                       CheckpointProperties.forStandardCheckpoint(),
+                       null,
+                       null);
+
+               List<CompletedCheckpoint> checkpoints1= new ArrayList<>();
+               checkpoints1.add(checkpoint1);
+
+               List<CompletedCheckpoint> checkpoints2 = new ArrayList<>();
+               checkpoints2.add(checkpoint2);
+
+               assertFalse(CompletedCheckpoint.checkpointsMatch(checkpoints1, 
checkpoints2));
+       }
+
        /**
         * Tests that persistent checkpoints discard their header file.
         */

Reply via email to