[
https://issues.apache.org/jira/browse/HADOOP-15327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17636076#comment-17636076
]
ASF GitHub Bot commented on HADOOP-15327:
-----------------------------------------
jojochuang commented on code in PR #3259:
URL: https://github.com/apache/hadoop/pull/3259#discussion_r1026831834
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -18,19 +18,20 @@
package org.apache.hadoop.mapred;
+import static io.netty.buffer.Unpooled.wrappedBuffer;
+import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
+import static io.netty.handler.codec.http.HttpMethod.GET;
+import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
+import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
+import static
io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static
io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
+import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+import static org.apache.hadoop.mapred.ShuffleHandler.NettyChannelHelper.*;
Review Comment:
let's not use wildcard import
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java:
##########
@@ -315,9 +318,8 @@ protected void copyFromHost(MapHost host) throws
IOException {
return;
}
- if(LOG.isDebugEnabled()) {
- LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: "
- + maps);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: " +
maps);
Review Comment:
slf4j logger messages can be rewritten using parameterized logging format.
But let's not worry about that now. This PR is already too big.
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -1322,48 +1438,45 @@ protected void sendError(ChannelHandlerContext ctx,
String msg,
for (Map.Entry<String, String> header : headers.entrySet()) {
response.headers().set(header.getKey(), header.getValue());
}
- response.setContent(
- ChannelBuffers.copiedBuffer(msg, CharsetUtil.UTF_8));
// Close the connection as soon as the error message is sent.
-
ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+ writeToChannelAndClose(ctx.channel(), response);
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
- Channel ch = e.getChannel();
- Throwable cause = e.getCause();
+ Channel ch = ctx.channel();
if (cause instanceof TooLongFrameException) {
+ LOG.trace("TooLongFrameException, channel id: {}", ch.id());
sendError(ctx, BAD_REQUEST);
return;
} else if (cause instanceof IOException) {
if (cause instanceof ClosedChannelException) {
- LOG.debug("Ignoring closed channel error", cause);
+ LOG.debug("Ignoring closed channel error, channel id: " + ch.id(),
cause);
Review Comment:
Use parameterized logging, otherwise wrap it in a LOG.isDebugEnabled() check.
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -904,65 +990,84 @@ private List<String> splitMaps(List<String> mapq) {
}
@Override
- public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt)
+ public void channelActive(ChannelHandlerContext ctx)
throws Exception {
- super.channelOpen(ctx, evt);
-
- if ((maxShuffleConnections > 0) && (accepted.size() >=
maxShuffleConnections)) {
+ NettyChannelHelper.channelActive(ctx.channel());
+ int numConnections = activeConnections.incrementAndGet();
+ if ((maxShuffleConnections > 0) && (numConnections >
maxShuffleConnections)) {
Review Comment:
Perhaps these channel bookeeping is no longer needed given that channel size
is limited.
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -182,19 +184,29 @@ public class ShuffleHandler extends AuxiliaryService {
public static final HttpResponseStatus TOO_MANY_REQ_STATUS =
new HttpResponseStatus(429, "TOO MANY REQUESTS");
- // This should kept in sync with Fetcher.FETCH_RETRY_DELAY_DEFAULT
+ // This should be kept in sync with Fetcher.FETCH_RETRY_DELAY_DEFAULT
public static final long FETCH_RETRY_DELAY = 1000L;
public static final String RETRY_AFTER_HEADER = "Retry-After";
+ static final String ENCODER_HANDLER_NAME = "encoder";
private int port;
- private ChannelFactory selector;
- private final ChannelGroup accepted = new DefaultChannelGroup();
+ private EventLoopGroup bossGroup;
+ private EventLoopGroup workerGroup;
+ private ServerBootstrap bootstrap;
+ private Channel ch;
+ private final ChannelGroup accepted =
+ new DefaultChannelGroup(new DefaultEventExecutorGroup(5).next());
Review Comment:
So, if I understand it correct from the context, the size of the channel
group should be maxShuffleConnections, which is unlimited by default
(configurable via mapreduce.shuffle.max.connections)
> Upgrade MR ShuffleHandler to use Netty4
> ---------------------------------------
>
> Key: HADOOP-15327
> URL: https://issues.apache.org/jira/browse/HADOOP-15327
> Project: Hadoop Common
> Issue Type: Sub-task
> Reporter: Xiaoyu Yao
> Assignee: Szilard Nemeth
> Priority: Major
> Labels: pull-request-available
> Fix For: 3.4.0
>
> Attachments: HADOOP-15327.001.patch, HADOOP-15327.002.patch,
> HADOOP-15327.003.patch, HADOOP-15327.004.patch, HADOOP-15327.005.patch,
> HADOOP-15327.005.patch,
> getMapOutputInfo_BlockingOperationException_awaitUninterruptibly.log,
> hades-results-20221108.zip, testfailure-testMapFileAccess-emptyresponse.zip,
> testfailure-testReduceFromPartialMem.zip
>
> Time Spent: 11.5h
> Remaining Estimate: 0h
>
> This way, we can remove the dependencies on the netty3 (jboss.netty)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]