[FLINK-4512] [FLIP-10] Add option to persist periodic checkpoints [FLINK-4509] [FLIP-10] Specify savepoint directory per savepoint [FLINK-4507] [FLIP-10] Deprecate savepoint backend config
This closes #2608. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fd410d9f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fd410d9f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fd410d9f Branch: refs/heads/master Commit: fd410d9f6d1c8d53fb721752528ebd77fd78db57 Parents: c7d1a3b Author: Ufuk Celebi <u...@apache.org> Authored: Thu Oct 6 16:43:42 2016 +0200 Committer: Ufuk Celebi <u...@apache.org> Committed: Fri Oct 14 10:05:06 2016 +0200 ---------------------------------------------------------------------- docs/setup/cli.md | 4 +- docs/setup/savepoints.md | 27 +- .../org/apache/flink/client/CliFrontend.java | 19 +- .../flink/client/cli/CliFrontendParser.java | 2 +- .../flink/client/CliFrontendSavepointTest.java | 55 ++- .../flink/configuration/ConfigConstants.java | 18 + .../clusterframework/MesosJobManager.scala | 3 - .../jobmanager/JMXJobManagerMetricTest.java | 3 +- .../handlers/JobCheckpointsHandler.java | 23 +- .../handlers/JobCheckpointsHandlerTest.java | 5 + .../jobs/job.plan.node.checkpoints.job.jade | 6 + .../web-dashboard/web/css/index.css | 3 - flink-runtime-web/web-dashboard/web/js/index.js | 188 ++++---- .../web-dashboard/web/js/vendor.js | 4 +- .../jobs/job.plan.node.checkpoints.job.html | 4 + .../checkpoint/CheckpointCoordinator.java | 128 +++--- .../runtime/checkpoint/CheckpointIDCounter.java | 20 +- .../checkpoint/CheckpointProperties.java | 234 +++++++++- .../runtime/checkpoint/CompletedCheckpoint.java | 81 +++- .../checkpoint/CompletedCheckpointStore.java | 19 +- .../runtime/checkpoint/PendingCheckpoint.java | 150 +++++-- .../runtime/checkpoint/PendingSavepoint.java | 159 ------- .../StandaloneCheckpointIDCounter.java | 6 +- .../StandaloneCheckpointRecoveryFactory.java | 2 +- .../StandaloneCompletedCheckpointStore.java | 31 +- .../ZooKeeperCheckpointIDCounter.java | 24 +- .../ZooKeeperCheckpointRecoveryFactory.java | 2 +- .../ZooKeeperCompletedCheckpointStore.java | 98 +++-- .../checkpoint/savepoint/FsSavepointStore.java | 190 -------- .../savepoint/HeapSavepointStore.java | 157 ------- .../checkpoint/savepoint/SavepointLoader.java | 21 +- .../savepoint/SavepointSerializer.java | 2 +- .../checkpoint/savepoint/SavepointStore.java | 158 ++++++- .../savepoint/SavepointStoreFactory.java | 97 ----- .../checkpoint/stats/JobCheckpointStats.java | 7 + .../stats/SimpleCheckpointStatsTracker.java | 23 + .../runtime/executiongraph/ExecutionGraph.java | 19 +- .../executiongraph/ExecutionGraphBuilder.java | 15 +- .../tasks/ExternalizedCheckpointSettings.java | 68 +++ .../jobgraph/tasks/JobSnapshottingSettings.java | 28 +- .../flink/runtime/util/ZooKeeperUtils.java | 5 +- .../ContaineredJobManager.scala | 3 - .../flink/runtime/jobmanager/JobManager.scala | 124 +++--- .../runtime/messages/JobManagerMessages.scala | 5 +- .../minicluster/LocalFlinkMiniCluster.scala | 5 - .../checkpoint/CheckpointCoordinatorTest.java | 429 +++++++++++++------ .../checkpoint/CheckpointIDCounterTest.java | 11 +- .../checkpoint/CheckpointPropertiesTest.java | 55 ++- .../checkpoint/CheckpointStateRestoreTest.java | 19 +- .../CompletedCheckpointStoreTest.java | 54 ++- .../checkpoint/CompletedCheckpointTest.java | 83 +++- .../checkpoint/CoordinatorShutdownTest.java | 13 +- ...ExecutionGraphCheckpointCoordinatorTest.java | 18 +- .../checkpoint/PendingCheckpointTest.java | 149 +++++-- .../checkpoint/PendingSavepointTest.java | 140 ------ .../StandaloneCompletedCheckpointStoreTest.java | 14 +- ...ZooKeeperCompletedCheckpointStoreITCase.java | 16 +- .../savepoint/FsSavepointStoreTest.java | 235 ---------- .../savepoint/SavepointLoaderTest.java | 40 +- .../savepoint/SavepointStoreFactoryTest.java | 91 ---- .../savepoint/SavepointStoreTest.java | 237 ++++++++++ .../stats/SimpleCheckpointStatsTrackerTest.java | 3 +- .../jobmanager/JobManagerHARecoveryTest.java | 26 +- .../JobManagerLeaderElectionTest.java | 4 - .../runtime/jobmanager/JobManagerITCase.scala | 56 ++- .../runtime/testingUtils/TestingCluster.scala | 7 +- .../testingUtils/TestingJobManager.scala | 5 +- .../testingUtils/TestingJobManagerLike.scala | 3 +- .../runtime/testingUtils/TestingUtils.scala | 21 +- .../api/environment/CheckpointConfig.java | 103 ++++- .../environment/StreamExecutionEnvironment.java | 124 +++--- .../flink/streaming/api/graph/StreamConfig.java | 1 - .../api/graph/StreamingJobGraphGenerator.java | 19 +- .../test/checkpointing/RescalingITCase.java | 14 +- .../test/checkpointing/SavepointITCase.java | 174 +------- .../test/classloading/ClassLoaderITCase.java | 7 +- .../flink/yarn/TestingYarnJobManager.scala | 2 - .../org/apache/flink/yarn/YarnJobManager.scala | 3 - 78 files changed, 2277 insertions(+), 2144 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/docs/setup/cli.md ---------------------------------------------------------------------- diff --git a/docs/setup/cli.md b/docs/setup/cli.md index 5153787..251a0f6 100644 --- a/docs/setup/cli.md +++ b/docs/setup/cli.md @@ -137,11 +137,13 @@ This allows the job to finish processing all inflight data. #### Trigger a savepoint {% highlight bash %} -./bin/flink savepoint <jobID> +./bin/flink savepoint <jobID> [savepointDirectory] {% endhighlight %} Returns the path of the created savepoint. You need this path to restore and dispose savepoints. +You can optionally specify a `savepointDirectory` when triggering the savepoint. If you don't specify one here, you need to configure a default savepoint directory for the Flink installation (see [[savepoint.html#configuration]]). + #### **Restore a savepoint** {% highlight bash %} http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/docs/setup/savepoints.md ---------------------------------------------------------------------- diff --git a/docs/setup/savepoints.md b/docs/setup/savepoints.md index 9f0655c..231a4ff 100644 --- a/docs/setup/savepoints.md +++ b/docs/setup/savepoints.md @@ -41,32 +41,15 @@ Note that **s<sub>1</sub>** is only a **pointer to the actual checkpoint data c< ## Configuration -Savepoints point to regular checkpoints and store their state in a configured [state backend]({{ site.baseurl }}/dev/state_backends.html). Currently, the supported state backends are **jobmanager** and **filesystem**. The state backend configuration for the regular periodic checkpoints is **independent** of the savepoint state backend configuration. Checkpoint data is **not copied** for savepoints, but points to the configured checkpoint state backend. - -### JobManager - -This is the **default backend** for savepoints. - -Savepoints are stored on the heap of the job manager. They are *lost* after the job manager is shut down. This mode is only useful if you want to *stop* and *resume* your program while the **same cluster** keeps running. It is *not recommended* for production use. Savepoints are *not* part of the [job manager's highly available]({{ site.baseurl }}/setup/jobmanager_high_availability.html) state. - -<pre> -savepoints.state.backend: jobmanager -</pre> - -**Note**: If you don't configure a specific state backend for the savepoints, the jobmanager backend will be used. - -### File system - -Savepoints are stored in the configured **file system directory**. They are available between cluster instances and allow to move your program to another cluster. +Savepoints are stored in a configured **file system directory**. They are available between cluster instances and allow you to move your program to another cluster. <pre> -savepoints.state.backend: filesystem -savepoints.state.backend.fs.dir: hdfs:///flink/savepoints +state.savepoints.dir: hdfs:///flink/savepoints </pre> -**Note**: If you don't configure a specific directory, the job manager backend will be used. +**Note**: If you don't configure a specific directory, triggering the savepoint will fail. -**Important**: A savepoint is a pointer to a completed checkpoint. That means that the state of a savepoint is not only found in the savepoint file itself, but also needs the actual checkpoint data (e.g. in a set of further files). Therefore, using the *filesystem* backend for savepoints and the *jobmanager* backend for checkpoints does not work, because the required checkpoint data won't be available after a job manager restart. +**Important**: A savepoint is a pointer to a completed checkpoint. That means that the state of a savepoint is not only found in the savepoint file itself, but also needs the actual checkpoint data (e.g. in a set of further files). ## Changes to your program @@ -105,5 +88,3 @@ You control the savepoints via the [command line client]({{ site.baseurl }}/setu - **Parallelism**: When restoring a savepoint, the parallelism of the program has to match the parallelism of the original program from which the savepoint was drawn. There is no mechanism to re-partition the savepoint's state yet. - **Chaining**: Chained operators are identified by the ID of the first task. It's not possible to manually assign an ID to an intermediate chained task, e.g. in the chain `[ a -> b -> c ]` only **a** can have its ID assigned manually, but not **b** or **c**. To work around this, you can [manually define the task chains](index.html#task-chaining-and-resource-groups). If you rely on the automatic ID assignment, a change in the chaining behaviour will also change the IDs. - -- **Disposing custom state handles**: Disposing an old savepoint does not work with custom state handles (if you are using a custom state backend), because the user code class loader is not available during disposal. http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index 0711758..90d3437 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -72,6 +72,7 @@ import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.Option; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -623,7 +624,7 @@ public class CliFrontend { String[] cleanedArgs = options.getArgs(); JobID jobId; - if (cleanedArgs.length > 0) { + if (cleanedArgs.length >= 1) { String jobIdString = cleanedArgs[0]; try { jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); @@ -637,7 +638,17 @@ public class CliFrontend { "Specify a Job ID to trigger a savepoint.")); } - return triggerSavepoint(options, jobId); + String savepointDirectory = null; + if (cleanedArgs.length >= 2) { + savepointDirectory = cleanedArgs[1]; + } + + // Print superfluous arguments + if (cleanedArgs.length >= 3) { + logAndSysout("Provided more arguments than required. Ignoring not needed arguments."); + } + + return triggerSavepoint(options, jobId, savepointDirectory); } } @@ -645,12 +656,12 @@ public class CliFrontend { * Sends a {@link org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint} * message to the job manager. */ - private int triggerSavepoint(SavepointOptions options, JobID jobId) { + private int triggerSavepoint(SavepointOptions options, JobID jobId, String savepointDirectory) { try { ActorGateway jobManager = getJobManagerGateway(options); logAndSysout("Triggering savepoint for job " + jobId + "."); - Future<Object> response = jobManager.ask(new TriggerSavepoint(jobId), + Future<Object> response = jobManager.ask(new TriggerSavepoint(jobId, Option.apply(savepointDirectory)), new FiniteDuration(1, TimeUnit.HOURS)); Object result; http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java index 3ed383d..9f3ef63 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java @@ -343,7 +343,7 @@ public class CliFrontendParser { formatter.setWidth(80); System.out.println("\nAction \"savepoint\" triggers savepoints for a running job or disposes existing ones."); - System.out.println("\n Syntax: savepoint [OPTIONS] <Job ID>"); + System.out.println("\n Syntax: savepoint [OPTIONS] <Job ID> [<target directory>]"); formatter.setSyntaxPrefix(" \"savepoint\" action options:"); formatter.printHelp(" ", getSavepointOptionsWithoutDeprecatedOptions(new Options())); http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java index f9c7e8c..c2bd562 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java @@ -21,7 +21,6 @@ package org.apache.flink.client; import akka.dispatch.Futures; import org.apache.flink.api.common.JobID; import org.apache.flink.client.cli.CommandLineOptions; -import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.messages.JobManagerMessages; import org.junit.Rule; @@ -37,8 +36,6 @@ import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.PrintStream; -import java.util.List; -import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint; @@ -79,7 +76,7 @@ public class CliFrontendSavepointTest { Promise<Object> triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>(); when(jobManager.ask( - Mockito.eq(new TriggerSavepoint(jobId)), + Mockito.eq(new TriggerSavepoint(jobId, Option.<String>empty())), any(FiniteDuration.class))) .thenReturn(triggerResponse.future()); @@ -95,7 +92,7 @@ public class CliFrontendSavepointTest { assertEquals(0, returnCode); verify(jobManager, times(1)).ask( - Mockito.eq(new TriggerSavepoint(jobId)), + Mockito.eq(new TriggerSavepoint(jobId, Option.<String>empty())), any(FiniteDuration.class)); assertTrue(buffer.toString().contains("expectedSavepointPath")); @@ -116,7 +113,7 @@ public class CliFrontendSavepointTest { Promise<Object> triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>(); when(jobManager.ask( - Mockito.eq(new TriggerSavepoint(jobId)), + Mockito.eq(new TriggerSavepoint(jobId, Option.<String>empty())), any(FiniteDuration.class))) .thenReturn(triggerResponse.future()); @@ -132,7 +129,7 @@ public class CliFrontendSavepointTest { assertTrue(returnCode != 0); verify(jobManager, times(1)).ask( - Mockito.eq(new TriggerSavepoint(jobId)), + Mockito.eq(new TriggerSavepoint(jobId, Option.<String>empty())), any(FiniteDuration.class)); assertTrue(buffer.toString().contains("expectedTestException")); @@ -171,7 +168,7 @@ public class CliFrontendSavepointTest { Promise<Object> triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>(); when(jobManager.ask( - Mockito.eq(new TriggerSavepoint(jobId)), + Mockito.eq(new TriggerSavepoint(jobId, Option.<String>empty())), any(FiniteDuration.class))) .thenReturn(triggerResponse.future()); @@ -185,7 +182,7 @@ public class CliFrontendSavepointTest { assertTrue(returnCode != 0); verify(jobManager, times(1)).ask( - Mockito.eq(new TriggerSavepoint(jobId)), + Mockito.eq(new TriggerSavepoint(jobId, Option.<String>empty())), any(FiniteDuration.class)); String errMsg = buffer.toString(); @@ -197,6 +194,46 @@ public class CliFrontendSavepointTest { } } + /** + * Tests that a CLI call with a custom savepoint directory target is + * forwarded correctly to the JM. + */ + @Test + public void testTriggerSavepointCustomTarget() throws Exception { + replaceStdOutAndStdErr(); + + try { + JobID jobId = new JobID(); + Option<String> customTarget = Option.apply("customTargetDirectory"); + ActorGateway jobManager = mock(ActorGateway.class); + + Promise<Object> triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>(); + + when(jobManager.ask( + Mockito.eq(new TriggerSavepoint(jobId, customTarget)), + any(FiniteDuration.class))) + .thenReturn(triggerResponse.future()); + String savepointPath = "expectedSavepointPath"; + triggerResponse.success(new TriggerSavepointSuccess(jobId, savepointPath)); + + CliFrontend frontend = new MockCliFrontend( + CliFrontendTestUtils.getConfigDir(), jobManager); + + String[] parameters = { jobId.toString(), customTarget.get() }; + int returnCode = frontend.savepoint(parameters); + + assertEquals(0, returnCode); + verify(jobManager, times(1)).ask( + Mockito.eq(new TriggerSavepoint(jobId, customTarget)), + any(FiniteDuration.class)); + + assertTrue(buffer.toString().contains("expectedSavepointPath")); + } + finally { + restoreStdOutAndStdErr(); + } + } + // ------------------------------------------------------------------------ // Dispose savepoint // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 9e66e2a..f508d4a 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -868,6 +868,24 @@ public final class ConfigConstants { /** The scope format string that is applied to all metrics scoped to an operator. */ public static final String METRICS_SCOPE_NAMING_OPERATOR = "metrics.scope.operator"; + // ---------------------------- Checkpoints ------------------------------- + + /** The default directory for savepoints. */ + @PublicEvolving + public static final String SAVEPOINT_DIRECTORY_KEY = "state.savepoints.dir"; + + /** The default directory used for persistent checkpoints. */ + @PublicEvolving + public static final String CHECKPOINTS_DIRECTORY_KEY = "state.checkpoints.dir"; + + /** + * This key was used in Flink versions <= 1.1.X with the savepoint backend + * configuration. We now always use the FileSystem for savepoints. For this, + * the only relevant config key is {@link #SAVEPOINT_DIRECTORY_KEY}. + */ + @Deprecated + public static final String SAVEPOINT_FS_DIRECTORY_KEY = "savepoints.state.backend.fs.dir"; + // ------------------------------------------------------------------------ // Default Values // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala index 3513210..113ab85 100644 --- a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala +++ b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala @@ -22,7 +22,6 @@ import java.util.concurrent.ExecutorService import akka.actor.ActorRef import org.apache.flink.configuration.{Configuration => FlinkConfiguration} -import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory import org.apache.flink.runtime.clusterframework.ContaineredJobManager import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager @@ -60,7 +59,6 @@ class MesosJobManager(flinkConfiguration: FlinkConfiguration, leaderElectionService: LeaderElectionService, submittedJobGraphs : SubmittedJobGraphStore, checkpointRecoveryFactory : CheckpointRecoveryFactory, - savepointStore: SavepointStore, jobRecoveryTimeout: FiniteDuration, metricsRegistry: Option[FlinkMetricRegistry]) extends ContaineredJobManager( @@ -75,7 +73,6 @@ class MesosJobManager(flinkConfiguration: FlinkConfiguration, leaderElectionService, submittedJobGraphs, checkpointRecoveryFactory, - savepointStore, jobRecoveryTimeout, metricsRegistry) { http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java index fa1f9f8..0954241 100644 --- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java +++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; import org.apache.flink.runtime.testingUtils.TestingCluster; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; @@ -71,7 +72,7 @@ public class JMXJobManagerMetricTest { Collections.<JobVertexID>emptyList(), Collections.<JobVertexID>emptyList(), Collections.<JobVertexID>emptyList(), - 500, 500, 50, 5)); + 500, 500, 50, 5, ExternalizedCheckpointSettings.none())); flink.waitForActorsToBeAlive(); http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandler.java index 41735c5..b63ab0e 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandler.java @@ -51,28 +51,35 @@ public class JobCheckpointsHandler extends AbstractExecutionGraphRequestHandler Option<JobCheckpointStats> stats = tracker.getJobStats(); if (stats.isDefined()) { + JobCheckpointStats jobStats = stats.get(); + // Total number of checkpoints - gen.writeNumberField("count", stats.get().getCount()); + gen.writeNumberField("count", jobStats.getCount()); + + // Optional external path + if (jobStats.getExternalPath() != null) { + gen.writeStringField("external-path", jobStats.getExternalPath()); + } // Duration gen.writeFieldName("duration"); gen.writeStartObject(); - gen.writeNumberField("min", stats.get().getMinDuration()); - gen.writeNumberField("max", stats.get().getMaxDuration()); - gen.writeNumberField("avg", stats.get().getAverageDuration()); + gen.writeNumberField("min", jobStats.getMinDuration()); + gen.writeNumberField("max", jobStats.getMaxDuration()); + gen.writeNumberField("avg", jobStats.getAverageDuration()); gen.writeEndObject(); // State size gen.writeFieldName("size"); gen.writeStartObject(); - gen.writeNumberField("min", stats.get().getMinStateSize()); - gen.writeNumberField("max", stats.get().getMaxStateSize()); - gen.writeNumberField("avg", stats.get().getAverageStateSize()); + gen.writeNumberField("min", jobStats.getMinStateSize()); + gen.writeNumberField("max", jobStats.getMaxStateSize()); + gen.writeNumberField("avg", jobStats.getAverageStateSize()); gen.writeEndObject(); // Recent history gen.writeArrayFieldStart("history"); - for (CheckpointStats checkpoint : stats.get().getRecentHistory()) { + for (CheckpointStats checkpoint : jobStats.getRecentHistory()) { gen.writeStartObject(); gen.writeNumberField("id", checkpoint.getCheckpointId()); gen.writeNumberField("timestamp", checkpoint.getTriggerTimestamp()); http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandlerTest.java index b6d50ea..dfbb9cf 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandlerTest.java @@ -106,6 +106,11 @@ public class JobCheckpointsHandlerTest { } @Override + public String getExternalPath() { + return null; + } + + @Override public long getMinDuration() { return 1; } http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.checkpoints.job.jade ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.checkpoints.job.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.checkpoints.job.jade index e8a07c0..6d3b6b3 100644 --- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.checkpoints.job.jade +++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.checkpoints.job.jade @@ -60,6 +60,12 @@ table(ng-if="jobCheckpointStats").table.table-hover.table-inner strong Average: span {{ jobCheckpointStats['size']['avg'] | humanizeBytes }} + tr(ng-if="jobCheckpointStats['external-path']") + td(colspan=4) + strong Latest Checkpoint Path: + =" " + | {{ jobCheckpointStats['external-path'] }} + div(ng-if="!showHistory && jobCheckpointStats && jobCheckpointStats['history'].length > 0") a.btn.btn-default(ng-click="toggleHistory()") | <strong>Show history</strong> ({{ jobCheckpointStats['history'].length }}) http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime-web/web-dashboard/web/css/index.css ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/web/css/index.css b/flink-runtime-web/web-dashboard/web/css/index.css index 52a5631..49449f2 100644 --- a/flink-runtime-web/web-dashboard/web/css/index.css +++ b/flink-runtime-web/web-dashboard/web/css/index.css @@ -228,7 +228,6 @@ overflow: hidden; padding: 2px 4px; font-size: 14px; - -webkit-border-radius: 2px; border-radius: 2px; margin-top: -3px; } @@ -496,7 +495,6 @@ pre { } .nav-tabs.tabs-vertical li > a { margin-right: 0; - -webkit-border-radius: 0; border-radius: 0; border-bottom: none; border-left: 2px solid transparent; @@ -541,7 +539,6 @@ livechart { padding-right: 0.4em; margin: 0; border-right: 1px solid #fff; - -webkit-border-radius: 0; border-radius: 0; } .label-group .label.label-black {