zentol commented on a change in pull request #8485: [FLINK-12555] Introduce an
encapsulated metric group layout for shuffle API
URL: https://github.com/apache/flink/pull/8485#discussion_r286919552
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputChannelMetrics.java
##########
@@ -32,44 +32,79 @@
private static final String IO_NUM_BYTES_IN_LOCAL =
MetricNames.IO_NUM_BYTES_IN + "Local";
private static final String IO_NUM_BYTES_IN_REMOTE =
MetricNames.IO_NUM_BYTES_IN + "Remote";
- private static final String IO_NUM_BYTES_IN_LOCAL_RATE =
IO_NUM_BYTES_IN_LOCAL + MetricNames.SUFFIX_RATE;
- private static final String IO_NUM_BYTES_IN_REMOTE_RATE =
IO_NUM_BYTES_IN_REMOTE + MetricNames.SUFFIX_RATE;
-
private static final String IO_NUM_BUFFERS_IN_LOCAL =
MetricNames.IO_NUM_BUFFERS_IN + "Local";
private static final String IO_NUM_BUFFERS_IN_REMOTE =
MetricNames.IO_NUM_BUFFERS_IN + "Remote";
- private static final String IO_NUM_BUFFERS_IN_LOCAL_RATE =
IO_NUM_BUFFERS_IN_LOCAL + MetricNames.SUFFIX_RATE;
- private static final String IO_NUM_BUFFERS_IN_REMOTE_RATE =
IO_NUM_BUFFERS_IN_REMOTE + MetricNames.SUFFIX_RATE;
private final Counter numBytesInLocal;
private final Counter numBytesInRemote;
private final Counter numBuffersInLocal;
private final Counter numBuffersInRemote;
public InputChannelMetrics(MetricGroup parent) {
- this.numBytesInLocal = parent.counter(IO_NUM_BYTES_IN_LOCAL);
- this.numBytesInRemote = parent.counter(IO_NUM_BYTES_IN_REMOTE);
- parent.meter(IO_NUM_BYTES_IN_LOCAL_RATE, new
MeterView(numBytesInLocal, 60));
- parent.meter(IO_NUM_BYTES_IN_REMOTE_RATE, new
MeterView(numBytesInRemote, 60));
-
- this.numBuffersInLocal =
parent.counter(IO_NUM_BUFFERS_IN_LOCAL);
- this.numBuffersInRemote =
parent.counter(IO_NUM_BUFFERS_IN_REMOTE);
- parent.meter(IO_NUM_BUFFERS_IN_LOCAL_RATE, new
MeterView(numBuffersInLocal, 60));
- parent.meter(IO_NUM_BUFFERS_IN_REMOTE_RATE, new
MeterView(numBuffersInRemote, 60));
+ this.numBytesInLocal = createCounter(parent,
IO_NUM_BYTES_IN_LOCAL);
+ this.numBytesInRemote = createCounter(parent,
IO_NUM_BYTES_IN_REMOTE);
+ this.numBuffersInLocal = createCounter(parent,
IO_NUM_BUFFERS_IN_LOCAL);
+ this.numBuffersInRemote = createCounter(parent,
IO_NUM_BUFFERS_IN_REMOTE);
+ }
+
+ private static Counter createCounter(MetricGroup parent, String name) {
+ Counter counter = parent.counter(name);
+ parent.meter(name + MetricNames.SUFFIX_RATE, new
MeterView(counter, 60));
+ return counter;
+ }
+
+ public void incNumBytesInLocalCounter(long inc) {
+ numBytesInLocal.inc(inc);
}
- public Counter getNumBytesInLocalCounter() {
- return numBytesInLocal;
+ public void incNumBytesInRemoteCounter(long inc) {
+ numBytesInRemote.inc(inc);
}
- public Counter getNumBytesInRemoteCounter() {
- return numBytesInRemote;
+ public void incNumBuffersInLocalCounter(long inc) {
+ numBuffersInLocal.inc(inc);
}
- public Counter getNumBuffersInLocalCounter() {
- return numBuffersInLocal;
+ public void incNumBuffersInRemoteCounter(long inc) {
+ numBuffersInRemote.inc(inc);
}
- public Counter getNumBuffersInRemoteCounter() {
- return numBuffersInRemote;
+ /**
+ * Wraps {@link InputChannelMetrics} with legacy metrics.
+ * @deprecated eventually should be removed in favour of normal {@link
InputChannelMetrics}.
+ */
+ @SuppressWarnings("DeprecatedIsStillUsed")
+ @Deprecated
+ public static class InputChannelMetricsWithLegacy extends
InputChannelMetrics {
+ private final InputChannelMetrics legacyMetrics;
+
+ public InputChannelMetricsWithLegacy(MetricGroup parent,
MetricGroup legacyParent) {
+ super(parent);
+ legacyMetrics = new InputChannelMetrics(legacyParent);
+ }
+
+ @Override
+ public void incNumBytesInLocalCounter(long inc) {
+ super.incNumBytesInLocalCounter(inc);
Review comment:
it's quite easy to encapsulate this logic in a `Counter` implementation,
which would require significantly less overall changes and would no longer
introduce a new special case on how metrics are being used.
```
public static class CounterWrapper implements Counter {
private final Counter counter1;
private final Counter counter2;
private CounterWrapper(Counter counter1, Counter counter2) {
this.counter1 = counter1;
this.counter2 = counter2;
}
@Override
public void inc() {
counter1.inc();
counter2.inc();
}
@Override
public void inc(long n) {
counter1.inc(n);
counter2.inc(n);
}
@Override
public void dec() {
counter1.dec();
counter2.dec();
}
@Override
public void dec(long n) {
counter1.dec(n);
counter2.dec(n);
}
@Override
public long getCount() {
// assume that the counters are not accessed directly elsewhere
return counter1.getCount();
}
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services