9uapaw commented on code in PR #3259:
URL: https://github.com/apache/hadoop/pull/3259#discussion_r842824343


##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -1081,6 +1200,9 @@ public ChannelFuture sendMap(ReduceContext reduceContext)
           return null;
         }
       }
+      if (nextMap == null) {
+        LOG.trace("Returning nextMap: null");
+      }

Review Comment:
   I think its superflous, because in the try clause null check was already in 
place, and in the exception clause, we already return with null.



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java:
##########
@@ -294,33 +941,31 @@ protected ChannelFuture 
sendMapOutput(ChannelHandlerContext ctx,
               Channel ch, String user, String mapId, int reduce,
               MapOutputInfo info)
                   throws IOException {
-            // send a shuffle header and a lot of data down the channel
-            // to trigger a broken pipe
             ShuffleHeader header =
                 new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
             DataOutputBuffer dob = new DataOutputBuffer();
             header.write(dob);
-            ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+            ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
             dob = new DataOutputBuffer();
             for (int i = 0; i < 100000; ++i) {
               header.write(dob);
             }
-            return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+            return ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, 
dob.getLength()));
           }
           @Override
           protected void sendError(ChannelHandlerContext ctx,
               HttpResponseStatus status) {
             if (failures.size() == 0) {

Review Comment:
   This failure is not a closure on the local variable, but its the reference 
to the member variable of ShuffleHandlerForTestst, hence the local variable 
will always be an empty list.



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -904,11 +991,11 @@ public void setPort(int port) {
     }
 
     @Override
-    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt) 
+    public void channelActive(ChannelHandlerContext ctx)
         throws Exception {
-      super.channelOpen(ctx, evt);
-
-      if ((maxShuffleConnections > 0) && (accepted.size() >= 
maxShuffleConnections)) {
+      NettyChannelHelper.channelActive(ctx.channel());
+      int numConnections = acceptedConnections.incrementAndGet();

Review Comment:
   I think an increment action is not right here. Suppose you have 
maxShuffleConnections as 5, and there are 4 accepted connections. If you invoke 
this method, acceptedConnections will be 5, and the error handling part will 
kick in, regardless of the fact that it should be allowed.



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -785,37 +846,54 @@ private void removeJobShuffleInfo(JobID jobId) throws 
IOException {
     }
   }
 
-  static class TimeoutHandler extends IdleStateAwareChannelHandler {
+  @VisibleForTesting
+  public void setUseOutboundExceptionHandler(boolean useHandler) {
+    this.useOutboundExceptionHandler = useHandler;
+  }
 
+  static class TimeoutHandler extends IdleStateHandler {
+    private final int connectionKeepAliveTimeOut;
     private boolean enabledTimeout;
 
+    public TimeoutHandler(int connectionKeepAliveTimeOut) {
+      //disable reader timeout
+      //set writer timeout to configured timeout value
+      //disable all idle timeout
+      super(0, connectionKeepAliveTimeOut, 0, TimeUnit.SECONDS);
+      this.connectionKeepAliveTimeOut = connectionKeepAliveTimeOut;
+    }

Review Comment:
   Previous logic omitted every timeout. What is the default here?



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java:
##########
@@ -348,10 +993,13 @@ protected void sendError(ChannelHandlerContext ctx, 
String message,
     header.readFields(input);
     input.close();
 
+    assertEquals("sendError called when client closed connection", 0, 
failures.size());
+    Assert.assertEquals("Should have no caught exceptions",
+        new ArrayList<>(), failures);

Review Comment:
   Use EMPTY_LIST constant here.



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -185,12 +187,23 @@
   // This should kept in sync with Fetcher.FETCH_RETRY_DELAY_DEFAULT
   public static final long FETCH_RETRY_DELAY = 1000L;
   public static final String RETRY_AFTER_HEADER = "Retry-After";
+  static final String ENCODER_HANDLER_NAME = "encoder";
 
   private int port;
-  private ChannelFactory selector;
-  private final ChannelGroup accepted = new DefaultChannelGroup();
+  private EventLoopGroup bossGroup;
+  private EventLoopGroup workerGroup;
+  private ServerBootstrap bootstrap;
+  private Channel ch;
+  // FIXME: snemeth: need thread safety. - 
https://stackoverflow.com/questions/17836976/netty-4-0-instanciate-defaultchannelgroup
+  private final ChannelGroup accepted =
+      new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+  private final AtomicInteger acceptedConnections = new AtomicInteger();

Review Comment:
   I think it should be named activeConnections instead.



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -920,49 +1007,67 @@ public void channelOpen(ChannelHandlerContext ctx, 
ChannelStateEvent evt)
         // fetch failure.
         headers.put(RETRY_AFTER_HEADER, String.valueOf(FETCH_RETRY_DELAY));
         sendError(ctx, "", TOO_MANY_REQ_STATUS, headers);
-        return;
+      } else {
+        super.channelActive(ctx);

Review Comment:
   This super method call was actually invoked before the error handling part 
in the previous logic. However, I too find it better now, perhaps it was a bug 
in the previous logic?



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -185,12 +187,23 @@
   // This should kept in sync with Fetcher.FETCH_RETRY_DELAY_DEFAULT
   public static final long FETCH_RETRY_DELAY = 1000L;
   public static final String RETRY_AFTER_HEADER = "Retry-After";
+  static final String ENCODER_HANDLER_NAME = "encoder";
 
   private int port;
-  private ChannelFactory selector;
-  private final ChannelGroup accepted = new DefaultChannelGroup();
+  private EventLoopGroup bossGroup;
+  private EventLoopGroup workerGroup;
+  private ServerBootstrap bootstrap;
+  private Channel ch;
+  // FIXME: snemeth: need thread safety. - 
https://stackoverflow.com/questions/17836976/netty-4-0-instanciate-defaultchannelgroup
+  private final ChannelGroup accepted =
+      new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

Review Comment:
   This is actually threadsafe according to the documentation. However, 
GlobalEventExecutor instance is a non-scalable option, perhaps a custom 
executor will be needed here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to