9uapaw commented on code in PR #3259:
URL: https://github.com/apache/hadoop/pull/3259#discussion_r842824343
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -1081,6 +1200,9 @@ public ChannelFuture sendMap(ReduceContext reduceContext)
return null;
}
}
+ if (nextMap == null) {
+ LOG.trace("Returning nextMap: null");
+ }
Review Comment:
I think its superflous, because in the try clause null check was already in
place, and in the exception clause, we already return with null.
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java:
##########
@@ -294,33 +941,31 @@ protected ChannelFuture
sendMapOutput(ChannelHandlerContext ctx,
Channel ch, String user, String mapId, int reduce,
MapOutputInfo info)
throws IOException {
- // send a shuffle header and a lot of data down the channel
- // to trigger a broken pipe
ShuffleHeader header =
new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
DataOutputBuffer dob = new DataOutputBuffer();
header.write(dob);
- ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+ ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
dob = new DataOutputBuffer();
for (int i = 0; i < 100000; ++i) {
header.write(dob);
}
- return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+ return ch.writeAndFlush(wrappedBuffer(dob.getData(), 0,
dob.getLength()));
}
@Override
protected void sendError(ChannelHandlerContext ctx,
HttpResponseStatus status) {
if (failures.size() == 0) {
Review Comment:
This failure is not a closure on the local variable, but its the reference
to the member variable of ShuffleHandlerForTestst, hence the local variable
will always be an empty list.
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -904,11 +991,11 @@ public void setPort(int port) {
}
@Override
- public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt)
+ public void channelActive(ChannelHandlerContext ctx)
throws Exception {
- super.channelOpen(ctx, evt);
-
- if ((maxShuffleConnections > 0) && (accepted.size() >=
maxShuffleConnections)) {
+ NettyChannelHelper.channelActive(ctx.channel());
+ int numConnections = acceptedConnections.incrementAndGet();
Review Comment:
I think an increment action is not right here. Suppose you have
maxShuffleConnections as 5, and there are 4 accepted connections. If you invoke
this method, acceptedConnections will be 5, and the error handling part will
kick in, regardless of the fact that it should be allowed.
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -785,37 +846,54 @@ private void removeJobShuffleInfo(JobID jobId) throws
IOException {
}
}
- static class TimeoutHandler extends IdleStateAwareChannelHandler {
+ @VisibleForTesting
+ public void setUseOutboundExceptionHandler(boolean useHandler) {
+ this.useOutboundExceptionHandler = useHandler;
+ }
+ static class TimeoutHandler extends IdleStateHandler {
+ private final int connectionKeepAliveTimeOut;
private boolean enabledTimeout;
+ public TimeoutHandler(int connectionKeepAliveTimeOut) {
+ //disable reader timeout
+ //set writer timeout to configured timeout value
+ //disable all idle timeout
+ super(0, connectionKeepAliveTimeOut, 0, TimeUnit.SECONDS);
+ this.connectionKeepAliveTimeOut = connectionKeepAliveTimeOut;
+ }
Review Comment:
Previous logic omitted every timeout. What is the default here?
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java:
##########
@@ -348,10 +993,13 @@ protected void sendError(ChannelHandlerContext ctx,
String message,
header.readFields(input);
input.close();
+ assertEquals("sendError called when client closed connection", 0,
failures.size());
+ Assert.assertEquals("Should have no caught exceptions",
+ new ArrayList<>(), failures);
Review Comment:
Use EMPTY_LIST constant here.
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -185,12 +187,23 @@
// This should kept in sync with Fetcher.FETCH_RETRY_DELAY_DEFAULT
public static final long FETCH_RETRY_DELAY = 1000L;
public static final String RETRY_AFTER_HEADER = "Retry-After";
+ static final String ENCODER_HANDLER_NAME = "encoder";
private int port;
- private ChannelFactory selector;
- private final ChannelGroup accepted = new DefaultChannelGroup();
+ private EventLoopGroup bossGroup;
+ private EventLoopGroup workerGroup;
+ private ServerBootstrap bootstrap;
+ private Channel ch;
+ // FIXME: snemeth: need thread safety. -
https://stackoverflow.com/questions/17836976/netty-4-0-instanciate-defaultchannelgroup
+ private final ChannelGroup accepted =
+ new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+ private final AtomicInteger acceptedConnections = new AtomicInteger();
Review Comment:
I think it should be named activeConnections instead.
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -920,49 +1007,67 @@ public void channelOpen(ChannelHandlerContext ctx,
ChannelStateEvent evt)
// fetch failure.
headers.put(RETRY_AFTER_HEADER, String.valueOf(FETCH_RETRY_DELAY));
sendError(ctx, "", TOO_MANY_REQ_STATUS, headers);
- return;
+ } else {
+ super.channelActive(ctx);
Review Comment:
This super method call was actually invoked before the error handling part
in the previous logic. However, I too find it better now, perhaps it was a bug
in the previous logic?
##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -185,12 +187,23 @@
// This should kept in sync with Fetcher.FETCH_RETRY_DELAY_DEFAULT
public static final long FETCH_RETRY_DELAY = 1000L;
public static final String RETRY_AFTER_HEADER = "Retry-After";
+ static final String ENCODER_HANDLER_NAME = "encoder";
private int port;
- private ChannelFactory selector;
- private final ChannelGroup accepted = new DefaultChannelGroup();
+ private EventLoopGroup bossGroup;
+ private EventLoopGroup workerGroup;
+ private ServerBootstrap bootstrap;
+ private Channel ch;
+ // FIXME: snemeth: need thread safety. -
https://stackoverflow.com/questions/17836976/netty-4-0-instanciate-defaultchannelgroup
+ private final ChannelGroup accepted =
+ new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
Review Comment:
This is actually threadsafe according to the documentation. However,
GlobalEventExecutor instance is a non-scalable option, perhaps a custom
executor will be needed here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]