[ 
https://issues.apache.org/jira/browse/HADOOP-15327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17362367#comment-17362367
 ] 

Szilard Nemeth commented on HADOOP-15327:
-----------------------------------------

The latest patch contains commits from this branch: 
[https://github.com/szilard-nemeth/hadoop/commits/HADOOP-15327-snemeth]
 There are a couple of commits so I would approach this by explaning the 
reasons behind each change in the commits.
 Not all commits are listed, I left out a few trivial ones.
 Unfortunately, this task was a bit tricky as everytime I touched something in 
the test, I just found another bug or weird behaviour so it took a great deal 
of time to solve and discover everything.

*1. ShuffleHandler: ch.isOpen() --> ch.isActive(): 
[https://github.com/szilard-nemeth/hadoop/commit/e703adb57f66da8579baa26257ca9aaed2bf1db5]*
 This was already mentioned with my previous lenghtier comment.

*2. TestShuffleHandler: Fix mocking in testSendMapCount + replace ch.write() 
with ch.writeAndFlush(): 
[https://github.com/szilard-nemeth/hadoop/commit/07fbfee5cae85e8e374b53c303e794c19c620efc]*
 This is about 2 things:
 - Replacing channel.write calls with channel.writeAndFlush
 - Fixing bad mocking in 
org.apache.hadoop.mapred.TestShuffleHandler#testSendMapCount

*3. TestShuffleHandler.testMaxConnections: Rewrite test + production code: 
accepted connection handling: 
[https://github.com/szilard-nemeth/hadoop/commit/def0059982ef8f0e2f19d385b1a1fcdca8639f9d]*
 *Changes in production code:*
 - ShuffleHandler#channelActive added the channel to the channel group (field 
called 'accepted') before the if statement that enforces the maximum number of 
open connections. This was the old, wrong piece of code:
{code:java}
 super.channelActive(ctx);
      LOG.debug("accepted connections={}", accepted.size());

      if ((maxShuffleConnections > 0) && (accepted.size() >= 
maxShuffleConnections)) {
{code}

 - Also, counting the number of open channels with the channel group was 
unreliable so I introduced a new AtomicInteger field called 
'acceptedConnections' to track the open channels / connections.
 - There was another issue: When the channels were accepted, the counter of 
open channels was increased but when channels were inactivated I could not see 
any code that would have maintained (decremented) the value.
 This was mitigated by adding 
org.apache.hadoop.mapred.ShuffleHandler.Shuffle#channelInactive that logs the 
channel inactivated event and decreases the open connections counter:
{code:java}
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
      super.channelInactive(ctx);
      acceptedConnections.decrementAndGet();
      LOG.debug("New value of Accepted number of connections={}",
          acceptedConnections.get());
    }
{code}

*Changes in test code:*
 - org.apache.hadoop.mapred.TestShuffleHandler#testMaxConnections: Fixed the 
testcase, the issue was pointed out correctly by [~weichiu] : The connections 
are accepted in parallel so we should not rely on their order in the test. The 
way I rewritten this is that I introduced a map to group HttpURLConnection 
objects by their HTTP response code.
 Then I check if we only have 200 OK and 429 TOO MANY REQUESTS, and check if 
the number of 200 OK connections is 2 and there's only one unaccepted 
connection.

*4. increase netty version to 4.1.65.Final: 
[https://github.com/szilard-nemeth/hadoop/commit/4f4589063b579a93389b1e188c29bd895ae507fc]*
 This is a simple commit to increase the Netty version to the latest stable 4.x 
version.
 See this page: [https://netty.io/downloads.html]
 It states: "netty-4.1.65.Final.tar.gz ‐ 19-May-2021 (Stable, Recommended)"

*5. ShuffleHandler: Fix keepalive test + writing HTTP response properly to 
channel: 
[https://github.com/szilard-nemeth/hadoop/commit/1aad4eaace28cfff4a9a9152f7535d70cc6e3734]*
 This is where things get more interesting. There was a testcase called 
org.apache.hadoop.mapred.TestShuffleHandler#testKeepAlive that caught an issue 
that came up because Netty 4.x handles HTTP responses written to the same 
channel differently than Netty 3.x.
 See details below.

Production code changes:
 - Added some logs to be able to track what happened when utilizing HTTP 
Connection Keep-alive.
 - Added a ChannelOutboundHandlerAdapter that handles exceptions that happens 
during outbound message construction. This is by default not logged by Netty 
and I only found this trick to catch these events:
{code:java}
      pipeline.addLast("outboundExcHandler", new 
ChannelOutboundHandlerAdapter() {
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise 
promise) throws Exception {
          promise.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
          super.write(ctx, msg, promise);
        }
      });
{code}
This solution is described here: 
//[https://stackoverflow.com/questions/50612403/catch-all-exception-handling-for-outbound-channelhandler]

 - *Fixed the root cause of the keep alive test issue in 
org.apache.hadoop.mapred.ShuffleHandler.Shuffle#channelRead.*
 *Let me give a detailed explanation here as this was quite tricky to catch.*
 With Netty 4.x, after writing the DefaultHttpResponse to the channel, the HTTP 
body is constructed by channel write calls. These calls are originating from
 sendMap and then sendMapOutput. A significant difference between Netty 3.x and 
4.x is the introduced HTTP response objects.
 These are: DefaultFullHttpResponse, DefaultHttpResponse.
 The DefaultFullHttpResponse is for constructing a final response that 
encapsulates the HTTP header and body.
 The DefaultHttpResponse is to construct a HTTP header, write it to the channel 
and push buffered data to the channel later and this will be the HTTP body.
 In case of HTTP connection Keep-Alive is utilized, an instance of 
LastHttpContent.EMPTY_LAST_CONTENT message object should be written to the 
channel after the message body data is sent through.
 Doing so will enable the next HTTP response to be sent through on the same 
channel.
 If we wouldn't write a LastHttpContent, the channel would fail to handle 
subsequent HTTP responses.
 The root cause of this is that all outbound messages go through 
HttpResponseEncoder, and it is stateful in a way that it prevents sending other 
HTTP responses if there was no clear boundary to detect the end of the previous 
HTTP response. This is main the purpose of LastHttpContent.
 When there's no LastHttpContent written to the channel, 
HttpObjectEncoder.encode will throw an IllegalStateException on the second HTTP 
message.
 By default, exceptions thrown while handling outbound messages are not printed 
in any way, so it's required that trick above to print those.
 All of the above behaviour is quite undocumented, unfortunately.

I found some result in Github issues, these are the most related ones that led 
me to the final solution:
 - [https://github.com/netty/netty/issues/1725#issuecomment-22624967]
 - [https://github.com/netty/netty/issues/11155#issue-857141001]

There are other related issues as well:
 * [https://github.com/netty/netty/issues/1008]
 * [https://github.com/netty/netty/issues/7993]
 * [https://github.com/netty/netty/issues/1700]
 * [https://github.com/netty/netty/issues/2466]
 * [https://github.com/netty/netty/issues/1359]

Some of the Netty issues above mentions that sane use of write / writeAndFlush 
could also mitigate this kind of issue, but it never worked for me so I sticked 
to using LastHttpContent.

*Test code changes:* 
 In general, I don't like adding too much unrelated refactor code and other 
non-related changes to the patch and trying to focus on the core change.
 However, as I wanted to add a new KeepAlive test as well, I didn't want to 
duplicate a huge chunk of test code. The testcase itself already contained too 
much of code duplication.
 - Introduced 
org.apache.hadoop.mapred.TestShuffleHandler.ShuffleHandlerForKeepAliveTests 
that was extracted out of testKeepAlive.
 - Introduced 
org.apache.hadoop.mapred.TestShuffleHandler.LoggingHttpResponseEncoder: This is 
to add more logging for outbound messages in the test channel pipeline.
 - Introduced org.apache.hadoop.mapred.TestShuffleHandler.MapOutputSender: This 
has the same functionality as before, but with a lot better and straightforward 
code, at least I think so :) This is primarily called from 
org.apache.hadoop.mapred.ShuffleHandler.Shuffle#sendMapOutput when 
ShuffleHandler decides to send mapper output.
 - Similarly to MapOutputSender I also introduced 
org.apache.hadoop.mapred.TestShuffleHandler.ShuffleHeaderProvider: Same 
functionality, more straightforward code.
 Primarily called from 
org.apache.hadoop.mapred.TestShuffleHandler.HeaderPopulator#populateHeaders.
 - Introduced new test helper class: 
org.apache.hadoop.mapred.TestShuffleHandler.HttpConnectionHelper.
 This is capable of connecting to the provided array of URLs and stores 
connection data for each one in HttpConnectionData objects.
 - Introduced new test helper class: 
org.apache.hadoop.mapred.TestShuffleHandler.HttpConnectionData.
 This class is a simple data class to store connection data: the 
HttpURLConnection itself, the length of the payload (response), the 
SocketAddress, the HTTP response code and the HTTP headers. This class makes it 
easier to perform assertions on these HTTP connections.
 - Finally, I introduced another new test helper class: 
org.apache.hadoop.mapred.TestShuffleHandler.HttpConnectionAssert.
 This receives an instance of HttpConnectionData and can perform certain 
assertions. All kinds of assertions are added as convenience methods.
 There's one special method: 
org.apache.hadoop.mapred.TestShuffleHandler.HttpConnectionAssert#assertKeepAliveConnectionsAreSame:
 This receives a HttpConnectionHelper and compares if the first 2 connection 
sockets are having the same address.
 - Given all these helper classes and refactors, I simplified the keep alive 
test and added a new testcase as well: testKeepAliveInitiallyDisabled

*6. channel.closeFuture().awaitUninterruptibly() --> channel.close(): 
[https://github.com/szilard-nemeth/hadoop/commit/3eb1bf244a7a50f12a5f43058384d9904ab95825]*
 This is in relation to my previous longer comment and the question I asked.
 This is quite a concise commit, it replaces
{code:java}
future.channel().closeFuture().awaitUninterruptibly();
{code}
with
{code:java}
future.channel().close();
{code}
As Wei-Chiu mentioned, it's recommended by the javadoc to not call 
awaitUninterruptibly() in Netty I/O threads or channel handlers: 
[https://netty.io/4.0/api/io/netty/channel/ChannelFuture.html]
 See this attachment: 
[^getMapOutputInfo_BlockingOperationException_awaitUninterruptibly.log] for a 
stacktrace that shows that io.netty.util.concurrent.BlockingOperationException 
is thrown if await is called instead of simply calling close().

*7. TestShuffleHandler: Add error handling + assertion logic: 
[https://github.com/szilard-nemeth/hadoop/commit/ec1d7a9249576a88e228fac1497c87309b4c6ac6]*
 During testing, I discovered a potential bug. I saw the same 
IllegalStateException coming thrown from HttpResponseEncoder.
 This will be a bit lengthy but I spent enough time to reveal the root cause so 
I thought it's better to write it down.
 In org.apache.hadoop.mapred.ShuffleHandler.Shuffle#channelRead, when 
populateHeaders is called (only 1 occurrence), and an IOException is caught, 
this happens:
{code:java}
try {
        populateHeaders(mapIds, jobId, user, reduceId, request,
          response, keepAliveParam, mapOutputInfoMap);
      } catch(IOException e) {
        //TODO snemeth (HADOOP-15327)
        // This seems like a bug combined with bad expectations in the tests.
        // See details in jira
        ch.writeAndFlush(response);
        LOG.error("Shuffle error in populating headers :", e);
        String errorMessage = getErrorMessage(e);
        sendError(ctx,errorMessage , INTERNAL_SERVER_ERROR);
        return;
      }
{code}
Here, the response is written to the channel, the response object is defined as:
{code:java}
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
{code}
The problem is that Shuffle#sendError also writes a response, but it is an 
error response:
{code:java}
protected void sendError(ChannelHandlerContext ctx, String msg,
        HttpResponseStatus status, Map<String, String> headers) {
      FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status,
              Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
      response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
      // Put shuffle version into http header
      response.headers().set(ShuffleHeader.HTTP_HEADER_NAME,
          ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
      response.headers().set(ShuffleHeader.HTTP_HEADER_VERSION,
          ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
      for (Map.Entry<String, String> header : headers.entrySet()) {
        response.headers().set(header.getKey(), header.getValue());
      }

      // Close the connection as soon as the error message is sent.
      
ctx.channel().writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }
{code}
It doesn't seem to be correct to write a successful, then an error response.
 Moreover, the tests are passing, which is strange.
 When running either org.apache.hadoop.mapred.TestShuffleHandler#testRecovery 
or org.apache.hadoop.mapred.TestShuffleHandler#testRecoveryFromOtherVersions, 
we can observe many many occurrences of this exception in the logs:
{code:java}
2021-06-12 18:41:54,732 ERROR [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:exceptionCaught(1412)) - Shuffle 
error: 
io.netty.handler.codec.EncoderException: java.lang.IllegalStateException: 
unexpected message type: DefaultFullHttpResponse, state: 1
      at 
io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:104)
      at 
io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
      at 
io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
      at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
      at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
      at 
io.netty.handler.stream.ChunkedWriteHandler.doFlush(ChunkedWriteHandler.java:302)
      at 
io.netty.handler.stream.ChunkedWriteHandler.flush(ChunkedWriteHandler.java:131)
      at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
      at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
      at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
      at 
io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127)
      at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
      at 
io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
      at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
      at 
io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
      at 
io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:808)
      at 
io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1025)
      at 
io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:306)
      at 
org.apache.hadoop.mapred.ShuffleHandler$Shuffle.sendError(ShuffleHandler.java:1390)
      at 
org.apache.hadoop.mapred.ShuffleHandler$Shuffle.sendError(ShuffleHandler.java:1372)
      at 
org.apache.hadoop.mapred.ShuffleHandler$Shuffle.channelRead(ShuffleHandler.java:1069)
      at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
      at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
      at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
      at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
      at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
      at 
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
      at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
      at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
      at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
      at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
      at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
      at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
      at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
      at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
      at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
      at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
      at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
      at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
      at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
      at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: unexpected message type: 
DefaultFullHttpResponse, state: 1
      at 
io.netty.handler.codec.http.HttpObjectEncoder.encode(HttpObjectEncoder.java:86)
      at 
io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:89)
      ... 44 more
{code}
*This is the same issue described with point 5*: Once the outbound message 
encoder had written a HTTP response, it can't write another one on the same 
channel unless a LastHttpContent is written before the second message.
 So when the successful response object has been written, Shuffle.sendError 
wants to write the error HTTP response and it fails. The reason of many 
occurrences of this exception is that 
org.apache.hadoop.mapred.ShuffleHandler.Shuffle#exceptionCaught catches the 
outbound exception and tries to send the error again to the client in 
org.apache.hadoop.mapred.ShuffleHandler.Shuffle#exceptionCaught (Note the last 
sendError call):
{code:java}
@Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
        throws Exception {
      Channel ch = ctx.channel();
      if (cause instanceof TooLongFrameException) {
        sendError(ctx, BAD_REQUEST);
        return;
      } else if (cause instanceof IOException) {
        if (cause instanceof ClosedChannelException) {
          LOG.debug("Ignoring closed channel error", cause);
          return;
        }
        String message = String.valueOf(cause.getMessage());
        if (IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
          LOG.debug("Ignoring client socket close", cause);
          return;
        }
      }

      LOG.error("Shuffle error: ", cause);
      if (ch.isActive()) {
        sendError(ctx, INTERNAL_SERVER_ERROR);
      }
    }
{code}
This goes on and on until the test finishes so this is kind of an infinite loop 
of events.
 The client only gets the successful HTTP response and the tests pass. However, 
I think in these cases the test should fail and expect an error HTTP response 
instead.
 Let's look at 
org.apache.hadoop.mapred.TestShuffleHandler#testRecoveryFromOtherVersions, for 
example: 
 There must be something wrong with the test setup, as populateHeaders errors 
out with:
{code:java}
2021-06-12 18:41:54,703 DEBUG [ShuffleHandler Netty Worker #0] fs.FileSystem 
(DurationInfo.java:close(101)) - Creating FS file:///: duration 0:00.097s
2021-06-12 18:41:54,731 ERROR [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1067)) - Shuffle error 
in populating headers :
java.io.IOException: Error Reading IndexFile
      at 
org.apache.hadoop.mapred.IndexCache.readIndexFileToCache(IndexCache.java:123)
      at 
org.apache.hadoop.mapred.IndexCache.getIndexInformation(IndexCache.java:68)
      at 
org.apache.hadoop.mapred.ShuffleHandler$Shuffle.getMapOutputInfo(ShuffleHandler.java:1191)
      at 
org.apache.hadoop.mapred.ShuffleHandler$Shuffle.populateHeaders(ShuffleHandler.java:1210)
      at 
org.apache.hadoop.mapred.ShuffleHandler$Shuffle.channelRead(ShuffleHandler.java:1060)
      at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
      at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
      at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
      at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
      at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
      at 
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
      at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
      at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
      at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
      at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
      at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
      at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
      at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
      at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
      at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
      at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
      at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
      at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
      at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
      at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: File 
/Users/snemeth/development/apache/hadoop/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/target/test-dir/TestShuffleHandlerLocDir/usercache/someuser/appcache/application_12345_0001/output/attempt_12345_1_m_1_0/file.out.index
 does not exist
      at 
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:779)
      at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1100)
      at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:769)
      at 
org.apache.hadoop.fs.RawLocalFileSystem.open(RawLocalFileSystem.java:274)
      at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:975)
      at 
org.apache.hadoop.io.SecureIOUtils.openFSDataInputStream(SecureIOUtils.java:152)
      at org.apache.hadoop.mapred.SpillRecord.<init>(SpillRecord.java:71)
      at org.apache.hadoop.mapred.SpillRecord.<init>(SpillRecord.java:62)
      at 
org.apache.hadoop.mapred.IndexCache.readIndexFileToCache(IndexCache.java:119)
      ... 28 more
{code}
This induces the above infinite loop of exceptions. Still, the client only 
receives the successful response and the server stuck in that loop.
 The infinite loop can be prevented by removing the magic 
ChannelOutboundHandlerAdapter from the pipeline in 
org.apache.hadoop.mapred.ShuffleHandler.HttpPipelineFactory#initChannel:
{code:java}
 pipeline.addLast("outboundExcHandler", new ChannelOutboundHandlerAdapter() {
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise 
promise) throws Exception {
          promise.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
          super.write(ctx, msg, promise);
        }
      });
{code}
*Let me sum this up real quickly:*
 - *ShuffleHandler.Shuffle#channelRead writes a HTTP 200 OK response even if 
the call to populateHeaders failed.*
 - *However, sendError writes a HTTP 500 Internal Server error response to the 
channel later.*
 - *Tests are expecting a successful HTTP connection, this is wrong.*
 - *The successful HTTP connection is just a side-effect of the fact that the 
unsuccessful HTTP response can't be written to the channel because of*
 *an exception thrown from the HttpResponseEncoder: 
"java.lang.IllegalStateException: unexpected message type: 
DefaultFullHttpResponse, state: 1"*
 - *With Netty 3.x, this was probably another side-effect, so the second 
unsuccessful HTTP response was not written to the channel, either.*

 

*All in all, I propose to fix this in a follow-up jira and temporarily disable 
this outbound handler adapter as this is a separate bug that was discovered.*
 [~weichiu] *Could you share your thoughts, please?*

*Test code changes:*

This commit well represents how rigid is the current design: As Shuffle is an 
inner-class of ShuffleHandler, everytime the test wants to override something 
from the Shuffle class, it also needs to have an enclosing instance of 
ShuffleHandler.
 This could be fixed as another follow-up jira, as the tests are very hard to 
maintain like this.
 I had to add org.apache.hadoop.mapred.ShuffleHandler.Shuffle#exceptionCaught 
to each and every overridden class of Shuffle, which is pure code duplication.

*8. Fix idle state handling + add test: 
[https://github.com/szilard-nemeth/hadoop/commit/3e74ac1913d3280a84b31189d3d8a116aae3e0e5]*
 This is mainly a fix of my previous question regarding the constructor call of 
TimeoutHandler.super with 1, 1, 1 as parameters.
 It turned out that the code was wrong there, let me explain. 
 The constructor of the superclass (IdleStateHandler) looks like this:
{code:java}
  public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, 
int allIdleTimeSeconds) {
    this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, 
(long)allIdleTimeSeconds, TimeUnit.SECONDS);
  }
{code}
It turns out Wei-Chiu's commit had [two idle state 
handlers|https://github.com/jojochuang/hadoop/commit/14761633c95a38291e825169f3b9ed6459586f7f#diff-bb1f8f80a30d30861c2152d70c46b5cc709f3b8016836cf77cf5e83a7baefe54R853-R855]
 The first named "idle" is a simple IdleStateHandler. The parameterization of 
the constructor was okay here, it used the value of 
"connectionKeepAliveTimeOut" for the channel write timeout.
 The second handler is the TIMEOUT_HANDLER, that is an instance of 
TimeoutHandler and it also extends IdleStateHandler: 
[LINK|https://github.com/jojochuang/hadoop/commit/14761633c95a38291e825169f3b9ed6459586f7f#diff-bb1f8f80a30d30861c2152d70c46b5cc709f3b8016836cf77cf5e83a7baefe54R795-R813]
 So we had two IdleStateHandlers, the 2nd handler had 1 second as 
readerIdleTimeSeconds, writerIdleTimeSeconds and allIdleTimeSeconds, which 
effectively means after 1 second of read or write inactivity, the channel will 
be closed. But channelIdle limited this by filtering the type of event to the 
WRITER_IDLE event: 
[LINK|https://github.com/jojochuang/hadoop/commit/14761633c95a38291e825169f3b9ed6459586f7f#diff-bb1f8f80a30d30861c2152d70c46b5cc709f3b8016836cf77cf5e83a7baefe54R808-R813]
Ultimately, the second handler "won" since it had 1 second set as write timeout 
so the first handler was not working in this sense.
I resolved all this by keeping TimeoutHandler and removing the other handler 
called "idle" and fixed the constructor parameters by using 
connectionKeepAliveTimeOut as the write timeout value.
 I also added two unit testcases to cover this functionality.

> Upgrade MR ShuffleHandler to use Netty4
> ---------------------------------------
>
>                 Key: HADOOP-15327
>                 URL: https://issues.apache.org/jira/browse/HADOOP-15327
>             Project: Hadoop Common
>          Issue Type: Sub-task
>            Reporter: Xiaoyu Yao
>            Assignee: Szilard Nemeth
>            Priority: Major
>         Attachments: HADOOP-15327.001.patch, HADOOP-15327.002.patch, 
> getMapOutputInfo_BlockingOperationException_awaitUninterruptibly.log
>
>
> This way, we can remove the dependencies on the netty3 (jboss.netty)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to