[FLINK-5763] [checkpoints] Add isSavepoint() to CheckpointProperties This closes #3345
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fcc1efcb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fcc1efcb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fcc1efcb Branch: refs/heads/master Commit: fcc1efcb05bce13e435946107a842727b1e3ee20 Parents: 1cb8cde Author: Ufuk Celebi <[email protected]> Authored: Thu Feb 16 16:52:32 2017 +0100 Committer: Stephan Ewen <[email protected]> Committed: Wed Feb 22 12:14:55 2017 +0100 ---------------------------------------------------------------------- .../CheckpointStatsDetailsHandler.java | 3 +- .../checkpoints/CheckpointStatsHandler.java | 5 ++- .../CheckpointStatsDetailsHandlerTest.java | 6 ++-- .../checkpoints/CheckpointStatsHandlerTest.java | 8 ++--- .../checkpoint/CheckpointProperties.java | 32 ++++++++++---------- .../checkpoint/CheckpointStatsHistory.java | 2 +- .../checkpoint/CheckpointPropertiesTest.java | 8 ++--- .../checkpoint/CheckpointStatsHistoryTest.java | 3 ++ 8 files changed, 34 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java index 33d6cf7..d461f03 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints; import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; -import org.apache.flink.runtime.checkpoint.CheckpointProperties; import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats; import org.apache.flink.runtime.checkpoint.FailedCheckpointStats; @@ -80,7 +79,7 @@ public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequest gen.writeNumberField("id", checkpoint.getCheckpointId()); gen.writeStringField("status", checkpoint.getStatus().toString()); - gen.writeBooleanField("is_savepoint", CheckpointProperties.isSavepoint(checkpoint.getProperties())); + gen.writeBooleanField("is_savepoint", checkpoint.getProperties().isSavepoint()); gen.writeNumberField("trigger_timestamp", checkpoint.getTriggerTimestamp()); gen.writeNumberField("latest_ack_timestamp", checkpoint.getLatestAckTimestamp()); gen.writeNumberField("state_size", checkpoint.getStateSize()); http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java index 8aab5fa..404b2c7 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints; import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; -import org.apache.flink.runtime.checkpoint.CheckpointProperties; import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts; import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory; import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; @@ -170,7 +169,7 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler gen.writeObjectFieldStart("restored"); gen.writeNumberField("id", restored.getCheckpointId()); gen.writeNumberField("restore_timestamp", restored.getRestoreTimestamp()); - gen.writeBooleanField("is_savepoint", CheckpointProperties.isSavepoint(restored.getProperties())); + gen.writeBooleanField("is_savepoint", restored.getProperties().isSavepoint()); String externalPath = restored.getExternalPath(); if (externalPath != null) { @@ -197,7 +196,7 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler gen.writeStartObject(); gen.writeNumberField("id", checkpoint.getCheckpointId()); gen.writeStringField("status", checkpoint.getStatus().toString()); - gen.writeBooleanField("is_savepoint", CheckpointProperties.isSavepoint(checkpoint.getProperties())); + gen.writeBooleanField("is_savepoint", checkpoint.getProperties().isSavepoint()); gen.writeNumberField("trigger_timestamp", checkpoint.getTriggerTimestamp()); gen.writeNumberField("latest_ack_timestamp", checkpoint.getLatestAckTimestamp()); gen.writeNumberField("state_size", checkpoint.getStateSize()); http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java index fb5cfc5..dfad46d 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java @@ -128,7 +128,7 @@ public class CheckpointStatsDetailsHandlerTest { assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong()); assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText()); - assertEquals(CheckpointProperties.isSavepoint(checkpoint.getProperties()), rootNode.get("is_savepoint").asBoolean()); + assertEquals(checkpoint.getProperties().isSavepoint(), rootNode.get("is_savepoint").asBoolean()); assertEquals(checkpoint.getTriggerTimestamp(), rootNode.get("trigger_timestamp").asLong()); assertEquals(checkpoint.getLatestAckTimestamp(), rootNode.get("latest_ack_timestamp").asLong()); assertEquals(checkpoint.getStateSize(), rootNode.get("state_size").asLong()); @@ -172,7 +172,7 @@ public class CheckpointStatsDetailsHandlerTest { assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong()); assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText()); - assertEquals(CheckpointProperties.isSavepoint(checkpoint.getProperties()), rootNode.get("is_savepoint").asBoolean()); + assertEquals(checkpoint.getProperties().isSavepoint(), rootNode.get("is_savepoint").asBoolean()); assertEquals(checkpoint.getTriggerTimestamp(), rootNode.get("trigger_timestamp").asLong()); assertEquals(checkpoint.getLatestAckTimestamp(), rootNode.get("latest_ack_timestamp").asLong()); assertEquals(checkpoint.getStateSize(), rootNode.get("state_size").asLong()); @@ -218,7 +218,7 @@ public class CheckpointStatsDetailsHandlerTest { assertEquals(checkpoint.getCheckpointId(), rootNode.get("id").asLong()); assertEquals(checkpoint.getStatus().toString(), rootNode.get("status").asText()); - assertEquals(CheckpointProperties.isSavepoint(checkpoint.getProperties()), rootNode.get("is_savepoint").asBoolean()); + assertEquals(checkpoint.getProperties().isSavepoint(), rootNode.get("is_savepoint").asBoolean()); assertEquals(checkpoint.getTriggerTimestamp(), rootNode.get("trigger_timestamp").asLong()); assertEquals(checkpoint.getLatestAckTimestamp(), rootNode.get("latest_ack_timestamp").asLong()); assertEquals(checkpoint.getStateSize(), rootNode.get("state_size").asLong()); http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java index 23a1900..29f3ae9 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java @@ -241,7 +241,7 @@ public class CheckpointStatsHandlerTest { JsonNode latestRestoredNode = latestNode.get("restored"); assertEquals(latestRestored.getCheckpointId(), latestRestoredNode.get("id").asLong()); assertEquals(latestRestored.getRestoreTimestamp(), latestRestoredNode.get("restore_timestamp").asLong()); - assertEquals(CheckpointProperties.isSavepoint(latestRestored.getProperties()), latestRestoredNode.get("is_savepoint").asBoolean()); + assertEquals(latestRestored.getProperties().isSavepoint(), latestRestoredNode.get("is_savepoint").asBoolean()); assertEquals(latestRestored.getExternalPath(), latestRestoredNode.get("external_path").asText()); JsonNode historyNode = rootNode.get("history"); @@ -252,7 +252,7 @@ public class CheckpointStatsHandlerTest { assertEquals(inProgress.getCheckpointId(), inProgressNode.get("id").asLong()); assertEquals(inProgress.getStatus().toString(), inProgressNode.get("status").asText()); - assertEquals(CheckpointProperties.isSavepoint(inProgress.getProperties()), inProgressNode.get("is_savepoint").asBoolean()); + assertEquals(inProgress.getProperties().isSavepoint(), inProgressNode.get("is_savepoint").asBoolean()); assertEquals(inProgress.getTriggerTimestamp(), inProgressNode.get("trigger_timestamp").asLong()); assertEquals(inProgress.getLatestAckTimestamp(), inProgressNode.get("latest_ack_timestamp").asLong()); assertEquals(inProgress.getStateSize(), inProgressNode.get("state_size").asLong()); @@ -266,7 +266,7 @@ public class CheckpointStatsHandlerTest { assertEquals(completedSavepoint.getCheckpointId(), completedSavepointNode.get("id").asLong()); assertEquals(completedSavepoint.getStatus().toString(), completedSavepointNode.get("status").asText()); - assertEquals(CheckpointProperties.isSavepoint(completedSavepoint.getProperties()), completedSavepointNode.get("is_savepoint").asBoolean()); + assertEquals(completedSavepoint.getProperties().isSavepoint(), completedSavepointNode.get("is_savepoint").asBoolean()); assertEquals(completedSavepoint.getTriggerTimestamp(), completedSavepointNode.get("trigger_timestamp").asLong()); assertEquals(completedSavepoint.getLatestAckTimestamp(), completedSavepointNode.get("latest_ack_timestamp").asLong()); assertEquals(completedSavepoint.getStateSize(), completedSavepointNode.get("state_size").asLong()); @@ -283,7 +283,7 @@ public class CheckpointStatsHandlerTest { assertEquals(failed.getCheckpointId(), failedNode.get("id").asLong()); assertEquals(failed.getStatus().toString(), failedNode.get("status").asText()); - assertEquals(CheckpointProperties.isSavepoint(failed.getProperties()), failedNode.get("is_savepoint").asBoolean()); + assertEquals(failed.getProperties().isSavepoint(), failedNode.get("is_savepoint").asBoolean()); assertEquals(failed.getTriggerTimestamp(), failedNode.get("trigger_timestamp").asLong()); assertEquals(failed.getLatestAckTimestamp(), failedNode.get("latest_ack_timestamp").asLong()); assertEquals(failed.getStateSize(), failedNode.get("state_size").asLong()); http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java index 4d8bab2..45c8a1b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java @@ -86,7 +86,7 @@ public class CheckpointProperties implements Serializable { * @see CheckpointCoordinator * @see PendingCheckpoint */ - public boolean forceCheckpoint() { + boolean forceCheckpoint() { return forced; } @@ -98,7 +98,7 @@ public class CheckpointProperties implements Serializable { * * @see PendingCheckpoint */ - public boolean externalizeCheckpoint() { + boolean externalizeCheckpoint() { return externalize; } @@ -117,7 +117,7 @@ public class CheckpointProperties implements Serializable { * * @see CompletedCheckpointStore */ - public boolean discardOnSubsumed() { + boolean discardOnSubsumed() { return discardSubsumed; } @@ -131,7 +131,7 @@ public class CheckpointProperties implements Serializable { * * @see CompletedCheckpointStore */ - public boolean discardOnJobFinished() { + boolean discardOnJobFinished() { return discardFinished; } @@ -145,7 +145,7 @@ public class CheckpointProperties implements Serializable { * * @see CompletedCheckpointStore */ - public boolean discardOnJobCancelled() { + boolean discardOnJobCancelled() { return discardCancelled; } @@ -159,7 +159,7 @@ public class CheckpointProperties implements Serializable { * * @see CompletedCheckpointStore */ - public boolean discardOnJobFailed() { + boolean discardOnJobFailed() { return discardFailed; } @@ -173,10 +173,19 @@ public class CheckpointProperties implements Serializable { * * @see CompletedCheckpointStore */ - public boolean discardOnJobSuspended() { + boolean discardOnJobSuspended() { return discardSuspended; } + /** + * Returns whether the checkpoint properties describe a standard savepoint. + * + * @return <code>true</code> if the properties describe a savepoint, <code>false</code> otherwise. + */ + public boolean isSavepoint() { + return this == STANDARD_SAVEPOINT; + } + // ------------------------------------------------------------------------ @Override @@ -306,13 +315,4 @@ public class CheckpointProperties implements Serializable { } } - /** - * Returns whether the checkpoint properties describe a standard savepoint. - * - * @param props Checkpoint properties to check. - * @return <code>true</code> if the properties describe a savepoint, <code>false</code> otherwise. - */ - public static boolean isSavepoint(CheckpointProperties props) { - return STANDARD_SAVEPOINT.equals(props); - } } http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java index 13ce642..ce14c2d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java @@ -255,7 +255,7 @@ public class CheckpointStatsHistory implements Serializable { // Update the latest checkpoint stats if (completedOrFailed.getStatus().isCompleted()) { CompletedCheckpointStats completed = (CompletedCheckpointStats) completedOrFailed; - if (CheckpointProperties.isSavepoint(completed.getProperties()) && + if (completed.getProperties().isSavepoint() && (latestSavepoint == null || completed.getCheckpointId() > latestSavepoint.getCheckpointId())) { http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java index fb3bd65..52ac54c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java @@ -93,22 +93,22 @@ public class CheckpointPropertiesTest { public void testIsSavepoint() throws Exception { { CheckpointProperties props = CheckpointProperties.forStandardCheckpoint(); - assertFalse(CheckpointProperties.isSavepoint(props)); + assertFalse(props.isSavepoint()); } { CheckpointProperties props = CheckpointProperties.forExternalizedCheckpoint(true); - assertFalse(CheckpointProperties.isSavepoint(props)); + assertFalse(props.isSavepoint()); } { CheckpointProperties props = CheckpointProperties.forExternalizedCheckpoint(false); - assertFalse(CheckpointProperties.isSavepoint(props)); + assertFalse(props.isSavepoint()); } { CheckpointProperties props = CheckpointProperties.forStandardSavepoint(); - assertTrue(CheckpointProperties.isSavepoint(props)); + assertTrue(props.isSavepoint()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java index 7541806..3c373f1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java @@ -175,12 +175,14 @@ public class CheckpointStatsHistoryTest { PendingCheckpointStats pending = mock(PendingCheckpointStats.class); when(pending.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS); when(pending.getCheckpointId()).thenReturn(checkpointId); + when(pending.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint()); return pending; } private CompletedCheckpointStats createCompletedCheckpointStats(long checkpointId) { CompletedCheckpointStats completed = mock(CompletedCheckpointStats.class); when(completed.getStatus()).thenReturn(CheckpointStatsStatus.COMPLETED); + when(completed.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint()); when(completed.getCheckpointId()).thenReturn(checkpointId); return completed; } @@ -188,6 +190,7 @@ public class CheckpointStatsHistoryTest { private FailedCheckpointStats createFailedCheckpointStats(long checkpointId) { FailedCheckpointStats failed = mock(FailedCheckpointStats.class); when(failed.getStatus()).thenReturn(CheckpointStatsStatus.FAILED); + when(failed.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint()); when(failed.getCheckpointId()).thenReturn(checkpointId); return failed; }
