[
https://issues.apache.org/jira/browse/HADOOP-15327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17358861#comment-17358861
]
Szilard Nemeth commented on HADOOP-15327:
-----------------------------------------
Let me list the differences introduced because of the migration from Netty 3.x
to 4.x.
There is a migration guide that mentions most (but not all) of the changes:
[https://netty.io/wiki/new-and-noteworthy-in-4.0.html]
Please note that the below code changes are based on Wei-Chiu's branch:
[https://github.com/jojochuang/hadoop/commits/shuffle_handler_netty4]
h2. CHANGES IN ShuffleHandler
h3. *I will list the changes mostly from ShuffleHandler as it covers almost all
type of changes in other classes as well.*
*In TestShuffleHandler, the test code was changed by any of the justifications
listed down below.*
h3. Change category #1: General API changes / non-configuration getters:
Details:
[https://netty.io/wiki/new-and-noteworthy-in-4.0.html#general-api-changes]
{quote}Non-configuration getters have no get- prefix anymore. (e.g.
Channel.getRemoteAddress() → Channel.remoteAddress())
Boolean properties are still prefixed with is- to avoid confusion (e.g.
'empty' is both an adjective and a verb, so empty() can have two meanings.)
{quote}
I'm just listing all the changes without additional context (in which method
they were changed) separated by three dots, as they are simply method renamings:
{code:java}
- future.getChannel().close();
+ future.channel().closeFuture().awaitUninterruptibly();
...
...
- ChannelPipeline pipeline = future.getChannel().getPipeline();
+ ChannelPipeline pipeline = future.channel().pipeline();
...
...
- port = ((InetSocketAddress)ch.getLocalAddress()).getPort();
+ port = ((InetSocketAddress)ch.localAddress()).getPort();
...
...
- if (e.getState() == IdleState.WRITER_IDLE && enabledTimeout) {
- e.getChannel().close();
+ if (e.state() == IdleState.WRITER_IDLE && enabledTimeout) {
+ ctx.channel().close();
...
...
- accepted.add(evt.getChannel());
+ accepted.add(ctx.channel());
...
...
- new QueryStringDecoder(request.getUri()).getParameters();
+ new QueryStringDecoder(request.getUri()).parameters(); //getUri was
not changed, see this later
...
...
- Channel ch = evt.getChannel();
- ChannelPipeline pipeline = ch.getPipeline();
+ Channel ch = ctx.channel();
+ ChannelPipeline pipeline = ch.pipeline();
...
...
- reduceContext.getCtx().getChannel(),
+ reduceContext.getCtx().channel(),
...
...
- if (ch.getPipeline().get(SslHandler.class) == null) {
+ if (ch.pipeline().get(SslHandler.class) == null) {
...
...
- Channel ch = evt.getChannel();
- ChannelPipeline pipeline = ch.getPipeline();
+ Channel ch = ctx.channel();
+ ChannelPipeline pipeline = ch.pipeline();
...
...
-
ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+ ctx.channel().write(response).addListener(ChannelFutureListener.CLOSE);
...
...
- Channel ch = e.getChannel();
- Throwable cause = e.getCause();
+ Channel ch = ctx.channel();
{code}
h3. Change category #2: General API changes / Method signature changes.
*2.1: SimpleChannelUpstreamHandler was renamed to ChannelInboundHandlerAdapter.*
[https://netty.io/wiki/new-and-noteworthy-in-4.0.html#upstream--inbound-downstream--outbound]
{quote}The terms 'upstream' and 'downstream' were pretty confusing to
beginners. 4.0 uses 'inbound' and 'outbound' wherever possible.
{quote}
{code:java}
- class Shuffle extends SimpleChannelUpstreamHandler {
+ @ChannelHandler.Sharable
+ class Shuffle extends ChannelInboundHandlerAdapter {
{code}
*2.2: Simplifed channel state model:
[https://netty.io/wiki/new-and-noteworthy-in-4.0.html#simplified-channel-state-model]*
{quote}channelOpen, channelBound, and channelConnected have been merged to
channelActive. channelDisconnected, channelUnbound, and channelClosed have been
merged to channelInactive. Likewise, Channel.isBound() and isConnected() have
been merged to isActive().
{quote}
*2.2.1 Changes in class: Shuffle*
{code:java}
@Override
- public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt)
+ public void channelActive(ChannelHandlerContext ctx)
throws Exception {
- super.channelOpen(ctx, evt);
+ super.channelActive(ctx);
{code}
*2.2.2 Changes in
org.apache.hadoop.mapred.ShuffleHandler.Shuffle#exceptionCaught:*
Quoting the change again:
{quote}channelOpen, channelBound, and channelConnected have been merged to
channelActive. channelDisconnected, channelUnbound, and channelClosed have been
merged to channelInactive. Likewise, Channel.isBound() and isConnected() have
been merged to isActive().
{quote}
{code:java}
LOG.error("Shuffle error: ", cause);
- if (ch.isConnected()) {
- LOG.error("Shuffle error " + e);
+ if (ch.isOpen()) {
sendError(ctx, INTERNAL_SERVER_ERROR);
}
}
{code}
{color:#ff0000}I think here we have an issue. The doc says: "Likewise,
Channel.isBound() and isConnected() have been merged to isActive()."{color}
{color:#ff0000} So isOpen should be replaced with isActive().{color}
*2.3 Change in method name:
io.netty.channel.ChannelInboundHandlerAdapter#channelRead vs. old name:
messageReceived()*
[https://netty.io/wiki/new-and-noteworthy-in-4.0.html#case-study-porting-the-factorial-example]
Changes in class: Shuffle
{code:java}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
+ public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
- HttpRequest request = (HttpRequest) evt.getMessage();
+ HttpRequest request = (HttpRequest) msg;
{code}
*2.4 The method parameter of channelRead vs. messageReceived was also changed.*
This is detailed here:
[https://netty.io/wiki/new-and-noteworthy-in-4.0.html#channelhandler-with-no-event-object]
Changes in class: Shuffle
{code:java}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
+ public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
- HttpRequest request = (HttpRequest) evt.getMessage();
+ HttpRequest request = (HttpRequest) msg;
{code}
Consequently, as there is no event object received, the channel can be gathered
from the context instead of the event:
{code:java}
- Channel ch = evt.getChannel();
- ChannelPipeline pipeline = ch.getPipeline();
+ Channel ch = ctx.channel();
+ ChannelPipeline pipeline = ch.pipeline();
{code}
*2.5 ChannelHandler method signature changes: exceptionCaught*
[https://netty.io/wiki/new-and-noteworthy-in-4.0.html#new-channelhandler-type-hierarchy]
The ExceptionEvent is not passed anymore, there's the exact cause of the issue
as a Throwable is passed along.
Consequently, we can't get the cause object from the event.
Changes in org.apache.hadoop.mapred.ShuffleHandler.Shuffle#exceptionCaught:
{code:java}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
- Throwable cause = e.getCause();
{code}
h3. Change category #3: Behavioral change: write() does not flush automatically:
Details:
[https://netty.io/wiki/new-and-noteworthy-in-4.0.html#write-does-not-flush-automatically]
*Changes in org.apache.hadoop.mapred.ShuffleHandler.Shuffle#channelRead:*
{code:java}
populateHeaders(mapIds, jobId, user, reduceId, request,
response, keepAliveParam, mapOutputInfoMap);
} catch(IOException e) {
- ch.write(response);
+ ch.writeAndFlush(response);
...
...
- ch.write(response);
+ ch.writeAndFlush(response);
//Initialize one ReduceContext object per messageReceived call
boolean keepAlive = keepAliveParam || connectionKeepAliveEnabled;
ReduceContext reduceContext = new ReduceContext(mapIds, reduceId, ctx,
{code}
*Changes in org.apache.hadoop.mapred.ShuffleHandler.Shuffle#sendMapOutput:*
{code:java}
@@ -1259,7 +1272,7 @@ protected ChannelFuture
sendMapOutput(ChannelHandlerContext ctx, Channel ch,
new ShuffleHeader(mapId, info.partLength, info.rawLength, reduce);
final DataOutputBuffer dob = new DataOutputBuffer();
header.write(dob);
- ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+ ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
...
...
ChannelFuture writeFuture;
final FadvisedFileRegion partition = new FadvisedFileRegion(spill,
info.startOffset, info.partLength, manageOsCache, readaheadLength,
readaheadPool, spillfile.getAbsolutePath(),
shuffleBufferSize, shuffleTransferToAllowed);
- writeFuture = ch.write(partition);
+ writeFuture = ch.writeAndFlush(partition);
...
...
info.startOffset, info.partLength, sslFileBufferSize,
manageOsCache, readaheadLength, readaheadPool,
spillfile.getAbsolutePath());
- writeFuture = ch.write(chunk);
+ writeFuture = ch.writeAndFlush(chunk);
}
metrics.shuffleConnections.incr();
{code}
h3. Change category #4: Releasing resources of / Deallocating
io.netty.channel.DefaultFileRegion:
I cound't really find any reference in the migration guide for this one.
*I can see this change in ShuffleHandler:*
{code:java}
@@ -1284,7 +1297,7 @@ public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
partition.transferSuccessful();
}
- partition.releaseExternalResources();
+ partition.deallocate();
}
});
} else {
{code}
*This calls FadvisedFileRegion#deallocate (formerly releaseExternalResources):*
{code:java}
@Override
- public void releaseExternalResources() {
+ protected void deallocate() {
if (readaheadRequest != null) {
readaheadRequest.cancel();
}
- super.releaseExternalResources();
+ super.deallocate();
}
{code}
There's also a related change in TestFadvisedFileRegion.
h3. Change category #5: Closing the channel:
*Code changes in
org.apache.hadoop.mapred.ShuffleHandler.ReduceMapFileCount#operationComplete:*
{code:java}
@@ -305,7 +312,7 @@ public ReduceMapFileCount(ReduceContext rc) {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
- future.getChannel().close();
+ future.channel().closeFuture().awaitUninterruptibly();
return;
}
int waitCount = this.reduceContext.getMapsToWait().decrementAndGet();
@@ -313,12 +320,12 @@ public void operationComplete(ChannelFuture future)
throws Exception {
metrics.operationComplete(future);
// Let the idle timer handler close keep-alive connections
if (reduceContext.getKeepAlive()) {
TimeoutHandler timeoutHandler =
(TimeoutHandler)pipeline.get(TIMEOUT_HANDLER);
timeoutHandler.setEnabledTimeout(true);
} else {
- future.getChannel().close();
+ future.channel().closeFuture().awaitUninterruptibly();
}
{code}
So, channel.close() have been replaced with
channel.closeFuture().awaitUninterruptibly().
I couldn't find anything related to this in the migration guide.
h3. Change category #6: Idle state handling on channels:
The old code had an an IdleStateHandler + TimeoutHandler in the pipeline:
{code:java}
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
if (sslFactory != null) {
pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
}
pipeline.addLast("decoder", new HttpRequestDecoder());
pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
pipeline.addLast("encoder", new HttpResponseEncoder());
pipeline.addLast("chunking", new ChunkedWriteHandler());
pipeline.addLast("shuffle", SHUFFLE);
pipeline.addLast("idle", idleStateHandler);
pipeline.addLast(TIMEOUT_HANDLER, new TimeoutHandler());
return pipeline;
}
{code}
*The instance of idleStateHandler was created with the constructor of
HttpPipelineFactory:*
{code:java}
public HttpPipelineFactory(Configuration conf, Timer timer) throws Exception {
SHUFFLE = getShuffle(conf);
if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) {
LOG.info("Encrypted shuffle is enabled.");
sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
sslFactory.init();
}
this.idleStateHandler = new IdleStateHandler(timer, 0,
connectionKeepAliveTimeOut, 0);
}
{code}
The parameters are:
{code:java}
Timer timer,
int readerIdleTimeSeconds,
int writerIdleTimeSeconds,
int allIdleTimeSeconds
{code}
The writerIdleTimeSeconds will get assigned to the value of field named
"connectionKeepAliveTimeOut", which is set by Hadoop's configuration.
The thing is, whatever value the field "connectionKeepAliveTimeOut" takes,
this is how many seconds the IdleStateHandler will wait until an IdleStateEvent
will be generated.
*The second part of the handler logic is the TimeoutHandler, that is the last
handler of the pipeline.*
The implementation is so concise I can paste it here as reference:
{code:java}
static class TimeoutHandler extends IdleStateAwareChannelHandler {
private boolean enabledTimeout;
void setEnabledTimeout(boolean enabledTimeout) {
this.enabledTimeout = enabledTimeout;
}
@Override
public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) {
if (e.getState() == IdleState.WRITER_IDLE && enabledTimeout) {
e.getChannel().close();
}
}
}
{code}
The point is, this class will handle the IdlesStateEvent generated by the
IdleStateHandler.
There's an additional flag called "enabledTimeout" that can enable or disable
the timeout logic.
Let's see what are the changes:
*6.1 IdleStateHandler:*
Here's the javadoc of the IdleStateHandler class:
[https://netty.io/4.0/api/io/netty/handler/timeout/IdleStateHandler.html]
As IdleStateHandler is in a completely different class hierarchy, the Timer
can't be passed to the constructor anymore.
As a consequence, the org.apache.hadoop.mapred.ShuffleHandler#timer field is
not required anymore, code changes:
{code:java}
@@ -267,7 +275,6 @@
boolean connectionKeepAliveEnabled = false;
private int connectionKeepAliveTimeOut;
private int mapOutputMetaInfoCacheSize;
- private Timer timer;
...
...
// Timer is shared across entire factory and must be released separately
- timer = new HashedWheelTimer();
try {
- pipelineFact = new HttpPipelineFactory(conf, timer);
+ pipelineFact = new HttpPipelineFactory(conf);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
...
...
if (pipelineFact != null) {
pipelineFact.destroy();
}
- if (timer != null) {
- // Release this shared timer resource
- timer.stop();
- }
{code}
*6.2 TimeoutHandler:*
With Netty 4.x, there is no IdleStateAwareChannelHandler class anymore.
The TimeoutHandler should extend IdleStateHandler and implement the
channelIdle method:
[https://netty.io/4.0/api/io/netty/handler/timeout/IdleStateHandler.html#channelIdle-io.netty.channel.ChannelHandlerContext-io.netty.handler.timeout.IdleStateEvent-]
The method name and its signature is the same, but the class has changed.
Here's the code change:
{code:java}
- static class TimeoutHandler extends IdleStateAwareChannelHandler {
+ static class TimeoutHandler extends IdleStateHandler {
private boolean enabledTimeout;
+ public TimeoutHandler() {
+ super(1, 1, 1);
+ }
+
void setEnabledTimeout(boolean enabledTimeout) {
this.enabledTimeout = enabledTimeout;
}
@Override
public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) {
}
}
}
{code}
h3. Change category #7: Server bootrapping:
Netty 4.x has a new API for bootstrapping the server:
[https://netty.io/wiki/new-and-noteworthy-in-4.0.html#new-bootstrap-api]
{quote}The bootstrap API has been rewritten from scratch although its purpose
stays same; it performs the typical steps required to make a server or a client
up and running, often found in boilerplate code.
The new bootstrap also employs a fluent interface.
{quote}
Let me list the required changes:
*7.1 Type-safe ChannelOptions:
[https://netty.io/wiki/new-and-noteworthy-in-4.0.html#type-safe-channeloption]*
The new server bootstrap API utilizes a type-safe way to modify socket
options:
Code changes:
{code:java}
- bootstrap.setOption("backlog", conf.getInt(SHUFFLE_LISTEN_QUEUE_SIZE,
- DEFAULT_SHUFFLE_LISTEN_QUEUE_SIZE));
- bootstrap.setOption("child.keepAlive", true);
+ bootstrap.option(ChannelOption.SO_BACKLOG,
+ conf.getInt(SHUFFLE_LISTEN_QUEUE_SIZE,
+ DEFAULT_SHUFFLE_LISTEN_QUEUE_SIZE))
+ .option(ChannelOption.SO_KEEPALIVE, true)
{code}
*7.2 Simplified shutdown:
[https://netty.io/wiki/new-and-noteworthy-in-4.0.html#simplified-shutdown]*
{quote}There's no more releaseExternalResources(). You can close all open
channels immediately and make all I/O threads stop themselves by calling
EventLoopGroup.shutdownGracefully().
{quote}
Code changes:
{code:java}
@@ -577,17 +590,11 @@ protected void serviceStart() throws Exception {
@Override
protected void serviceStop() throws Exception {
accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
- if (selector != null) {
- ServerBootstrap bootstrap = new ServerBootstrap(selector);
- bootstrap.releaseExternalResources();
- }
if (stateDb != null) {
stateDb.close();
}
{code}
*7.3 NioServerSocketChannelFactory --> NioEventLoopGroup:
[https://netty.io/wiki/new-and-noteworthy-in-4.0.html#flexible-io-thread-allocation]*
This is a significant change as the old codebase was using
HadoopExecutors.newCachedThreadPool for the boss and worker thread groups.
This can be simplified to:
{code:java}
- selector = new NioServerSocketChannelFactory(
- HadoopExecutors.newCachedThreadPool(bossFactory),
- HadoopExecutors.newCachedThreadPool(workerFactory),
- maxShuffleThreads);
+ bossGroup = new NioEventLoopGroup(0, bossFactory);
+ workerGroup = new NioEventLoopGroup(0, workerFactory);
{code}
With the old Netty codebase, the constructor of NioServerSocketChannelFactory
had this signature:
{code:java}
public NioServerSocketChannelFactory(
Executor bossExecutor, Executor workerExecutor,
int workerCount) {
this(bossExecutor, 1, workerExecutor, workerCount);
}
{code}
The last parameter was the workerCount, which was taking the value of
maxShuffleThreads.
The constructor that is being used with Netty 4.x:
{code:java}
/**
* Create a new instance using the specified number of threads, the given
{@link ThreadFactory} and the
* {@link SelectorProvider} which is returned by {@link
SelectorProvider#provider()}.
*/
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
this(nThreads, threadFactory, SelectorProvider.provider());
}
{code}
{color:#ff0000}With Wei-Chiu's code changes, here we have 0 here. I think this
is not correct, so I modified the code to pass the value of "maxShuffleThreads"
as the first parameter:{color}
{code:java}
+ bossGroup = new NioEventLoopGroup(maxShuffleThreads, bossFactory);
+ workerGroup = new NioEventLoopGroup(maxShuffleThreads, workerFactory);
{code}
*7.4 The rest of the server init code in the serviceStart method is just
adaptation to the new API:*
{code:java}
@@ -540,22 +550,25 @@ protected void serviceStart() throws Exception {
userRsrc = new ConcurrentHashMap<String,String>();
secretManager = new JobTokenSecretManager();
recoverState(conf);
- ServerBootstrap bootstrap = new ServerBootstrap(selector);
+ bootstrap = new ServerBootstrap();
+ bootstrap.group(bossGroup, workerGroup)
+ .channel(NioServerSocketChannel.class);
+
- bootstrap.setPipelineFactory(pipelineFact);
+ .childHandler(pipelineFact);
port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
- Channel ch = bootstrap.bind(new InetSocketAddress(port));
+ ch = bootstrap.bind(new InetSocketAddress(port)).sync().channel();
accepted.add(ch);
conf.set(SHUFFLE_PORT_CONFIG_KEY, Integer.toString(port));
pipelineFact.SHUFFLE.setPort(port);
LOG.info(getName() + " listening on port " + port);
@@ -785,29 +792,33 @@ private void removeJobShuffleInfo(JobID jobId) throws
IOException {
}
}
{code}
*7.5 There's one important thing left: Initializing the channel group.*
The old codebase was using this constructor of DefaultChannelGroup:
[https://docs.jboss.org/netty/3.1/api/org/jboss/netty/channel/group/DefaultChannelGroup.html#DefaultChannelGroup(java.lang.String])
Here it had only one parameter: The name of the group.
This is also mentioned in the architectural guide:
[https://netty.io/3.8/guide/#example.discard3.co1]
However, Netty 4.x has a different ChannelGroup constructor: It has a name and
a mandatory EventExecutor parameter:
[https://netty.io/4.0/api/io/netty/channel/group/DefaultChannelGroup.html#DefaultChannelGroup-java.lang.String-io.netty.util.concurrent.EventExecutor-]
There's nothing in the migration guide regarding this.
Looking at this javadoc (interface of DefaultChannelGroup is ChannelGroup):
[https://netty.io/4.0/api/io/netty/channel/group/ChannelGroup.html]
The only way channel groups are showcased is this, without any further
explanation of the semantics:
{code:java}
ChannelGroup allChannels =
new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
{code}
Wei-Chiu also felt this is a bit fishy:
{code:java}
+ // FIXME: need thread safety.
+ private final ChannelGroup accepted =
+ new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
{code}
There's a related SO article:
[https://stackoverflow.com/questions/17836976/netty-4-0-instanciate-defaultchannelgroup]
{color:#ff0000}This is the part that I'm the most uncertain of.{color}
h3. *Change category #8: Channel initialization logic*
Details can be found here:
[https://netty.io/wiki/new-and-noteworthy-in-4.0.html#channelpipelinefactory--channelinitializer]
{quote}ChannelPipelineFactory → ChannelInitializer
As you noticed in the example above, there is no ChannelPipelineFactory
anymore. It has been replaced with ChannelInitializer, which gives more control
over Channel and ChannelPipeline configuration.
Please note that you don't create a new ChannelPipeline by yourself. After
observing many use cases reported so far, the Netty project team concluded that
it has no benefit for a user to create his or her own pipeline implementation
or to extend the default implementation. Therefore, ChannelPipeline is not
created by a user anymore. ChannelPipeline is automatically created by a
Channel.
{quote}
Code changes accordingly:
{code:java}
- class HttpPipelineFactory implements ChannelPipelineFactory {
+ class HttpPipelineFactory extends ChannelInitializer<SocketChannel> {
final Shuffle SHUFFLE;
private SSLFactory sslFactory;
- private final ChannelHandler idleStateHandler;
- public HttpPipelineFactory(Configuration conf, Timer timer) throws
Exception {
+ public HttpPipelineFactory(Configuration conf) throws Exception {
SHUFFLE = getShuffle(conf);
if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) {
@@ -815,7 +826,7 @@ public HttpPipelineFactory(Configuration conf, Timer timer)
throws Exception {
sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
sslFactory.init();
}
- this.idleStateHandler = new IdleStateHandler(timer, 0,
connectionKeepAliveTimeOut, 0);
}
public Shuffle getSHUFFLE() {
@@ -828,27 +839,29 @@ public void destroy() {
}
}
- @Override
- public ChannelPipeline getPipeline() throws Exception {
- ChannelPipeline pipeline = Channels.pipeline();
+ @Override protected void initChannel(SocketChannel ch) throws Exception {
+ ChannelPipeline pipeline = ch.pipeline();
if (sslFactory != null) {
pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
}
pipeline.addLast("decoder", new HttpRequestDecoder());
- pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
+ pipeline.addLast("aggregator", new HttpObjectAggregator(1 << 16));
pipeline.addLast("encoder", new HttpResponseEncoder());
pipeline.addLast("chunking", new ChunkedWriteHandler());
pipeline.addLast("shuffle", SHUFFLE);
- pipeline.addLast("idle", idleStateHandler);
+ pipeline.addLast("idle", new IdleStateHandler(
+ 0, connectionKeepAliveTimeOut, 0));
pipeline.addLast(TIMEOUT_HANDLER, new TimeoutHandler());
- return pipeline;
}
}
{code}
h3. *Change category #9: Buffer API changes*
Details:
[https://netty.io/wiki/new-and-noteworthy-in-4.0.html#channelbuffer--bytebuf]
{quote}The utility class ChannelBuffers, which creates a new buffer, has been
split into two utility classes, Unpooled and ByteBufUtil. As can be guessed
from its name Unpooled, 4.0 introduced pooled ByteBufs which can be allocated
via ByteBufAllocator implementations.
{quote}
*Essentially, ChannelBuffers.copiedBuffer needed to be replaced with
Unpooled.copiedBuffer:*
{code:java}
@@ -1312,7 +1325,7 @@ protected void sendError(ChannelHandlerContext ctx,
String message,
protected void sendError(ChannelHandlerContext ctx, String msg,
HttpResponseStatus status, Map<String, String> headers) {
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
+ FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1,
status, Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
// Put shuffle version into http header
response.headers().set(ShuffleHeader.HTTP_HEADER_NAME,
@@ -1322,18 +1335,17 @@ protected void sendError(ChannelHandlerContext ctx,
String msg,
for (Map.Entry<String, String> header : headers.entrySet()) {
response.headers().set(header.getKey(), header.getValue());
}
- response.setContent(
- ChannelBuffers.copiedBuffer(msg, CharsetUtil.UTF_8));
// Close the connection as soon as the error message is sent.
}
{code}
Also, DefaultFullHttpResponse became the go-to class (instead of
DefaultHttpResponse) to construct a final HTTP response with a buffer.
----
h2. Questions to Wei-Chiu
[~weichiu] Could you please help me with answering these?
*1. Change category #5, closing channels with:*
{code:java}
future.channel().closeFuture().awaitUninterruptibly()
{code}
Could you please tell me how did you find out to close a channel like this? As
mentioned above, I couldn't find any resource to justify this.
*2. Question for 6.2 TimeoutHandler:*
Could you please tell me what's the reason of calling the super constructor
with 1, 1, 1?
{code:java}
+ public TimeoutHandler() {
+ super(1, 1, 1);
+ }
+
{code}
*3. Question for 7.3: Initialization of EventLoopGroups:*
My updated code is:
{code:java}
+ bossGroup = new NioEventLoopGroup(maxShuffleThreads, bossFactory);
+ workerGroup = new NioEventLoopGroup(maxShuffleThreads, workerFactory);
{code}
Am I missing something or was this an overlook on your side?
*4. What about point 2.2.2? Do you think what I'm saying there is okay?*
*5. Do you have any idea for point 7.5?*
----
h2. Test failures
Work is still in progress, I need to find out what causes the unit tests of the
ShuffleHandler to fail.
After I fixed the tests I will also manually check if MR behaves well on a
cluster.
> Upgrade MR ShuffleHandler to use Netty4
> ---------------------------------------
>
> Key: HADOOP-15327
> URL: https://issues.apache.org/jira/browse/HADOOP-15327
> Project: Hadoop Common
> Issue Type: Sub-task
> Reporter: Xiaoyu Yao
> Assignee: Szilard Nemeth
> Priority: Major
>
> This way, we can remove the dependencies on the netty3 (jboss.netty)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]