[FLINK-4057] Checkpoint Metrics

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9e540daf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9e540daf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9e540daf

Branch: refs/heads/master
Commit: 9e540daf6e44c386ca82e6818f87d634be316e6c
Parents: 8829f97
Author: zentol <[email protected]>
Authored: Wed Jun 15 13:19:56 2016 +0200
Committer: zentol <[email protected]>
Committed: Fri Jul 1 15:09:16 2016 +0200

----------------------------------------------------------------------
 .../stats/SimpleCheckpointStatsTracker.java     |  22 +++-
 .../flink/runtime/jobmanager/JobManager.scala   |  15 ++-
 .../stats/SimpleCheckpointStatsTrackerTest.java |  11 +-
 .../jobmanager/JobManagerMetricTest.java        | 117 +++++++++++++++++++
 4 files changed, 157 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9e540daf/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
index 19a8fe4..9d47457 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.checkpoint.stats;
 
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -109,7 +111,8 @@ public class SimpleCheckpointStatsTracker implements 
CheckpointStatsTracker {
 
        public SimpleCheckpointStatsTracker(
                        int historySize,
-                       List<ExecutionJobVertex> tasksToWaitFor) {
+                       List<ExecutionJobVertex> tasksToWaitFor,
+                       MetricGroup metrics) {
 
                checkArgument(historySize >= 0);
                this.historySize = historySize;
@@ -124,6 +127,9 @@ public class SimpleCheckpointStatsTracker implements 
CheckpointStatsTracker {
                } else {
                        taskParallelism = Collections.emptyMap();
                }
+
+               metrics.gauge("lastCheckpointSize", new CheckpointSizeGauge());
+               metrics.gauge("lastCheckpointDuration", new 
CheckpointDurationGauge());
        }
 
        @Override
@@ -411,4 +417,18 @@ public class SimpleCheckpointStatsTracker implements 
CheckpointStatsTracker {
                        return averageStateSize;
                }
        }
