This is an automated email from the ASF dual-hosted git repository.

fchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 349ee8b1c Revert "[CELEBORN-255] Add counter of outstandingFetches, 
outstanding…
349ee8b1c is described below

commit 349ee8b1cb7e0db4e26dafbed4ff6d5e8df60fc1
Author: Fu Chen <[email protected]>
AuthorDate: Tue Oct 24 17:18:54 2023 +0800

    Revert "[CELEBORN-255] Add counter of outstandingFetches, outstanding…
    
    …Rpcs and outstandingPushes to metrics"
    
    This reverts commit bfa341c32f362b64dd69b84fc54934cf8224200c.
    
    ### What changes were proposed in this pull request?
    
    ### Why are the changes needed?
    
    
https://github.com/apache/incubator-celeborn/pull/1992#issuecomment-1776760369
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #2032 from cfmcgrady/revert-pr-1992.
    
    Authored-by: Fu Chen <[email protected]>
    Signed-off-by: Fu Chen <[email protected]>
---
 METRICS.md                                         |   7 +-
 assets/grafana/celeborn-dashboard.json             | 289 +--------------------
 .../celeborn/common/network/TransportContext.java  |   2 +-
 .../network/client/TransportResponseHandler.java   |  41 +--
 .../common/metrics/source/AbstractSource.scala     |  33 ---
 .../network/TransportResponseHandlerSuiteJ.java    |  90 +++----
 docs/monitoring.md                                 |   6 -
 7 files changed, 60 insertions(+), 408 deletions(-)

diff --git a/METRICS.md b/METRICS.md
index 42a40fe29..9db43e1c7 100644
--- a/METRICS.md
+++ b/METRICS.md
@@ -76,8 +76,8 @@ Here is an example of grafana dashboard importing.
 |             FlushDataTime              |      worker       |                 
                 FlushData means flush a disk buffer to disk.                   
                |
 |             OpenStreamTime             |      worker       |            
OpenStream means read a shuffle file and send client about chunks size and 
stream index.             |
 |             FetchChunkTime             |      worker       |                 
     FetchChunk means read a chunk from a shuffle file and send to client.      
                |
-|          PrimaryPushDataTime           |      worker       |                 
     PrimaryPushData means handle pushdata of primary partition location.       
                |
-|          ReplicaPushDataTime           |      worker       |                 
     ReplicaPushData means handle pushdata of replica partition location.       
                |
+|           PrimaryPushDataTime          |      worker       |                 
      PrimaryPushData means handle pushdata of primary partition location.      
                |
+|           ReplicaPushDataTime          |      worker       |                 
       ReplicaPushData means handle pushdata of replica partition location.     
                |
 |           WriteDataFailCount           |      worker       |                 
   The count of writing PushData or PushMergedData failed in current worker.    
                |
 |         ReplicateDataFailCount         |      worker       |                 
 The count of replicating PushData or PushMergedData failed in current worker.  
                |
 |      ReplicateDataWriteFailCount       |      worker       |       The count 
of replicating PushData or PushMergedData failed caused by write failure in 
peer worker.        |
@@ -97,9 +97,6 @@ Here is an example of grafana dashboard importing.
 |       PausePushDataAndReplicate        |      worker       |    
PausePushDataAndReplicate means the count of worker stopped receiving data from 
client and other workers.    |
 |           ActiveShuffleSize            |      worker       |                 
The active shuffle size of a worker including master replica and slave replica. 
                |
 |         ActiveShuffleFileCount         |      worker       |              
The active shuffle file count of a worker including master replica and slave 
replica.              |
-|         OutstandingFetchCount          |      worker       |                 
        The count of outstanding fetch request received in peer worker.         
                |
-|          OutstandingRpcCount           |      worker       |                 
         The count of outstanding rpc request received in peer worker.          
                |
-|          OutstandingPushCount          |      worker       |                 
        The count of outstanding push request received in peer worker.          
                |
 
 ## Implementation
 
diff --git a/assets/grafana/celeborn-dashboard.json 
b/assets/grafana/celeborn-dashboard.json
index 760cfd7fd..1f534335a 100644
--- a/assets/grafana/celeborn-dashboard.json
+++ b/assets/grafana/celeborn-dashboard.json
@@ -8140,293 +8140,6 @@
       ],
       "title": "UserConsumption",
       "type": "row"
