This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new cbf4f6f1e [#1596] fix(netty): Use a ChannelFutureListener callback
mechanism to release readMemory (#1605)
cbf4f6f1e is described below
commit cbf4f6f1e2e0d7b0b5e03b169f62c602fac30278
Author: RickyMa <[email protected]>
AuthorDate: Thu Mar 28 14:35:56 2024 +0800
[#1596] fix(netty): Use a ChannelFutureListener callback mechanism to
release readMemory (#1605)
### What changes were proposed in this pull request?
1. Add a `ChannelFutureListener` and use its callback mechanism to release
`readMemory` only after the `writeAndFlush` method is truly completed.
2. Change the descriptions of configurations
`rss.server.buffer.capacity.ratio` and `rss.server.read.buffer.capacity.ratio`.
### Why are the changes needed?
This is actually a bug, which was introduced by PR
https://github.com/apache/incubator-uniffle/pull/879. The issue has been
present since the very beginning when the Netty feature was first integrated.
Fix https://github.com/apache/incubator-uniffle/issues/1596.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
I don't think we need new tests. Tested in our env.
The new log will be:
```
[2024-03-26 23:11:51.039] [epollEventLoopGroup-3-158] [INFO]
ShuffleServerNettyHandler.operationComplete - Successfully executed
getLocalShuffleData for appId[application_1703049085550_7359933_1711463990606],
shuffleId[0], partitionId[1328], offset[0], length[14693742]. Took 1457 ms and
retrieved 14693742 bytes of data
[2024-03-26 23:11:51.040] [epollEventLoopGroup-3-130] [INFO]
ShuffleServerNettyHandler.operationComplete - Successfully executed
getMemoryShuffleData for
appId[application_1703049085550_7359933_1711463990606], shuffleId[0],
partitionId[1262]. Took 1 ms and retrieved 0 bytes of data
[2024-03-26 23:11:51.068] [epollEventLoopGroup-3-177] [INFO]
ShuffleServerNettyHandler.operationComplete - Successfully executed
getLocalShuffleIndex for
appId[application_1703049085550_7359933_1711463990606], shuffleId[0],
partitionId[1366]. Took 918 ms and retrieved 1653600 bytes of data
```
---
.../netty/protocol/GetLocalShuffleDataRequest.java | 5 +
.../protocol/GetLocalShuffleIndexRequest.java | 5 +
.../protocol/GetMemoryShuffleDataRequest.java | 5 +
.../common/netty/protocol/RequestMessage.java | 2 +
.../netty/protocol/SendShuffleDataRequest.java | 5 +
docs/server_guide.md | 6 +-
.../apache/uniffle/server/ShuffleServerConf.java | 4 +-
.../server/buffer/ShuffleBufferManager.java | 7 +-
.../server/netty/ShuffleServerNettyHandler.java | 167 ++++++++++++++++-----
9 files changed, 160 insertions(+), 46 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataRequest.java
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataRequest.java
index d80d0aa6d..b96c028fb 100644
---
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataRequest.java
+++
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataRequest.java
@@ -131,4 +131,9 @@ public class GetLocalShuffleDataRequest extends
RequestMessage {
public long getTimestamp() {
return timestamp;
}
+
+ @Override
+ public String getOperationType() {
+ return "getLocalShuffleData";
+ }
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexRequest.java
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexRequest.java
index 1ccdfae10..105fea051 100644
---
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexRequest.java
+++
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleIndexRequest.java
@@ -93,4 +93,9 @@ public class GetLocalShuffleIndexRequest extends
RequestMessage {
public int getPartitionNum() {
return partitionNum;
}
+
+ @Override
+ public String getOperationType() {
+ return "getLocalShuffleIndex";
+ }
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetMemoryShuffleDataRequest.java
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetMemoryShuffleDataRequest.java
index d358cf7cd..13a241241 100644
---
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetMemoryShuffleDataRequest.java
+++
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetMemoryShuffleDataRequest.java
@@ -148,4 +148,9 @@ public class GetMemoryShuffleDataRequest extends
RequestMessage {
public Roaring64NavigableMap getExpectedTaskIdsBitmap() {
return expectedTaskIdsBitmap;
}
+
+ @Override
+ public String getOperationType() {
+ return "getMemoryShuffleData";
+ }
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/RequestMessage.java
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/RequestMessage.java
index cfa55287c..946f906cc 100644
---
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/RequestMessage.java
+++
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/RequestMessage.java
@@ -35,4 +35,6 @@ public abstract class RequestMessage extends Message {
public long getRequestId() {
return requestId;
}
+
+ public abstract String getOperationType();
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/SendShuffleDataRequest.java
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/SendShuffleDataRequest.java
index 492b5b64b..a77b0d3c7 100644
---
a/common/src/main/java/org/apache/uniffle/common/netty/protocol/SendShuffleDataRequest.java
+++
b/common/src/main/java/org/apache/uniffle/common/netty/protocol/SendShuffleDataRequest.java
@@ -145,4 +145,9 @@ public class SendShuffleDataRequest extends RequestMessage {
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
+
+ @Override
+ public String getOperationType() {
+ return "sendShuffleData";
+ }
}
diff --git a/docs/server_guide.md b/docs/server_guide.md
index 25224d6d6..efe0856a9 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -83,11 +83,11 @@ This document will introduce how to deploy Uniffle shuffle
servers.
| rss.server.netty.receive.buf | 0
| Receive buffer size
(SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should
be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth =
10Gbps, buffer size should be ~ 1.25MB. Default is 0, the operating system
automatically estimates the receive buffer size based on default settings.
[...]
| rss.server.netty.send.buf | 0
| Send buffer size
(SO_SNDBUF).
[...]
| rss.server.buffer.capacity | -1
| Max memory of buffer
manager for shuffle server. If negative, JVM heap size * buffer.ratio is used
[...]
-| rss.server.buffer.capacity.ratio | 0.8
| when
`rss.server.buffer.capacity`=-1, then the buffer capacity is JVM heap size *
ratio
[...]
+| rss.server.buffer.capacity.ratio | 0.8
| when
`rss.server.buffer.capacity`=-1, then the buffer capacity is JVM heap size or
off-heap size(when enabling Netty) * ratio
[...]
| rss.server.memory.shuffle.highWaterMark.percentage | 75.0
| Threshold of spill data
to storage, percentage of rss.server.buffer.capacity
[...]
| rss.server.memory.shuffle.lowWaterMark.percentage | 25.0
| Threshold of keep data in
memory, percentage of rss.server.buffer.capacity
[...]
| rss.server.read.buffer.capacity | -1
| Max size of buffer for
reading data. If negative, JVM heap size * read.buffer.ratio is used
[...]
-| rss.server.read.buffer.capacity.ratio | 0.4
| when
`rss.server.read.buffer.capacity`=-1, then read buffer capacity is JVM heap
size * ratio
[...]
+| rss.server.read.buffer.capacity.ratio | 0.4
| when
`rss.server.read.buffer.capacity`=-1, then read buffer capacity is JVM heap
size or off-heap size(when enabling Netty) * ratio
[...]
| rss.server.heartbeat.interval | 10000
| Heartbeat interval to
Coordinator (ms)
[...]
| rss.server.flush.localfile.threadPool.size | 10
| Thread pool for flush
data to local file
[...]
| rss.server.flush.hadoop.threadPool.size | 60
| Thread pool for flush
data to hadoop storage
[...]
@@ -104,7 +104,7 @@ This document will introduce how to deploy Uniffle shuffle
servers.
| rss.server.max.concurrency.of.per-partition.write | 30
| The max concurrency of
single partition writer, the data partition file number is equal to this value.
Default value is 1. This config could improve the writing speed, especially for
huge partition.
[...]
| rss.server.max.concurrency.limit.of.per-partition.write | -
| The limit for max
concurrency per-partition write specified by client, this won't be enabled by
default.
[...]
| rss.metrics.reporter.class | -
| The class of metrics
reporter.
[...]
-| rss.server.hybrid.storage.manager.selector.class |
org.apache.uniffle.server.storage.hybrid.DefaultStorageManagerSelector | The
manager selector strategy for `MEMORY_LOCALFILE_HDFS`. Default value is
`DefaultStorageManagerSelector`, and another
`HugePartitionSensitiveStorageManagerSelector` will flush only huge partition's
data to cold storage.
[...]
+| rss.server.hybrid.storage.manager.selector.class |
org.apache.uniffle.server.storage.hybrid.DefaultStorageManagerSelector | The
manager selector strategy for `MEMORY_LOCALFILE_HDFS`. Default value is
`DefaultStorageManagerSelector`, and another
`HugePartitionSensitiveStorageManagerSelector` will flush only huge partition's
data to cold storage.
[...]
| rss.server.disk-capacity.watermark.check.enabled | false
| If it is co-located with
other services, the high-low watermark check based on the uniffle used is not
correct. Due to this, the whole disk capacity watermark check is necessary,
which will reuse the current watermark value. It will be disabled by default.
[...]
### Advanced Configurations
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index e72510eae..9ea2e84f2 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -42,7 +42,7 @@ public class ShuffleServerConf extends RssBaseConf {
.doubleType()
.defaultValue(0.6)
.withDescription(
- "JVM heap size * ratio for the maximum memory of buffer manager
for shuffle server, this "
+ "JVM heap size or off-heap size(when enabling Netty) * ratio for
the maximum memory of buffer manager for shuffle server, this "
+ "is only effective when `rss.server.buffer.capacity` is
not explicitly set");
public static final ConfigOption<Long> SERVER_READ_BUFFER_CAPACITY =
@@ -56,7 +56,7 @@ public class ShuffleServerConf extends RssBaseConf {
.doubleType()
.defaultValue(0.2)
.withDescription(
- "JVM heap size * ratio for read buffer size, this is only
effective when "
+ "JVM heap size or off-heap size(when enabling Netty) * ratio for
read buffer size, this is only effective when "
+ "`rss.server.reader.buffer.capacity.ratio` is not
explicitly set");
public static final ConfigOption<Long> SERVER_HEARTBEAT_DELAY =
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index ceca59211..4d42b0576 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -98,7 +98,12 @@ public class ShuffleBufferManager {
this.readCapacity =
conf.getSizeAsBytes(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY);
if (this.readCapacity < 0) {
this.readCapacity =
- (long) (heapSize *
conf.getDouble(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY_RATIO));
+ nettyServerEnabled
+ ? (long)
+ (NettyUtils.getMaxDirectMemory()
+ *
conf.getDouble(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY_RATIO))
+ : (long)
+ (heapSize *
conf.getDouble(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY_RATIO));
}
LOG.info(
"Init shuffle buffer manager with capacity: {}, read buffer capacity:
{}.",
diff --git
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index ac8973ecc..2e0c070e9 100644
---
a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++
b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -24,6 +24,8 @@ import java.util.Map;
import com.google.common.collect.Lists;
import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -253,7 +255,7 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
.recordTransportTime(GetMemoryShuffleDataRequest.class.getName(),
transportTime);
}
}
- long start = System.currentTimeMillis();
+ final long start = System.currentTimeMillis();
StatusCode status = StatusCode.SUCCESS;
String msg = "OK";
GetMemoryShuffleDataResponse response;
@@ -262,8 +264,9 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
// todo: if can get the exact memory size?
if
(shuffleServer.getShuffleBufferManager().requireReadMemoryWithRetry(readBufferSize))
{
+ ShuffleDataResult shuffleDataResult = null;
try {
- ShuffleDataResult shuffleDataResult =
+ shuffleDataResult =
shuffleServer
.getShuffleTaskManager()
.getInMemoryShuffleData(
@@ -281,19 +284,18 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
ShuffleServerMetrics.counterTotalReadDataSize.inc(data.size());
ShuffleServerMetrics.counterTotalReadMemoryDataSize.inc(data.size());
}
- long costTime = System.currentTimeMillis() - start;
- shuffleServer
- .getNettyMetrics()
- .recordProcessTime(GetMemoryShuffleDataRequest.class.getName(),
costTime);
- LOG.info(
- "Successfully getInMemoryShuffleData cost {} ms with {} bytes
shuffle" + " data for {}",
- costTime,
- data.size(),
- requestInfo);
-
response =
new GetMemoryShuffleDataResponse(req.getRequestId(), status, msg,
bufferSegments, data);
+ ReleaseMemoryAndRecordReadTimeListener listener =
+ new ReleaseMemoryAndRecordReadTimeListener(
+ start, readBufferSize, data.size(), requestInfo, req, client);
+ client.getChannel().writeAndFlush(response).addListener(listener);
+ return;
} catch (Exception e) {
+
shuffleServer.getShuffleBufferManager().releaseReadMemory(readBufferSize);
+ if (shuffleDataResult != null) {
+ shuffleDataResult.release();
+ }
status = StatusCode.INTERNAL_ERROR;
msg =
"Error happened when get in memory shuffle data for "
@@ -304,8 +306,6 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
response =
new GetMemoryShuffleDataResponse(
req.getRequestId(), status, msg, Lists.newArrayList(),
Unpooled.EMPTY_BUFFER);
- } finally {
-
shuffleServer.getShuffleBufferManager().releaseReadMemory(readBufferSize);
}
} else {
status = StatusCode.INTERNAL_ERROR;
@@ -348,9 +348,10 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
.getShuffleServerConf()
.getLong(ShuffleServerConf.SERVER_SHUFFLE_INDEX_SIZE_HINT);
if
(shuffleServer.getShuffleBufferManager().requireReadMemoryWithRetry(assumedFileSize))
{
+ ShuffleIndexResult shuffleIndexResult = null;
try {
final long start = System.currentTimeMillis();
- ShuffleIndexResult shuffleIndexResult =
+ shuffleIndexResult =
shuffleServer
.getShuffleTaskManager()
.getShuffleIndex(appId, shuffleId, partitionId,
partitionNumPerRange, partitionNum);
@@ -361,13 +362,16 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
response =
new GetLocalShuffleIndexResponse(
req.getRequestId(), status, msg, data,
shuffleIndexResult.getDataFileLen());
- long readTime = System.currentTimeMillis() - start;
- LOG.info(
- "Successfully getShuffleIndex cost {} ms for {}" + " bytes with
{}",
- readTime,
- data.size(),
- requestInfo);
+ ReleaseMemoryAndRecordReadTimeListener listener =
+ new ReleaseMemoryAndRecordReadTimeListener(
+ start, assumedFileSize, data.size(), requestInfo, req, client);
+ client.getChannel().writeAndFlush(response).addListener(listener);
+ return;
} catch (FileNotFoundException indexFileNotFoundException) {
+
shuffleServer.getShuffleBufferManager().releaseReadMemory(assumedFileSize);
+ if (shuffleIndexResult != null) {
+ shuffleIndexResult.release();
+ }
LOG.warn(
"Index file for {} is not found, maybe the data has been flushed
to cold storage.",
requestInfo,
@@ -376,14 +380,16 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
new GetLocalShuffleIndexResponse(
req.getRequestId(), status, msg, Unpooled.EMPTY_BUFFER, 0L);
} catch (Exception e) {
+
shuffleServer.getShuffleBufferManager().releaseReadMemory(assumedFileSize);
+ if (shuffleIndexResult != null) {
+ shuffleIndexResult.release();
+ }
status = StatusCode.INTERNAL_ERROR;
msg = "Error happened when get shuffle index for " + requestInfo + ",
" + e.getMessage();
LOG.error(msg, e);
response =
new GetLocalShuffleIndexResponse(
req.getRequestId(), status, msg, Unpooled.EMPTY_BUFFER, 0L);
- } finally {
-
shuffleServer.getShuffleBufferManager().releaseReadMemory(assumedFileSize);
}
} else {
status = StatusCode.INTERNAL_ERROR;
@@ -418,7 +424,6 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
StatusCode status = StatusCode.SUCCESS;
String msg = "OK";
GetLocalShuffleDataResponse response;
- ShuffleDataResult sdr;
String requestInfo =
"appId["
+ appId
@@ -426,11 +431,9 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
+ shuffleId
+ "], partitionId["
+ partitionId
- + "]"
- + "offset["
+ + "], offset["
+ offset
- + "]"
- + "length["
+ + "], length["
+ length
+ "]";
@@ -445,8 +448,9 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
}
if
(shuffleServer.getShuffleBufferManager().requireReadMemoryWithRetry(length)) {
+ ShuffleDataResult sdr = null;
try {
- long start = System.currentTimeMillis();
+ final long start = System.currentTimeMillis();
sdr =
shuffleServer
.getShuffleTaskManager()
@@ -459,29 +463,27 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
storageType,
offset,
length);
- long readTime = System.currentTimeMillis() - start;
- ShuffleServerMetrics.counterTotalReadTime.inc(readTime);
ShuffleServerMetrics.counterTotalReadDataSize.inc(sdr.getDataLength());
ShuffleServerMetrics.counterTotalReadLocalDataFileSize.inc(sdr.getDataLength());
- shuffleServer
- .getNettyMetrics()
- .recordProcessTime(GetLocalShuffleDataRequest.class.getName(),
readTime);
- LOG.info(
- "Successfully getShuffleData cost {} ms for shuffle" + " data with
{}",
- readTime,
- requestInfo);
response =
new GetLocalShuffleDataResponse(
req.getRequestId(), status, msg, sdr.getManagedBuffer());
+ ReleaseMemoryAndRecordReadTimeListener listener =
+ new ReleaseMemoryAndRecordReadTimeListener(
+ start, length, sdr.getDataLength(), requestInfo, req, client);
+ client.getChannel().writeAndFlush(response).addListener(listener);
+ return;
} catch (Exception e) {
+ shuffleServer.getShuffleBufferManager().releaseReadMemory(length);
+ if (sdr != null) {
+ sdr.release();
+ }
status = StatusCode.INTERNAL_ERROR;
msg = "Error happened when get shuffle data for " + requestInfo + ", "
+ e.getMessage();
LOG.error(msg, e);
response =
new GetLocalShuffleDataResponse(
req.getRequestId(), status, msg, new
NettyManagedBuffer(Unpooled.EMPTY_BUFFER));
- } finally {
- shuffleServer.getShuffleBufferManager().releaseReadMemory(length);
}
} else {
status = StatusCode.INTERNAL_ERROR;
@@ -522,4 +524,89 @@ public class ShuffleServerNettyHandler implements
BaseMessageHandler {
}
return ret;
}
+
+ class ReleaseMemoryAndRecordReadTimeListener implements
ChannelFutureListener {
+ private final long readStartedTime;
+ private final long readBufferSize;
+ private final long dataSize;
+ private final String requestInfo;
+ private final RequestMessage request;
+ private final TransportClient client;
+
+ ReleaseMemoryAndRecordReadTimeListener(
+ long readStartedTime,
+ long readBufferSize,
+ long dataSize,
+ String requestInfo,
+ RequestMessage request,
+ TransportClient client) {
+ this.readStartedTime = readStartedTime;
+ this.readBufferSize = readBufferSize;
+ this.dataSize = dataSize;
+ this.requestInfo = requestInfo;
+ this.request = request;
+ this.client = client;
+ }
+
+ @Override
+ public void operationComplete(ChannelFuture future) {
+
shuffleServer.getShuffleBufferManager().releaseReadMemory(readBufferSize);
+ long readTime = System.currentTimeMillis() - readStartedTime;
+ ShuffleServerMetrics.counterTotalReadTime.inc(readTime);
+
shuffleServer.getNettyMetrics().recordProcessTime(request.getClass().getName(),
readTime);
+ if (!future.isSuccess()) {
+ Throwable cause = future.cause();
+ String errorMsg =
+ "Error happened when executing "
+ + request.getOperationType()
+ + " for "
+ + requestInfo
+ + ", "
+ + cause.getMessage();
+ LOG.error(errorMsg, future.cause());
+ RpcResponse errorResponse;
+ if (request instanceof GetLocalShuffleDataRequest) {
+ errorResponse =
+ new GetLocalShuffleDataResponse(
+ request.getRequestId(),
+ StatusCode.INTERNAL_ERROR,
+ errorMsg,
+ new NettyManagedBuffer(Unpooled.EMPTY_BUFFER));
+ } else if (request instanceof GetLocalShuffleIndexRequest) {
+ errorResponse =
+ new GetLocalShuffleIndexResponse(
+ request.getRequestId(),
+ StatusCode.INTERNAL_ERROR,
+ errorMsg,
+ Unpooled.EMPTY_BUFFER,
+ 0L);
+ } else if (request instanceof GetMemoryShuffleDataRequest) {
+ errorResponse =
+ new GetMemoryShuffleDataResponse(
+ request.getRequestId(),
+ StatusCode.INTERNAL_ERROR,
+ errorMsg,
+ Lists.newArrayList(),
+ Unpooled.EMPTY_BUFFER);
+ } else {
+ LOG.error("Cannot handle request {}", request.type());
+ return;
+ }
+ client.getChannel().writeAndFlush(errorResponse);
+ LOG.error(
+ "Failed to execute {} for {}. Took {} ms and could not retrieve {}
bytes of data",
+ request.getOperationType(),
+ requestInfo,
+ readTime,
+ dataSize);
+ } else {
+ LOG.info(
+ "Successfully executed {} for {}. Took {} ms and retrieved {}
bytes of data",
+ request.getOperationType(),
+ requestInfo,
+ readTime,
+ dataSize);
+ }
+ }
+ }
}