zentol commented on a change in pull request #8485: [FLINK-12555] Introduce an 
encapsulated metric group layout for shuffle API
URL: https://github.com/apache/flink/pull/8485#discussion_r286919552
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputChannelMetrics.java
 ##########
 @@ -32,44 +32,79 @@
 
        private static final String IO_NUM_BYTES_IN_LOCAL = 
MetricNames.IO_NUM_BYTES_IN + "Local";
        private static final String IO_NUM_BYTES_IN_REMOTE = 
MetricNames.IO_NUM_BYTES_IN + "Remote";
-       private static final String IO_NUM_BYTES_IN_LOCAL_RATE = 
IO_NUM_BYTES_IN_LOCAL + MetricNames.SUFFIX_RATE;
-       private static final String IO_NUM_BYTES_IN_REMOTE_RATE = 
IO_NUM_BYTES_IN_REMOTE + MetricNames.SUFFIX_RATE;
-
        private static final String IO_NUM_BUFFERS_IN_LOCAL = 
MetricNames.IO_NUM_BUFFERS_IN + "Local";
        private static final String IO_NUM_BUFFERS_IN_REMOTE = 
MetricNames.IO_NUM_BUFFERS_IN + "Remote";
-       private static final String IO_NUM_BUFFERS_IN_LOCAL_RATE = 
IO_NUM_BUFFERS_IN_LOCAL + MetricNames.SUFFIX_RATE;
-       private static final String IO_NUM_BUFFERS_IN_REMOTE_RATE = 
IO_NUM_BUFFERS_IN_REMOTE + MetricNames.SUFFIX_RATE;
 
        private final Counter numBytesInLocal;
        private final Counter numBytesInRemote;
        private final Counter numBuffersInLocal;
        private final Counter numBuffersInRemote;
 
        public InputChannelMetrics(MetricGroup parent) {
-               this.numBytesInLocal = parent.counter(IO_NUM_BYTES_IN_LOCAL);
-               this.numBytesInRemote = parent.counter(IO_NUM_BYTES_IN_REMOTE);
-               parent.meter(IO_NUM_BYTES_IN_LOCAL_RATE, new 
MeterView(numBytesInLocal, 60));
-               parent.meter(IO_NUM_BYTES_IN_REMOTE_RATE, new 
MeterView(numBytesInRemote, 60));
-
-               this.numBuffersInLocal = 
parent.counter(IO_NUM_BUFFERS_IN_LOCAL);
-               this.numBuffersInRemote = 
parent.counter(IO_NUM_BUFFERS_IN_REMOTE);
-               parent.meter(IO_NUM_BUFFERS_IN_LOCAL_RATE, new 
MeterView(numBuffersInLocal, 60));
-               parent.meter(IO_NUM_BUFFERS_IN_REMOTE_RATE, new 
MeterView(numBuffersInRemote, 60));
+               this.numBytesInLocal = createCounter(parent, 
IO_NUM_BYTES_IN_LOCAL);
+               this.numBytesInRemote = createCounter(parent, 
IO_NUM_BYTES_IN_REMOTE);
+               this.numBuffersInLocal = createCounter(parent, 
IO_NUM_BUFFERS_IN_LOCAL);
+               this.numBuffersInRemote = createCounter(parent, 
IO_NUM_BUFFERS_IN_REMOTE);
+       }
+
+       private static Counter createCounter(MetricGroup parent, String name) {
+               Counter counter = parent.counter(name);
+               parent.meter(name + MetricNames.SUFFIX_RATE, new 
MeterView(counter, 60));
+               return counter;
+       }
+
+       public void incNumBytesInLocalCounter(long inc) {
+               numBytesInLocal.inc(inc);
        }
 
-       public Counter getNumBytesInLocalCounter() {
-               return numBytesInLocal;
+       public void incNumBytesInRemoteCounter(long inc) {
+               numBytesInRemote.inc(inc);
        }
 
-       public Counter getNumBytesInRemoteCounter() {
-               return numBytesInRemote;
+       public void incNumBuffersInLocalCounter(long inc) {
+               numBuffersInLocal.inc(inc);
        }
 
-       public Counter getNumBuffersInLocalCounter() {
-               return numBuffersInLocal;
+       public void incNumBuffersInRemoteCounter(long inc) {
+               numBuffersInRemote.inc(inc);
        }
 
-       public Counter getNumBuffersInRemoteCounter() {
-               return numBuffersInRemote;
+       /**
+        * Wraps {@link InputChannelMetrics} with legacy metrics.
+        * @deprecated eventually should be removed in favour of normal {@link 
InputChannelMetrics}.
+        */
+       @SuppressWarnings("DeprecatedIsStillUsed")
+       @Deprecated
+       public static class InputChannelMetricsWithLegacy extends 
InputChannelMetrics {
+               private final InputChannelMetrics legacyMetrics;
+
+               public InputChannelMetricsWithLegacy(MetricGroup parent, 
MetricGroup legacyParent) {
+                       super(parent);
+                       legacyMetrics = new InputChannelMetrics(legacyParent);
+               }
+
+               @Override
+               public void incNumBytesInLocalCounter(long inc) {
+                       super.incNumBytesInLocalCounter(inc);
 
 Review comment:
   it's quite easy to encapsulate this logic in a `Counter` implementation, 
which would require significantly less overall changes and would no longer 
introduce a new special case on how metrics are being used.
   
   ```
   public static class CounterWrapper implements Counter {
        private final Counter counter1;
        private final Counter counter2;
   
        private CounterWrapper(Counter counter1, Counter counter2) {
                this.counter1 = counter1;
                this.counter2 = counter2;
        }
   
        @Override
        public void inc() {
                counter1.inc();
                counter2.inc();
        }
   
        @Override
        public void inc(long n) {
                counter1.inc(n);
                counter2.inc(n);
   
        }
   
        @Override
        public void dec() {
                counter1.dec();
                counter2.dec();
        }
   
        @Override
        public void dec(long n) {
                counter1.dec(n);
                counter2.dec(n);
        }
   
        @Override
        public long getCount() {
                // assume that the counters are not accessed directly elsewhere
                return counter1.getCount();
        }
   }
   ```
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to