[
https://issues.apache.org/jira/browse/HADOOP-15327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17362367#comment-17362367
]
Szilard Nemeth commented on HADOOP-15327:
-----------------------------------------
The latest patch contains commits from this branch:
[https://github.com/szilard-nemeth/hadoop/commits/HADOOP-15327-snemeth]
There are a couple of commits so I would approach this by explaning the
reasons behind each change in the commits.
Not all commits are listed, I left out a few trivial ones.
Unfortunately, this task was a bit tricky as everytime I touched something in
the test, I just found another bug or weird behaviour so it took a great deal
of time to solve and discover everything.
*1. ShuffleHandler: ch.isOpen() --> ch.isActive():
[https://github.com/szilard-nemeth/hadoop/commit/e703adb57f66da8579baa26257ca9aaed2bf1db5]*
This was already mentioned with my previous lenghtier comment.
*2. TestShuffleHandler: Fix mocking in testSendMapCount + replace ch.write()
with ch.writeAndFlush():
[https://github.com/szilard-nemeth/hadoop/commit/07fbfee5cae85e8e374b53c303e794c19c620efc]*
This is about 2 things:
- Replacing channel.write calls with channel.writeAndFlush
- Fixing bad mocking in
org.apache.hadoop.mapred.TestShuffleHandler#testSendMapCount
*3. TestShuffleHandler.testMaxConnections: Rewrite test + production code:
accepted connection handling:
[https://github.com/szilard-nemeth/hadoop/commit/def0059982ef8f0e2f19d385b1a1fcdca8639f9d]*
*Changes in production code:*
- ShuffleHandler#channelActive added the channel to the channel group (field
called 'accepted') before the if statement that enforces the maximum number of
open connections. This was the old, wrong piece of code:
{code:java}
super.channelActive(ctx);
LOG.debug("accepted connections={}", accepted.size());
if ((maxShuffleConnections > 0) && (accepted.size() >=
maxShuffleConnections)) {
{code}
- Also, counting the number of open channels with the channel group was
unreliable so I introduced a new AtomicInteger field called
'acceptedConnections' to track the open channels / connections.
- There was another issue: When the channels were accepted, the counter of
open channels was increased but when channels were inactivated I could not see
any code that would have maintained (decremented) the value.
This was mitigated by adding
org.apache.hadoop.mapred.ShuffleHandler.Shuffle#channelInactive that logs the
channel inactivated event and decreases the open connections counter:
{code:java}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
acceptedConnections.decrementAndGet();
LOG.debug("New value of Accepted number of connections={}",
acceptedConnections.get());
}
{code}
*Changes in test code:*
- org.apache.hadoop.mapred.TestShuffleHandler#testMaxConnections: Fixed the
testcase, the issue was pointed out correctly by [~weichiu] : The connections
are accepted in parallel so we should not rely on their order in the test. The
way I rewritten this is that I introduced a map to group HttpURLConnection
objects by their HTTP response code.
Then I check if we only have 200 OK and 429 TOO MANY REQUESTS, and check if
the number of 200 OK connections is 2 and there's only one unaccepted
connection.
*4. increase netty version to 4.1.65.Final:
[https://github.com/szilard-nemeth/hadoop/commit/4f4589063b579a93389b1e188c29bd895ae507fc]*
This is a simple commit to increase the Netty version to the latest stable 4.x
version.
See this page: [https://netty.io/downloads.html]
It states: "netty-4.1.65.Final.tar.gz ‐ 19-May-2021 (Stable, Recommended)"
*5. ShuffleHandler: Fix keepalive test + writing HTTP response properly to
channel:
[https://github.com/szilard-nemeth/hadoop/commit/1aad4eaace28cfff4a9a9152f7535d70cc6e3734]*
This is where things get more interesting. There was a testcase called
org.apache.hadoop.mapred.TestShuffleHandler#testKeepAlive that caught an issue
that came up because Netty 4.x handles HTTP responses written to the same
channel differently than Netty 3.x.
See details below.
Production code changes:
- Added some logs to be able to track what happened when utilizing HTTP
Connection Keep-alive.
- Added a ChannelOutboundHandlerAdapter that handles exceptions that happens
during outbound message construction. This is by default not logged by Netty
and I only found this trick to catch these events:
{code:java}
pipeline.addLast("outboundExcHandler", new
ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise
promise) throws Exception {
promise.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
super.write(ctx, msg, promise);
}
});
{code}
This solution is described here:
//[https://stackoverflow.com/questions/50612403/catch-all-exception-handling-for-outbound-channelhandler]
- *Fixed the root cause of the keep alive test issue in
org.apache.hadoop.mapred.ShuffleHandler.Shuffle#channelRead.*
*Let me give a detailed explanation here as this was quite tricky to catch.*
With Netty 4.x, after writing the DefaultHttpResponse to the channel, the HTTP
body is constructed by channel write calls. These calls are originating from
sendMap and then sendMapOutput. A significant difference between Netty 3.x and
4.x is the introduced HTTP response objects.
These are: DefaultFullHttpResponse, DefaultHttpResponse.
The DefaultFullHttpResponse is for constructing a final response that
encapsulates the HTTP header and body.
The DefaultHttpResponse is to construct a HTTP header, write it to the channel
and push buffered data to the channel later and this will be the HTTP body.
In case of HTTP connection Keep-Alive is utilized, an instance of
LastHttpContent.EMPTY_LAST_CONTENT message object should be written to the
channel after the message body data is sent through.
Doing so will enable the next HTTP response to be sent through on the same
channel.
If we wouldn't write a LastHttpContent, the channel would fail to handle
subsequent HTTP responses.
The root cause of this is that all outbound messages go through
HttpResponseEncoder, and it is stateful in a way that it prevents sending other
HTTP responses if there was no clear boundary to detect the end of the previous
HTTP response. This is main the purpose of LastHttpContent.
When there's no LastHttpContent written to the channel,
HttpObjectEncoder.encode will throw an IllegalStateException on the second HTTP
message.
By default, exceptions thrown while handling outbound messages are not printed
in any way, so it's required that trick above to print those.
All of the above behaviour is quite undocumented, unfortunately.
I found some result in Github issues, these are the most related ones that led
me to the final solution:
- [https://github.com/netty/netty/issues/1725#issuecomment-22624967]
- [https://github.com/netty/netty/issues/11155#issue-857141001]
There are other related issues as well:
* [https://github.com/netty/netty/issues/1008]
* [https://github.com/netty/netty/issues/7993]
* [https://github.com/netty/netty/issues/1700]
* [https://github.com/netty/netty/issues/2466]
* [https://github.com/netty/netty/issues/1359]
Some of the Netty issues above mentions that sane use of write / writeAndFlush
could also mitigate this kind of issue, but it never worked for me so I sticked
to using LastHttpContent.
*Test code changes:*
In general, I don't like adding too much unrelated refactor code and other
non-related changes to the patch and trying to focus on the core change.
However, as I wanted to add a new KeepAlive test as well, I didn't want to
duplicate a huge chunk of test code. The testcase itself already contained too
much of code duplication.
- Introduced
org.apache.hadoop.mapred.TestShuffleHandler.ShuffleHandlerForKeepAliveTests
that was extracted out of testKeepAlive.
- Introduced
org.apache.hadoop.mapred.TestShuffleHandler.LoggingHttpResponseEncoder: This is
to add more logging for outbound messages in the test channel pipeline.
- Introduced org.apache.hadoop.mapred.TestShuffleHandler.MapOutputSender: This
has the same functionality as before, but with a lot better and straightforward
code, at least I think so :) This is primarily called from
org.apache.hadoop.mapred.ShuffleHandler.Shuffle#sendMapOutput when
ShuffleHandler decides to send mapper output.
- Similarly to MapOutputSender I also introduced
org.apache.hadoop.mapred.TestShuffleHandler.ShuffleHeaderProvider: Same
functionality, more straightforward code.
Primarily called from
org.apache.hadoop.mapred.TestShuffleHandler.HeaderPopulator#populateHeaders.
- Introduced new test helper class:
org.apache.hadoop.mapred.TestShuffleHandler.HttpConnectionHelper.
This is capable of connecting to the provided array of URLs and stores
connection data for each one in HttpConnectionData objects.
- Introduced new test helper class:
org.apache.hadoop.mapred.TestShuffleHandler.HttpConnectionData.
This class is a simple data class to store connection data: the
HttpURLConnection itself, the length of the payload (response), the
SocketAddress, the HTTP response code and the HTTP headers. This class makes it
easier to perform assertions on these HTTP connections.
- Finally, I introduced another new test helper class:
org.apache.hadoop.mapred.TestShuffleHandler.HttpConnectionAssert.
This receives an instance of HttpConnectionData and can perform certain
assertions. All kinds of assertions are added as convenience methods.
There's one special method:
org.apache.hadoop.mapred.TestShuffleHandler.HttpConnectionAssert#assertKeepAliveConnectionsAreSame:
This receives a HttpConnectionHelper and compares if the first 2 connection
sockets are having the same address.
- Given all these helper classes and refactors, I simplified the keep alive
test and added a new testcase as well: testKeepAliveInitiallyDisabled
*6. channel.closeFuture().awaitUninterruptibly() --> channel.close():
[https://github.com/szilard-nemeth/hadoop/commit/3eb1bf244a7a50f12a5f43058384d9904ab95825]*
This is in relation to my previous longer comment and the question I asked.
This is quite a concise commit, it replaces
{code:java}
future.channel().closeFuture().awaitUninterruptibly();
{code}
with
{code:java}
future.channel().close();
{code}
As Wei-Chiu mentioned, it's recommended by the javadoc to not call
awaitUninterruptibly() in Netty I/O threads or channel handlers:
[https://netty.io/4.0/api/io/netty/channel/ChannelFuture.html]
See this attachment:
[^getMapOutputInfo_BlockingOperationException_awaitUninterruptibly.log] for a
stacktrace that shows that io.netty.util.concurrent.BlockingOperationException
is thrown if await is called instead of simply calling close().
*7. TestShuffleHandler: Add error handling + assertion logic:
[https://github.com/szilard-nemeth/hadoop/commit/ec1d7a9249576a88e228fac1497c87309b4c6ac6]*
During testing, I discovered a potential bug. I saw the same
IllegalStateException coming thrown from HttpResponseEncoder.
This will be a bit lengthy but I spent enough time to reveal the root cause so
I thought it's better to write it down.
In org.apache.hadoop.mapred.ShuffleHandler.Shuffle#channelRead, when
populateHeaders is called (only 1 occurrence), and an IOException is caught,
this happens:
{code:java}
try {
populateHeaders(mapIds, jobId, user, reduceId, request,
response, keepAliveParam, mapOutputInfoMap);
} catch(IOException e) {
//TODO snemeth (HADOOP-15327)
// This seems like a bug combined with bad expectations in the tests.
// See details in jira
ch.writeAndFlush(response);
LOG.error("Shuffle error in populating headers :", e);
String errorMessage = getErrorMessage(e);
sendError(ctx,errorMessage , INTERNAL_SERVER_ERROR);
return;
}
{code}
Here, the response is written to the channel, the response object is defined as:
{code:java}
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
{code}
The problem is that Shuffle#sendError also writes a response, but it is an
error response:
{code:java}
protected void sendError(ChannelHandlerContext ctx, String msg,
HttpResponseStatus status, Map<String, String> headers) {
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status,
Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
// Put shuffle version into http header
response.headers().set(ShuffleHeader.HTTP_HEADER_NAME,
ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
response.headers().set(ShuffleHeader.HTTP_HEADER_VERSION,
ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
for (Map.Entry<String, String> header : headers.entrySet()) {
response.headers().set(header.getKey(), header.getValue());
}
// Close the connection as soon as the error message is sent.
ctx.channel().writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
{code}
It doesn't seem to be correct to write a successful, then an error response.
Moreover, the tests are passing, which is strange.
When running either org.apache.hadoop.mapred.TestShuffleHandler#testRecovery
or org.apache.hadoop.mapred.TestShuffleHandler#testRecoveryFromOtherVersions,
we can observe many many occurrences of this exception in the logs:
{code:java}
2021-06-12 18:41:54,732 ERROR [ShuffleHandler Netty Worker #0]
mapred.ShuffleHandler (ShuffleHandler.java:exceptionCaught(1412)) - Shuffle
error:
io.netty.handler.codec.EncoderException: java.lang.IllegalStateException:
unexpected message type: DefaultFullHttpResponse, state: 1
at
io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:104)
at
io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
at
io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
at
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
at
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
at
io.netty.handler.stream.ChunkedWriteHandler.doFlush(ChunkedWriteHandler.java:302)
at
io.netty.handler.stream.ChunkedWriteHandler.flush(ChunkedWriteHandler.java:131)
at
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
at
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
at
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
at
io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127)
at
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
at
io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
at
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
at
io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
at
io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:808)
at
io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1025)
at
io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:306)
at
org.apache.hadoop.mapred.ShuffleHandler$Shuffle.sendError(ShuffleHandler.java:1390)
at
org.apache.hadoop.mapred.ShuffleHandler$Shuffle.sendError(ShuffleHandler.java:1372)
at
org.apache.hadoop.mapred.ShuffleHandler$Shuffle.channelRead(ShuffleHandler.java:1069)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: unexpected message type:
DefaultFullHttpResponse, state: 1
at
io.netty.handler.codec.http.HttpObjectEncoder.encode(HttpObjectEncoder.java:86)
at
io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:89)
... 44 more
{code}
*This is the same issue described with point 5*: Once the outbound message
encoder had written a HTTP response, it can't write another one on the same
channel unless a LastHttpContent is written before the second message.
So when the successful response object has been written, Shuffle.sendError
wants to write the error HTTP response and it fails. The reason of many
occurrences of this exception is that
org.apache.hadoop.mapred.ShuffleHandler.Shuffle#exceptionCaught catches the
outbound exception and tries to send the error again to the client in
org.apache.hadoop.mapred.ShuffleHandler.Shuffle#exceptionCaught (Note the last
sendError call):
{code:java}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
Channel ch = ctx.channel();
if (cause instanceof TooLongFrameException) {
sendError(ctx, BAD_REQUEST);
return;
} else if (cause instanceof IOException) {
if (cause instanceof ClosedChannelException) {
LOG.debug("Ignoring closed channel error", cause);
return;
}
String message = String.valueOf(cause.getMessage());
if (IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
LOG.debug("Ignoring client socket close", cause);
return;
}
}
LOG.error("Shuffle error: ", cause);
if (ch.isActive()) {
sendError(ctx, INTERNAL_SERVER_ERROR);
}
}
{code}
This goes on and on until the test finishes so this is kind of an infinite loop
of events.
The client only gets the successful HTTP response and the tests pass. However,
I think in these cases the test should fail and expect an error HTTP response
instead.
Let's look at
org.apache.hadoop.mapred.TestShuffleHandler#testRecoveryFromOtherVersions, for
example:
There must be something wrong with the test setup, as populateHeaders errors
out with:
{code:java}
2021-06-12 18:41:54,703 DEBUG [ShuffleHandler Netty Worker #0] fs.FileSystem
(DurationInfo.java:close(101)) - Creating FS file:///: duration 0:00.097s
2021-06-12 18:41:54,731 ERROR [ShuffleHandler Netty Worker #0]
mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1067)) - Shuffle error
in populating headers :
java.io.IOException: Error Reading IndexFile
at
org.apache.hadoop.mapred.IndexCache.readIndexFileToCache(IndexCache.java:123)
at
org.apache.hadoop.mapred.IndexCache.getIndexInformation(IndexCache.java:68)
at
org.apache.hadoop.mapred.ShuffleHandler$Shuffle.getMapOutputInfo(ShuffleHandler.java:1191)
at
org.apache.hadoop.mapred.ShuffleHandler$Shuffle.populateHeaders(ShuffleHandler.java:1210)
at
org.apache.hadoop.mapred.ShuffleHandler$Shuffle.channelRead(ShuffleHandler.java:1060)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: File
/Users/snemeth/development/apache/hadoop/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/target/test-dir/TestShuffleHandlerLocDir/usercache/someuser/appcache/application_12345_0001/output/attempt_12345_1_m_1_0/file.out.index
does not exist
at
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:779)
at
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1100)
at
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:769)
at
org.apache.hadoop.fs.RawLocalFileSystem.open(RawLocalFileSystem.java:274)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:975)
at
org.apache.hadoop.io.SecureIOUtils.openFSDataInputStream(SecureIOUtils.java:152)
at org.apache.hadoop.mapred.SpillRecord.<init>(SpillRecord.java:71)
at org.apache.hadoop.mapred.SpillRecord.<init>(SpillRecord.java:62)
at
org.apache.hadoop.mapred.IndexCache.readIndexFileToCache(IndexCache.java:119)
... 28 more
{code}
This induces the above infinite loop of exceptions. Still, the client only
receives the successful response and the server stuck in that loop.
The infinite loop can be prevented by removing the magic
ChannelOutboundHandlerAdapter from the pipeline in
org.apache.hadoop.mapred.ShuffleHandler.HttpPipelineFactory#initChannel:
{code:java}
pipeline.addLast("outboundExcHandler", new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise
promise) throws Exception {
promise.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
super.write(ctx, msg, promise);
}
});
{code}
*Let me sum this up real quickly:*
- *ShuffleHandler.Shuffle#channelRead writes a HTTP 200 OK response even if
the call to populateHeaders failed.*
- *However, sendError writes a HTTP 500 Internal Server error response to the
channel later.*
- *Tests are expecting a successful HTTP connection, this is wrong.*
- *The successful HTTP connection is just a side-effect of the fact that the
unsuccessful HTTP response can't be written to the channel because of*
*an exception thrown from the HttpResponseEncoder:
"java.lang.IllegalStateException: unexpected message type:
DefaultFullHttpResponse, state: 1"*
- *With Netty 3.x, this was probably another side-effect, so the second
unsuccessful HTTP response was not written to the channel, either.*
*All in all, I propose to fix this in a follow-up jira and temporarily disable
this outbound handler adapter as this is a separate bug that was discovered.*
[~weichiu] *Could you share your thoughts, please?*
*Test code changes:*
This commit well represents how rigid is the current design: As Shuffle is an
inner-class of ShuffleHandler, everytime the test wants to override something
from the Shuffle class, it also needs to have an enclosing instance of
ShuffleHandler.
This could be fixed as another follow-up jira, as the tests are very hard to
maintain like this.
I had to add org.apache.hadoop.mapred.ShuffleHandler.Shuffle#exceptionCaught
to each and every overridden class of Shuffle, which is pure code duplication.
*8. Fix idle state handling + add test:
[https://github.com/szilard-nemeth/hadoop/commit/3e74ac1913d3280a84b31189d3d8a116aae3e0e5]*
This is mainly a fix of my previous question regarding the constructor call of
TimeoutHandler.super with 1, 1, 1 as parameters.
It turned out that the code was wrong there, let me explain.
The constructor of the superclass (IdleStateHandler) looks like this:
{code:java}
public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds,
int allIdleTimeSeconds) {
this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds,
(long)allIdleTimeSeconds, TimeUnit.SECONDS);
}
{code}
It turns out Wei-Chiu's commit had [two idle state
handlers|https://github.com/jojochuang/hadoop/commit/14761633c95a38291e825169f3b9ed6459586f7f#diff-bb1f8f80a30d30861c2152d70c46b5cc709f3b8016836cf77cf5e83a7baefe54R853-R855]
The first named "idle" is a simple IdleStateHandler. The parameterization of
the constructor was okay here, it used the value of
"connectionKeepAliveTimeOut" for the channel write timeout.
The second handler is the TIMEOUT_HANDLER, that is an instance of
TimeoutHandler and it also extends IdleStateHandler:
[LINK|https://github.com/jojochuang/hadoop/commit/14761633c95a38291e825169f3b9ed6459586f7f#diff-bb1f8f80a30d30861c2152d70c46b5cc709f3b8016836cf77cf5e83a7baefe54R795-R813]
So we had two IdleStateHandlers, the 2nd handler had 1 second as
readerIdleTimeSeconds, writerIdleTimeSeconds and allIdleTimeSeconds, which
effectively means after 1 second of read or write inactivity, the channel will
be closed. But channelIdle limited this by filtering the type of event to the
WRITER_IDLE event:
[LINK|https://github.com/jojochuang/hadoop/commit/14761633c95a38291e825169f3b9ed6459586f7f#diff-bb1f8f80a30d30861c2152d70c46b5cc709f3b8016836cf77cf5e83a7baefe54R808-R813]
Ultimately, the second handler "won" since it had 1 second set as write timeout
so the first handler was not working in this sense.
I resolved all this by keeping TimeoutHandler and removing the other handler
called "idle" and fixed the constructor parameters by using
connectionKeepAliveTimeOut as the write timeout value.
I also added two unit testcases to cover this functionality.
> Upgrade MR ShuffleHandler to use Netty4
> ---------------------------------------
>
> Key: HADOOP-15327
> URL: https://issues.apache.org/jira/browse/HADOOP-15327
> Project: Hadoop Common
> Issue Type: Sub-task
> Reporter: Xiaoyu Yao
> Assignee: Szilard Nemeth
> Priority: Major
> Attachments: HADOOP-15327.001.patch, HADOOP-15327.002.patch,
> getMapOutputInfo_BlockingOperationException_awaitUninterruptibly.log
>
>
> This way, we can remove the dependencies on the netty3 (jboss.netty)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]