Repository: flink
Updated Branches:
  refs/heads/master d29bed383 -> 843f0cbc1


[FLINK-7362] [checkpoints] Savepoint property is lost after de/serialization of 
CheckpointProperties


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/843f0cbc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/843f0cbc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/843f0cbc

Branch: refs/heads/master
Commit: 843f0cbc105ba165c6dcf66b46dc0d0d120bbfb4
Parents: d29bed3
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Mon Aug 7 16:22:32 2017 +0200
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Tue Aug 15 15:11:50 2017 +0200

----------------------------------------------------------------------
 .../runtime/checkpoint/CheckpointProperties.java      | 14 ++++++++++++--
 .../runtime/checkpoint/CheckpointPropertiesTest.java  |  8 ++++++++
 .../runtime/checkpoint/CompletedCheckpointTest.java   |  6 +++---
 .../runtime/checkpoint/PendingCheckpointTest.java     | 14 +++++++-------
 .../checkpoint/RestoredCheckpointStatsTest.java       |  2 +-
 5 files changed, 31 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/843f0cbc/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
index 6df7e71..1233b6e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
@@ -39,6 +39,7 @@ public class CheckpointProperties implements Serializable {
        private final boolean forced;
 
        private final boolean externalize;
+       private final boolean savepoint;
 
        private final boolean discardSubsumed;
        private final boolean discardFinished;
@@ -49,6 +50,7 @@ public class CheckpointProperties implements Serializable {
        CheckpointProperties(
                        boolean forced,
                        boolean externalize,
+                       boolean savepoint,
                        boolean discardSubsumed,
                        boolean discardFinished,
                        boolean discardCancelled,
@@ -57,6 +59,7 @@ public class CheckpointProperties implements Serializable {
 
                this.forced = forced;
                this.externalize = externalize;
+               this.savepoint = savepoint;
                this.discardSubsumed = discardSubsumed;
                this.discardFinished = discardFinished;
                this.discardCancelled = discardCancelled;
@@ -183,7 +186,7 @@ public class CheckpointProperties implements Serializable {
         * @return <code>true</code> if the properties describe a savepoint, 
<code>false</code> otherwise.
         */
        public boolean isSavepoint() {
-               return this == STANDARD_SAVEPOINT;
+               return savepoint;
        }
 
        // 
------------------------------------------------------------------------
@@ -201,6 +204,7 @@ public class CheckpointProperties implements Serializable {
                CheckpointProperties that = (CheckpointProperties) o;
                return forced == that.forced &&
                                externalize == that.externalize &&
+                               savepoint == that.savepoint &&
                                discardSubsumed == that.discardSubsumed &&
                                discardFinished == that.discardFinished &&
                                discardCancelled == that.discardCancelled &&
@@ -212,6 +216,7 @@ public class CheckpointProperties implements Serializable {
        public int hashCode() {
                int result = (forced ? 1 : 0);
                result = 31 * result + (externalize ? 1 : 0);
+               result = 31 * result + (savepoint ? 1 : 0);
                result = 31 * result + (discardSubsumed ? 1 : 0);
                result = 31 * result + (discardFinished ? 1 : 0);
                result = 31 * result + (discardCancelled ? 1 : 0);
@@ -224,7 +229,8 @@ public class CheckpointProperties implements Serializable {
        public String toString() {
                return "CheckpointProperties{" +
                                "forced=" + forced +
-                               ", externalize=" + externalizeCheckpoint() +
+                               ", externalized=" + externalizeCheckpoint() +
+                               ", savepoint=" + savepoint +
                                ", discardSubsumed=" + discardSubsumed +
                                ", discardFinished=" + discardFinished +
                                ", discardCancelled=" + discardCancelled +
@@ -238,6 +244,7 @@ public class CheckpointProperties implements Serializable {
        private static final CheckpointProperties STANDARD_SAVEPOINT = new 
CheckpointProperties(
                        true,
                        true,
+                       true,
                        false,
                        false,
                        false,
@@ -247,6 +254,7 @@ public class CheckpointProperties implements Serializable {
        private static final CheckpointProperties STANDARD_CHECKPOINT = new 
CheckpointProperties(
                        false,
                        false,
+                       false,
                        true,
                        true,
                        true,
@@ -256,6 +264,7 @@ public class CheckpointProperties implements Serializable {
        private static final CheckpointProperties 
EXTERNALIZED_CHECKPOINT_RETAINED = new CheckpointProperties(
                        false,
                        true,
+                       false,
                        true,
                        true,
                        false, // Retain on cancellation
@@ -265,6 +274,7 @@ public class CheckpointProperties implements Serializable {
        private static final CheckpointProperties 
EXTERNALIZED_CHECKPOINT_DELETED = new CheckpointProperties(
                        false,
                        true,
+                       false,
                        true,
                        true,
                        true, // Delete on cancellation

http://git-wip-us.apache.org/repos/asf/flink/blob/843f0cbc/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
index 52ac54c..a0509c4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.util.InstantiationUtil;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertFalse;
@@ -109,6 +111,12 @@ public class CheckpointPropertiesTest {
                {
                        CheckpointProperties props = 
CheckpointProperties.forStandardSavepoint();
                        assertTrue(props.isSavepoint());
+
+                       CheckpointProperties deserializedCheckpointProperties =
+                               InstantiationUtil.deserializeObject(
+                                       
InstantiationUtil.serializeObject(props),
+                                       getClass().getClassLoader());
+                       
assertTrue(deserializedCheckpointProperties.isSavepoint());
                }
 
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/843f0cbc/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index 4846879..293675c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -82,7 +82,7 @@ public class CompletedCheckpointTest {
                operatorStates.put(new OperatorID(), state);
 
                boolean discardSubsumed = true;
-               CheckpointProperties props = new CheckpointProperties(false, 
false, discardSubsumed, true, true, true, true);
+               CheckpointProperties props = new CheckpointProperties(false, 
false, false, discardSubsumed, true, true, true, true);
                
                CompletedCheckpoint checkpoint = new CompletedCheckpoint(
                                new JobID(), 0, 0, 1,
@@ -122,7 +122,7 @@ public class CompletedCheckpointTest {
                        Mockito.reset(state);
 
                        // Keep
-                       CheckpointProperties props = new 
CheckpointProperties(false, true, false, false, false, false, false);
+                       CheckpointProperties props = new 
CheckpointProperties(false, true, false, false, false, false, false, false);
                        CompletedCheckpoint checkpoint = new 
CompletedCheckpoint(
                                        new JobID(), 0, 0, 1,
                                        new HashMap<>(operatorStates),
@@ -139,7 +139,7 @@ public class CompletedCheckpointTest {
                        assertEquals(true, file.exists());
 
                        // Discard
-                       props = new CheckpointProperties(false, false, true, 
true, true, true, true);
+                       props = new CheckpointProperties(false, false, false, 
true, true, true, true, true);
                        checkpoint = new CompletedCheckpoint(
                                        new JobID(), 0, 0, 1,
                                        new HashMap<>(operatorStates),

http://git-wip-us.apache.org/repos/asf/flink/blob/843f0cbc/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 7ebb49a..ef31f0a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -80,7 +80,7 @@ public class PendingCheckpointTest {
        @Test
        public void testCanBeSubsumed() throws Exception {
                // Forced checkpoints cannot be subsumed
-               CheckpointProperties forced = new CheckpointProperties(true, 
true, false, false, false, false, false);
+               CheckpointProperties forced = new CheckpointProperties(true, 
true, false, false, false, false, false, false);
                PendingCheckpoint pending = createPendingCheckpoint(forced, 
"ignored");
                assertFalse(pending.canBeSubsumed());
 
@@ -92,7 +92,7 @@ public class PendingCheckpointTest {
                }
 
                // Non-forced checkpoints can be subsumed
-               CheckpointProperties subsumed = new CheckpointProperties(false, 
true, false, false, false, false, false);
+               CheckpointProperties subsumed = new CheckpointProperties(false, 
true, false, false, false, false, false, false);
                pending = createPendingCheckpoint(subsumed, "ignored");
                assertTrue(pending.canBeSubsumed());
        }
@@ -106,7 +106,7 @@ public class PendingCheckpointTest {
                File tmp = tmpFolder.newFolder();
 
                // Persisted checkpoint
-               CheckpointProperties persisted = new 
CheckpointProperties(false, true, false, false, false, false, false);
+               CheckpointProperties persisted = new 
CheckpointProperties(false, true, false, false, false, false, false, false);
 
                PendingCheckpoint pending = createPendingCheckpoint(persisted, 
tmp.getAbsolutePath());
                pending.acknowledgeTask(ATTEMPT_ID, null, new 
CheckpointMetrics());
@@ -115,7 +115,7 @@ public class PendingCheckpointTest {
                assertEquals(1, tmp.listFiles().length);
 
                // Ephemeral checkpoint
-               CheckpointProperties ephemeral = new 
CheckpointProperties(false, false, true, true, true, true, true);
+               CheckpointProperties ephemeral = new 
CheckpointProperties(false, false, false, true, true, true, true, true);
                pending = createPendingCheckpoint(ephemeral, null);
                pending.acknowledgeTask(ATTEMPT_ID, null, new 
CheckpointMetrics());
 
@@ -130,7 +130,7 @@ public class PendingCheckpointTest {
         */
        @Test
        public void testCompletionFuture() throws Exception {
-               CheckpointProperties props = new CheckpointProperties(false, 
true, false, false, false, false, false);
+               CheckpointProperties props = new CheckpointProperties(false, 
true, false, false, false, false, false, false);
 
                // Abort declined
                PendingCheckpoint pending = createPendingCheckpoint(props, 
"ignored");
@@ -192,7 +192,7 @@ public class PendingCheckpointTest {
        @Test
        @SuppressWarnings("unchecked")
        public void testAbortDiscardsState() throws Exception {
-               CheckpointProperties props = new CheckpointProperties(false, 
true, false, false, false, false, false);
+               CheckpointProperties props = new CheckpointProperties(false, 
true, false, false, false, false, false, false);
                QueueExecutor executor = new QueueExecutor();
 
                OperatorState state = mock(OperatorState.class);
@@ -330,7 +330,7 @@ public class PendingCheckpointTest {
 
        @Test
        public void testSetCanceller() {
-               final CheckpointProperties props = new 
CheckpointProperties(false, false, true, true, true, true, true);
+               final CheckpointProperties props = new 
CheckpointProperties(false, false, false, true, true, true, true, true);
 
                PendingCheckpoint aborted = createPendingCheckpoint(props, 
null);
                aborted.abortDeclined();

http://git-wip-us.apache.org/repos/asf/flink/blob/843f0cbc/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStatsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStatsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStatsTest.java
index 85b1516..d43283d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStatsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStatsTest.java
@@ -31,7 +31,7 @@ public class RestoredCheckpointStatsTest {
        public void testSimpleAccess() throws Exception {
                long checkpointId = Integer.MAX_VALUE + 1L;
                long triggerTimestamp = Integer.MAX_VALUE + 1L;
-               CheckpointProperties props = new CheckpointProperties(true, 
true, false, false, true, false, true);
+               CheckpointProperties props = new CheckpointProperties(true, 
true, false, false, false, true, false, true);
                long restoreTimestamp = Integer.MAX_VALUE + 1L;
                String externalPath = "external-path";
 

Reply via email to