Repository: flink Updated Branches: refs/heads/master d29bed383 -> 843f0cbc1
[FLINK-7362] [checkpoints] Savepoint property is lost after de/serialization of CheckpointProperties Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/843f0cbc Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/843f0cbc Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/843f0cbc Branch: refs/heads/master Commit: 843f0cbc105ba165c6dcf66b46dc0d0d120bbfb4 Parents: d29bed3 Author: Stefan Richter <s.rich...@data-artisans.com> Authored: Mon Aug 7 16:22:32 2017 +0200 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Tue Aug 15 15:11:50 2017 +0200 ---------------------------------------------------------------------- .../runtime/checkpoint/CheckpointProperties.java | 14 ++++++++++++-- .../runtime/checkpoint/CheckpointPropertiesTest.java | 8 ++++++++ .../runtime/checkpoint/CompletedCheckpointTest.java | 6 +++--- .../runtime/checkpoint/PendingCheckpointTest.java | 14 +++++++------- .../checkpoint/RestoredCheckpointStatsTest.java | 2 +- 5 files changed, 31 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/843f0cbc/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java index 6df7e71..1233b6e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java @@ -39,6 +39,7 @@ public class CheckpointProperties implements Serializable { private final boolean forced; private final boolean externalize; + private final boolean savepoint; private final boolean discardSubsumed; private final boolean discardFinished; @@ -49,6 +50,7 @@ public class CheckpointProperties implements Serializable { CheckpointProperties( boolean forced, boolean externalize, + boolean savepoint, boolean discardSubsumed, boolean discardFinished, boolean discardCancelled, @@ -57,6 +59,7 @@ public class CheckpointProperties implements Serializable { this.forced = forced; this.externalize = externalize; + this.savepoint = savepoint; this.discardSubsumed = discardSubsumed; this.discardFinished = discardFinished; this.discardCancelled = discardCancelled; @@ -183,7 +186,7 @@ public class CheckpointProperties implements Serializable { * @return <code>true</code> if the properties describe a savepoint, <code>false</code> otherwise. */ public boolean isSavepoint() { - return this == STANDARD_SAVEPOINT; + return savepoint; } // ------------------------------------------------------------------------ @@ -201,6 +204,7 @@ public class CheckpointProperties implements Serializable { CheckpointProperties that = (CheckpointProperties) o; return forced == that.forced && externalize == that.externalize && + savepoint == that.savepoint && discardSubsumed == that.discardSubsumed && discardFinished == that.discardFinished && discardCancelled == that.discardCancelled && @@ -212,6 +216,7 @@ public class CheckpointProperties implements Serializable { public int hashCode() { int result = (forced ? 1 : 0); result = 31 * result + (externalize ? 1 : 0); + result = 31 * result + (savepoint ? 1 : 0); result = 31 * result + (discardSubsumed ? 1 : 0); result = 31 * result + (discardFinished ? 1 : 0); result = 31 * result + (discardCancelled ? 1 : 0); @@ -224,7 +229,8 @@ public class CheckpointProperties implements Serializable { public String toString() { return "CheckpointProperties{" + "forced=" + forced + - ", externalize=" + externalizeCheckpoint() + + ", externalized=" + externalizeCheckpoint() + + ", savepoint=" + savepoint + ", discardSubsumed=" + discardSubsumed + ", discardFinished=" + discardFinished + ", discardCancelled=" + discardCancelled + @@ -238,6 +244,7 @@ public class CheckpointProperties implements Serializable { private static final CheckpointProperties STANDARD_SAVEPOINT = new CheckpointProperties( true, true, + true, false, false, false, @@ -247,6 +254,7 @@ public class CheckpointProperties implements Serializable { private static final CheckpointProperties STANDARD_CHECKPOINT = new CheckpointProperties( false, false, + false, true, true, true, @@ -256,6 +264,7 @@ public class CheckpointProperties implements Serializable { private static final CheckpointProperties EXTERNALIZED_CHECKPOINT_RETAINED = new CheckpointProperties( false, true, + false, true, true, false, // Retain on cancellation @@ -265,6 +274,7 @@ public class CheckpointProperties implements Serializable { private static final CheckpointProperties EXTERNALIZED_CHECKPOINT_DELETED = new CheckpointProperties( false, true, + false, true, true, true, // Delete on cancellation http://git-wip-us.apache.org/repos/asf/flink/blob/843f0cbc/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java index 52ac54c..a0509c4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.util.InstantiationUtil; + import org.junit.Test; import static org.junit.Assert.assertFalse; @@ -109,6 +111,12 @@ public class CheckpointPropertiesTest { { CheckpointProperties props = CheckpointProperties.forStandardSavepoint(); assertTrue(props.isSavepoint()); + + CheckpointProperties deserializedCheckpointProperties = + InstantiationUtil.deserializeObject( + InstantiationUtil.serializeObject(props), + getClass().getClassLoader()); + assertTrue(deserializedCheckpointProperties.isSavepoint()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/843f0cbc/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java index 4846879..293675c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java @@ -82,7 +82,7 @@ public class CompletedCheckpointTest { operatorStates.put(new OperatorID(), state); boolean discardSubsumed = true; - CheckpointProperties props = new CheckpointProperties(false, false, discardSubsumed, true, true, true, true); + CheckpointProperties props = new CheckpointProperties(false, false, false, discardSubsumed, true, true, true, true); CompletedCheckpoint checkpoint = new CompletedCheckpoint( new JobID(), 0, 0, 1, @@ -122,7 +122,7 @@ public class CompletedCheckpointTest { Mockito.reset(state); // Keep - CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false); + CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false, false); CompletedCheckpoint checkpoint = new CompletedCheckpoint( new JobID(), 0, 0, 1, new HashMap<>(operatorStates), @@ -139,7 +139,7 @@ public class CompletedCheckpointTest { assertEquals(true, file.exists()); // Discard - props = new CheckpointProperties(false, false, true, true, true, true, true); + props = new CheckpointProperties(false, false, false, true, true, true, true, true); checkpoint = new CompletedCheckpoint( new JobID(), 0, 0, 1, new HashMap<>(operatorStates), http://git-wip-us.apache.org/repos/asf/flink/blob/843f0cbc/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java index 7ebb49a..ef31f0a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java @@ -80,7 +80,7 @@ public class PendingCheckpointTest { @Test public void testCanBeSubsumed() throws Exception { // Forced checkpoints cannot be subsumed - CheckpointProperties forced = new CheckpointProperties(true, true, false, false, false, false, false); + CheckpointProperties forced = new CheckpointProperties(true, true, false, false, false, false, false, false); PendingCheckpoint pending = createPendingCheckpoint(forced, "ignored"); assertFalse(pending.canBeSubsumed()); @@ -92,7 +92,7 @@ public class PendingCheckpointTest { } // Non-forced checkpoints can be subsumed - CheckpointProperties subsumed = new CheckpointProperties(false, true, false, false, false, false, false); + CheckpointProperties subsumed = new CheckpointProperties(false, true, false, false, false, false, false, false); pending = createPendingCheckpoint(subsumed, "ignored"); assertTrue(pending.canBeSubsumed()); } @@ -106,7 +106,7 @@ public class PendingCheckpointTest { File tmp = tmpFolder.newFolder(); // Persisted checkpoint - CheckpointProperties persisted = new CheckpointProperties(false, true, false, false, false, false, false); + CheckpointProperties persisted = new CheckpointProperties(false, true, false, false, false, false, false, false); PendingCheckpoint pending = createPendingCheckpoint(persisted, tmp.getAbsolutePath()); pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics()); @@ -115,7 +115,7 @@ public class PendingCheckpointTest { assertEquals(1, tmp.listFiles().length); // Ephemeral checkpoint - CheckpointProperties ephemeral = new CheckpointProperties(false, false, true, true, true, true, true); + CheckpointProperties ephemeral = new CheckpointProperties(false, false, false, true, true, true, true, true); pending = createPendingCheckpoint(ephemeral, null); pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics()); @@ -130,7 +130,7 @@ public class PendingCheckpointTest { */ @Test public void testCompletionFuture() throws Exception { - CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false); + CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false, false); // Abort declined PendingCheckpoint pending = createPendingCheckpoint(props, "ignored"); @@ -192,7 +192,7 @@ public class PendingCheckpointTest { @Test @SuppressWarnings("unchecked") public void testAbortDiscardsState() throws Exception { - CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false); + CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false, false); QueueExecutor executor = new QueueExecutor(); OperatorState state = mock(OperatorState.class); @@ -330,7 +330,7 @@ public class PendingCheckpointTest { @Test public void testSetCanceller() { - final CheckpointProperties props = new CheckpointProperties(false, false, true, true, true, true, true); + final CheckpointProperties props = new CheckpointProperties(false, false, false, true, true, true, true, true); PendingCheckpoint aborted = createPendingCheckpoint(props, null); aborted.abortDeclined(); http://git-wip-us.apache.org/repos/asf/flink/blob/843f0cbc/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStatsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStatsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStatsTest.java index 85b1516..d43283d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStatsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStatsTest.java @@ -31,7 +31,7 @@ public class RestoredCheckpointStatsTest { public void testSimpleAccess() throws Exception { long checkpointId = Integer.MAX_VALUE + 1L; long triggerTimestamp = Integer.MAX_VALUE + 1L; - CheckpointProperties props = new CheckpointProperties(true, true, false, false, true, false, true); + CheckpointProperties props = new CheckpointProperties(true, true, false, false, false, true, false, true); long restoreTimestamp = Integer.MAX_VALUE + 1L; String externalPath = "external-path";