[FLINK-4057] Checkpoint Metrics
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9e540daf Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9e540daf Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9e540daf Branch: refs/heads/master Commit: 9e540daf6e44c386ca82e6818f87d634be316e6c Parents: 8829f97 Author: zentol <[email protected]> Authored: Wed Jun 15 13:19:56 2016 +0200 Committer: zentol <[email protected]> Committed: Fri Jul 1 15:09:16 2016 +0200 ---------------------------------------------------------------------- .../stats/SimpleCheckpointStatsTracker.java | 22 +++- .../flink/runtime/jobmanager/JobManager.scala | 15 ++- .../stats/SimpleCheckpointStatsTrackerTest.java | 11 +- .../jobmanager/JobManagerMetricTest.java | 117 +++++++++++++++++++ 4 files changed, 157 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9e540daf/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java index 19a8fe4..9d47457 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.checkpoint.stats; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; @@ -109,7 +111,8 @@ public class SimpleCheckpointStatsTracker implements CheckpointStatsTracker { public SimpleCheckpointStatsTracker( int historySize, - List<ExecutionJobVertex> tasksToWaitFor) { + List<ExecutionJobVertex> tasksToWaitFor, + MetricGroup metrics) { checkArgument(historySize >= 0); this.historySize = historySize; @@ -124,6 +127,9 @@ public class SimpleCheckpointStatsTracker implements CheckpointStatsTracker { } else { taskParallelism = Collections.emptyMap(); } + + metrics.gauge("lastCheckpointSize", new CheckpointSizeGauge()); + metrics.gauge("lastCheckpointDuration", new CheckpointDurationGauge()); } @Override @@ -411,4 +417,18 @@ public class SimpleCheckpointStatsTracker implements CheckpointStatsTracker { return averageStateSize; } } + + private class CheckpointSizeGauge implements Gauge<Long> { + @Override + public Long getValue() { + return latestCompletedCheckpoint == null ? -1 : latestCompletedCheckpoint.getStateSize(); + } + } + + private class CheckpointDurationGauge implements Gauge<Long> { + @Override + public Long getValue() { + return latestCompletedCheckpoint == null ? -1 : latestCompletedCheckpoint.getDuration(); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/9e540daf/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 852b238..348282c 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -36,7 +36,7 @@ import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalCon import org.apache.flink.core.fs.FileSystem import org.apache.flink.core.io.InputSplitAssigner import org.apache.flink.metrics.{Gauge, MetricGroup, MetricRegistry => FlinkMetricRegistry} -import org.apache.flink.metrics.groups.JobManagerMetricGroup +import org.apache.flink.metrics.groups.{JobManagerMetricGroup, UnregisteredMetricsGroup} import org.apache.flink.runtime.accumulators.AccumulatorSnapshot import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour} import org.apache.flink.runtime.blob.BlobServer @@ -1224,11 +1224,21 @@ class JobManager( if (isStatsDisabled) { new DisabledCheckpointStatsTracker() } else { + + val jobMetrics = jobManagerMetricGroup match { + case Some(group) => + group.addJob(jobGraph.getJobID, jobGraph.getName) match { + case (jobGroup:Any) => jobGroup + case null => new UnregisteredMetricsGroup() + } + case None => + new UnregisteredMetricsGroup() + } val historySize: Int = flinkConfiguration.getInteger( ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE, ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE) - new SimpleCheckpointStatsTracker(historySize, ackVertices) + new SimpleCheckpointStatsTracker(historySize, ackVertices, jobMetrics) } val jobParallelism = jobGraph.getSerializedExecutionConfig @@ -1655,6 +1665,7 @@ class JobManager( case t: Throwable => log.error(s"Could not properly unregister job $jobID form the library cache.", t) } + jobManagerMetricGroup.map(_.removeJob(jobID)) futureOption } http://git-wip-us.apache.org/repos/asf/flink/blob/9e540daf/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java index e8ffb08..9265ab1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.checkpoint.stats; import org.apache.flink.api.common.JobID; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; @@ -54,7 +55,7 @@ public class SimpleCheckpointStatsTrackerTest { @Test public void testNoCompletedCheckpointYet() throws Exception { CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker( - 0, Collections.<ExecutionJobVertex>emptyList()); + 0, Collections.<ExecutionJobVertex>emptyList(), new UnregisteredMetricsGroup()); assertFalse(tracker.getJobStats().isDefined()); assertFalse(tracker.getOperatorStats(new JobVertexID()).isDefined()); @@ -64,7 +65,7 @@ public class SimpleCheckpointStatsTrackerTest { public void testRandomStats() throws Exception { CompletedCheckpoint[] checkpoints = generateRandomCheckpoints(16); List<ExecutionJobVertex> tasksToWaitFor = createTasksToWaitFor(checkpoints[0]); - CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor); + CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor, new UnregisteredMetricsGroup()); for (int i = 0; i < checkpoints.length; i++) { CompletedCheckpoint checkpoint = checkpoints[i]; @@ -80,7 +81,7 @@ public class SimpleCheckpointStatsTrackerTest { public void testIllegalOperatorId() throws Exception { CompletedCheckpoint[] checkpoints = generateRandomCheckpoints(16); List<ExecutionJobVertex> tasksToWaitFor = createTasksToWaitFor(checkpoints[0]); - CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor); + CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor, new UnregisteredMetricsGroup()); for (CompletedCheckpoint checkpoint : checkpoints) { tracker.onCompletedCheckpoint(checkpoint); @@ -95,7 +96,7 @@ public class SimpleCheckpointStatsTrackerTest { public void testCompletedCheckpointReordering() throws Exception { CompletedCheckpoint[] checkpoints = generateRandomCheckpoints(2); List<ExecutionJobVertex> tasksToWaitFor = createTasksToWaitFor(checkpoints[0]); - CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor); + CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor, new UnregisteredMetricsGroup()); // First the second checkpoint notifies tracker.onCompletedCheckpoint(checkpoints[1]); @@ -115,7 +116,7 @@ public class SimpleCheckpointStatsTrackerTest { public void testOperatorStateCachedClearedOnNewCheckpoint() throws Exception { CompletedCheckpoint[] checkpoints = generateRandomCheckpoints(2); List<ExecutionJobVertex> tasksToWaitFor = createTasksToWaitFor(checkpoints[0]); - CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor); + CheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor, new UnregisteredMetricsGroup()); tracker.onCompletedCheckpoint(checkpoints[0]); http://git-wip-us.apache.org/repos/asf/flink/blob/9e540daf/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java new file mode 100644 index 0000000..5759888 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; +import org.apache.flink.runtime.testingUtils.TestingCluster; +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; +import org.junit.Test; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import javax.management.MBeanServer; +import javax.management.ObjectName; +import java.lang.management.ManagementFactory; +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.metrics.MetricRegistry.KEY_METRICS_SCOPE_NAMING_JM_JOB; +import static org.junit.Assert.assertEquals; + +public class JobManagerMetricTest { + /** + * Tests that metrics registered on the JobManager are actually accessible. + * + * @throws Exception + */ + @Test + public void testJobManagerMetricAccess() throws Exception { + Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow(); + Configuration flinkConfiguration = new Configuration(); + + flinkConfiguration.setString(KEY_METRICS_SCOPE_NAMING_JM_JOB, "jobmanager.<job_name>"); + + TestingCluster flink = new TestingCluster(flinkConfiguration); + + try { + flink.start(); + + JobVertex sourceJobVertex = new JobVertex("Source"); + sourceJobVertex.setInvokableClass(BlockingInvokable.class); + + JobGraph jobGraph = new JobGraph("TestingJob", sourceJobVertex); + jobGraph.setSnapshotSettings(new JobSnapshottingSettings( + Collections.<JobVertexID>emptyList(), + Collections.<JobVertexID>emptyList(), + Collections.<JobVertexID>emptyList(), + 500, 500, 50, 5)); + + flink.waitForActorsToBeAlive(); + + flink.submitJobDetached(jobGraph); + + Future<Object> jobRunning = flink.getLeaderGateway(deadline.timeLeft()) + .ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.RUNNING), deadline.timeLeft()); + Await.ready(jobRunning, deadline.timeLeft()); + + MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); + ObjectName objectName1 = new ObjectName("org.apache.flink.metrics:key0=jobmanager,key1=TestingJob,name=lastCheckpointSize"); + assertEquals(-1L, mBeanServer.getAttribute(objectName1, "Value")); + + Future<Object> jobFinished = flink.getLeaderGateway(deadline.timeLeft()) + .ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), deadline.timeLeft()); + + BlockingInvokable.unblock(); + + // wait til the job has finished + Await.ready(jobFinished, deadline.timeLeft()); + } finally { + flink.stop(); + } + } + + public static class BlockingInvokable extends AbstractInvokable { + private static boolean blocking = true; + private static final Object lock = new Object(); + + @Override + public void invoke() throws Exception { + while (blocking) { + synchronized (lock) { + lock.wait(); + } + } + } + + public static void unblock() { + blocking = false; + + synchronized (lock) { + lock.notifyAll(); + } + } + } +}
