This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a89609dc [#1271] improvement(server): change transportTime and 
processTime summary to Thread Pool Instead of block (#1272)
5a89609dc is described below

commit 5a89609dc2b6e5167f8373276d90793600559b6a
Author: Qing <[email protected]>
AuthorDate: Wed Nov 1 10:12:07 2023 +0800

    [#1271] improvement(server): change transportTime and processTime summary 
to Thread Pool Instead of block (#1272)
    
    Change transportTime and processTime summary to Thread Pool Instead of block
    
    Closes #1271
    
    
    ### What changes were proposed in this pull request?
    
    Change transportTime and processTime summary to Thread Pool Instead of block
    
    ### Why are the changes needed?
    
    Not block GRPC Thread
    
    Fix: #1271
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    No
---
 .../manager/ShuffleManagerServerFactory.java       |  2 +-
 .../uniffle/common/metrics/EmptyGRPCMetrics.java   |  6 ++-
 .../apache/uniffle/common/metrics/GRPCMetrics.java | 12 ++++--
 .../uniffle/common/metrics/NettyMetrics.java       |  6 ++-
 .../apache/uniffle/common/metrics/RPCMetrics.java  | 45 ++++++++++++++++++++--
 .../apache/uniffle/common/rpc/GrpcServerTest.java  |  7 +++-
 .../uniffle/coordinator/CoordinatorServer.java     |  2 +-
 .../coordinator/metric/CoordinatorGrpcMetrics.java |  5 ++-
 .../uniffle/test/CoordinatorGrpcServerTest.java    |  2 +-
 .../org/apache/uniffle/server/ShuffleServer.java   |  4 +-
 .../apache/uniffle/server/ShuffleServerConf.java   | 26 +++++++++++++
 .../uniffle/server/ShuffleServerGrpcMetrics.java   |  4 +-
 .../uniffle/server/ShuffleServerNettyMetrics.java  |  4 +-
 .../server/ShuffleServerGrpcMetricsTest.java       |  6 ++-
 14 files changed, 105 insertions(+), 26 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactory.java
 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactory.java
index 7015aca24..982d9f77f 100644
--- 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactory.java
+++ 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerServerFactory.java
@@ -49,7 +49,7 @@ public class ShuffleManagerServerFactory {
       }
       return GrpcServer.Builder.newBuilder()
           .conf(conf)
-          .grpcMetrics(GRPCMetrics.getEmptyGRPCMetrics())
+          .grpcMetrics(GRPCMetrics.getEmptyGRPCMetrics(conf))
           .addService(service)
           .build();
     } else {
diff --git 
a/common/src/main/java/org/apache/uniffle/common/metrics/EmptyGRPCMetrics.java 
b/common/src/main/java/org/apache/uniffle/common/metrics/EmptyGRPCMetrics.java
index 4b99d50f3..bf69e8c0e 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/metrics/EmptyGRPCMetrics.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/metrics/EmptyGRPCMetrics.java
@@ -17,10 +17,12 @@
 
 package org.apache.uniffle.common.metrics;
 
+import org.apache.uniffle.common.config.RssConf;
+
 public class EmptyGRPCMetrics extends GRPCMetrics {
 
-  public EmptyGRPCMetrics(String tags) {
-    super(tags);
+  public EmptyGRPCMetrics(RssConf rssConf, String tags) {
+    super(rssConf, tags);
   }
 
   @Override
diff --git 
a/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java 
b/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
index 99786c1eb..7b56e0fa6 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/GRPCMetrics.java
@@ -20,6 +20,7 @@ package org.apache.uniffle.common.metrics;
 import io.prometheus.client.Counter;
 import io.prometheus.client.Gauge;
 
+import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.util.Constants;
 
 public abstract class GRPCMetrics extends RPCMetrics {
@@ -40,10 +41,11 @@ public abstract class GRPCMetrics extends RPCMetrics {
   protected Gauge.Child gaugeGrpcOpen;
   protected Counter.Child counterGrpcTotal;
 
-  public GRPCMetrics(String tags) {
-    super(tags);
+  public GRPCMetrics(RssConf rssConf, String tags) {
+    super(rssConf, tags);
   }
 
+  @Override
   public abstract void registerMetrics();
 
   @Override
@@ -61,6 +63,7 @@ public abstract class GRPCMetrics extends RPCMetrics {
         metricsManager.addLabeledGauge(GRPC_SERVER_CONNECTION_NUMBER));
   }
 
+  @Override
   public void incCounter(String methodName) {
     if (isRegistered) {
       super.incCounter(methodName);
@@ -69,6 +72,7 @@ public abstract class GRPCMetrics extends RPCMetrics {
     }
   }
 
+  @Override
   public void decCounter(String methodName) {
     if (isRegistered) {
       super.decCounter(methodName);
@@ -84,7 +88,7 @@ public abstract class GRPCMetrics extends RPCMetrics {
     return counterGrpcTotal;
   }
 
-  public static GRPCMetrics getEmptyGRPCMetrics() {
-    return new EmptyGRPCMetrics(Constants.SHUFFLE_SERVER_VERSION);
+  public static GRPCMetrics getEmptyGRPCMetrics(RssConf rssConf) {
+    return new EmptyGRPCMetrics(rssConf, Constants.SHUFFLE_SERVER_VERSION);
   }
 }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/metrics/NettyMetrics.java 
b/common/src/main/java/org/apache/uniffle/common/metrics/NettyMetrics.java
index 6ba4fcc03..0879e2075 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/NettyMetrics.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/NettyMetrics.java
@@ -20,6 +20,8 @@ package org.apache.uniffle.common.metrics;
 import io.prometheus.client.Counter;
 import io.prometheus.client.Gauge;
 
+import org.apache.uniffle.common.config.RssConf;
+
 public abstract class NettyMetrics extends RPCMetrics {
 
   private static final String NETTY_ACTIVE_CONNECTION = 
"netty_active_connection";
@@ -28,8 +30,8 @@ public abstract class NettyMetrics extends RPCMetrics {
   protected Gauge.Child gaugeNettyActiveConn;
   protected Counter.Child counterNettyException;
 
-  public NettyMetrics(String tags) {
-    super(tags);
+  public NettyMetrics(RssConf rssConf, String tags) {
+    super(rssConf, tags);
   }
 
   @Override
diff --git 
a/common/src/main/java/org/apache/uniffle/common/metrics/RPCMetrics.java 
b/common/src/main/java/org/apache/uniffle/common/metrics/RPCMetrics.java
index 819ebdee7..fbfb8123b 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/RPCMetrics.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/RPCMetrics.java
@@ -18,27 +18,63 @@
 package org.apache.uniffle.common.metrics;
 
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.Counter;
 import io.prometheus.client.Gauge;
 import io.prometheus.client.Summary;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.common.util.JavaUtils;
+import org.apache.uniffle.common.util.ThreadUtils;
 
 public abstract class RPCMetrics {
+  private static final Logger LOG = LoggerFactory.getLogger(RPCMetrics.class);
+
   protected boolean isRegistered = false;
   protected Map<String, Counter.Child> counterMap = 
JavaUtils.newConcurrentMap();
   protected Map<String, Gauge.Child> gaugeMap = JavaUtils.newConcurrentMap();
   protected Map<String, Summary.Child> transportTimeSummaryMap = 
JavaUtils.newConcurrentMap();
   protected Map<String, Summary.Child> processTimeSummaryMap = 
JavaUtils.newConcurrentMap();
+  private static final String THREAD_POOL_CORE_SIZE =
+      "rss.server.summary.metric.thread.pool.core.size";
+  private static final int THREAD_POOL_CORE_SIZE_DEFAULT_VALUE = 2;
+  private static final String THREAD_POOL_MAX_SIZE =
+      "rss.server.summary.metric.thread.pool.max.size";
+  private static final int THREAD_POOL_MAX_SIZE_DEFAULT_VALUE = 20;
+  private static final String KEEP_ALIVE_TIME =
+      "rss.server.summary.metric.thread.pool.keep.alive.time";
+  private static final int KEEP_ALIVE_TIME_DEFAULT_VALUE = 60;
+  private final ExecutorService summaryObservePool;
   protected MetricsManager metricsManager;
   protected String tags;
 
-  public RPCMetrics(String tags) {
+  public RPCMetrics(RssConf rssConf, String tags) {
     this.tags = tags;
+    int coreSize = rssConf.getInteger(THREAD_POOL_CORE_SIZE, 
THREAD_POOL_CORE_SIZE_DEFAULT_VALUE);
+    int maxSize = rssConf.getInteger(THREAD_POOL_MAX_SIZE, 
THREAD_POOL_MAX_SIZE_DEFAULT_VALUE);
+    int keepAliveTime = rssConf.getInteger(KEEP_ALIVE_TIME, 
KEEP_ALIVE_TIME_DEFAULT_VALUE);
+    this.summaryObservePool =
+        new ThreadPoolExecutor(
+            coreSize,
+            maxSize,
+            keepAliveTime,
+            TimeUnit.SECONDS,
+            Queues.newLinkedBlockingQueue(),
+            ThreadUtils.getThreadFactory("SummaryObserveThreadPool"));
+    LOG.info(
+        "Init summary observe thread pool, core size:{}, max size:{}, keep 
alive time:{}",
+        coreSize,
+        maxSize,
+        keepAliveTime);
   }
 
   public abstract void registerMetrics();
@@ -116,14 +152,17 @@ public abstract class RPCMetrics {
   public void recordTransportTime(String methodName, long 
transportTimeInMillionSecond) {
     Summary.Child summary = transportTimeSummaryMap.get(methodName);
     if (summary != null) {
-      summary.observe(transportTimeInMillionSecond / 
Constants.MILLION_SECONDS_PER_SECOND);
+      summaryObservePool.execute(
+          () ->
+              summary.observe(transportTimeInMillionSecond / 
Constants.MILLION_SECONDS_PER_SECOND));
     }
   }
 
   public void recordProcessTime(String methodName, long 
processTimeInMillionSecond) {
     Summary.Child summary = processTimeSummaryMap.get(methodName);
     if (summary != null) {
-      summary.observe(processTimeInMillionSecond / 
Constants.MILLION_SECONDS_PER_SECOND);
+      summaryObservePool.execute(
+          () -> summary.observe(processTimeInMillionSecond / 
Constants.MILLION_SECONDS_PER_SECOND));
     }
   }
 
diff --git 
a/common/src/test/java/org/apache/uniffle/common/rpc/GrpcServerTest.java 
b/common/src/test/java/org/apache/uniffle/common/rpc/GrpcServerTest.java
index 2142bfcf6..9b4a11e8b 100644
--- a/common/src/test/java/org/apache/uniffle/common/rpc/GrpcServerTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/rpc/GrpcServerTest.java
@@ -27,6 +27,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.metrics.GRPCMetrics;
 import org.apache.uniffle.common.util.ThreadUtils;
 import org.apache.uniffle.proto.ShuffleManagerGrpc;
@@ -43,8 +44,9 @@ public class GrpcServerTest {
   @Test
   public void testGrpcExecutorPool() throws Exception {
     // Explicitly setting the synchronizing variable as false at the beginning 
of test run
+    RssConf rssConf = new RssConf();
     GrpcServer.reset();
-    GRPCMetrics grpcMetrics = GRPCMetrics.getEmptyGRPCMetrics();
+    GRPCMetrics grpcMetrics = GRPCMetrics.getEmptyGRPCMetrics(rssConf);
     grpcMetrics.register(new CollectorRegistry(true));
     GrpcServer.GrpcThreadPoolExecutor executor =
         new GrpcServer.GrpcThreadPoolExecutor(
@@ -96,7 +98,8 @@ public class GrpcServerTest {
 
   @Test
   public void testRandomPort() throws Exception {
-    GRPCMetrics grpcMetrics = GRPCMetrics.getEmptyGRPCMetrics();
+    RssConf rssConf = new RssConf();
+    GRPCMetrics grpcMetrics = GRPCMetrics.getEmptyGRPCMetrics(rssConf);
     grpcMetrics.register(new CollectorRegistry(true));
     RssBaseConf conf = new RssBaseConf();
     conf.set(RPC_SERVER_PORT, 0);
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
index 30373500b..2c22d7cf6 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
@@ -212,7 +212,7 @@ public class CoordinatorServer extends ReconfigurableBase {
     LOG.info("Register metrics");
     CollectorRegistry coordinatorCollectorRegistry = new 
CollectorRegistry(true);
     CoordinatorMetrics.register(coordinatorCollectorRegistry);
-    grpcMetrics = new CoordinatorGrpcMetrics();
+    grpcMetrics = new CoordinatorGrpcMetrics(coordinatorConf);
     grpcMetrics.register(new CollectorRegistry(true));
     boolean verbose = 
coordinatorConf.getBoolean(CoordinatorConf.RSS_JVM_METRICS_VERBOSE_ENABLE);
     CollectorRegistry jvmCollectorRegistry = new CollectorRegistry(true);
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorGrpcMetrics.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorGrpcMetrics.java
index e028ab216..43f7f2e11 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorGrpcMetrics.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorGrpcMetrics.java
@@ -17,6 +17,7 @@
 
 package org.apache.uniffle.coordinator.metric;
 
+import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.metrics.GRPCMetrics;
 import org.apache.uniffle.common.util.Constants;
 
@@ -31,8 +32,8 @@ public class CoordinatorGrpcMetrics extends GRPCMetrics {
       "grpc_get_shuffle_assignments_total";
   private static final String GRPC_HEARTBEAT_TOTAL = "grpc_heartbeat_total";
 
-  public CoordinatorGrpcMetrics() {
-    super(Constants.COORDINATOR_TAG);
+  public CoordinatorGrpcMetrics(RssConf rssConf) {
+    super(rssConf, Constants.COORDINATOR_TAG);
   }
 
   @Override
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java
index 5a83d72c4..242004553 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java
@@ -61,7 +61,7 @@ public class CoordinatorGrpcServerTest {
     baseConf.set(RssBaseConf.RPC_SERVER_PORT, 20001);
     baseConf.set(RssBaseConf.RPC_EXECUTOR_SIZE, 2);
 
-    GRPCMetrics grpcMetrics = new CoordinatorGrpcMetrics();
+    GRPCMetrics grpcMetrics = new CoordinatorGrpcMetrics(baseConf);
     grpcMetrics.register(new CollectorRegistry(true));
     GrpcServer grpcServer =
         GrpcServer.Builder.newBuilder()
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index 4049ce639..eecfc0e06 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -296,9 +296,9 @@ public class ShuffleServer {
     CollectorRegistry shuffleServerCollectorRegistry = new 
CollectorRegistry(true);
     String tags = coverToString();
     ShuffleServerMetrics.register(shuffleServerCollectorRegistry, tags);
-    grpcMetrics = new ShuffleServerGrpcMetrics(tags);
+    grpcMetrics = new ShuffleServerGrpcMetrics(this.shuffleServerConf, tags);
     grpcMetrics.register(new CollectorRegistry(true));
-    nettyMetrics = new ShuffleServerNettyMetrics(tags);
+    nettyMetrics = new ShuffleServerNettyMetrics(shuffleServerConf, tags);
     nettyMetrics.register(new CollectorRegistry(true));
     CollectorRegistry jvmCollectorRegistry = new CollectorRegistry(true);
     boolean verbose =
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index 0a2028478..5237da481 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -514,6 +514,32 @@ public class ShuffleServerConf extends RssBaseConf {
                   + "network_bandwidth = 10Gbps, buffer size should be ~ 
1.25MB."
                   + "Default is 0, OS will dynamically adjust the buf size.");
 
+  public static final ConfigOption<Integer> SUMMARY_METRIC_WAIT_QUEUE_SIZE =
+      ConfigOptions.key("rss.server.summary.metric.wait.queue.size")
+          .intType()
+          .defaultValue(1000)
+          .withDescription(
+              "size of waiting queue for thread pool that used for calc 
summary metric.");
+
+  public static final ConfigOption<Integer> 
SUMMARY_METRIC_THREAD_POOL_CORE_SIZE =
+      ConfigOptions.key("rss.server.summary.metric.thread.pool.core.size")
+          .intType()
+          .defaultValue(2)
+          .withDescription("core thread number of thread pool that used for 
calc summary metric.");
+
+  public static final ConfigOption<Integer> 
SUMMARY_METRIC_THREAD_POOL_MAX_SIZE =
+      ConfigOptions.key("rss.server.summary.metric.thread.pool.max.size")
+          .intType()
+          .defaultValue(20)
+          .withDescription("max thread number of thread pool that used for 
calc summary metric.");
+
+  public static final ConfigOption<Integer> 
SUMMARY_METRIC_THREAD_POOL_KEEP_ALIVE_TIME =
+      
ConfigOptions.key("rss.server.summary.metric.thread.pool.keep.alive.time")
+          .intType()
+          .defaultValue(60)
+          .withDescription(
+              "keep alive time of thread pool that used for calc summary 
metric, in SECONDS.");
+
   public ShuffleServerConf() {}
 
   public ShuffleServerConf(String fileName) {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcMetrics.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcMetrics.java
index f0f69c6d7..f4c53ad76 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcMetrics.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcMetrics.java
@@ -72,8 +72,8 @@ public class ShuffleServerGrpcMetrics extends GRPCMetrics {
   private static final String GRPC_GET_MEMORY_SHUFFLE_DATA_PROCESS_LATENCY =
       "grpc_get_memory_shuffle_data_process_latency";
 
-  public ShuffleServerGrpcMetrics(String tags) {
-    super(tags);
+  public ShuffleServerGrpcMetrics(ShuffleServerConf shuffleServerConf, String 
tags) {
+    super(shuffleServerConf, tags);
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerNettyMetrics.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerNettyMetrics.java
index afd242dcc..b8a3aed24 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerNettyMetrics.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerNettyMetrics.java
@@ -36,8 +36,8 @@ public class ShuffleServerNettyMetrics extends NettyMetrics {
   private static final String NETTY_GET_MEMORY_SHUFFLE_DATA_REQUEST =
       "netty_get_memory_shuffle_data_request";
 
-  public ShuffleServerNettyMetrics(String tags) {
-    super(tags);
+  public ShuffleServerNettyMetrics(ShuffleServerConf shuffleServerConf, String 
tags) {
+    super(shuffleServerConf, tags);
   }
 
   @Override
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerGrpcMetricsTest.java
 
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerGrpcMetricsTest.java
index a9f56d4a2..27ef4807b 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerGrpcMetricsTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerGrpcMetricsTest.java
@@ -29,9 +29,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class ShuffleServerGrpcMetricsTest {
   @Test
-  public void testLatencyMetrics() {
+  public void testLatencyMetrics() throws InterruptedException {
+    ShuffleServerConf shuffleServerConf = new ShuffleServerConf();
     ShuffleServerGrpcMetrics metrics =
-        new ShuffleServerGrpcMetrics(Constants.SHUFFLE_SERVER_VERSION);
+        new ShuffleServerGrpcMetrics(shuffleServerConf, 
Constants.SHUFFLE_SERVER_VERSION);
     metrics.register(new CollectorRegistry(true));
     
metrics.recordTransportTime(ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD, 
1000);
     
metrics.recordTransportTime(ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD, 
500);
@@ -44,6 +45,7 @@ public class ShuffleServerGrpcMetricsTest {
     assertEquals(3, sendTimeSummaryTime.size());
     assertEquals(3, processTimeSummaryTime.size());
 
+    Thread.sleep(1000L);
     assertEquals(
         1D, 
sendTimeSummaryTime.get(ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD).get().sum);
     assertEquals(

Reply via email to