-    },
-    {
-      "collapsed": true,
-      "gridPos": {
-        "h": 1,
-        "w": 24,
-        "x": 0,
-        "y": 12
-      },
-      "id": 183,
-      "panels": [
-        {
-          "datasource": {
-            "type": "prometheus",
-            "uid": "${DS_PROMETHEUS}"
-          },
-          "fieldConfig": {
-            "defaults": {
-              "color": {
-                "mode": "palette-classic"
-              },
-              "custom": {
-                "axisCenteredZero": false,
-                "axisColorMode": "text",
-                "axisLabel": "",
-                "axisPlacement": "auto",
-                "barAlignment": 0,
-                "drawStyle": "line",
-                "fillOpacity": 0,
-                "gradientMode": "none",
-                "hideFrom": {
-                  "legend": false,
-                  "tooltip": false,
-                  "viz": false
-                },
-                "lineInterpolation": "linear",
-                "lineWidth": 1,
-                "pointSize": 5,
-                "scaleDistribution": {
-                  "type": "linear"
-                },
-                "showPoints": "auto",
-                "spanNulls": false,
-                "stacking": {
-                  "group": "A",
-                  "mode": "none"
-                },
-                "thresholdsStyle": {
-                  "mode": "off"
-                }
-              },
-              "mappings": [],
-              "thresholds": {
-                "mode": "absolute",
-                "steps": [
-                  {
-                    "color": "green"
-                  },
-                  {
-                    "color": "red",
-                    "value": 80
-                  }
-                ]
-              }
-            },
-            "overrides": []
-          },
-          "gridPos": {
-            "h": 8,
-            "w": 12,
-            "x": 0,
-            "y": 81
-          },
-          "id": 184,
-          "options": {
-            "legend": {
-              "calcs": [],
-              "displayMode": "list",
-              "placement": "bottom",
-              "showLegend": true
-            },
-            "tooltip": {
-              "mode": "single",
-              "sort": "none"
-            }
-          },
-          "targets": [
-            {
-              "datasource": {
-                "type": "prometheus",
-                "uid": "${DS_PROMETHEUS}"
-              },
-              "editorMode": "code",
-              "expr": "metrics_OutstandingFetchCount_Count",
-              "legendFormat": "$baseLegend",
-              "range": true,
-              "refId": "A"
-            }
-          ],
-          "title": "metrics_OutstandingFetchCount_Count",
-          "type": "timeseries"
-        },
-        {
-          "datasource": {
-            "type": "prometheus",
-            "uid": "${DS_PROMETHEUS}"
-          },
-          "fieldConfig": {
-            "defaults": {
-              "color": {
-                "mode": "palette-classic"
-              },
-              "custom": {
-                "axisCenteredZero": false,
-                "axisColorMode": "text",
-                "axisLabel": "",
-                "axisPlacement": "auto",
-                "barAlignment": 0,
-                "drawStyle": "line",
-                "fillOpacity": 0,
-                "gradientMode": "none",
-                "hideFrom": {
-                  "legend": false,
-                  "tooltip": false,
-                  "viz": false
-                },
-                "lineInterpolation": "linear",
-                "lineWidth": 1,
-                "pointSize": 5,
-                "scaleDistribution": {
-                  "type": "linear"
-                },
-                "showPoints": "auto",
-                "spanNulls": false,
-                "stacking": {
-                  "group": "A",
-                  "mode": "none"
-                },
-                "thresholdsStyle": {
-                  "mode": "off"
-                }
-              },
-              "mappings": [],
-              "thresholds": {
-                "mode": "absolute",
-                "steps": [
-                  {
-                    "color": "green"
-                  },
-                  {
-                    "color": "red",
-                    "value": 80
-                  }
-                ]
-              }
-            },
-            "overrides": []
-          },
-          "gridPos": {
-            "h": 8,
-            "w": 12,
-            "x": 0,
-            "y": 81
-          },
-          "id": 185,
-          "options": {
-            "legend": {
-              "calcs": [],
-              "displayMode": "list",
-              "placement": "bottom",
-              "showLegend": true
-            },
-            "tooltip": {
-              "mode": "single",
-              "sort": "none"
-            }
-          },
-          "targets": [
-            {
-              "datasource": {
-                "type": "prometheus",
-                "uid": "${DS_PROMETHEUS}"
-              },
-              "editorMode": "code",
-              "expr": "metrics_OutstandingRpcCount_Count",
-              "legendFormat": "$baseLegend",
-              "range": true,
-              "refId": "A"
-            }
-          ],
-          "title": "metrics_OutstandingRpcCount_Count",
-          "type": "timeseries"
-        },
-        {
-          "datasource": {
-            "type": "prometheus",
-            "uid": "${DS_PROMETHEUS}"
-          },
-          "fieldConfig": {
-            "defaults": {
-              "color": {
-                "mode": "palette-classic"
-              },
-              "custom": {
-                "axisCenteredZero": false,
-                "axisColorMode": "text",
-                "axisLabel": "",
-                "axisPlacement": "auto",
-                "barAlignment": 0,
-                "drawStyle": "line",
-                "fillOpacity": 0,
-                "gradientMode": "none",
-                "hideFrom": {
-                  "legend": false,
-                  "tooltip": false,
-                  "viz": false
-                },
-                "lineInterpolation": "linear",
-                "lineWidth": 1,
-                "pointSize": 5,
-                "scaleDistribution": {
-                  "type": "linear"
-                },
-                "showPoints": "auto",
-                "spanNulls": false,
-                "stacking": {
-                  "group": "A",
-                  "mode": "none"
-                },
-                "thresholdsStyle": {
-                  "mode": "off"
-                }
-              },
-              "mappings": [],
-              "thresholds": {
-                "mode": "absolute",
-                "steps": [
-                  {
-                    "color": "green"
-                  },
-                  {
-                    "color": "red",
-                    "value": 80
-                  }
-                ]
-              }
-            },
-            "overrides": []
-          },
-          "gridPos": {
-            "h": 8,
-            "w": 12,
-            "x": 0,
-            "y": 81
-          },
-          "id": 186,
-          "options": {
-            "legend": {
-              "calcs": [],
-              "displayMode": "list",
-              "placement": "bottom",
-              "showLegend": true
-            },
-            "tooltip": {
-              "mode": "single",
-              "sort": "none"
-            }
-          },
-          "targets": [
-            {
-              "datasource": {
-                "type": "prometheus",
-                "uid": "${DS_PROMETHEUS}"
-              },
-              "editorMode": "code",
-              "expr": "metrics_OutstandingPushCount_Count",
-              "legendFormat": "$baseLegend",
-              "range": true,
-              "refId": "A"
-            }
-          ],
-          "title": "metrics_OutstandingPushCount_Count",
-          "type": "timeseries"
-        }
-      ],
-      "title": "OutstandingRequest",
-      "type": "row"
     }
   ],
   "refresh": "5s",
