[FLINK-4512] [FLIP-10] Add option to persist periodic checkpoints

[FLINK-4509] [FLIP-10] Specify savepoint directory per savepoint
[FLINK-4507] [FLIP-10] Deprecate savepoint backend config

This closes #2608.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fd410d9f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fd410d9f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fd410d9f

Branch: refs/heads/master
Commit: fd410d9f6d1c8d53fb721752528ebd77fd78db57
Parents: c7d1a3b
Author: Ufuk Celebi <u...@apache.org>
Authored: Thu Oct 6 16:43:42 2016 +0200
Committer: Ufuk Celebi <u...@apache.org>
Committed: Fri Oct 14 10:05:06 2016 +0200

----------------------------------------------------------------------
 docs/setup/cli.md                               |   4 +-
 docs/setup/savepoints.md                        |  27 +-
 .../org/apache/flink/client/CliFrontend.java    |  19 +-
 .../flink/client/cli/CliFrontendParser.java     |   2 +-
 .../flink/client/CliFrontendSavepointTest.java  |  55 ++-
 .../flink/configuration/ConfigConstants.java    |  18 +
 .../clusterframework/MesosJobManager.scala      |   3 -
 .../jobmanager/JMXJobManagerMetricTest.java     |   3 +-
 .../handlers/JobCheckpointsHandler.java         |  23 +-
 .../handlers/JobCheckpointsHandlerTest.java     |   5 +
 .../jobs/job.plan.node.checkpoints.job.jade     |   6 +
 .../web-dashboard/web/css/index.css             |   3 -
 flink-runtime-web/web-dashboard/web/js/index.js | 188 ++++----
 .../web-dashboard/web/js/vendor.js              |   4 +-
 .../jobs/job.plan.node.checkpoints.job.html     |   4 +
 .../checkpoint/CheckpointCoordinator.java       | 128 +++---
 .../runtime/checkpoint/CheckpointIDCounter.java |  20 +-
 .../checkpoint/CheckpointProperties.java        | 234 +++++++++-
 .../runtime/checkpoint/CompletedCheckpoint.java |  81 +++-
 .../checkpoint/CompletedCheckpointStore.java    |  19 +-
 .../runtime/checkpoint/PendingCheckpoint.java   | 150 +++++--
 .../runtime/checkpoint/PendingSavepoint.java    | 159 -------
 .../StandaloneCheckpointIDCounter.java          |   6 +-
 .../StandaloneCheckpointRecoveryFactory.java    |   2 +-
 .../StandaloneCompletedCheckpointStore.java     |  31 +-
 .../ZooKeeperCheckpointIDCounter.java           |  24 +-
 .../ZooKeeperCheckpointRecoveryFactory.java     |   2 +-
 .../ZooKeeperCompletedCheckpointStore.java      |  98 +++--
 .../checkpoint/savepoint/FsSavepointStore.java  | 190 --------
 .../savepoint/HeapSavepointStore.java           | 157 -------
 .../checkpoint/savepoint/SavepointLoader.java   |  21 +-
 .../savepoint/SavepointSerializer.java          |   2 +-
 .../checkpoint/savepoint/SavepointStore.java    | 158 ++++++-
 .../savepoint/SavepointStoreFactory.java        |  97 -----
 .../checkpoint/stats/JobCheckpointStats.java    |   7 +
 .../stats/SimpleCheckpointStatsTracker.java     |  23 +
 .../runtime/executiongraph/ExecutionGraph.java  |  19 +-
 .../executiongraph/ExecutionGraphBuilder.java   |  15 +-
 .../tasks/ExternalizedCheckpointSettings.java   |  68 +++
 .../jobgraph/tasks/JobSnapshottingSettings.java |  28 +-
 .../flink/runtime/util/ZooKeeperUtils.java      |   5 +-
 .../ContaineredJobManager.scala                 |   3 -
 .../flink/runtime/jobmanager/JobManager.scala   | 124 +++---
 .../runtime/messages/JobManagerMessages.scala   |   5 +-
 .../minicluster/LocalFlinkMiniCluster.scala     |   5 -
 .../checkpoint/CheckpointCoordinatorTest.java   | 429 +++++++++++++------
 .../checkpoint/CheckpointIDCounterTest.java     |  11 +-
 .../checkpoint/CheckpointPropertiesTest.java    |  55 ++-
 .../checkpoint/CheckpointStateRestoreTest.java  |  19 +-
 .../CompletedCheckpointStoreTest.java           |  54 ++-
 .../checkpoint/CompletedCheckpointTest.java     |  83 +++-
 .../checkpoint/CoordinatorShutdownTest.java     |  13 +-
 ...ExecutionGraphCheckpointCoordinatorTest.java |  18 +-
 .../checkpoint/PendingCheckpointTest.java       | 149 +++++--
 .../checkpoint/PendingSavepointTest.java        | 140 ------
 .../StandaloneCompletedCheckpointStoreTest.java |  14 +-
 ...ZooKeeperCompletedCheckpointStoreITCase.java |  16 +-
 .../savepoint/FsSavepointStoreTest.java         | 235 ----------
 .../savepoint/SavepointLoaderTest.java          |  40 +-
 .../savepoint/SavepointStoreFactoryTest.java    |  91 ----
 .../savepoint/SavepointStoreTest.java           | 237 ++++++++++
 .../stats/SimpleCheckpointStatsTrackerTest.java |   3 +-
 .../jobmanager/JobManagerHARecoveryTest.java    |  26 +-
 .../JobManagerLeaderElectionTest.java           |   4 -
 .../runtime/jobmanager/JobManagerITCase.scala   |  56 ++-
 .../runtime/testingUtils/TestingCluster.scala   |   7 +-
 .../testingUtils/TestingJobManager.scala        |   5 +-
 .../testingUtils/TestingJobManagerLike.scala    |   3 +-
 .../runtime/testingUtils/TestingUtils.scala     |  21 +-
 .../api/environment/CheckpointConfig.java       | 103 ++++-
 .../environment/StreamExecutionEnvironment.java | 124 +++---
 .../flink/streaming/api/graph/StreamConfig.java |   1 -
 .../api/graph/StreamingJobGraphGenerator.java   |  19 +-
 .../test/checkpointing/RescalingITCase.java     |  14 +-
 .../test/checkpointing/SavepointITCase.java     | 174 +-------
 .../test/classloading/ClassLoaderITCase.java    |   7 +-
 .../flink/yarn/TestingYarnJobManager.scala      |   2 -
 .../org/apache/flink/yarn/YarnJobManager.scala  |   3 -
 78 files changed, 2277 insertions(+), 2144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/docs/setup/cli.md
