This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new c19df2ba [ISSUE-388][ISSUE-244][Bug] Fix incorrect usage of
`GRPCMetrics#setGauge` (#404)
c19df2ba is described below
commit c19df2ba51b5af923627aa10f267e0ca2f6afd3b
Author: xianjingfeng <[email protected]>
AuthorDate: Tue Dec 13 12:01:40 2022 +0800
[ISSUE-388][ISSUE-244][Bug] Fix incorrect usage of `GRPCMetrics#setGauge`
(#404)
### What changes were proposed in this pull request?
Fix incorrect usage of `GRPCMetrics#setGauge`
### Why are the changes needed?
It is a bug #244 #388
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
No need
---
.../apache/uniffle/common/metrics/GRPCMetrics.java | 26 ++++++++++++++++++++++
.../org/apache/uniffle/common/rpc/GrpcServer.java | 6 ++---
.../rpc/MonitoringServerTransportFilter.java | 7 ++----
3 files changed, 30 insertions(+), 9 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
b/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
index c693604d..62972968 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
@@ -80,6 +80,32 @@ public abstract class GRPCMetrics {
}
}
+ public void incGauge(String tag) {
+ incGauge(tag, 1);
+ }
+
+ public void incGauge(String tag, double value) {
+ if (isRegistered) {
+ Gauge gauge = gaugeMap.get(tag);
+ if (gauge != null) {
+ gauge.inc(value);
+ }
+ }
+ }
+
+ public void decGauge(String tag) {
+ decGauge(tag, 1);
+ }
+
+ public void decGauge(String tag, double value) {
+ if (isRegistered) {
+ Gauge gauge = gaugeMap.get(tag);
+ if (gauge != null) {
+ gauge.dec(value);
+ }
+ }
+ }
+
public void incCounter(String methodName) {
if (isRegistered) {
Gauge gauge = gaugeMap.get(methodName);
diff --git a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
index 2283a662..be4e916f 100644
--- a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
+++ b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
@@ -99,8 +99,7 @@ public class GrpcServer implements ServerInterface {
@Override
protected void beforeExecute(Thread t, Runnable r) {
- grpcMetrics.setGauge(GRPCMetrics.GRPC_SERVER_EXECUTOR_ACTIVE_THREADS_KEY,
- activeThreadSize.incrementAndGet());
+
grpcMetrics.incGauge(GRPCMetrics.GRPC_SERVER_EXECUTOR_ACTIVE_THREADS_KEY);
grpcMetrics.setGauge(GRPCMetrics.GRPC_SERVER_EXECUTOR_BLOCKING_QUEUE_SIZE_KEY,
getQueue().size());
super.beforeExecute(t, r);
@@ -108,8 +107,7 @@ public class GrpcServer implements ServerInterface {
@Override
protected void afterExecute(Runnable r, Throwable t) {
- grpcMetrics.setGauge(GRPCMetrics.GRPC_SERVER_EXECUTOR_ACTIVE_THREADS_KEY,
- activeThreadSize.decrementAndGet());
+
grpcMetrics.decGauge(GRPCMetrics.GRPC_SERVER_EXECUTOR_ACTIVE_THREADS_KEY);
grpcMetrics.setGauge(GRPCMetrics.GRPC_SERVER_EXECUTOR_BLOCKING_QUEUE_SIZE_KEY,
getQueue().size());
super.afterExecute(r, t);
diff --git
a/common/src/main/java/org/apache/uniffle/common/rpc/MonitoringServerTransportFilter.java
b/common/src/main/java/org/apache/uniffle/common/rpc/MonitoringServerTransportFilter.java
index 085b13fd..2c29dd27 100644
---
a/common/src/main/java/org/apache/uniffle/common/rpc/MonitoringServerTransportFilter.java
+++
b/common/src/main/java/org/apache/uniffle/common/rpc/MonitoringServerTransportFilter.java
@@ -17,8 +17,6 @@
package org.apache.uniffle.common.rpc;
-import java.util.concurrent.atomic.AtomicLong;
-
import io.grpc.Attributes;
import io.grpc.ServerTransportFilter;
@@ -27,7 +25,6 @@ import org.apache.uniffle.common.metrics.GRPCMetrics;
import static
org.apache.uniffle.common.metrics.GRPCMetrics.GRPC_SERVER_CONNECTION_NUMBER_KEY;
public class MonitoringServerTransportFilter extends ServerTransportFilter {
- private final AtomicLong connectionSize = new AtomicLong(0);
private final GRPCMetrics grpcMetrics;
public MonitoringServerTransportFilter(GRPCMetrics grpcMetrics) {
@@ -35,12 +32,12 @@ public class MonitoringServerTransportFilter extends
ServerTransportFilter {
}
public Attributes transportReady(Attributes transportAttrs) {
- grpcMetrics.setGauge(GRPC_SERVER_CONNECTION_NUMBER_KEY,
connectionSize.incrementAndGet());
+ grpcMetrics.incGauge(GRPC_SERVER_CONNECTION_NUMBER_KEY);
return super.transportReady(transportAttrs);
}
public void transportTerminated(Attributes transportAttrs) {
- grpcMetrics.setGauge(GRPC_SERVER_CONNECTION_NUMBER_KEY,
connectionSize.decrementAndGet());
+ grpcMetrics.decGauge(GRPC_SERVER_CONNECTION_NUMBER_KEY);
super.transportTerminated(transportAttrs);
}
}