Repository: flink
Updated Branches:
  refs/heads/master 62cb954d9 -> ee3c7a88b


[FLINK-4093] Expose metric interfaces

This closes #2134


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d43bf8d9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d43bf8d9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d43bf8d9

Branch: refs/heads/master
Commit: d43bf8d9b3085d1341bfca61e05c2a77e5426226
Parents: 62cb954
Author: zentol <ches...@apache.org>
Authored: Wed Jun 22 10:37:03 2016 +0200
Committer: zentol <ches...@apache.org>
Committed: Mon Jun 27 15:23:27 2016 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/metrics/Counter.java  | 24 ++-----
 .../java/org/apache/flink/metrics/Gauge.java    |  5 +-
 .../org/apache/flink/metrics/MetricGroup.java   | 32 +++++++--
 .../org/apache/flink/metrics/SimpleCounter.java | 71 ++++++++++++++++++++
 .../metrics/groups/AbstractMetricGroup.java     | 16 ++++-
 .../groups/UnregisteredMetricsGroup.java        | 19 ++++--
 .../flink/runtime/taskmanager/TaskManager.scala | 41 ++++++-----
 .../partition/consumer/InputChannelTest.java    |  4 +-
 8 files changed, 154 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d43bf8d9/flink-core/src/main/java/org/apache/flink/metrics/Counter.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Counter.java 
b/flink-core/src/main/java/org/apache/flink/metrics/Counter.java
index acc37cf..ffb1cc7 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/Counter.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/Counter.java
@@ -24,48 +24,36 @@ import org.apache.flink.annotation.PublicEvolving;
  * A Counter is a {@link Metric} that measures a count.
  */
 @PublicEvolving