----------------------------------------------------------------------
diff --git a/docs/setup/cli.md b/docs/setup/cli.md
index 5153787..251a0f6 100644
--- a/docs/setup/cli.md
+++ b/docs/setup/cli.md
@@ -137,11 +137,13 @@ This allows the job to finish processing all inflight 
data.
 #### Trigger a savepoint
 
 {% highlight bash %}
-./bin/flink savepoint <jobID>
+./bin/flink savepoint <jobID> [savepointDirectory]
 {% endhighlight %}
 
 Returns the path of the created savepoint. You need this path to restore and 
dispose savepoints.
 
+You can optionally specify a `savepointDirectory` when triggering the 
savepoint. If you don't specify one here, you need to configure a default 
savepoint directory for the Flink installation (see 
[[savepoint.html#configuration]]).
+
 #### **Restore a savepoint**
 
 {% highlight bash %}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/docs/setup/savepoints.md
----------------------------------------------------------------------
diff --git a/docs/setup/savepoints.md b/docs/setup/savepoints.md
index 9f0655c..231a4ff 100644
--- a/docs/setup/savepoints.md
+++ b/docs/setup/savepoints.md
@@ -41,32 +41,15 @@ Note that **s<sub>1</sub>** is only a **pointer to the 
actual checkpoint data c<
 
 ## Configuration
 
-Savepoints point to regular checkpoints and store their state in a configured 
[state backend]({{ site.baseurl }}/dev/state_backends.html). Currently, the 
supported state backends are **jobmanager** and **filesystem**. The state 
backend configuration for the regular periodic checkpoints is **independent** 
of the savepoint state backend configuration. Checkpoint data is **not copied** 
for savepoints, but points to the configured checkpoint state backend.
-
-### JobManager
-
-This is the **default backend** for savepoints.
-
-Savepoints are stored on the heap of the job manager. They are *lost* after 
the job manager is shut down. This mode is only useful if you want to *stop* 
and *resume* your program while the **same cluster** keeps running. It is *not 
recommended* for production use. Savepoints are *not* part of the [job 
manager's highly available]({{ site.baseurl 
}}/setup/jobmanager_high_availability.html) state.
-
-<pre>
-savepoints.state.backend: jobmanager
-</pre>
-
-**Note**: If you don't configure a specific state backend for the savepoints, 
the jobmanager backend will be used.
-
-### File system
-
-Savepoints are stored in the configured **file system directory**. They are 
available between cluster instances and allow to move your program to another 
cluster.
+Savepoints are stored in a configured **file system directory**. They are 
available between cluster instances and allow you to move your program to 
another cluster.
 
 <pre>
-savepoints.state.backend: filesystem
-savepoints.state.backend.fs.dir: hdfs:///flink/savepoints
+state.savepoints.dir: hdfs:///flink/savepoints
 </pre>
 
-**Note**: If you don't configure a specific directory, the job manager backend 
will be used.
+**Note**: If you don't configure a specific directory, triggering the 
savepoint will fail.
 
-**Important**: A savepoint is a pointer to a completed checkpoint. That means 
that the state of a savepoint is not only found in the savepoint file itself, 
but also needs the actual checkpoint data (e.g. in a set of further files). 
Therefore, using the *filesystem* backend for savepoints and the *jobmanager* 
backend for checkpoints does not work, because the required checkpoint data 
won't be available after a job manager restart.
+**Important**: A savepoint is a pointer to a completed checkpoint. That means 
that the state of a savepoint is not only found in the savepoint file itself, 
but also needs the actual checkpoint data (e.g. in a set of further files).
 
 ## Changes to your program
 
@@ -105,5 +88,3 @@ You control the savepoints via the [command line client]({{ 
site.baseurl }}/setu
 - **Parallelism**: When restoring a savepoint, the parallelism of the program 
has to match the parallelism of the original program from which the savepoint 
was drawn. There is no mechanism to re-partition the savepoint's state yet.
 
 - **Chaining**: Chained operators are identified by the ID of the first task. 
It's not possible to manually assign an ID to an intermediate chained task, 
e.g. in the chain `[  a -> b -> c ]` only **a** can have its ID assigned 
manually, but not **b** or **c**. To work around this, you can [manually define 
the task chains](index.html#task-chaining-and-resource-groups). If you rely on 
the automatic ID assignment, a change in the chaining behaviour will also 
change the IDs.
-
-- **Disposing custom state handles**: Disposing an old savepoint does not work 
with custom state handles (if you are using a custom state backend), because 
the user code class loader is not available during disposal.

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 0711758..90d3437 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -72,6 +72,7 @@ import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Option;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -623,7 +624,7 @@ public class CliFrontend {
                        String[] cleanedArgs = options.getArgs();
                        JobID jobId;
 
-                       if (cleanedArgs.length > 0) {
+                       if (cleanedArgs.length >= 1) {
                                String jobIdString = cleanedArgs[0];
                                try {
                                        jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
@@ -637,7 +638,17 @@ public class CliFrontend {
                                                                "Specify a Job 
ID to trigger a savepoint."));
                        }
 
-                       return triggerSavepoint(options, jobId);
+                       String savepointDirectory = null;
+                       if (cleanedArgs.length >= 2) {
+                               savepointDirectory = cleanedArgs[1];
+                       }
+
+                       // Print superfluous arguments
+                       if (cleanedArgs.length >= 3) {
+                               logAndSysout("Provided more arguments than 
required. Ignoring not needed arguments.");
+                       }
+
+                       return triggerSavepoint(options, jobId, 
savepointDirectory);
                }
        }
 
@@ -645,12 +656,12 @@ public class CliFrontend {
         * Sends a {@link 
org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint}
         * message to the job manager.
         */
-       private int triggerSavepoint(SavepointOptions options, JobID jobId) {
+       private int triggerSavepoint(SavepointOptions options, JobID jobId, 
String savepointDirectory) {
                try {
                        ActorGateway jobManager = getJobManagerGateway(options);
 
                        logAndSysout("Triggering savepoint for job " + jobId + 
".");
-                       Future<Object> response = jobManager.ask(new 
TriggerSavepoint(jobId),
+                       Future<Object> response = jobManager.ask(new 
TriggerSavepoint(jobId, Option.apply(savepointDirectory)),
                                        new FiniteDuration(1, TimeUnit.HOURS));
 
                        Object result;

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 3ed383d..9f3ef63 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -343,7 +343,7 @@ public class CliFrontendParser {
                formatter.setWidth(80);
 
                System.out.println("\nAction \"savepoint\" triggers savepoints 
for a running job or disposes existing ones.");
-               System.out.println("\n  Syntax: savepoint [OPTIONS] <Job ID>");
+               System.out.println("\n  Syntax: savepoint [OPTIONS] <Job ID> 
[<target directory>]");
                formatter.setSyntaxPrefix("  \"savepoint\" action options:");
                formatter.printHelp(" ", 
getSavepointOptionsWithoutDeprecatedOptions(new Options()));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
index f9c7e8c..c2bd562 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.client;
 import akka.dispatch.Futures;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.cli.CommandLineOptions;
-import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.junit.Rule;
@@ -37,8 +36,6 @@ import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
-import java.util.List;
-import java.util.zip.ZipEntry;
 import java.util.zip.ZipOutputStream;
 
 import static 
org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
@@ -79,7 +76,7 @@ public class CliFrontendSavepointTest {
                        Promise<Object> triggerResponse = new 
scala.concurrent.impl.Promise.DefaultPromise<>();
 
                        when(jobManager.ask(
-                                       Mockito.eq(new TriggerSavepoint(jobId)),
+                                       Mockito.eq(new TriggerSavepoint(jobId, 
Option.<String>empty())),
                                        any(FiniteDuration.class)))
                                        .thenReturn(triggerResponse.future());
 
@@ -95,7 +92,7 @@ public class CliFrontendSavepointTest {
 
                        assertEquals(0, returnCode);
                        verify(jobManager, times(1)).ask(
-                                       Mockito.eq(new TriggerSavepoint(jobId)),
+                                       Mockito.eq(new TriggerSavepoint(jobId, 
Option.<String>empty())),
                                        any(FiniteDuration.class));
 
                        
assertTrue(buffer.toString().contains("expectedSavepointPath"));
@@ -116,7 +113,7 @@ public class CliFrontendSavepointTest {
                        Promise<Object> triggerResponse = new 
scala.concurrent.impl.Promise.DefaultPromise<>();
 
                        when(jobManager.ask(
-                                       Mockito.eq(new TriggerSavepoint(jobId)),
+                                       Mockito.eq(new TriggerSavepoint(jobId, 
Option.<String>empty())),
                                        any(FiniteDuration.class)))
                                        .thenReturn(triggerResponse.future());
 
@@ -132,7 +129,7 @@ public class CliFrontendSavepointTest {
 
                        assertTrue(returnCode != 0);
                        verify(jobManager, times(1)).ask(
-                                       Mockito.eq(new TriggerSavepoint(jobId)),
+                                       Mockito.eq(new TriggerSavepoint(jobId, 
Option.<String>empty())),
                                        any(FiniteDuration.class));
 
                        
assertTrue(buffer.toString().contains("expectedTestException"));
@@ -171,7 +168,7 @@ public class CliFrontendSavepointTest {
                        Promise<Object> triggerResponse = new 
scala.concurrent.impl.Promise.DefaultPromise<>();
 
                        when(jobManager.ask(
-                                       Mockito.eq(new TriggerSavepoint(jobId)),
+                                       Mockito.eq(new TriggerSavepoint(jobId, 
Option.<String>empty())),
                                        any(FiniteDuration.class)))
                                        .thenReturn(triggerResponse.future());
 
@@ -185,7 +182,7 @@ public class CliFrontendSavepointTest {
 
                        assertTrue(returnCode != 0);
                        verify(jobManager, times(1)).ask(
-                                       Mockito.eq(new TriggerSavepoint(jobId)),
+                                       Mockito.eq(new TriggerSavepoint(jobId, 
Option.<String>empty())),
                                        any(FiniteDuration.class));
 
                        String errMsg = buffer.toString();
@@ -197,6 +194,46 @@ public class CliFrontendSavepointTest {
                }
        }
 
+       /**
+        * Tests that a CLI call with a custom savepoint directory target is
+        * forwarded correctly to the JM.
+        */
+       @Test
+       public void testTriggerSavepointCustomTarget() throws Exception {
+               replaceStdOutAndStdErr();
+
+               try {
+                       JobID jobId = new JobID();
+                       Option<String> customTarget = 
Option.apply("customTargetDirectory");
+                       ActorGateway jobManager = mock(ActorGateway.class);
+
+                       Promise<Object> triggerResponse = new 
scala.concurrent.impl.Promise.DefaultPromise<>();
+
+                       when(jobManager.ask(
+                                       Mockito.eq(new TriggerSavepoint(jobId, 
customTarget)),
+                                       any(FiniteDuration.class)))
+                                       .thenReturn(triggerResponse.future());
+                       String savepointPath = "expectedSavepointPath";
+                       triggerResponse.success(new 
TriggerSavepointSuccess(jobId, savepointPath));
+
+                       CliFrontend frontend = new MockCliFrontend(
+                                       CliFrontendTestUtils.getConfigDir(), 
jobManager);
+
+                       String[] parameters = { jobId.toString(), 
customTarget.get() };
+                       int returnCode = frontend.savepoint(parameters);
+
+                       assertEquals(0, returnCode);
+                       verify(jobManager, times(1)).ask(
+                                       Mockito.eq(new TriggerSavepoint(jobId, 
customTarget)),
+                                       any(FiniteDuration.class));
+
+                       
assertTrue(buffer.toString().contains("expectedSavepointPath"));
+               }
+               finally {
+                       restoreStdOutAndStdErr();
+               }
+       }
+
        // 
------------------------------------------------------------------------
        // Dispose savepoint
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 9e66e2a..f508d4a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -868,6 +868,24 @@ public final class ConfigConstants {
        /** The scope format string that is applied to all metrics scoped to an 
operator. */
        public static final String METRICS_SCOPE_NAMING_OPERATOR = 
"metrics.scope.operator";
 
+       // ---------------------------- Checkpoints 
-------------------------------
+
+       /** The default directory for savepoints. */
+       @PublicEvolving
+       public static final String SAVEPOINT_DIRECTORY_KEY = 
"state.savepoints.dir";
+
+       /** The default directory used for persistent checkpoints. */
+       @PublicEvolving
+       public static final String CHECKPOINTS_DIRECTORY_KEY = 
"state.checkpoints.dir";
+
+       /**
+        * This key was used in Flink versions <= 1.1.X with the savepoint 
backend
+        * configuration. We now always use the FileSystem for savepoints. For 
this,
+        * the only relevant config key is {@link #SAVEPOINT_DIRECTORY_KEY}.
+        */
+       @Deprecated
+       public static final String SAVEPOINT_FS_DIRECTORY_KEY = 
"savepoints.state.backend.fs.dir";
+
        // 
------------------------------------------------------------------------
        //                            Default Values
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
 
b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
index 3513210..113ab85 100644
--- 
a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
+++ 
b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
@@ -22,7 +22,6 @@ import java.util.concurrent.ExecutorService
 
 import akka.actor.ActorRef
 import org.apache.flink.configuration.{Configuration => FlinkConfiguration}
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.clusterframework.ContaineredJobManager
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
@@ -60,7 +59,6 @@ class MesosJobManager(flinkConfiguration: FlinkConfiguration,
                       leaderElectionService: LeaderElectionService,
                       submittedJobGraphs : SubmittedJobGraphStore,
                       checkpointRecoveryFactory : CheckpointRecoveryFactory,
-                      savepointStore: SavepointStore,
                       jobRecoveryTimeout: FiniteDuration,
                       metricsRegistry: Option[FlinkMetricRegistry])
   extends ContaineredJobManager(
@@ -75,7 +73,6 @@ class MesosJobManager(flinkConfiguration: FlinkConfiguration,
     leaderElectionService,
     submittedJobGraphs,
     checkpointRecoveryFactory,
-    savepointStore,
     jobRecoveryTimeout,
     metricsRegistry) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index fa1f9f8..0954241 100644
--- 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
@@ -71,7 +72,7 @@ public class JMXJobManagerMetricTest {
                                Collections.<JobVertexID>emptyList(),
                                Collections.<JobVertexID>emptyList(),
                                Collections.<JobVertexID>emptyList(),
-                               500, 500, 50, 5));
+                               500, 500, 50, 5, 
ExternalizedCheckpointSettings.none()));
 
                        flink.waitForActorsToBeAlive();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandler.java
index 41735c5..b63ab0e 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandler.java
@@ -51,28 +51,35 @@ public class JobCheckpointsHandler extends 
AbstractExecutionGraphRequestHandler
                        Option<JobCheckpointStats> stats = 
tracker.getJobStats();
 
                        if (stats.isDefined()) {
+                               JobCheckpointStats jobStats = stats.get();
+
                                // Total number of checkpoints
-                               gen.writeNumberField("count", 
stats.get().getCount());
+                               gen.writeNumberField("count", 
jobStats.getCount());
+
+                               // Optional external path
+                               if (jobStats.getExternalPath() != null) {
+                                       gen.writeStringField("external-path", 
jobStats.getExternalPath());
+                               }
 
                                // Duration
                                gen.writeFieldName("duration");
                                gen.writeStartObject();
-                               gen.writeNumberField("min", 
stats.get().getMinDuration());
-                               gen.writeNumberField("max", 
stats.get().getMaxDuration());
-                               gen.writeNumberField("avg", 
stats.get().getAverageDuration());
+                               gen.writeNumberField("min", 
jobStats.getMinDuration());
+                               gen.writeNumberField("max", 
jobStats.getMaxDuration());
+                               gen.writeNumberField("avg", 
jobStats.getAverageDuration());
                                gen.writeEndObject();
 
                                // State size
                                gen.writeFieldName("size");
                                gen.writeStartObject();
-                               gen.writeNumberField("min", 
stats.get().getMinStateSize());
-                               gen.writeNumberField("max", 
stats.get().getMaxStateSize());
-                               gen.writeNumberField("avg", 
stats.get().getAverageStateSize());
+                               gen.writeNumberField("min", 
jobStats.getMinStateSize());
+                               gen.writeNumberField("max", 
jobStats.getMaxStateSize());
+                               gen.writeNumberField("avg", 
jobStats.getAverageStateSize());
                                gen.writeEndObject();
 
                                // Recent history
                                gen.writeArrayFieldStart("history");
-                               for (CheckpointStats checkpoint : 
stats.get().getRecentHistory()) {
+                               for (CheckpointStats checkpoint : 
jobStats.getRecentHistory()) {
                                        gen.writeStartObject();
                                        gen.writeNumberField("id", 
checkpoint.getCheckpointId());
                                        gen.writeNumberField("timestamp", 
checkpoint.getTriggerTimestamp());

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandlerTest.java
index b6d50ea..dfbb9cf 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandlerTest.java
@@ -106,6 +106,11 @@ public class JobCheckpointsHandlerTest {
                        }
 
                        @Override
+                       public String getExternalPath() {
+                               return null;
+                       }
+
+                       @Override
                        public long getMinDuration() {
                                return 1;
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.checkpoints.job.jade
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.checkpoints.job.jade
 
b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.checkpoints.job.jade
index e8a07c0..6d3b6b3 100644
--- 
a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.checkpoints.job.jade
+++ 
b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.checkpoints.job.jade
@@ -60,6 +60,12 @@ 
table(ng-if="jobCheckpointStats").table.table-hover.table-inner
             strong Average:
             span  {{ jobCheckpointStats['size']['avg'] | humanizeBytes }}
 
+    tr(ng-if="jobCheckpointStats['external-path']")
+      td(colspan=4)
+        strong Latest Checkpoint Path:
+        =" "
+        | {{ jobCheckpointStats['external-path'] }}
+
 div(ng-if="!showHistory && jobCheckpointStats && 
jobCheckpointStats['history'].length > 0")
   a.btn.btn-default(ng-click="toggleHistory()")
     | <strong>Show history</strong> ({{ jobCheckpointStats['history'].length 
}})

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime-web/web-dashboard/web/css/index.css
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/css/index.css 
b/flink-runtime-web/web-dashboard/web/css/index.css
index 52a5631..49449f2 100644
--- a/flink-runtime-web/web-dashboard/web/css/index.css
+++ b/flink-runtime-web/web-dashboard/web/css/index.css
@@ -228,7 +228,6 @@
   overflow: hidden;
   padding: 2px 4px;
   font-size: 14px;
-  -webkit-border-radius: 2px;
   border-radius: 2px;
   margin-top: -3px;
 }
@@ -496,7 +495,6 @@ pre {
 }
 .nav-tabs.tabs-vertical li > a {
   margin-right: 0;
-  -webkit-border-radius: 0;
   border-radius: 0;
   border-bottom: none;
   border-left: 2px solid transparent;
@@ -541,7 +539,6 @@ livechart {
   padding-right: 0.4em;
   margin: 0;
   border-right: 1px solid #fff;
-  -webkit-border-radius: 0;
   border-radius: 0;
 }
 .label-group .label.label-black {

Reply via email to