[FLINK-5763] [checkpoints] Add isSavepoint() to CheckpointProperties

This closes #3345


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fcc1efcb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fcc1efcb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fcc1efcb

Branch: refs/heads/master
Commit: fcc1efcb05bce13e435946107a842727b1e3ee20
Parents: 1cb8cde
Author: Ufuk Celebi <[email protected]>
Authored: Thu Feb 16 16:52:32 2017 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Wed Feb 22 12:14:55 2017 +0100

----------------------------------------------------------------------
 .../CheckpointStatsDetailsHandler.java          |  3 +-
 .../checkpoints/CheckpointStatsHandler.java     |  5 ++-
 .../CheckpointStatsDetailsHandlerTest.java      |  6 ++--
 .../checkpoints/CheckpointStatsHandlerTest.java |  8 ++---
 .../checkpoint/CheckpointProperties.java        | 32 ++++++++++----------
 .../checkpoint/CheckpointStatsHistory.java      |  2 +-
 .../checkpoint/CheckpointPropertiesTest.java    |  8 ++---
 .../checkpoint/CheckpointStatsHistoryTest.java  |  3 ++
 8 files changed, 34 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
index 33d6cf7..d461f03 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
@@ -20,7 +20,6 @@ package 
org.apache.flink.runtime.webmonitor.handlers.checkpoints;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
-import org.apache.flink.runtime.checkpoint.CheckpointProperties;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
@@ -80,7 +79,7 @@ public class CheckpointStatsDetailsHandler extends 
AbstractExecutionGraphRequest
 
                gen.writeNumberField("id", checkpoint.getCheckpointId());
                gen.writeStringField("status", 
checkpoint.getStatus().toString());
-               gen.writeBooleanField("is_savepoint", 
CheckpointProperties.isSavepoint(checkpoint.getProperties()));
+               gen.writeBooleanField("is_savepoint", 
checkpoint.getProperties().isSavepoint());
                gen.writeNumberField("trigger_timestamp", 
checkpoint.getTriggerTimestamp());
                gen.writeNumberField("latest_ack_timestamp", 
checkpoint.getLatestAckTimestamp());
                gen.writeNumberField("state_size", checkpoint.getStateSize());

