This is an automated email from the ASF dual-hosted git repository.
fchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 349ee8b1c Revert "[CELEBORN-255] Add counter of outstandingFetches,
outstanding…
349ee8b1c is described below
commit 349ee8b1cb7e0db4e26dafbed4ff6d5e8df60fc1
Author: Fu Chen <[email protected]>
AuthorDate: Tue Oct 24 17:18:54 2023 +0800
Revert "[CELEBORN-255] Add counter of outstandingFetches, outstanding…
…Rpcs and outstandingPushes to metrics"
This reverts commit bfa341c32f362b64dd69b84fc54934cf8224200c.
### What changes were proposed in this pull request?
### Why are the changes needed?
https://github.com/apache/incubator-celeborn/pull/1992#issuecomment-1776760369
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #2032 from cfmcgrady/revert-pr-1992.
Authored-by: Fu Chen <[email protected]>
Signed-off-by: Fu Chen <[email protected]>
---
METRICS.md | 7 +-
assets/grafana/celeborn-dashboard.json | 289 +--------------------
.../celeborn/common/network/TransportContext.java | 2 +-
.../network/client/TransportResponseHandler.java | 41 +--
.../common/metrics/source/AbstractSource.scala | 33 ---
.../network/TransportResponseHandlerSuiteJ.java | 90 +++----
docs/monitoring.md | 6 -
7 files changed, 60 insertions(+), 408 deletions(-)
diff --git a/METRICS.md b/METRICS.md
index 42a40fe29..9db43e1c7 100644
--- a/METRICS.md
+++ b/METRICS.md
@@ -76,8 +76,8 @@ Here is an example of grafana dashboard importing.
| FlushDataTime | worker |
FlushData means flush a disk buffer to disk.
|
| OpenStreamTime | worker |
OpenStream means read a shuffle file and send client about chunks size and
stream index. |
| FetchChunkTime | worker |
FetchChunk means read a chunk from a shuffle file and send to client.
|
-| PrimaryPushDataTime | worker |
PrimaryPushData means handle pushdata of primary partition location.
|
-| ReplicaPushDataTime | worker |
ReplicaPushData means handle pushdata of replica partition location.
|
+| PrimaryPushDataTime | worker |
PrimaryPushData means handle pushdata of primary partition location.
|
+| ReplicaPushDataTime | worker |
ReplicaPushData means handle pushdata of replica partition location.
|
| WriteDataFailCount | worker |
The count of writing PushData or PushMergedData failed in current worker.
|
| ReplicateDataFailCount | worker |
The count of replicating PushData or PushMergedData failed in current worker.
|
| ReplicateDataWriteFailCount | worker | The count
of replicating PushData or PushMergedData failed caused by write failure in
peer worker. |
@@ -97,9 +97,6 @@ Here is an example of grafana dashboard importing.
| PausePushDataAndReplicate | worker |
PausePushDataAndReplicate means the count of worker stopped receiving data from
client and other workers. |
| ActiveShuffleSize | worker |
The active shuffle size of a worker including master replica and slave replica.
|
| ActiveShuffleFileCount | worker |
The active shuffle file count of a worker including master replica and slave
replica. |
-| OutstandingFetchCount | worker |
The count of outstanding fetch request received in peer worker.
|
-| OutstandingRpcCount | worker |
The count of outstanding rpc request received in peer worker.
|
-| OutstandingPushCount | worker |
The count of outstanding push request received in peer worker.
|
## Implementation
diff --git a/assets/grafana/celeborn-dashboard.json
b/assets/grafana/celeborn-dashboard.json
index 760cfd7fd..1f534335a 100644
--- a/assets/grafana/celeborn-dashboard.json
+++ b/assets/grafana/celeborn-dashboard.json
@@ -8140,293 +8140,6 @@
],
"title": "UserConsumption",
"type": "row"
- },
- {
- "collapsed": true,
- "gridPos": {
- "h": 1,
- "w": 24,
- "x": 0,
- "y": 12
- },
- "id": 183,
- "panels": [
- {
- "datasource": {
- "type": "prometheus",
- "uid": "${DS_PROMETHEUS}"
- },
- "fieldConfig": {
- "defaults": {
- "color": {
- "mode": "palette-classic"
- },
- "custom": {
- "axisCenteredZero": false,
- "axisColorMode": "text",
- "axisLabel": "",
- "axisPlacement": "auto",
- "barAlignment": 0,
- "drawStyle": "line",
- "fillOpacity": 0,
- "gradientMode": "none",
- "hideFrom": {
- "legend": false,
- "tooltip": false,
- "viz": false
- },
- "lineInterpolation": "linear",
- "lineWidth": 1,
- "pointSize": 5,
- "scaleDistribution": {
- "type": "linear"
- },
- "showPoints": "auto",
- "spanNulls": false,
- "stacking": {
- "group": "A",
- "mode": "none"
- },
- "thresholdsStyle": {
- "mode": "off"
- }
- },
- "mappings": [],
- "thresholds": {
- "mode": "absolute",
- "steps": [
- {
- "color": "green"
- },
- {
- "color": "red",
- "value": 80
- }
- ]
- }
- },
- "overrides": []
- },
- "gridPos": {
- "h": 8,
- "w": 12,
- "x": 0,
- "y": 81
- },
- "id": 184,
- "options": {
- "legend": {
- "calcs": [],
- "displayMode": "list",
- "placement": "bottom",
- "showLegend": true
- },
- "tooltip": {
- "mode": "single",
- "sort": "none"
- }
- },
- "targets": [
- {
- "datasource": {
- "type": "prometheus",
- "uid": "${DS_PROMETHEUS}"
- },
- "editorMode": "code",
- "expr": "metrics_OutstandingFetchCount_Count",
- "legendFormat": "$baseLegend",
- "range": true,
- "refId": "A"
- }
- ],
- "title": "metrics_OutstandingFetchCount_Count",
- "type": "timeseries"
- },
- {
- "datasource": {
- "type": "prometheus",
- "uid": "${DS_PROMETHEUS}"
- },
- "fieldConfig": {
- "defaults": {
- "color": {
- "mode": "palette-classic"
- },
- "custom": {
- "axisCenteredZero": false,
- "axisColorMode": "text",
- "axisLabel": "",
- "axisPlacement": "auto",
- "barAlignment": 0,
- "drawStyle": "line",
- "fillOpacity": 0,
- "gradientMode": "none",
- "hideFrom": {
- "legend": false,
- "tooltip": false,
- "viz": false
- },
- "lineInterpolation": "linear",
- "lineWidth": 1,
- "pointSize": 5,
- "scaleDistribution": {
- "type": "linear"
- },
- "showPoints": "auto",
- "spanNulls": false,
- "stacking": {
- "group": "A",
- "mode": "none"
- },
- "thresholdsStyle": {
- "mode": "off"
- }
- },
- "mappings": [],
- "thresholds": {
- "mode": "absolute",
- "steps": [
- {
- "color": "green"
- },
- {
- "color": "red",
- "value": 80
- }
- ]
- }
- },
- "overrides": []
- },
- "gridPos": {
- "h": 8,
- "w": 12,
- "x": 0,
- "y": 81
- },
- "id": 185,
- "options": {
- "legend": {
- "calcs": [],
- "displayMode": "list",
- "placement": "bottom",
- "showLegend": true
- },
- "tooltip": {
- "mode": "single",
- "sort": "none"
- }
- },
- "targets": [
- {
- "datasource": {
- "type": "prometheus",
- "uid": "${DS_PROMETHEUS}"
- },
- "editorMode": "code",
- "expr": "metrics_OutstandingRpcCount_Count",
- "legendFormat": "$baseLegend",
- "range": true,
- "refId": "A"
- }
- ],
- "title": "metrics_OutstandingRpcCount_Count",
- "type": "timeseries"
- },
- {
- "datasource": {
- "type": "prometheus",
- "uid": "${DS_PROMETHEUS}"
- },
- "fieldConfig": {
- "defaults": {
- "color": {
- "mode": "palette-classic"
- },
- "custom": {
- "axisCenteredZero": false,
- "axisColorMode": "text",
- "axisLabel": "",
- "axisPlacement": "auto",
- "barAlignment": 0,
- "drawStyle": "line",
- "fillOpacity": 0,
- "gradientMode": "none",
- "hideFrom": {
- "legend": false,
- "tooltip": false,
- "viz": false
- },
- "lineInterpolation": "linear",
- "lineWidth": 1,
- "pointSize": 5,
- "scaleDistribution": {
- "type": "linear"
- },
- "showPoints": "auto",
- "spanNulls": false,
- "stacking": {
- "group": "A",
- "mode": "none"
- },
- "thresholdsStyle": {
- "mode": "off"
- }
- },
- "mappings": [],
- "thresholds": {
- "mode": "absolute",
- "steps": [
- {
- "color": "green"
- },
- {
- "color": "red",
- "value": 80
- }
- ]
- }
- },
- "overrides": []
- },
- "gridPos": {
- "h": 8,
- "w": 12,
- "x": 0,
- "y": 81
- },
- "id": 186,
- "options": {
- "legend": {
- "calcs": [],
- "displayMode": "list",
- "placement": "bottom",
- "showLegend": true
- },
- "tooltip": {
- "mode": "single",
- "sort": "none"
- }
- },
- "targets": [
- {
- "datasource": {
- "type": "prometheus",
- "uid": "${DS_PROMETHEUS}"
- },
- "editorMode": "code",
- "expr": "metrics_OutstandingPushCount_Count",
- "legendFormat": "$baseLegend",
- "range": true,
- "refId": "A"
- }
- ],
- "title": "metrics_OutstandingPushCount_Count",
- "type": "timeseries"
- }
- ],
- "title": "OutstandingRequest",
- "type": "row"
}
],
"refresh": "5s",
@@ -8472,4 +8185,4 @@
"uid": "U_qgru_7z",
"version": 1,
"weekStart": ""
-}
\ No newline at end of file
+}
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java
b/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java
index 52c68367a..50bb5cac1 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/TransportContext.java
@@ -141,7 +141,7 @@ public class TransportContext {
private TransportChannelHandler createChannelHandler(
Channel channel, BaseMessageHandler msgHandler) {
- TransportResponseHandler responseHandler = new
TransportResponseHandler(conf, channel, source);
+ TransportResponseHandler responseHandler = new
TransportResponseHandler(conf, channel);
TransportClient client = new TransportClient(channel, responseHandler);
TransportRequestHandler requestHandler =
new TransportRequestHandler(channel, client, msgHandler);
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
index aa57b4e0b..ddace8d87 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java
@@ -26,15 +26,11 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.Nullable;
-
-import com.google.common.annotations.VisibleForTesting;
import io.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.celeborn.common.exception.CelebornIOException;
-import org.apache.celeborn.common.metrics.source.AbstractSource;
import org.apache.celeborn.common.network.protocol.*;
import org.apache.celeborn.common.network.server.MessageHandler;
import org.apache.celeborn.common.network.util.NettyUtils;
@@ -66,25 +62,23 @@ public class TransportResponseHandler extends
MessageHandler<ResponseMessage> {
/** Records the time (in system nanoseconds) that the last fetch or RPC
request was sent. */
private final AtomicLong timeOfLastRequestNs;
- private final AbstractSource source;
-
+ private final long pushTimeoutCheckerInterval;
private static ScheduledExecutorService pushTimeoutChecker = null;
- private ScheduledFuture<?> pushCheckerScheduleFuture;
+ private ScheduledFuture pushCheckerScheduleFuture;
+ private final long fetchTimeoutCheckerInterval;
private static ScheduledExecutorService fetchTimeoutChecker = null;
- private ScheduledFuture<?> fetchCheckerScheduleFuture;
+ private ScheduledFuture fetchCheckerScheduleFuture;
- public TransportResponseHandler(
- TransportConf conf, Channel channel, @Nullable AbstractSource source) {
+ public TransportResponseHandler(TransportConf conf, Channel channel) {
this.conf = conf;
this.channel = channel;
this.outstandingFetches = JavaUtils.newConcurrentHashMap();
this.outstandingRpcs = JavaUtils.newConcurrentHashMap();
this.outstandingPushes = JavaUtils.newConcurrentHashMap();
this.timeOfLastRequestNs = new AtomicLong(0);
- this.source = source;
- long pushTimeoutCheckerInterval = conf.pushDataTimeoutCheckIntervalMs();
- long fetchTimeoutCheckerInterval = conf.fetchDataTimeoutCheckIntervalMs();
+ this.pushTimeoutCheckerInterval = conf.pushDataTimeoutCheckIntervalMs();
+ this.fetchTimeoutCheckerInterval = conf.fetchDataTimeoutCheckIntervalMs();
String module = conf.getModuleName();
boolean checkPushTimeout = false;
@@ -116,7 +110,7 @@ public class TransportResponseHandler extends
MessageHandler<ResponseMessage> {
if (checkPushTimeout) {
pushCheckerScheduleFuture =
pushTimeoutChecker.scheduleWithFixedDelay(
- this::failExpiredPushRequest,
+ () -> failExpiredPushRequest(),
pushTimeoutCheckerInterval,
pushTimeoutCheckerInterval,
TimeUnit.MILLISECONDS);
@@ -125,13 +119,11 @@ public class TransportResponseHandler extends
MessageHandler<ResponseMessage> {
if (checkFetchTimeout) {
fetchCheckerScheduleFuture =
fetchTimeoutChecker.scheduleWithFixedDelay(
- this::failExpiredFetchRequest,
+ () -> failExpiredFetchRequest(),
fetchTimeoutCheckerInterval,
fetchTimeoutCheckerInterval,
TimeUnit.MILLISECONDS);
}
-
- registerMetrics();
}
public void failExpiredPushRequest() {
@@ -186,14 +178,6 @@ public class TransportResponseHandler extends
MessageHandler<ResponseMessage> {
}
}
- private void registerMetrics() {
- if (source != null) {
- source.addGauge("OutstandingFetchCount", outstandingFetches::size);
- source.addGauge("OutstandingRpcCount", outstandingRpcs::size);
- source.addGauge("OutstandingPushCount", outstandingPushes::size);
- }
- }
-
public void addFetchRequest(StreamChunkSlice streamChunkSlice,
FetchRequestInfo info) {
updateTimeOfLastRequest();
if (outstandingFetches.containsKey(streamChunkSlice)) {
@@ -283,7 +267,7 @@ public class TransportResponseHandler extends
MessageHandler<ResponseMessage> {
if (numOutstandingRequests() > 0) {
// show the details of outstanding Fetches
if (logger.isDebugEnabled()) {
- if (!outstandingFetches.isEmpty()) {
+ if (outstandingFetches.size() > 0) {
for (Map.Entry<StreamChunkSlice, FetchRequestInfo> e :
outstandingFetches.entrySet()) {
StreamChunkSlice key = e.getKey();
logger.debug("The channel is closed, but there is still
outstanding Fetch {}", key);
@@ -471,9 +455,4 @@ public class TransportResponseHandler extends
MessageHandler<ResponseMessage> {
streamChunkSlice);
}
}
-
- @VisibleForTesting
- public AbstractSource source() {
- return source;
- }
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
index bfa01b428..c4409c050 100644
---
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
@@ -146,39 +146,6 @@ abstract class AbstractSource(conf: CelebornConf, role:
String)
namedGauges.asScala.toList
}
- /**
- * Gets the named gauge of the metric given the metric name.
- *
- * Note: This method is exposed to test the value of the gauge for metric.
- *
- * @param name The metric name.
- * @return The corresponding named gauge.
- */
- def getGauge(name: String): NamedGauge[_] = {
- getGauge(name, Map.empty)
- }
-
- /**
- * Gets the named gauge of the metric given the metric name and labels.
- *
- * Note: This method is exposed to test the value of the gauge for metric.
- *
- * @param name The metric name.
- * @param labels The metric labels.
- * @return The corresponding named gauge.
- */
- def getGauge(name: String, labels: Map[String, String] = Map.empty):
NamedGauge[_] = {
- val labelString = MetricLabels.labelString(labels ++ staticLabels)
- val iter = namedGauges.iterator()
- while (iter.hasNext) {
- val namedGauge = iter.next()
- if (namedGauge.name.equals(name) &&
namedGauge.labelString.equals(labelString)) {
- return namedGauge
- }
- }
- null
- }
-
protected def histograms(): List[NamedHistogram] = {
List.empty[NamedHistogram]
}
diff --git
a/common/src/test/java/org/apache/celeborn/common/network/TransportResponseHandlerSuiteJ.java
b/common/src/test/java/org/apache/celeborn/common/network/TransportResponseHandlerSuiteJ.java
index cc781c1da..3dd32334f 100644
---
a/common/src/test/java/org/apache/celeborn/common/network/TransportResponseHandlerSuiteJ.java
+++
b/common/src/test/java/org/apache/celeborn/common/network/TransportResponseHandlerSuiteJ.java
@@ -27,7 +27,6 @@ import io.netty.channel.local.LocalChannel;
import org.junit.Test;
import org.apache.celeborn.common.CelebornConf;
-import org.apache.celeborn.common.metrics.source.AbstractSource;
import org.apache.celeborn.common.network.buffer.NioManagedBuffer;
import org.apache.celeborn.common.network.client.ChunkReceivedCallback;
import org.apache.celeborn.common.network.client.RpcResponseCallback;
@@ -42,40 +41,50 @@ public class TransportResponseHandlerSuiteJ {
@Test
public void handleSuccessfulFetch() throws Exception {
StreamChunkSlice streamChunkSlice = new StreamChunkSlice(1, 0);
- TransportResponseHandler handler =
createResponseHandler(TransportModuleConstants.FETCH_MODULE);
+
+ TransportResponseHandler handler =
+ new TransportResponseHandler(
+ Utils.fromCelebornConf(new CelebornConf(),
TransportModuleConstants.FETCH_MODULE, 8),
+ new LocalChannel());
ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
FetchRequestInfo info = new FetchRequestInfo(System.currentTimeMillis() +
30000, callback);
handler.addFetchRequest(streamChunkSlice, info);
- assertOutstandingRequests(handler, "OutstandingFetchCount", 1);
+ assertEquals(1, handler.numOutstandingRequests());
handler.handle(new ChunkFetchSuccess(streamChunkSlice, new
TestManagedBuffer(123)));
verify(callback, times(1)).onSuccess(eq(0), any());
- assertOutstandingRequests(handler, "OutstandingFetchCount", 0);
+ assertEquals(0, handler.numOutstandingRequests());
}
@Test
public void handleFailedFetch() throws Exception {
StreamChunkSlice streamChunkSlice = new StreamChunkSlice(1, 0);
- TransportResponseHandler handler =
createResponseHandler(TransportModuleConstants.FETCH_MODULE);
+ TransportResponseHandler handler =
+ new TransportResponseHandler(
+ Utils.fromCelebornConf(new CelebornConf(),
TransportModuleConstants.FETCH_MODULE, 8),
+ new LocalChannel());
ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
FetchRequestInfo info = new FetchRequestInfo(System.currentTimeMillis() +
30000, callback);
handler.addFetchRequest(streamChunkSlice, info);
- assertOutstandingRequests(handler, "OutstandingFetchCount", 1);
+ assertEquals(1, handler.numOutstandingRequests());
handler.handle(new ChunkFetchFailure(streamChunkSlice, "some error msg"));
verify(callback, times(1)).onFailure(eq(0), any());
- assertOutstandingRequests(handler, "OutstandingFetchCount", 0);
+ assertEquals(0, handler.numOutstandingRequests());
}
@Test
public void clearAllOutstandingRequests() throws Exception {
- TransportResponseHandler handler =
createResponseHandler(TransportModuleConstants.DATA_MODULE);
+ TransportResponseHandler handler =
+ new TransportResponseHandler(
+ Utils.fromCelebornConf(new CelebornConf(),
TransportModuleConstants.DATA_MODULE, 8),
+ new LocalChannel());
ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
FetchRequestInfo info = new FetchRequestInfo(System.currentTimeMillis() +
30000, callback);
handler.addFetchRequest(new StreamChunkSlice(1, 0), info);
handler.addFetchRequest(new StreamChunkSlice(1, 1), info);
handler.addFetchRequest(new StreamChunkSlice(1, 2), info);
- assertOutstandingRequests(handler, "OutstandingFetchCount", 3);
+ assertEquals(3, handler.numOutstandingRequests());
handler.handle(new ChunkFetchSuccess(new StreamChunkSlice(1, 0), new
TestManagedBuffer(12)));
handler.exceptionCaught(new Exception("duh duh duhhhh"));
@@ -84,49 +93,58 @@ public class TransportResponseHandlerSuiteJ {
verify(callback, times(1)).onSuccess(eq(0), any());
verify(callback, times(1)).onFailure(eq(1), any());
verify(callback, times(1)).onFailure(eq(2), any());
- assertOutstandingRequests(handler, "OutstandingFetchCount", 0);
+ assertEquals(0, handler.numOutstandingRequests());
}
@Test
public void handleSuccessfulRPC() throws Exception {
- TransportResponseHandler handler =
createResponseHandler(TransportModuleConstants.RPC_MODULE);
+ TransportResponseHandler handler =
+ new TransportResponseHandler(
+ Utils.fromCelebornConf(new CelebornConf(),
TransportModuleConstants.RPC_MODULE, 8),
+ new LocalChannel());
RpcResponseCallback callback = mock(RpcResponseCallback.class);
handler.addRpcRequest(12345, callback);
- assertOutstandingRequests(handler, "OutstandingRpcCount", 1);
+ assertEquals(1, handler.numOutstandingRequests());
// This response should be ignored.
handler.handle(new RpcResponse(54321, new
NioManagedBuffer(ByteBuffer.allocate(7))));
- assertOutstandingRequests(handler, "OutstandingRpcCount", 1);
+ assertEquals(1, handler.numOutstandingRequests());
ByteBuffer resp = ByteBuffer.allocate(10);
handler.handle(new RpcResponse(12345, new NioManagedBuffer(resp)));
verify(callback, times(1)).onSuccess(eq(ByteBuffer.allocate(10)));
- assertOutstandingRequests(handler, "OutstandingRpcCount", 0);
+ assertEquals(0, handler.numOutstandingRequests());
}
@Test
public void handleFailedRPC() throws Exception {
- TransportResponseHandler handler =
createResponseHandler(TransportModuleConstants.RPC_MODULE);
+ TransportResponseHandler handler =
+ new TransportResponseHandler(
+ Utils.fromCelebornConf(new CelebornConf(),
TransportModuleConstants.RPC_MODULE, 8),
+ new LocalChannel());
RpcResponseCallback callback = mock(RpcResponseCallback.class);
handler.addRpcRequest(12345, callback);
- assertOutstandingRequests(handler, "OutstandingRpcCount", 1);
+ assertEquals(1, handler.numOutstandingRequests());
handler.handle(new RpcFailure(54321, "uh-oh!")); // should be ignored
- assertOutstandingRequests(handler, "OutstandingRpcCount", 1);
+ assertEquals(1, handler.numOutstandingRequests());
handler.handle(new RpcFailure(12345, "oh no"));
verify(callback, times(1)).onFailure(any());
- assertOutstandingRequests(handler, "OutstandingRpcCount", 0);
+ assertEquals(0, handler.numOutstandingRequests());
}
@Test
public void handleSuccessfulPush() throws Exception {
- TransportResponseHandler handler =
createResponseHandler(TransportModuleConstants.DATA_MODULE);
+ TransportResponseHandler handler =
+ new TransportResponseHandler(
+ Utils.fromCelebornConf(new CelebornConf(),
TransportModuleConstants.DATA_MODULE, 8),
+ new LocalChannel());
RpcResponseCallback callback = mock(RpcResponseCallback.class);
PushRequestInfo info = new PushRequestInfo(System.currentTimeMillis() +
30000, callback);
info.setChannelFuture(mock(ChannelFuture.class));
handler.addPushRequest(12345, info);
- assertOutstandingRequests(handler, "OutstandingPushCount", 1);
+ assertEquals(1, handler.numOutstandingRequests());
// This response should be ignored.
handler.handle(new RpcResponse(54321, new
NioManagedBuffer(ByteBuffer.allocate(7))));
@@ -135,42 +153,26 @@ public class TransportResponseHandlerSuiteJ {
ByteBuffer resp = ByteBuffer.allocate(10);
handler.handle(new RpcResponse(12345, new NioManagedBuffer(resp)));
verify(callback, times(1)).onSuccess(eq(ByteBuffer.allocate(10)));
- assertOutstandingRequests(handler, "OutstandingPushCount", 0);
+ assertEquals(0, handler.numOutstandingRequests());
}
@Test
public void handleFailedPush() throws Exception {
- TransportResponseHandler handler =
createResponseHandler(TransportModuleConstants.DATA_MODULE);
+ TransportResponseHandler handler =
+ new TransportResponseHandler(
+ Utils.fromCelebornConf(new CelebornConf(),
TransportModuleConstants.DATA_MODULE, 8),
+ new LocalChannel());
RpcResponseCallback callback = mock(RpcResponseCallback.class);
PushRequestInfo info = new PushRequestInfo(System.currentTimeMillis() +
30000L, callback);
info.setChannelFuture(mock(ChannelFuture.class));
handler.addPushRequest(12345, info);
- assertOutstandingRequests(handler, "OutstandingPushCount", 1);
+ assertEquals(1, handler.numOutstandingRequests());
handler.handle(new RpcFailure(54321, "uh-oh!")); // should be ignored
- assertOutstandingRequests(handler, "OutstandingPushCount", 1);
+ assertEquals(1, handler.numOutstandingRequests());
handler.handle(new RpcFailure(12345, "oh no"));
verify(callback, times(1)).onFailure(any());
- assertOutstandingRequests(handler, "OutstandingPushCount", 0);
- }
-
- private TransportResponseHandler createResponseHandler(String module) {
- CelebornConf celebornConf = new CelebornConf();
- return new TransportResponseHandler(
- Utils.fromCelebornConf(celebornConf, module, 8),
- new LocalChannel(),
- new AbstractSource(celebornConf, "Worker") {
- @Override
- public String sourceName() {
- return "worker";
- }
- });
- }
-
- private void assertOutstandingRequests(
- TransportResponseHandler handler, String name, int expected) {
- assertEquals(expected, handler.numOutstandingRequests());
- assertEquals(expected, handler.source().getGauge(name).gauge().getValue());
+ assertEquals(0, handler.numOutstandingRequests());
}
}
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 228d47a3d..e5745ee56 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -198,12 +198,6 @@ These metrics are exposed by Celeborn worker.
- The active shuffle size of a worker including master replica and
slave replica.
- ActiveShuffleFileCount
- The active shuffle file count of a worker including master replica
and slave replica.
- - OutstandingFetchCount
- - The count of outstanding fetch request.
- - OutstandingRpcCount
- - The count of outstanding rpc request.
- - OutstandingPushCount
- - The count of outstanding push request.
- push_server_usedHeapMemory
- push_server_usedDirectMemory
- push_server_numAllocations