-public final class Counter implements Metric {
-
-       private long count;
+public interface Counter extends Metric {
 
        /**
         * Increment the current count by 1.
         */
-       public void inc() {
-               count++;
-       }
+       void inc();
 
        /**
         * Increment the current count by the given value.
         *
         * @param n value to increment the current count by
         */
-       public void inc(long n) {
-               count += n;
-       }
+       void inc(long n);
 
        /**
         * Decrement the current count by 1.
         */
-       public void dec() {
-               count--;
-       }
+       void dec();
 
        /**
         * Decrement the current count by the given value.
         *
         * @param n value to decrement the current count by
         */
-       public void dec(long n) {
-               count -= n;
-       }
+       void dec(long n);
 
        /**
         * Returns the current count.
         *
         * @return current count
         */
-       public long getCount() {
-               return count;
-       }
+       long getCount();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d43bf8d9/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java 
b/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java
index aad8deb..740645d 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/Gauge.java
@@ -24,12 +24,11 @@ import org.apache.flink.annotation.PublicEvolving;
  * A Gauge is a {@link Metric} that calculates a specific value at a point in 
time.
  */
 @PublicEvolving
-public abstract class Gauge<T> implements Metric {
-
+public interface Gauge<T> extends Metric {
        /**
         * Calculates and returns the measured value.
         *
         * @return calculated value
         */
-       public abstract T getValue();
+       T getValue();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d43bf8d9/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java 
b/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
index 6c9e044..b131949 100644
--- a/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
+++ b/flink-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
@@ -63,7 +63,7 @@ public interface MetricGroup {
         * Creates and registers a new {@link org.apache.flink.metrics.Counter} 
with Flink.
         *
         * @param name name of the counter
-        * @return the registered counter
+        * @return the created counter
         */
        Counter counter(int name);
 
@@ -71,19 +71,39 @@ public interface MetricGroup {
         * Creates and registers a new {@link org.apache.flink.metrics.Counter} 
with Flink.
         *
         * @param name name of the counter
-        * @return the registered counter
+        * @return the created counter
         */
        Counter counter(String name);
 
        /**
+        * Registers a {@link org.apache.flink.metrics.Counter} with Flink.
+        *
+        * @param name    name of the counter
+        * @param counter counter to register
+        * @param <C>     counter type
+        * @return the given counter
+        */
+       <C extends Counter> C counter(int name, C counter);
+
+       /**
+        * Registers a {@link org.apache.flink.metrics.Counter} with Flink.
+        *
+        * @param name    name of the counter
+        * @param counter counter to register
+        * @param <C>     counter type
+        * @return the given counter
+        */
+       <C extends Counter> C counter(String name, C counter);
+       
+       /**
         * Registers a new {@link org.apache.flink.metrics.Gauge} with Flink.
         *
         * @param name  name of the gauge
         * @param gauge gauge to register
         * @param <T>   return type of the gauge
-        * @return the registered gauge
+        * @return the given gauge
         */
-       <T> Gauge<T> gauge(int name, Gauge<T> gauge);
+       <T, G extends Gauge<T>> G gauge(int name, G gauge);
 
        /**
         * Registers a new {@link org.apache.flink.metrics.Gauge} with Flink.
@@ -91,9 +111,9 @@ public interface MetricGroup {
         * @param name  name of the gauge
         * @param gauge gauge to register
         * @param <T>   return type of the gauge
-        * @return the registered gauge
+        * @return the given gauge
         */
-       <T> Gauge<T> gauge(String name, Gauge<T> gauge);
+       <T, G extends Gauge<T>> G gauge(String name, G gauge);
 
        // 
------------------------------------------------------------------------
        // Groups

http://git-wip-us.apache.org/repos/asf/flink/blob/d43bf8d9/flink-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java 
b/flink-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java
new file mode 100644
index 0000000..9720b08
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/metrics/SimpleCounter.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.metrics;
+
+/**
+ * A simple low-overhead {@link org.apache.flink.metrics.Counter} that is not 
thread-safe.
+ */
+public class SimpleCounter implements Counter {
+       private long count;
+
+       /**
+        * Increment the current count by 1.
+        */
+       @Override
+       public void inc() {
+               count++;
+       }
+
+       /**
+        * Increment the current count by the given value.
+        *
+        * @param n value to increment the current count by
+        */
+       @Override
+       public void inc(long n) {
+               count += n;
+       }
+
+       /**
+        * Decrement the current count by 1.
+        */
+       @Override
+       public void dec() {
+               count--;
+       }
+
+       /**
+        * Decrement the current count by the given value.
+        *
+        * @param n value to decrement the current count by
+        */
+       @Override
+       public void dec(long n) {
+               count -= n;
+       }
+
+       /**
+        * Returns the current count.
+        *
+        * @return current count
+        */
+       @Override
+       public long getCount() {
+               return count;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d43bf8d9/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
index 032fa04..93eb734 100644
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/AbstractMetricGroup.java
@@ -24,6 +24,7 @@ import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.MetricRegistry;
+import org.apache.flink.metrics.SimpleCounter;
 
 import org.apache.flink.metrics.groups.scope.ScopeFormat;
 import org.slf4j.Logger;
@@ -146,18 +147,27 @@ public abstract class AbstractMetricGroup implements 
MetricGroup {
 
        @Override
        public Counter counter(String name) {
-               Counter counter = new Counter();
+               return counter(name, new SimpleCounter());
+       }
+       
+       @Override
+       public <C extends Counter> C counter(int name, C counter) {
+               return counter(String.valueOf(name), counter);
+       }
+
+       @Override
+       public <C extends Counter> C counter(String name, C counter) {
                addMetric(name, counter);
                return counter;
        }
 
        @Override
-       public <T> Gauge<T> gauge(int name, Gauge<T> gauge) {
+       public <T, G extends Gauge<T>> G gauge(int name, G gauge) {
                return gauge(String.valueOf(name), gauge);
        }
 
        @Override
-       public <T> Gauge<T> gauge(String name, Gauge<T> gauge) {
+       public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
                addMetric(name, gauge);
                return gauge;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/d43bf8d9/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
index 961bcce..29d71d9 100644
--- 
a/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
+++ 
b/flink-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
 
 /**
  * A special {@link MetricGroup} that does not register any metrics at the 
metrics registry
@@ -42,21 +43,31 @@ public class UnregisteredMetricsGroup implements 
MetricGroup {
 
        @Override
        public Counter counter(int name) {
-               return new Counter();
+               return new SimpleCounter();
        }
 
        @Override
        public Counter counter(String name) {
-               return new Counter();
+               return new SimpleCounter();
        }
 
        @Override
-       public <T> Gauge<T> gauge(int name, Gauge<T> gauge) {
+       public <C extends Counter> C counter(int name, C counter) {
+               return counter;
+       }
+
+       @Override
+       public <C extends Counter> C counter(String name, C counter) {
+               return counter;
+       }
+
+       @Override
+       public <T, G extends Gauge<T>> G gauge(int name, G gauge) {
                return gauge;
        }
 
        @Override
-       public <T> Gauge<T> gauge(String name, Gauge<T> gauge) {
+       public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
                return gauge;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d43bf8d9/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 8ef22af..1fb0e09 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -2294,11 +2294,10 @@ object TaskManager {
   private def instantiateClassLoaderMetrics(metrics: MetricGroup) {
     val mxBean = ManagementFactory.getClassLoadingMXBean
 
-    metrics
-      .gauge("ClassesLoaded", new FlinkGauge[Long] {
+    metrics.gauge[Long, FlinkGauge[Long]]("ClassesLoaded", new 
FlinkGauge[Long] {
       override def getValue: Long = mxBean.getTotalLoadedClassCount
     })
-    metrics.gauge("ClassesUnloaded", new FlinkGauge[Long] {
+    metrics.gauge[Long, FlinkGauge[Long]]("ClassesUnloaded", new 
FlinkGauge[Long] {
       override def getValue: Long = mxBean.getUnloadedClassCount
     })
   }
@@ -2308,10 +2307,10 @@ object TaskManager {
 
     for (garbageCollector <- garbageCollectors) {
       val gcGroup = metrics.addGroup("\"" + garbageCollector.getName + "\"")
-      gcGroup.gauge("Count", new FlinkGauge[Long] {
+      gcGroup.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] {
         override def getValue: Long = garbageCollector.getCollectionCount
       })
-      gcGroup.gauge("Time", new FlinkGauge[Long] {
+      gcGroup.gauge[Long, FlinkGauge[Long]]("Time", new FlinkGauge[Long] {
         override def getValue: Long = garbageCollector.getCollectionTime
       })
     }
@@ -2320,24 +2319,24 @@ object TaskManager {
   private def instantiateMemoryMetrics(metrics: MetricGroup) {
     val mxBean = ManagementFactory.getMemoryMXBean
     val heap = metrics.addGroup("Heap")
-    heap.gauge("Used", new FlinkGauge[Long] {
+    heap.gauge[Long, FlinkGauge[Long]]("Used", new FlinkGauge[Long] {
       override def getValue: Long = mxBean.getHeapMemoryUsage.getUsed
     })
-    heap.gauge("Committed", new FlinkGauge[Long] {
+    heap.gauge[Long, FlinkGauge[Long]]("Committed", new FlinkGauge[Long] {
         override def getValue: Long = mxBean.getHeapMemoryUsage.getCommitted
       })
-    heap.gauge("Max", new FlinkGauge[Long] {
+    heap.gauge[Long, FlinkGauge[Long]]("Max", new FlinkGauge[Long] {
         override def getValue: Long = mxBean.getHeapMemoryUsage.getMax
       })
 
     val nonHeap = metrics.addGroup("NonHeap")
-    nonHeap.gauge("Used", new FlinkGauge[Long] {
+    nonHeap.gauge[Long, FlinkGauge[Long]]("Used", new FlinkGauge[Long] {
         override def getValue: Long = mxBean.getNonHeapMemoryUsage.getUsed
       })
-    nonHeap.gauge("Committed", new FlinkGauge[Long] {
+    nonHeap.gauge[Long, FlinkGauge[Long]]("Committed", new FlinkGauge[Long] {
         override def getValue: Long = mxBean.getNonHeapMemoryUsage.getCommitted
       })
-    nonHeap.gauge("Max", new FlinkGauge[Long] {
+    nonHeap.gauge[Long, FlinkGauge[Long]]("Max", new FlinkGauge[Long] {
         override def getValue: Long = mxBean.getNonHeapMemoryUsage.getMax
       })
 
@@ -2346,15 +2345,15 @@ object TaskManager {
     val directObjectName = new 
ObjectName("java.nio:type=BufferPool,name=direct")
 
     val direct = metrics.addGroup("Direct")
-    direct.gauge("Count", new FlinkGauge[Long] {
+    direct.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] {
         override def getValue: Long = con
           .getAttribute(directObjectName, "Count").asInstanceOf[Long]
       })
-    direct.gauge("MemoryUsed", new FlinkGauge[Long] {
+    direct.gauge[Long, FlinkGauge[Long]]("MemoryUsed", new FlinkGauge[Long] {
         override def getValue: Long = con
           .getAttribute(directObjectName, "MemoryUsed").asInstanceOf[Long]
       })
-    direct.gauge("TotalCapacity", new FlinkGauge[Long] {
+    direct.gauge[Long, FlinkGauge[Long]]("TotalCapacity", new FlinkGauge[Long] 
{
         override def getValue: Long = con
           .getAttribute(directObjectName, "TotalCapacity").asInstanceOf[Long]
       })
@@ -2362,15 +2361,15 @@ object TaskManager {
     val mappedObjectName = new 
ObjectName("java.nio:type=BufferPool,name=direct")
 
     val mapped = metrics.addGroup("Mapped")
-    mapped.gauge("Count", new FlinkGauge[Long] {
+    mapped.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] {
         override def getValue: Long = con
           .getAttribute(mappedObjectName, "Count").asInstanceOf[Long]
       })
-    mapped.gauge("MemoryUsed", new FlinkGauge[Long] {
+    mapped.gauge[Long, FlinkGauge[Long]]("MemoryUsed", new FlinkGauge[Long] {
         override def getValue: Long = con
           .getAttribute(mappedObjectName, "MemoryUsed").asInstanceOf[Long]
       })
-    mapped.gauge("TotalCapacity", new FlinkGauge[Long] {
+    mapped.gauge[Long, FlinkGauge[Long]]("TotalCapacity", new FlinkGauge[Long] 
{
         override def getValue: Long = con
           .getAttribute(mappedObjectName, "TotalCapacity").asInstanceOf[Long]
       })
@@ -2379,8 +2378,7 @@ object TaskManager {
   private def instantiateThreadMetrics(metrics: MetricGroup): Unit = {
     val mxBean = ManagementFactory.getThreadMXBean
 
-    metrics
-      .gauge("Count", new FlinkGauge[Int] {
+    metrics.gauge[Int, FlinkGauge[Int]]("Count", new FlinkGauge[Int] {
       override def getValue: Int = mxBean.getThreadCount
     })
   }
@@ -2390,11 +2388,10 @@ object TaskManager {
       val mxBean = ManagementFactory.getOperatingSystemMXBean
         .asInstanceOf[com.sun.management.OperatingSystemMXBean]
 
-      metrics
-        .gauge("Load", new FlinkGauge[Double] {
+      metrics.gauge[Double, FlinkGauge[Double]]("Load", new FlinkGauge[Double] 
{
           override def getValue: Double = mxBean.getProcessCpuLoad
         })
-      metrics.gauge("Time", new FlinkGauge[Long] {
+      metrics.gauge[Long, FlinkGauge[Long]]("Time", new FlinkGauge[Long] {
           override def getValue: Long = mxBean.getProcessCpuTime
         })
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/d43bf8d9/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
index da15f08..0868398 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -119,7 +119,7 @@ public class InputChannelTest {
                                ResultPartitionID partitionId,
                                Tuple2<Integer, Integer> initialAndMaxBackoff) {
 
-                       super(inputGate, channelIndex, partitionId, 
initialAndMaxBackoff, new Counter());
+                       super(inputGate, channelIndex, partitionId, 
initialAndMaxBackoff, new SimpleCounter());
                }
 
                @Override

Reply via email to