[ 
https://issues.apache.org/jira/browse/HADOOP-15327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17636076#comment-17636076
 ] 

ASF GitHub Bot commented on HADOOP-15327:
-----------------------------------------

jojochuang commented on code in PR #3259:
URL: https://github.com/apache/hadoop/pull/3259#discussion_r1026831834


##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -18,19 +18,20 @@
 
 package org.apache.hadoop.mapred;
 
+import static io.netty.buffer.Unpooled.wrappedBuffer;
+import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
+import static io.netty.handler.codec.http.HttpMethod.GET;
+import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
+import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
+import static 
io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static 
io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
+import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+import static org.apache.hadoop.mapred.ShuffleHandler.NettyChannelHelper.*;

Review Comment:
   let's not use wildcard import



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java:
##########
@@ -315,9 +318,8 @@ protected void copyFromHost(MapHost host) throws 
IOException {
       return;
     }
     
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: "
-        + maps);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: " + 
maps);

Review Comment:
   slf4j logger messages can be rewritten using parameterized logging format. 
But let's not worry about that now. This PR is already too big.



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -1322,48 +1438,45 @@ protected void sendError(ChannelHandlerContext ctx, 
String msg,
       for (Map.Entry<String, String> header : headers.entrySet()) {
         response.headers().set(header.getKey(), header.getValue());
       }
-      response.setContent(
-          ChannelBuffers.copiedBuffer(msg, CharsetUtil.UTF_8));
 
       // Close the connection as soon as the error message is sent.
-      
ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+      writeToChannelAndClose(ctx.channel(), response);
     }
 
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
         throws Exception {
-      Channel ch = e.getChannel();
-      Throwable cause = e.getCause();
+      Channel ch = ctx.channel();
       if (cause instanceof TooLongFrameException) {
+        LOG.trace("TooLongFrameException, channel id: {}", ch.id());
         sendError(ctx, BAD_REQUEST);
         return;
       } else if (cause instanceof IOException) {
         if (cause instanceof ClosedChannelException) {
-          LOG.debug("Ignoring closed channel error", cause);
+          LOG.debug("Ignoring closed channel error, channel id: " + ch.id(), 
cause);

Review Comment:
   Use parameterized logging, otherwise wrap it in a LOG.isDebugEnabled() check.



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -904,65 +990,84 @@ private List<String> splitMaps(List<String> mapq) {
     }
 
     @Override
-    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt) 
+    public void channelActive(ChannelHandlerContext ctx)
         throws Exception {
-      super.channelOpen(ctx, evt);
-
-      if ((maxShuffleConnections > 0) && (accepted.size() >= 
maxShuffleConnections)) {
+      NettyChannelHelper.channelActive(ctx.channel());
+      int numConnections = activeConnections.incrementAndGet();
+      if ((maxShuffleConnections > 0) && (numConnections > 
maxShuffleConnections)) {

Review Comment:
   Perhaps these channel bookeeping is no longer needed given that channel size 
is limited.



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -182,19 +184,29 @@ public class ShuffleHandler extends AuxiliaryService {
 
   public static final HttpResponseStatus TOO_MANY_REQ_STATUS =
       new HttpResponseStatus(429, "TOO MANY REQUESTS");
-  // This should kept in sync with Fetcher.FETCH_RETRY_DELAY_DEFAULT
+  // This should be kept in sync with Fetcher.FETCH_RETRY_DELAY_DEFAULT
   public static final long FETCH_RETRY_DELAY = 1000L;
   public static final String RETRY_AFTER_HEADER = "Retry-After";
+  static final String ENCODER_HANDLER_NAME = "encoder";
 
   private int port;
-  private ChannelFactory selector;
-  private final ChannelGroup accepted = new DefaultChannelGroup();
+  private EventLoopGroup bossGroup;
+  private EventLoopGroup workerGroup;
+  private ServerBootstrap bootstrap;
+  private Channel ch;
+  private final ChannelGroup accepted =
+      new DefaultChannelGroup(new DefaultEventExecutorGroup(5).next());

Review Comment:
   So, if I understand it correct from the context, the size of the channel 
group should be maxShuffleConnections, which is unlimited by default 
(configurable via mapreduce.shuffle.max.connections)





> Upgrade MR ShuffleHandler to use Netty4
> ---------------------------------------
>
>                 Key: HADOOP-15327
>                 URL: https://issues.apache.org/jira/browse/HADOOP-15327
>             Project: Hadoop Common
>          Issue Type: Sub-task
>            Reporter: Xiaoyu Yao
>            Assignee: Szilard Nemeth
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 3.4.0
>
>         Attachments: HADOOP-15327.001.patch, HADOOP-15327.002.patch, 
> HADOOP-15327.003.patch, HADOOP-15327.004.patch, HADOOP-15327.005.patch, 
> HADOOP-15327.005.patch, 
> getMapOutputInfo_BlockingOperationException_awaitUninterruptibly.log, 
> hades-results-20221108.zip, testfailure-testMapFileAccess-emptyresponse.zip, 
> testfailure-testReduceFromPartialMem.zip
>
>          Time Spent: 11.5h
>  Remaining Estimate: 0h
>
> This way, we can remove the dependencies on the netty3 (jboss.netty)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to