@@ -8472,4 +8185,4 @@
   "uid": "U_qgru_7z",
   "version": 1,
   "weekStart": ""
-}
\ No newline at end of file
+}
diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java 
b/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java
index 52c68367a..50bb5cac1 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java
@@ -141,7 +141,7 @@ public class TransportContext {
 
   private TransportChannelHandler createChannelHandler(
       Channel channel, BaseMessageHandler msgHandler) {
-    TransportResponseHandler responseHandler = new 
TransportResponseHandler(conf, channel, source);
+    TransportResponseHandler responseHandler = new 
TransportResponseHandler(conf, channel);
     TransportClient client = new TransportClient(channel, responseHandler);
     TransportRequestHandler requestHandler =
         new TransportRequestHandler(channel, client, msgHandler);
diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
index aa57b4e0b..ddace8d87 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
@@ -26,15 +26,11 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
-import javax.annotation.Nullable;
-
-import com.google.common.annotations.VisibleForTesting;
 import io.netty.channel.Channel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.celeborn.common.exception.CelebornIOException;
-import org.apache.celeborn.common.metrics.source.AbstractSource;
 import org.apache.celeborn.common.network.protocol.*;
 import org.apache.celeborn.common.network.server.MessageHandler;
 import org.apache.celeborn.common.network.util.NettyUtils;
@@ -66,25 +62,23 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
   /** Records the time (in system nanoseconds) that the last fetch or RPC 
request was sent. */
   private final AtomicLong timeOfLastRequestNs;
 
-  private final AbstractSource source;
-
+  private final long pushTimeoutCheckerInterval;
   private static ScheduledExecutorService pushTimeoutChecker = null;
-  private ScheduledFuture<?> pushCheckerScheduleFuture;
+  private ScheduledFuture pushCheckerScheduleFuture;
 
+  private final long fetchTimeoutCheckerInterval;
   private static ScheduledExecutorService fetchTimeoutChecker = null;
-  private ScheduledFuture<?> fetchCheckerScheduleFuture;
+  private ScheduledFuture fetchCheckerScheduleFuture;
 
-  public TransportResponseHandler(
-      TransportConf conf, Channel channel, @Nullable AbstractSource source) {
+  public TransportResponseHandler(TransportConf conf, Channel channel) {
     this.conf = conf;
     this.channel = channel;
     this.outstandingFetches = JavaUtils.newConcurrentHashMap();
     this.outstandingRpcs = JavaUtils.newConcurrentHashMap();
     this.outstandingPushes = JavaUtils.newConcurrentHashMap();
     this.timeOfLastRequestNs = new AtomicLong(0);
-    this.source = source;
-    long pushTimeoutCheckerInterval = conf.pushDataTimeoutCheckIntervalMs();
-    long fetchTimeoutCheckerInterval = conf.fetchDataTimeoutCheckIntervalMs();
+    this.pushTimeoutCheckerInterval = conf.pushDataTimeoutCheckIntervalMs();
+    this.fetchTimeoutCheckerInterval = conf.fetchDataTimeoutCheckIntervalMs();
 
     String module = conf.getModuleName();
     boolean checkPushTimeout = false;
@@ -116,7 +110,7 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
     if (checkPushTimeout) {
       pushCheckerScheduleFuture =
           pushTimeoutChecker.scheduleWithFixedDelay(
-              this::failExpiredPushRequest,
+              () -> failExpiredPushRequest(),
               pushTimeoutCheckerInterval,
               pushTimeoutCheckerInterval,
               TimeUnit.MILLISECONDS);
@@ -125,13 +119,11 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
     if (checkFetchTimeout) {
       fetchCheckerScheduleFuture =
           fetchTimeoutChecker.scheduleWithFixedDelay(
-              this::failExpiredFetchRequest,
+              () -> failExpiredFetchRequest(),
               fetchTimeoutCheckerInterval,
               fetchTimeoutCheckerInterval,
               TimeUnit.MILLISECONDS);
     }
-
-    registerMetrics();
   }
 
   public void failExpiredPushRequest() {
@@ -186,14 +178,6 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
     }
   }
 
-  private void registerMetrics() {
-    if (source != null) {
-      source.addGauge("OutstandingFetchCount", outstandingFetches::size);
-      source.addGauge("OutstandingRpcCount", outstandingRpcs::size);
-      source.addGauge("OutstandingPushCount", outstandingPushes::size);
-    }
-  }
-
   public void addFetchRequest(StreamChunkSlice streamChunkSlice, 
FetchRequestInfo info) {
     updateTimeOfLastRequest();
     if (outstandingFetches.containsKey(streamChunkSlice)) {
@@ -283,7 +267,7 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
     if (numOutstandingRequests() > 0) {
       // show the details of outstanding Fetches
       if (logger.isDebugEnabled()) {
-        if (!outstandingFetches.isEmpty()) {
+        if (outstandingFetches.size() > 0) {
           for (Map.Entry<StreamChunkSlice, FetchRequestInfo> e : 
outstandingFetches.entrySet()) {
             StreamChunkSlice key = e.getKey();
             logger.debug("The channel is closed, but there is still 
outstanding Fetch {}", key);
@@ -471,9 +455,4 @@ public class TransportResponseHandler extends 
MessageHandler<ResponseMessage> {
           streamChunkSlice);
     }
   }
-
-  @VisibleForTesting
-  public AbstractSource source() {
-    return source;
-  }
 }
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
 
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
index bfa01b428..c4409c050 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
@@ -146,39 +146,6 @@ abstract class AbstractSource(conf: CelebornConf, role: 
String)
     namedGauges.asScala.toList
   }
 
-  /**
-   * Gets the named gauge of the metric given the metric name.
-   *
-   * Note: This method is exposed to test the value of the gauge for metric.
-   *
-   * @param name The metric name.
-   * @return The corresponding named gauge.
-   */
-  def getGauge(name: String): NamedGauge[_] = {
-    getGauge(name, Map.empty)
-  }
-
-  /**
-   * Gets the named gauge of the metric given the metric name and labels.
-   *
-   * Note: This method is exposed to test the value of the gauge for metric.
-   *
-   * @param name The metric name.
-   * @param labels The metric labels.
-   * @return The corresponding named gauge.
-   */
-  def getGauge(name: String, labels: Map[String, String] = Map.empty): 
NamedGauge[_] = {
-    val labelString = MetricLabels.labelString(labels ++ staticLabels)
-    val iter = namedGauges.iterator()
-    while (iter.hasNext) {
-      val namedGauge = iter.next()
-      if (namedGauge.name.equals(name) && 
namedGauge.labelString.equals(labelString)) {
-        return namedGauge
-      }
-    }
-    null
-  }
-
   protected def histograms(): List[NamedHistogram] = {
     List.empty[NamedHistogram]
   }
diff --git 
a/common/src/test/java/org/apache/celeborn/common/network/TransportResponseHandlerSuiteJ.java
 
b/common/src/test/java/org/apache/celeborn/common/network/TransportResponseHandlerSuiteJ.java
index cc781c1da..3dd32334f 100644
--- 
a/common/src/test/java/org/apache/celeborn/common/network/TransportResponseHandlerSuiteJ.java
+++ 
b/common/src/test/java/org/apache/celeborn/common/network/TransportResponseHandlerSuiteJ.java
@@ -27,7 +27,6 @@ import io.netty.channel.local.LocalChannel;
 import org.junit.Test;
 
 import org.apache.celeborn.common.CelebornConf;
-import org.apache.celeborn.common.metrics.source.AbstractSource;
 import org.apache.celeborn.common.network.buffer.NioManagedBuffer;
 import org.apache.celeborn.common.network.client.ChunkReceivedCallback;
 import org.apache.celeborn.common.network.client.RpcResponseCallback;
@@ -42,40 +41,50 @@ public class TransportResponseHandlerSuiteJ {
   @Test
   public void handleSuccessfulFetch() throws Exception {
     StreamChunkSlice streamChunkSlice = new StreamChunkSlice(1, 0);
-    TransportResponseHandler handler = 
createResponseHandler(TransportModuleConstants.FETCH_MODULE);
+
+    TransportResponseHandler handler =
+        new TransportResponseHandler(
+            Utils.fromCelebornConf(new CelebornConf(), 
TransportModuleConstants.FETCH_MODULE, 8),
+            new LocalChannel());
     ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
     FetchRequestInfo info = new FetchRequestInfo(System.currentTimeMillis() + 
30000, callback);
     handler.addFetchRequest(streamChunkSlice, info);
-    assertOutstandingRequests(handler, "OutstandingFetchCount", 1);
+    assertEquals(1, handler.numOutstandingRequests());
 
     handler.handle(new ChunkFetchSuccess(streamChunkSlice, new 
TestManagedBuffer(123)));
     verify(callback, times(1)).onSuccess(eq(0), any());
-    assertOutstandingRequests(handler, "OutstandingFetchCount", 0);
+    assertEquals(0, handler.numOutstandingRequests());
   }
 
   @Test
   public void handleFailedFetch() throws Exception {
     StreamChunkSlice streamChunkSlice = new StreamChunkSlice(1, 0);
-    TransportResponseHandler handler = 
createResponseHandler(TransportModuleConstants.FETCH_MODULE);
+    TransportResponseHandler handler =
+        new TransportResponseHandler(
+            Utils.fromCelebornConf(new CelebornConf(), 
TransportModuleConstants.FETCH_MODULE, 8),
+            new LocalChannel());
     ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
     FetchRequestInfo info = new FetchRequestInfo(System.currentTimeMillis() + 
30000, callback);
     handler.addFetchRequest(streamChunkSlice, info);
-    assertOutstandingRequests(handler, "OutstandingFetchCount", 1);
+    assertEquals(1, handler.numOutstandingRequests());
 
     handler.handle(new ChunkFetchFailure(streamChunkSlice, "some error msg"));
     verify(callback, times(1)).onFailure(eq(0), any());
-    assertOutstandingRequests(handler, "OutstandingFetchCount", 0);
+    assertEquals(0, handler.numOutstandingRequests());
   }
 
   @Test
   public void clearAllOutstandingRequests() throws Exception {
-    TransportResponseHandler handler = 
createResponseHandler(TransportModuleConstants.DATA_MODULE);
+    TransportResponseHandler handler =
+        new TransportResponseHandler(
+            Utils.fromCelebornConf(new CelebornConf(), 
TransportModuleConstants.DATA_MODULE, 8),
+            new LocalChannel());
     ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
     FetchRequestInfo info = new FetchRequestInfo(System.currentTimeMillis() + 
30000, callback);
     handler.addFetchRequest(new StreamChunkSlice(1, 0), info);
     handler.addFetchRequest(new StreamChunkSlice(1, 1), info);
     handler.addFetchRequest(new StreamChunkSlice(1, 2), info);
-    assertOutstandingRequests(handler, "OutstandingFetchCount", 3);
+    assertEquals(3, handler.numOutstandingRequests());
 
     handler.handle(new ChunkFetchSuccess(new StreamChunkSlice(1, 0), new 
TestManagedBuffer(12)));
     handler.exceptionCaught(new Exception("duh duh duhhhh"));
@@ -84,49 +93,58 @@ public class TransportResponseHandlerSuiteJ {
     verify(callback, times(1)).onSuccess(eq(0), any());
     verify(callback, times(1)).onFailure(eq(1), any());
     verify(callback, times(1)).onFailure(eq(2), any());
-    assertOutstandingRequests(handler, "OutstandingFetchCount", 0);
+    assertEquals(0, handler.numOutstandingRequests());
   }
 
   @Test
   public void handleSuccessfulRPC() throws Exception {
-    TransportResponseHandler handler = 
createResponseHandler(TransportModuleConstants.RPC_MODULE);
+    TransportResponseHandler handler =
+        new TransportResponseHandler(
+            Utils.fromCelebornConf(new CelebornConf(), 
TransportModuleConstants.RPC_MODULE, 8),
+            new LocalChannel());
     RpcResponseCallback callback = mock(RpcResponseCallback.class);
     handler.addRpcRequest(12345, callback);
-    assertOutstandingRequests(handler, "OutstandingRpcCount", 1);
+    assertEquals(1, handler.numOutstandingRequests());
 
     // This response should be ignored.
     handler.handle(new RpcResponse(54321, new 
NioManagedBuffer(ByteBuffer.allocate(7))));
-    assertOutstandingRequests(handler, "OutstandingRpcCount", 1);
+    assertEquals(1, handler.numOutstandingRequests());
 
     ByteBuffer resp = ByteBuffer.allocate(10);
     handler.handle(new RpcResponse(12345, new NioManagedBuffer(resp)));
     verify(callback, times(1)).onSuccess(eq(ByteBuffer.allocate(10)));
-    assertOutstandingRequests(handler, "OutstandingRpcCount", 0);
+    assertEquals(0, handler.numOutstandingRequests());
   }
 
   @Test
   public void handleFailedRPC() throws Exception {
-    TransportResponseHandler handler = 
createResponseHandler(TransportModuleConstants.RPC_MODULE);
+    TransportResponseHandler handler =
+        new TransportResponseHandler(
+            Utils.fromCelebornConf(new CelebornConf(), 
TransportModuleConstants.RPC_MODULE, 8),
+            new LocalChannel());
     RpcResponseCallback callback = mock(RpcResponseCallback.class);
     handler.addRpcRequest(12345, callback);
-    assertOutstandingRequests(handler, "OutstandingRpcCount", 1);
+    assertEquals(1, handler.numOutstandingRequests());
 
     handler.handle(new RpcFailure(54321, "uh-oh!")); // should be ignored
-    assertOutstandingRequests(handler, "OutstandingRpcCount", 1);
+    assertEquals(1, handler.numOutstandingRequests());
 
     handler.handle(new RpcFailure(12345, "oh no"));
     verify(callback, times(1)).onFailure(any());
-    assertOutstandingRequests(handler, "OutstandingRpcCount", 0);
+    assertEquals(0, handler.numOutstandingRequests());
   }
 
   @Test
   public void handleSuccessfulPush() throws Exception {
-    TransportResponseHandler handler = 
createResponseHandler(TransportModuleConstants.DATA_MODULE);
+    TransportResponseHandler handler =
+        new TransportResponseHandler(
+            Utils.fromCelebornConf(new CelebornConf(), 
TransportModuleConstants.DATA_MODULE, 8),
+            new LocalChannel());
     RpcResponseCallback callback = mock(RpcResponseCallback.class);
     PushRequestInfo info = new PushRequestInfo(System.currentTimeMillis() + 
30000, callback);
     info.setChannelFuture(mock(ChannelFuture.class));
     handler.addPushRequest(12345, info);
-    assertOutstandingRequests(handler, "OutstandingPushCount", 1);
+    assertEquals(1, handler.numOutstandingRequests());
 
     // This response should be ignored.
     handler.handle(new RpcResponse(54321, new 
NioManagedBuffer(ByteBuffer.allocate(7))));
@@ -135,42 +153,26 @@ public class TransportResponseHandlerSuiteJ {
     ByteBuffer resp = ByteBuffer.allocate(10);
     handler.handle(new RpcResponse(12345, new NioManagedBuffer(resp)));
     verify(callback, times(1)).onSuccess(eq(ByteBuffer.allocate(10)));
-    assertOutstandingRequests(handler, "OutstandingPushCount", 0);
+    assertEquals(0, handler.numOutstandingRequests());
   }
 
   @Test
   public void handleFailedPush() throws Exception {
-    TransportResponseHandler handler = 
createResponseHandler(TransportModuleConstants.DATA_MODULE);
+    TransportResponseHandler handler =
+        new TransportResponseHandler(
+            Utils.fromCelebornConf(new CelebornConf(), 
TransportModuleConstants.DATA_MODULE, 8),
+            new LocalChannel());
     RpcResponseCallback callback = mock(RpcResponseCallback.class);
     PushRequestInfo info = new PushRequestInfo(System.currentTimeMillis() + 
30000L, callback);
     info.setChannelFuture(mock(ChannelFuture.class));
     handler.addPushRequest(12345, info);
-    assertOutstandingRequests(handler, "OutstandingPushCount", 1);
+    assertEquals(1, handler.numOutstandingRequests());
 
     handler.handle(new RpcFailure(54321, "uh-oh!")); // should be ignored
-    assertOutstandingRequests(handler, "OutstandingPushCount", 1);
+    assertEquals(1, handler.numOutstandingRequests());
 
     handler.handle(new RpcFailure(12345, "oh no"));
     verify(callback, times(1)).onFailure(any());
-    assertOutstandingRequests(handler, "OutstandingPushCount", 0);
-  }
-
-  private TransportResponseHandler createResponseHandler(String module) {
-    CelebornConf celebornConf = new CelebornConf();
-    return new TransportResponseHandler(
-        Utils.fromCelebornConf(celebornConf, module, 8),
-        new LocalChannel(),
-        new AbstractSource(celebornConf, "Worker") {
-          @Override
-          public String sourceName() {
-            return "worker";
-          }
-        });
-  }
-
-  private void assertOutstandingRequests(
-      TransportResponseHandler handler, String name, int expected) {
-    assertEquals(expected, handler.numOutstandingRequests());
-    assertEquals(expected, handler.source().getGauge(name).gauge().getValue());
+    assertEquals(0, handler.numOutstandingRequests());
   }
 }
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 228d47a3d..e5745ee56 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -198,12 +198,6 @@ These metrics are exposed by Celeborn worker.
         - The active shuffle size of a worker including master replica and 
slave replica.
     - ActiveShuffleFileCount
         - The active shuffle file count of a worker including master replica 
and slave replica.
-    - OutstandingFetchCount
-        - The count of outstanding fetch request.
-    - OutstandingRpcCount
-        - The count of outstanding rpc request.
-    - OutstandingPushCount
-        - The count of outstanding push request.
     - push_server_usedHeapMemory 
     - push_server_usedDirectMemory
     - push_server_numAllocations 

Reply via email to