This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 5a89609dc [#1271] improvement(server): change transportTime and
processTime summary to Thread Pool Instead of block (#1272)
5a89609dc is described below
commit 5a89609dc2b6e5167f8373276d90793600559b6a
Author: Qing <[email protected]>
AuthorDate: Wed Nov 1 10:12:07 2023 +0800
[#1271] improvement(server): change transportTime and processTime summary
to Thread Pool Instead of block (#1272)
Change transportTime and processTime summary to Thread Pool Instead of block
Closes #1271
### What changes were proposed in this pull request?
Change transportTime and processTime summary to Thread Pool Instead of block
### Why are the changes needed?
Not block GRPC Thread
Fix: #1271
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
No
---
.../manager/ShuffleManagerServerFactory.java | 2 +-
.../uniffle/common/metrics/EmptyGRPCMetrics.java | 6 ++-
.../apache/uniffle/common/metrics/GRPCMetrics.java | 12 ++++--
.../uniffle/common/metrics/NettyMetrics.java | 6 ++-
.../apache/uniffle/common/metrics/RPCMetrics.java | 45 ++++++++++++++++++++--
.../apache/uniffle/common/rpc/GrpcServerTest.java | 7 +++-
.../uniffle/coordinator/CoordinatorServer.java | 2 +-
.../coordinator/metric/CoordinatorGrpcMetrics.java | 5 ++-
.../uniffle/test/CoordinatorGrpcServerTest.java | 2 +-
.../org/apache/uniffle/server/ShuffleServer.java | 4 +-
.../apache/uniffle/server/ShuffleServerConf.java | 26 +++++++++++++
.../uniffle/server/ShuffleServerGrpcMetrics.java | 4 +-
.../uniffle/server/ShuffleServerNettyMetrics.java | 4 +-
.../server/ShuffleServerGrpcMetricsTest.java | 6 ++-
14 files changed, 105 insertions(+), 26 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactory.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactory.java
index 7015aca24..982d9f77f 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactory.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactory.java
@@ -49,7 +49,7 @@ public class ShuffleManagerServerFactory {
}
return GrpcServer.Builder.newBuilder()
.conf(conf)
- .grpcMetrics(GRPCMetrics.getEmptyGRPCMetrics())
+ .grpcMetrics(GRPCMetrics.getEmptyGRPCMetrics(conf))
.addService(service)
.build();
} else {
diff --git
a/common/src/main/java/org/apache/uniffle/common/metrics/EmptyGRPCMetrics.java
b/common/src/main/java/org/apache/uniffle/common/metrics/EmptyGRPCMetrics.java
index 4b99d50f3..bf69e8c0e 100644
---
a/common/src/main/java/org/apache/uniffle/common/metrics/EmptyGRPCMetrics.java
+++
b/common/src/main/java/org/apache/uniffle/common/metrics/EmptyGRPCMetrics.java
@@ -17,10 +17,12 @@
package org.apache.uniffle.common.metrics;
+import org.apache.uniffle.common.config.RssConf;
+
public class EmptyGRPCMetrics extends GRPCMetrics {
- public EmptyGRPCMetrics(String tags) {
- super(tags);
+ public EmptyGRPCMetrics(RssConf rssConf, String tags) {
+ super(rssConf, tags);
}
@Override
diff --git
a/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
b/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
index 99786c1eb..7b56e0fa6 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
@@ -20,6 +20,7 @@ package org.apache.uniffle.common.metrics;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
+import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.Constants;
public abstract class GRPCMetrics extends RPCMetrics {
@@ -40,10 +41,11 @@ public abstract class GRPCMetrics extends RPCMetrics {
protected Gauge.Child gaugeGrpcOpen;
protected Counter.Child counterGrpcTotal;
- public GRPCMetrics(String tags) {
- super(tags);
+ public GRPCMetrics(RssConf rssConf, String tags) {
+ super(rssConf, tags);
}
+ @Override
public abstract void registerMetrics();
@Override
@@ -61,6 +63,7 @@ public abstract class GRPCMetrics extends RPCMetrics {
metricsManager.addLabeledGauge(GRPC_SERVER_CONNECTION_NUMBER));
}
+ @Override
public void incCounter(String methodName) {
if (isRegistered) {
super.incCounter(methodName);
@@ -69,6 +72,7 @@ public abstract class GRPCMetrics extends RPCMetrics {
}
}
+ @Override
public void decCounter(String methodName) {
if (isRegistered) {
super.decCounter(methodName);
@@ -84,7 +88,7 @@ public abstract class GRPCMetrics extends RPCMetrics {
return counterGrpcTotal;
}
- public static GRPCMetrics getEmptyGRPCMetrics() {
- return new EmptyGRPCMetrics(Constants.SHUFFLE_SERVER_VERSION);
+ public static GRPCMetrics getEmptyGRPCMetrics(RssConf rssConf) {
+ return new EmptyGRPCMetrics(rssConf, Constants.SHUFFLE_SERVER_VERSION);
}
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/metrics/NettyMetrics.java
b/common/src/main/java/org/apache/uniffle/common/metrics/NettyMetrics.java
index 6ba4fcc03..0879e2075 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/NettyMetrics.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/NettyMetrics.java
@@ -20,6 +20,8 @@ package org.apache.uniffle.common.metrics;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
+import org.apache.uniffle.common.config.RssConf;
+
public abstract class NettyMetrics extends RPCMetrics {
private static final String NETTY_ACTIVE_CONNECTION =
"netty_active_connection";
@@ -28,8 +30,8 @@ public abstract class NettyMetrics extends RPCMetrics {
protected Gauge.Child gaugeNettyActiveConn;
protected Counter.Child counterNettyException;
- public NettyMetrics(String tags) {
- super(tags);
+ public NettyMetrics(RssConf rssConf, String tags) {
+ super(rssConf, tags);
}
@Override
diff --git
a/common/src/main/java/org/apache/uniffle/common/metrics/RPCMetrics.java
b/common/src/main/java/org/apache/uniffle/common/metrics/RPCMetrics.java
index 819ebdee7..fbfb8123b 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/RPCMetrics.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/RPCMetrics.java
@@ -18,27 +18,63 @@
package org.apache.uniffle.common.metrics;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import io.prometheus.client.Summary;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.JavaUtils;
+import org.apache.uniffle.common.util.ThreadUtils;
public abstract class RPCMetrics {
+ private static final Logger LOG = LoggerFactory.getLogger(RPCMetrics.class);
+
protected boolean isRegistered = false;
protected Map<String, Counter.Child> counterMap =
JavaUtils.newConcurrentMap();
protected Map<String, Gauge.Child> gaugeMap = JavaUtils.newConcurrentMap();
protected Map<String, Summary.Child> transportTimeSummaryMap =
JavaUtils.newConcurrentMap();
protected Map<String, Summary.Child> processTimeSummaryMap =
JavaUtils.newConcurrentMap();
+ private static final String THREAD_POOL_CORE_SIZE =
+ "rss.server.summary.metric.thread.pool.core.size";
+ private static final int THREAD_POOL_CORE_SIZE_DEFAULT_VALUE = 2;
+ private static final String THREAD_POOL_MAX_SIZE =
+ "rss.server.summary.metric.thread.pool.max.size";
+ private static final int THREAD_POOL_MAX_SIZE_DEFAULT_VALUE = 20;
+ private static final String KEEP_ALIVE_TIME =
+ "rss.server.summary.metric.thread.pool.keep.alive.time";
+ private static final int KEEP_ALIVE_TIME_DEFAULT_VALUE = 60;
+ private final ExecutorService summaryObservePool;
protected MetricsManager metricsManager;
protected String tags;
- public RPCMetrics(String tags) {
+ public RPCMetrics(RssConf rssConf, String tags) {
this.tags = tags;
+ int coreSize = rssConf.getInteger(THREAD_POOL_CORE_SIZE,
THREAD_POOL_CORE_SIZE_DEFAULT_VALUE);
+ int maxSize = rssConf.getInteger(THREAD_POOL_MAX_SIZE,
THREAD_POOL_MAX_SIZE_DEFAULT_VALUE);
+ int keepAliveTime = rssConf.getInteger(KEEP_ALIVE_TIME,
KEEP_ALIVE_TIME_DEFAULT_VALUE);
+ this.summaryObservePool =
+ new ThreadPoolExecutor(
+ coreSize,
+ maxSize,
+ keepAliveTime,
+ TimeUnit.SECONDS,
+ Queues.newLinkedBlockingQueue(),
+ ThreadUtils.getThreadFactory("SummaryObserveThreadPool"));
+ LOG.info(
+ "Init summary observe thread pool, core size:{}, max size:{}, keep
alive time:{}",
+ coreSize,
+ maxSize,
+ keepAliveTime);
}
public abstract void registerMetrics();
@@ -116,14 +152,17 @@ public abstract class RPCMetrics {
public void recordTransportTime(String methodName, long
transportTimeInMillionSecond) {
Summary.Child summary = transportTimeSummaryMap.get(methodName);
if (summary != null) {
- summary.observe(transportTimeInMillionSecond /
Constants.MILLION_SECONDS_PER_SECOND);
+ summaryObservePool.execute(
+ () ->
+ summary.observe(transportTimeInMillionSecond /
Constants.MILLION_SECONDS_PER_SECOND));
}
}
public void recordProcessTime(String methodName, long
processTimeInMillionSecond) {
Summary.Child summary = processTimeSummaryMap.get(methodName);
if (summary != null) {
- summary.observe(processTimeInMillionSecond /
Constants.MILLION_SECONDS_PER_SECOND);
+ summaryObservePool.execute(
+ () -> summary.observe(processTimeInMillionSecond /
Constants.MILLION_SECONDS_PER_SECOND));
}
}
diff --git
a/common/src/test/java/org/apache/uniffle/common/rpc/GrpcServerTest.java
b/common/src/test/java/org/apache/uniffle/common/rpc/GrpcServerTest.java
index 2142bfcf6..9b4a11e8b 100644
--- a/common/src/test/java/org/apache/uniffle/common/rpc/GrpcServerTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/rpc/GrpcServerTest.java
@@ -27,6 +27,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.metrics.GRPCMetrics;
import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.proto.ShuffleManagerGrpc;
@@ -43,8 +44,9 @@ public class GrpcServerTest {
@Test
public void testGrpcExecutorPool() throws Exception {
// Explicitly setting the synchronizing variable as false at the beginning
of test run
+ RssConf rssConf = new RssConf();
GrpcServer.reset();
- GRPCMetrics grpcMetrics = GRPCMetrics.getEmptyGRPCMetrics();
+ GRPCMetrics grpcMetrics = GRPCMetrics.getEmptyGRPCMetrics(rssConf);
grpcMetrics.register(new CollectorRegistry(true));
GrpcServer.GrpcThreadPoolExecutor executor =
new GrpcServer.GrpcThreadPoolExecutor(
@@ -96,7 +98,8 @@ public class GrpcServerTest {
@Test
public void testRandomPort() throws Exception {
- GRPCMetrics grpcMetrics = GRPCMetrics.getEmptyGRPCMetrics();
+ RssConf rssConf = new RssConf();
+ GRPCMetrics grpcMetrics = GRPCMetrics.getEmptyGRPCMetrics(rssConf);
grpcMetrics.register(new CollectorRegistry(true));
RssBaseConf conf = new RssBaseConf();
conf.set(RPC_SERVER_PORT, 0);
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
index 30373500b..2c22d7cf6 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
@@ -212,7 +212,7 @@ public class CoordinatorServer extends ReconfigurableBase {
LOG.info("Register metrics");
CollectorRegistry coordinatorCollectorRegistry = new
CollectorRegistry(true);
CoordinatorMetrics.register(coordinatorCollectorRegistry);
- grpcMetrics = new CoordinatorGrpcMetrics();
+ grpcMetrics = new CoordinatorGrpcMetrics(coordinatorConf);
grpcMetrics.register(new CollectorRegistry(true));
boolean verbose =
coordinatorConf.getBoolean(CoordinatorConf.RSS_JVM_METRICS_VERBOSE_ENABLE);
CollectorRegistry jvmCollectorRegistry = new CollectorRegistry(true);
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorGrpcMetrics.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorGrpcMetrics.java
index e028ab216..43f7f2e11 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorGrpcMetrics.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorGrpcMetrics.java
@@ -17,6 +17,7 @@
package org.apache.uniffle.coordinator.metric;
+import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.metrics.GRPCMetrics;
import org.apache.uniffle.common.util.Constants;
@@ -31,8 +32,8 @@ public class CoordinatorGrpcMetrics extends GRPCMetrics {
"grpc_get_shuffle_assignments_total";
private static final String GRPC_HEARTBEAT_TOTAL = "grpc_heartbeat_total";
- public CoordinatorGrpcMetrics() {
- super(Constants.COORDINATOR_TAG);
+ public CoordinatorGrpcMetrics(RssConf rssConf) {
+ super(rssConf, Constants.COORDINATOR_TAG);
}
@Override
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java
index 5a83d72c4..242004553 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java
@@ -61,7 +61,7 @@ public class CoordinatorGrpcServerTest {
baseConf.set(RssBaseConf.RPC_SERVER_PORT, 20001);
baseConf.set(RssBaseConf.RPC_EXECUTOR_SIZE, 2);
- GRPCMetrics grpcMetrics = new CoordinatorGrpcMetrics();
+ GRPCMetrics grpcMetrics = new CoordinatorGrpcMetrics(baseConf);
grpcMetrics.register(new CollectorRegistry(true));
GrpcServer grpcServer =
GrpcServer.Builder.newBuilder()
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index 4049ce639..eecfc0e06 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -296,9 +296,9 @@ public class ShuffleServer {
CollectorRegistry shuffleServerCollectorRegistry = new
CollectorRegistry(true);
String tags = coverToString();
ShuffleServerMetrics.register(shuffleServerCollectorRegistry, tags);
- grpcMetrics = new ShuffleServerGrpcMetrics(tags);
+ grpcMetrics = new ShuffleServerGrpcMetrics(this.shuffleServerConf, tags);
grpcMetrics.register(new CollectorRegistry(true));
- nettyMetrics = new ShuffleServerNettyMetrics(tags);
+ nettyMetrics = new ShuffleServerNettyMetrics(shuffleServerConf, tags);
nettyMetrics.register(new CollectorRegistry(true));
CollectorRegistry jvmCollectorRegistry = new CollectorRegistry(true);
boolean verbose =
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index 0a2028478..5237da481 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -514,6 +514,32 @@ public class ShuffleServerConf extends RssBaseConf {
+ "network_bandwidth = 10Gbps, buffer size should be ~
1.25MB."
+ "Default is 0, OS will dynamically adjust the buf size.");
+ public static final ConfigOption<Integer> SUMMARY_METRIC_WAIT_QUEUE_SIZE =
+ ConfigOptions.key("rss.server.summary.metric.wait.queue.size")
+ .intType()
+ .defaultValue(1000)
+ .withDescription(
+ "size of waiting queue for thread pool that used for calc
summary metric.");
+
+ public static final ConfigOption<Integer>
SUMMARY_METRIC_THREAD_POOL_CORE_SIZE =
+ ConfigOptions.key("rss.server.summary.metric.thread.pool.core.size")
+ .intType()
+ .defaultValue(2)
+ .withDescription("core thread number of thread pool that used for
calc summary metric.");
+
+ public static final ConfigOption<Integer>
SUMMARY_METRIC_THREAD_POOL_MAX_SIZE =
+ ConfigOptions.key("rss.server.summary.metric.thread.pool.max.size")
+ .intType()
+ .defaultValue(20)
+ .withDescription("max thread number of thread pool that used for
calc summary metric.");
+
+ public static final ConfigOption<Integer>
SUMMARY_METRIC_THREAD_POOL_KEEP_ALIVE_TIME =
+
ConfigOptions.key("rss.server.summary.metric.thread.pool.keep.alive.time")
+ .intType()
+ .defaultValue(60)
+ .withDescription(
+ "keep alive time of thread pool that used for calc summary
metric, in SECONDS.");
+
public ShuffleServerConf() {}
public ShuffleServerConf(String fileName) {
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcMetrics.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcMetrics.java
index f0f69c6d7..f4c53ad76 100644
---
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcMetrics.java
+++
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcMetrics.java
@@ -72,8 +72,8 @@ public class ShuffleServerGrpcMetrics extends GRPCMetrics {
private static final String GRPC_GET_MEMORY_SHUFFLE_DATA_PROCESS_LATENCY =
"grpc_get_memory_shuffle_data_process_latency";
- public ShuffleServerGrpcMetrics(String tags) {
- super(tags);
+ public ShuffleServerGrpcMetrics(ShuffleServerConf shuffleServerConf, String
tags) {
+ super(shuffleServerConf, tags);
}
@Override
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerNettyMetrics.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerNettyMetrics.java
index afd242dcc..b8a3aed24 100644
---
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerNettyMetrics.java
+++
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerNettyMetrics.java
@@ -36,8 +36,8 @@ public class ShuffleServerNettyMetrics extends NettyMetrics {
private static final String NETTY_GET_MEMORY_SHUFFLE_DATA_REQUEST =
"netty_get_memory_shuffle_data_request";
- public ShuffleServerNettyMetrics(String tags) {
- super(tags);
+ public ShuffleServerNettyMetrics(ShuffleServerConf shuffleServerConf, String
tags) {
+ super(shuffleServerConf, tags);
}
@Override
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerGrpcMetricsTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerGrpcMetricsTest.java
index a9f56d4a2..27ef4807b 100644
---
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerGrpcMetricsTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerGrpcMetricsTest.java
@@ -29,9 +29,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class ShuffleServerGrpcMetricsTest {
@Test
- public void testLatencyMetrics() {
+ public void testLatencyMetrics() throws InterruptedException {
+ ShuffleServerConf shuffleServerConf = new ShuffleServerConf();
ShuffleServerGrpcMetrics metrics =
- new ShuffleServerGrpcMetrics(Constants.SHUFFLE_SERVER_VERSION);
+ new ShuffleServerGrpcMetrics(shuffleServerConf,
Constants.SHUFFLE_SERVER_VERSION);
metrics.register(new CollectorRegistry(true));
metrics.recordTransportTime(ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD,
1000);
metrics.recordTransportTime(ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD,
500);
@@ -44,6 +45,7 @@ public class ShuffleServerGrpcMetricsTest {
assertEquals(3, sendTimeSummaryTime.size());
assertEquals(3, processTimeSummaryTime.size());
+ Thread.sleep(1000L);
assertEquals(
1D,
sendTimeSummaryTime.get(ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD).get().sum);
assertEquals(