+
+       private class CheckpointSizeGauge implements Gauge<Long> {
+               @Override
+               public Long getValue() {
+                       return latestCompletedCheckpoint == null ? -1 : 
latestCompletedCheckpoint.getStateSize();
+               }
+       }
+
+       private class CheckpointDurationGauge implements Gauge<Long> {
+               @Override
+               public Long getValue() {
+                       return latestCompletedCheckpoint == null ? -1 : 
latestCompletedCheckpoint.getDuration();
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9e540daf/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 852b238..348282c 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -36,7 +36,7 @@ import org.apache.flink.configuration.{ConfigConstants, 
Configuration, GlobalCon
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.core.io.InputSplitAssigner
 import org.apache.flink.metrics.{Gauge, MetricGroup, MetricRegistry => 
FlinkMetricRegistry}
-import org.apache.flink.metrics.groups.JobManagerMetricGroup
+import org.apache.flink.metrics.groups.{JobManagerMetricGroup, 
UnregisteredMetricsGroup}
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
 import org.apache.flink.runtime.blob.BlobServer
@@ -1224,11 +1224,21 @@ class JobManager(
             if (isStatsDisabled) {
               new DisabledCheckpointStatsTracker()
             } else {
+
+              val jobMetrics = jobManagerMetricGroup match {
+                case Some(group) =>
+                  group.addJob(jobGraph.getJobID, jobGraph.getName) match {
+                    case (jobGroup:Any) => jobGroup
+                    case null => new UnregisteredMetricsGroup()
+                  }
+                case None =>
+                  new UnregisteredMetricsGroup()
+              }
               val historySize: Int = flinkConfiguration.getInteger(
                 ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
                 
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE)
 
-              new SimpleCheckpointStatsTracker(historySize, ackVertices)
+              new SimpleCheckpointStatsTracker(historySize, ackVertices, 
jobMetrics)
             }
 
           val jobParallelism = jobGraph.getSerializedExecutionConfig
@@ -1655,6 +1665,7 @@ class JobManager(
       case t: Throwable =>
         log.error(s"Could not properly unregister job $jobID form the library 
cache.", t)
     }
+    jobManagerMetricGroup.map(_.removeJob(jobID))
 
     futureOption
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/9e540daf/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
index e8ffb08..9265ab1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.checkpoint.stats;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -54,7 +55,7 @@ public class SimpleCheckpointStatsTrackerTest {
        @Test
        public void testNoCompletedCheckpointYet() throws Exception {
                CheckpointStatsTracker tracker = new 
SimpleCheckpointStatsTracker(
-                               0, Collections.<ExecutionJobVertex>emptyList());
+                               0, Collections.<ExecutionJobVertex>emptyList(), 
new UnregisteredMetricsGroup());
 
                assertFalse(tracker.getJobStats().isDefined());
                assertFalse(tracker.getOperatorStats(new 
JobVertexID()).isDefined());
@@ -64,7 +65,7 @@ public class SimpleCheckpointStatsTrackerTest {
        public void testRandomStats() throws Exception {
                CompletedCheckpoint[] checkpoints = 
generateRandomCheckpoints(16);
                List<ExecutionJobVertex> tasksToWaitFor = 
createTasksToWaitFor(checkpoints[0]);
-               CheckpointStatsTracker tracker = new 
SimpleCheckpointStatsTracker(10, tasksToWaitFor);
+               CheckpointStatsTracker tracker = new 
SimpleCheckpointStatsTracker(10, tasksToWaitFor, new 
UnregisteredMetricsGroup());
 
                for (int i = 0; i < checkpoints.length; i++) {
                        CompletedCheckpoint checkpoint = checkpoints[i];
@@ -80,7 +81,7 @@ public class SimpleCheckpointStatsTrackerTest {
        public void testIllegalOperatorId() throws Exception {
                CompletedCheckpoint[] checkpoints = 
generateRandomCheckpoints(16);
                List<ExecutionJobVertex> tasksToWaitFor = 
createTasksToWaitFor(checkpoints[0]);
-               CheckpointStatsTracker tracker = new 
SimpleCheckpointStatsTracker(10, tasksToWaitFor);
+               CheckpointStatsTracker tracker = new 
SimpleCheckpointStatsTracker(10, tasksToWaitFor, new 
UnregisteredMetricsGroup());
 
                for (CompletedCheckpoint checkpoint : checkpoints) {
                        tracker.onCompletedCheckpoint(checkpoint);
@@ -95,7 +96,7 @@ public class SimpleCheckpointStatsTrackerTest {
        public void testCompletedCheckpointReordering() throws Exception {
                CompletedCheckpoint[] checkpoints = 
generateRandomCheckpoints(2);
                List<ExecutionJobVertex> tasksToWaitFor = 
createTasksToWaitFor(checkpoints[0]);
-               CheckpointStatsTracker tracker = new 
SimpleCheckpointStatsTracker(10, tasksToWaitFor);
+               CheckpointStatsTracker tracker = new 
SimpleCheckpointStatsTracker(10, tasksToWaitFor, new 
UnregisteredMetricsGroup());
 
                // First the second checkpoint notifies
                tracker.onCompletedCheckpoint(checkpoints[1]);
@@ -115,7 +116,7 @@ public class SimpleCheckpointStatsTrackerTest {
        public void testOperatorStateCachedClearedOnNewCheckpoint() throws 
Exception {
                CompletedCheckpoint[] checkpoints = 
generateRandomCheckpoints(2);
                List<ExecutionJobVertex> tasksToWaitFor = 
createTasksToWaitFor(checkpoints[0]);
-               CheckpointStatsTracker tracker = new 
SimpleCheckpointStatsTracker(10, tasksToWaitFor);
+               CheckpointStatsTracker tracker = new 
SimpleCheckpointStatsTracker(10, tasksToWaitFor, new 
UnregisteredMetricsGroup());
 
                tracker.onCompletedCheckpoint(checkpoints[0]);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9e540daf/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java
new file mode 100644
index 0000000..5759888
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerMetricTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.junit.Test;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.metrics.MetricRegistry.KEY_METRICS_SCOPE_NAMING_JM_JOB;
+import static org.junit.Assert.assertEquals;
+
+public class JobManagerMetricTest {
+       /**
+        * Tests that metrics registered on the JobManager are actually 
accessible.
+        *
+        * @throws Exception
+        */
+       @Test
+       public void testJobManagerMetricAccess() throws Exception {
+               Deadline deadline = new FiniteDuration(2, 
TimeUnit.MINUTES).fromNow();
+               Configuration flinkConfiguration = new Configuration();
+
+               flinkConfiguration.setString(KEY_METRICS_SCOPE_NAMING_JM_JOB, 
"jobmanager.<job_name>");
+
+               TestingCluster flink = new TestingCluster(flinkConfiguration);
+
+               try {
+                       flink.start();
+
+                       JobVertex sourceJobVertex = new JobVertex("Source");
+                       
sourceJobVertex.setInvokableClass(BlockingInvokable.class);
+
+                       JobGraph jobGraph = new JobGraph("TestingJob", 
sourceJobVertex);
+                       jobGraph.setSnapshotSettings(new 
JobSnapshottingSettings(
+                               Collections.<JobVertexID>emptyList(),
+                               Collections.<JobVertexID>emptyList(),
+                               Collections.<JobVertexID>emptyList(),
+                               500, 500, 50, 5));
+
+                       flink.waitForActorsToBeAlive();
+
+                       flink.submitJobDetached(jobGraph);
+
+                       Future<Object> jobRunning = 
flink.getLeaderGateway(deadline.timeLeft())
+                               .ask(new 
TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(), 
JobStatus.RUNNING), deadline.timeLeft());
+                       Await.ready(jobRunning, deadline.timeLeft());
+
+                       MBeanServer mBeanServer = 
ManagementFactory.getPlatformMBeanServer();
+                       ObjectName objectName1 = new 
ObjectName("org.apache.flink.metrics:key0=jobmanager,key1=TestingJob,name=lastCheckpointSize");
+                       assertEquals(-1L, mBeanServer.getAttribute(objectName1, 
"Value"));
+
+                       Future<Object> jobFinished = 
flink.getLeaderGateway(deadline.timeLeft())
+                               .ask(new 
TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), 
deadline.timeLeft());
+
+                       BlockingInvokable.unblock();
+
+                       // wait til the job has finished
+                       Await.ready(jobFinished, deadline.timeLeft());
+               } finally {
+                       flink.stop();
+               }
+       }
+
+       public static class BlockingInvokable extends AbstractInvokable {
+               private static boolean blocking = true;
+               private static final Object lock = new Object();
+
+               @Override
+               public void invoke() throws Exception {
+                       while (blocking) {
+                               synchronized (lock) {
+                                       lock.wait();
+                               }
+                       }
+               }
+
+               public static void unblock() {
+                       blocking = false;
+
+                       synchronized (lock) {
+                               lock.notifyAll();
+                       }
+               }
+       }
+}

Reply via email to