http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
index 8aab5fa..404b2c7 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
@@ -20,7 +20,6 @@ package 
org.apache.flink.runtime.webmonitor.handlers.checkpoints;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
-import org.apache.flink.runtime.checkpoint.CheckpointProperties;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
@@ -170,7 +169,7 @@ public class CheckpointStatsHandler extends 
AbstractExecutionGraphRequestHandler
                        gen.writeObjectFieldStart("restored");
                        gen.writeNumberField("id", restored.getCheckpointId());
                        gen.writeNumberField("restore_timestamp", 
restored.getRestoreTimestamp());
-                       gen.writeBooleanField("is_savepoint", 
CheckpointProperties.isSavepoint(restored.getProperties()));
+                       gen.writeBooleanField("is_savepoint", 
restored.getProperties().isSavepoint());
 
                        String externalPath = restored.getExternalPath();
                        if (externalPath != null) {
@@ -197,7 +196,7 @@ public class CheckpointStatsHandler extends 
AbstractExecutionGraphRequestHandler
                        gen.writeStartObject();
                        gen.writeNumberField("id", 
checkpoint.getCheckpointId());
                        gen.writeStringField("status", 
checkpoint.getStatus().toString());
-                       gen.writeBooleanField("is_savepoint", 
CheckpointProperties.isSavepoint(checkpoint.getProperties()));
+                       gen.writeBooleanField("is_savepoint", 
checkpoint.getProperties().isSavepoint());
                        gen.writeNumberField("trigger_timestamp", 
checkpoint.getTriggerTimestamp());
                        gen.writeNumberField("latest_ack_timestamp", 
checkpoint.getLatestAckTimestamp());
                        gen.writeNumberField("state_size", 
checkpoint.getStateSize());

http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
index fb5cfc5..dfad46d 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
@@ -128,7 +128,7 @@ public class CheckpointStatsDetailsHandlerTest {
 
                assertEquals(checkpoint.getCheckpointId(), 
rootNode.get("id").asLong());
                assertEquals(checkpoint.getStatus().toString(), 
rootNode.get("status").asText());
-               
assertEquals(CheckpointProperties.isSavepoint(checkpoint.getProperties()), 
rootNode.get("is_savepoint").asBoolean());
+               assertEquals(checkpoint.getProperties().isSavepoint(), 
rootNode.get("is_savepoint").asBoolean());
                assertEquals(checkpoint.getTriggerTimestamp(), 
rootNode.get("trigger_timestamp").asLong());
                assertEquals(checkpoint.getLatestAckTimestamp(), 
rootNode.get("latest_ack_timestamp").asLong());
                assertEquals(checkpoint.getStateSize(), 
rootNode.get("state_size").asLong());
@@ -172,7 +172,7 @@ public class CheckpointStatsDetailsHandlerTest {
 
                assertEquals(checkpoint.getCheckpointId(), 
rootNode.get("id").asLong());
                assertEquals(checkpoint.getStatus().toString(), 
rootNode.get("status").asText());
-               
assertEquals(CheckpointProperties.isSavepoint(checkpoint.getProperties()), 
rootNode.get("is_savepoint").asBoolean());
+               assertEquals(checkpoint.getProperties().isSavepoint(), 
rootNode.get("is_savepoint").asBoolean());
                assertEquals(checkpoint.getTriggerTimestamp(), 
rootNode.get("trigger_timestamp").asLong());
                assertEquals(checkpoint.getLatestAckTimestamp(), 
rootNode.get("latest_ack_timestamp").asLong());
                assertEquals(checkpoint.getStateSize(), 
rootNode.get("state_size").asLong());
@@ -218,7 +218,7 @@ public class CheckpointStatsDetailsHandlerTest {
 
                assertEquals(checkpoint.getCheckpointId(), 
rootNode.get("id").asLong());
                assertEquals(checkpoint.getStatus().toString(), 
rootNode.get("status").asText());
-               
assertEquals(CheckpointProperties.isSavepoint(checkpoint.getProperties()), 
rootNode.get("is_savepoint").asBoolean());
+               assertEquals(checkpoint.getProperties().isSavepoint(), 
rootNode.get("is_savepoint").asBoolean());
                assertEquals(checkpoint.getTriggerTimestamp(), 
rootNode.get("trigger_timestamp").asLong());
                assertEquals(checkpoint.getLatestAckTimestamp(), 
rootNode.get("latest_ack_timestamp").asLong());
                assertEquals(checkpoint.getStateSize(), 
rootNode.get("state_size").asLong());

http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
index 23a1900..29f3ae9 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
@@ -241,7 +241,7 @@ public class CheckpointStatsHandlerTest {
                JsonNode latestRestoredNode = latestNode.get("restored");
                assertEquals(latestRestored.getCheckpointId(), 
latestRestoredNode.get("id").asLong());
                assertEquals(latestRestored.getRestoreTimestamp(), 
latestRestoredNode.get("restore_timestamp").asLong());
-               
assertEquals(CheckpointProperties.isSavepoint(latestRestored.getProperties()), 
latestRestoredNode.get("is_savepoint").asBoolean());
+               assertEquals(latestRestored.getProperties().isSavepoint(), 
latestRestoredNode.get("is_savepoint").asBoolean());
                assertEquals(latestRestored.getExternalPath(), 
latestRestoredNode.get("external_path").asText());
 
                JsonNode historyNode = rootNode.get("history");
@@ -252,7 +252,7 @@ public class CheckpointStatsHandlerTest {
 
                assertEquals(inProgress.getCheckpointId(), 
inProgressNode.get("id").asLong());
                assertEquals(inProgress.getStatus().toString(), 
inProgressNode.get("status").asText());
-               
assertEquals(CheckpointProperties.isSavepoint(inProgress.getProperties()), 
inProgressNode.get("is_savepoint").asBoolean());
+               assertEquals(inProgress.getProperties().isSavepoint(), 
inProgressNode.get("is_savepoint").asBoolean());
                assertEquals(inProgress.getTriggerTimestamp(), 
inProgressNode.get("trigger_timestamp").asLong());
                assertEquals(inProgress.getLatestAckTimestamp(), 
inProgressNode.get("latest_ack_timestamp").asLong());
                assertEquals(inProgress.getStateSize(), 
inProgressNode.get("state_size").asLong());
@@ -266,7 +266,7 @@ public class CheckpointStatsHandlerTest {
 
                assertEquals(completedSavepoint.getCheckpointId(), 
completedSavepointNode.get("id").asLong());
                assertEquals(completedSavepoint.getStatus().toString(), 
completedSavepointNode.get("status").asText());
-               
assertEquals(CheckpointProperties.isSavepoint(completedSavepoint.getProperties()),
 completedSavepointNode.get("is_savepoint").asBoolean());
+               assertEquals(completedSavepoint.getProperties().isSavepoint(), 
completedSavepointNode.get("is_savepoint").asBoolean());
                assertEquals(completedSavepoint.getTriggerTimestamp(), 
completedSavepointNode.get("trigger_timestamp").asLong());
                assertEquals(completedSavepoint.getLatestAckTimestamp(), 
completedSavepointNode.get("latest_ack_timestamp").asLong());
                assertEquals(completedSavepoint.getStateSize(), 
completedSavepointNode.get("state_size").asLong());
@@ -283,7 +283,7 @@ public class CheckpointStatsHandlerTest {
 
                assertEquals(failed.getCheckpointId(), 
failedNode.get("id").asLong());
                assertEquals(failed.getStatus().toString(), 
failedNode.get("status").asText());
-               
assertEquals(CheckpointProperties.isSavepoint(failed.getProperties()), 
failedNode.get("is_savepoint").asBoolean());
+               assertEquals(failed.getProperties().isSavepoint(), 
failedNode.get("is_savepoint").asBoolean());
                assertEquals(failed.getTriggerTimestamp(), 
failedNode.get("trigger_timestamp").asLong());
                assertEquals(failed.getLatestAckTimestamp(), 
failedNode.get("latest_ack_timestamp").asLong());
                assertEquals(failed.getStateSize(), 
failedNode.get("state_size").asLong());

http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
index 4d8bab2..45c8a1b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
@@ -86,7 +86,7 @@ public class CheckpointProperties implements Serializable {
         * @see CheckpointCoordinator
         * @see PendingCheckpoint
         */
-       public boolean forceCheckpoint() {
+       boolean forceCheckpoint() {
                return forced;
        }
 
@@ -98,7 +98,7 @@ public class CheckpointProperties implements Serializable {
         *
         * @see PendingCheckpoint
         */
-       public boolean externalizeCheckpoint() {
+       boolean externalizeCheckpoint() {
                return externalize;
        }
 
@@ -117,7 +117,7 @@ public class CheckpointProperties implements Serializable {
         *
         * @see CompletedCheckpointStore
         */
-       public boolean discardOnSubsumed() {
+       boolean discardOnSubsumed() {
                return discardSubsumed;
        }
 
@@ -131,7 +131,7 @@ public class CheckpointProperties implements Serializable {
         *
         * @see CompletedCheckpointStore
         */
-       public boolean discardOnJobFinished() {
+       boolean discardOnJobFinished() {
                return discardFinished;
        }
 
@@ -145,7 +145,7 @@ public class CheckpointProperties implements Serializable {
         *
         * @see CompletedCheckpointStore
         */
-       public boolean discardOnJobCancelled() {
+       boolean discardOnJobCancelled() {
                return discardCancelled;
        }
 
@@ -159,7 +159,7 @@ public class CheckpointProperties implements Serializable {
         *
         * @see CompletedCheckpointStore
         */
-       public boolean discardOnJobFailed() {
+       boolean discardOnJobFailed() {
                return discardFailed;
        }
 
@@ -173,10 +173,19 @@ public class CheckpointProperties implements Serializable 
{
         *
         * @see CompletedCheckpointStore
         */
-       public boolean discardOnJobSuspended() {
+       boolean discardOnJobSuspended() {
                return discardSuspended;
        }
 
+       /**
+        * Returns whether the checkpoint properties describe a standard 
savepoint.
+        *
+        * @return <code>true</code> if the properties describe a savepoint, 
<code>false</code> otherwise.
+        */
+       public boolean isSavepoint() {
+               return this == STANDARD_SAVEPOINT;
+       }
+
        // 
------------------------------------------------------------------------
 
        @Override
@@ -306,13 +315,4 @@ public class CheckpointProperties implements Serializable {
                }
        }
 
-       /**
-        * Returns whether the checkpoint properties describe a standard 
savepoint.
-        *
-        * @param props Checkpoint properties to check.
-        * @return <code>true</code> if the properties describe a savepoint, 
<code>false</code> otherwise.
-        */
-       public static boolean isSavepoint(CheckpointProperties props) {
-               return STANDARD_SAVEPOINT.equals(props);
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
index 13ce642..ce14c2d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
@@ -255,7 +255,7 @@ public class CheckpointStatsHistory implements Serializable 
{
                // Update the latest checkpoint stats
                if (completedOrFailed.getStatus().isCompleted()) {
                        CompletedCheckpointStats completed = 
(CompletedCheckpointStats) completedOrFailed;
-                       if 
(CheckpointProperties.isSavepoint(completed.getProperties()) &&
+                       if (completed.getProperties().isSavepoint() &&
                                (latestSavepoint == null ||
                                        completed.getCheckpointId() > 
latestSavepoint.getCheckpointId())) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
index fb3bd65..52ac54c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
@@ -93,22 +93,22 @@ public class CheckpointPropertiesTest {
        public void testIsSavepoint() throws Exception {
                {
                        CheckpointProperties props = 
CheckpointProperties.forStandardCheckpoint();
-                       assertFalse(CheckpointProperties.isSavepoint(props));
+                       assertFalse(props.isSavepoint());
                }
 
                {
                        CheckpointProperties props = 
CheckpointProperties.forExternalizedCheckpoint(true);
-                       assertFalse(CheckpointProperties.isSavepoint(props));
+                       assertFalse(props.isSavepoint());
                }
 
                {
                        CheckpointProperties props = 
CheckpointProperties.forExternalizedCheckpoint(false);
-                       assertFalse(CheckpointProperties.isSavepoint(props));
+                       assertFalse(props.isSavepoint());
                }
 
                {
                        CheckpointProperties props = 
CheckpointProperties.forStandardSavepoint();
-                       assertTrue(CheckpointProperties.isSavepoint(props));
+                       assertTrue(props.isSavepoint());
                }
 
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/fcc1efcb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
index 7541806..3c373f1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
@@ -175,12 +175,14 @@ public class CheckpointStatsHistoryTest {
                PendingCheckpointStats pending = 
mock(PendingCheckpointStats.class);
                
when(pending.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS);
                when(pending.getCheckpointId()).thenReturn(checkpointId);
+               
when(pending.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
                return pending;
        }
 
        private CompletedCheckpointStats createCompletedCheckpointStats(long 
checkpointId) {
                CompletedCheckpointStats completed = 
mock(CompletedCheckpointStats.class);
                
when(completed.getStatus()).thenReturn(CheckpointStatsStatus.COMPLETED);
+               
when(completed.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
                when(completed.getCheckpointId()).thenReturn(checkpointId);
                return completed;
        }
@@ -188,6 +190,7 @@ public class CheckpointStatsHistoryTest {
        private FailedCheckpointStats createFailedCheckpointStats(long 
checkpointId) {
                FailedCheckpointStats failed = 
mock(FailedCheckpointStats.class);
                
when(failed.getStatus()).thenReturn(CheckpointStatsStatus.FAILED);
+               
when(failed.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
                when(failed.getCheckpointId()).thenReturn(checkpointId);
                return failed;
        }

Reply via email to