[ 
https://issues.apache.org/jira/browse/HADOOP-15327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17371468#comment-17371468
 ] 

Szilard Nemeth edited comment on HADOOP-15327 at 6/29/21, 3:09 PM:
-------------------------------------------------------------------

Just uploaded a new patch: [^HADOOP-15327.005.patch]

I have been (almost) exclusively working on this since my last comment and 
there are a couple of things to add again.
 The last commit that was discussed is this: 
[https://github.com/szilard-nemeth/hadoop/commit/f149be8de28baafc64eed1c47e788f5beb215e62]
 Let me explain what've changed commit by commit. I will skip a bunch of 
trivial ones like code cleanup, added comments and the like.
 *I will cover the test failures surfaced by Jenkins build / unit test results:*
 - [Build 
#1|https://issues.apache.org/jira/browse/HADOOP-15327?focusedCommentId=17362456&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17362456]
 - [Build 
#2|https://issues.apache.org/jira/browse/HADOOP-15327?focusedCommentId=17363928&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17363928]

*1. TestShuffleHandler: Introduced InputStreamReadResult that stores response 
as string + total bytes read: 
[https://github.com/szilard-nemeth/hadoop/commit/a57de573c97fe12c9071dd3450df8f450bf075ea]*
 Here, I added a new class called 'InputStreamReadResult' that stores the bytes 
read (byte[]) and the number of bytes read from a response InputStream.
 This improves the way testcases can assert on these data.

*2. TestShuffleHandler: Use DEFAULT_PORT for all shuffle handler port configs: 
[https://github.com/szilard-nemeth/hadoop/commit/78b1166866c85cab6860407f8fe4a4ddc3168fae]*
 It was a common pitfall while debugging that the tests had to modified to use 
a certain fixed port. Here, I added a constant to store the port number so when 
I had to debug I only needed to change it in one single place.

*3. Create class: TestExecution: Configure proxy, keep alive connection 
timeout: 
[https://github.com/szilard-nemeth/hadoop/commit/fa5bb32ae4eb737077a165b3b1fba5069c982243]*
 In order to debug the HTTP responses, I found it convenient to add a helper 
class that is responsible for the following:
 - Configuring the HTTP connections, use a proxy when required
 - Increase the keepalive timeout when using DEBUG mode

TEST_EXECUTION is a static instance of TestExecution, initialized with a JUnit 
test setup method.
 There are 2 flags that control the behaviour of this object:
{code:java}
//Control test execution properties with these flags
private static final boolean DEBUG_MODE = true;
//If this is set to true and proxy server is not running, tests will fail!
private static final boolean USE_PROXY = false; 
{code}
The only difference on top of these is in the code of testcases: They create 
all HTTP connections with:
{code:java}
TEST_EXECUTION.openConnection(url)
{code}
*4. TestExecution: Configure port: 
[https://github.com/szilard-nemeth/hadoop/commit/4a5c035695be1099bff4a633cd605b9f8146d841]*
 One addition to 3. is to include the port used by ShuffleHandler in the 
TestExecution object. When using DEBUG mode, the port is fixed to a value, 
otherwise it is set to 0, meaning that the port will be dynamically chosen.

*5. Add logging response encoder to TestShuffleHandler.testMapFileAccess: 
[https://github.com/szilard-nemeth/hadoop/commit/64686b47d2fed4e923c1c9c0169a06aba3e339be]*
 While debugging TestShuffleHandler#testMapFileAccess, just realized that I 
forgot to add the LoggingHttpResponseEncoder to the pipeline. The most trivial 
way was to modify the pipeline when the channel is activated.

*6. TestShuffleHandler.testMapFileAccess: Modify to be able to run it locally + 
reproduce jenkins UT failure: 
[https://github.com/szilard-nemeth/hadoop/commit/bb0fcbbd7dcbe3fa7efd1b6a8c2eb8a9055c5ecd]*
 Here's where the fun begins. The problem with 
TestShuffleHandler#testMapFileAccess is that it requires the NativeIO module:
{code:java}
// This will run only in NativeIO is enabled as SecureIOUtils need it
assumeTrue(NativeIO.isAvailable());
{code}
I tried to compile the Hadoop Native libraries on my Mac according to these 
resources:
 - Native libraries: 
[https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/NativeLibraries.html]
 - Followed this guide: 
[https://dev.to/zejnilovic/building-hadoop-native-libraries-on-mac-in-2019-1iee]

Unfortunately, I still had compilation errors so I eventually gave up and 
tweaked the test to be able to run it locally. This wasn't such a complex 
thing, I don't think it's worth to go into the details, had to comment out some 
test code that used the Native library and that was all.
 From the Jenkins results I had this:
{code:java}
[INFO] --- maven-surefire-plugin:3.0.0-M1:test (default-test) @ 
hadoop-mapreduce-client-shuffle ---
[INFO] 
[INFO] -------------------------------------------------------
[INFO]  T E S T S
[INFO] -------------------------------------------------------
[INFO] Running org.apache.hadoop.mapred.TestFadvisedFileRegion
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.493 s 
- in org.apache.hadoop.mapred.TestFadvisedFileRegion
[INFO] Running org.apache.hadoop.mapred.TestShuffleHandler
[ERROR] Tests run: 15, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 12.033 
s <<< FAILURE! - in org.apache.hadoop.mapred.TestShuffleHandler
[ERROR] testMapFileAccess(org.apache.hadoop.mapred.TestShuffleHandler)  Time 
elapsed: 0.339 s  <<< FAILURE!
java.lang.AssertionError: Received string '.......<MANY MANY NULL TERMINATING 
CHARACTERS>........' 
should contain message 'Owner 'jenkins' for path 
/home/jenkins/jenkins-home/workspace/PreCommit-HADOOP-Build/sourcedir/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/target/test-dir/TestShuffleHandlerLocDir/usercache/randomUser/appcache/application_12345_0001/output/attempt_12345_1_m_1_0/file.out.index
 did not match expected owner 'randomUser''
        at org.junit.Assert.fail(Assert.java:89)
        at org.junit.Assert.assertTrue(Assert.java:42)
        at 
org.apache.hadoop.mapred.TestShuffleHandler.testMapFileAccess(TestShuffleHandler.java:1228)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
        at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
        at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
        at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.lang.Thread.run(Thread.java:748)
{code}
Please keep on reading the next bullet point where I detail how I found out 
what was happening.

*7. TestShuffleHandler.testMapFileAccess: Fix in production code: 
[https://github.com/szilard-nemeth/hadoop/commit/6b9b9c39e181cf7836de33a2cda0712a81b85d01]*
 Continuing on the previous point, I had to find out what's going on and why 
the response was full of null-terminating characters.
 It's important to refer back to my previous comment: By checking 7. from that 
comment, I already gave a detailed analyis about error handling and what's 
wrong with writing a HTTP 200 OK, then a HTTP 500 (or any other bad code) to 
the channel. By the time I was putting that comment together I didn't realize 
that the 'testMpafileAccess' testcase wouldn't pass as it wasn't running on my 
machine locally so I proposed a follow-up jira. It turned out I need to fix it 
now as the test fails so I will retract my statement.
 The fix is utilizing the usual "trick": Write the HTTP 200 response, write 
LastHttpContent.EMPTY_LAST_CONTENT then write the HTTP 500 response to the 
channel. This way, we can avoid the IllegalStateException thrown by the 
outbound message handler.
{code:java}
try {
    populateHeaders(mapIds, jobId, user, reduceId, request,
      response, keepAliveParam, mapOutputInfoMap);
  } catch(IOException e) {
    //HADOOP-15327
    // Need to send an instance of LastHttpContent to define HTTP
    // message boundaries.
    //Sending a HTTP 200 OK + HTTP 500 later (sendError)
    // is quite a non-standard way of crafting HTTP responses,
    // but we need to keep backward compatibility.
    // See more details in jira.
    ch.writeAndFlush(response);
    ch.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
    LOG.error("Shuffle error in populating headers :", e);
    String errorMessage = getErrorMessage(e);
    sendError(ctx,errorMessage , INTERNAL_SERVER_ERROR);
    return;
  }
{code}
I also went ahead and checked if the tests are too autonomous and does this 
tricky HTTP error handling or the production code (Fetcher.java) has the same 
logic: It turned out Fetcher also works quite the same, so at least the tests 
and the production code are close together. I still don't think this is good 
how the "error responses" are handled as HTTP 200 responses, since the 
HttpURLConnection has a 
[getErrorStream|https://docs.oracle.com/javase/8/docs/api/java/net/HttpURLConnection.html#getErrorStream--]
 method that could be used for proper error handling. 
 This is also something that points to this direction: 
[https://stackoverflow.com/a/11721878/1106893]
 Nevertheless, it would be too risky to change this behaviour plus it would 
also break backward-compatibility. It would be a dumb idea to modify this 
especially with the Netty upgrade as it would have been involved a heavy 
behavioral change.
 So it's sad as it looks: The tests are not buggy, they are okay and their 
expectation of the HTTP 200 and the HTTP 500 error code packed into that 
response is the correct thing to check to match the behaviour of the production 
code of the ShuffleHandler client (Fetcher.java).

Let me also share what I've used to come to this conclusion: 
 *7.1 Modified the Apache trunk code to check the response codes in the tests 
including particular testcase as well. It turned out that HTTP 200 is the 
response code.*

*7.2 Also did a diff between TCP dump outputs generated by the old vs. new 
code, they were the same in terms of HTTP response codes and structure, after 
my fix was applied.*

*7.3 Wanted to to a final verification step and double-check HTTP responses to 
see if they are really all HTTP 200 and the HTTP is wrapped as a second status 
line or in the response body itself and I found a very useful tool to capture 
HTTP traffic, it's called 'mitmproxy': [https://mitmproxy.org/]*

Some useful links related to this tool:
 - 
[https://yoshihisaonoue.wordpress.com/2017/12/23/basic-usage-of-mitmdump/comment-page-1/]
 - 
[https://blog.packagecloud.io/eng/2016/11/14/debugging-ssl-in-java-using-mitmproxy/]
 - [https://dzone.com/articles/how-to-record-httphttps-traffic-with-mitmproxy]
 - [https://github.com/mitmproxy/mitmproxy/issues/2832]

For this, I had to reconfigure the HTTPUrlConnections to use a proxy. In 
theory, there are some standard HTTP / HTTPS JVM switches to control the global 
proxy but somehow I was not able to make it work.

Initially, I found some basic resouces about configuring Java to use proxy for 
all connections, globally:
 - 
[https://stackoverflow.com/questions/120797/how-do-i-set-the-proxy-to-be-used-by-the-jvm]
 - [https://docs.oracle.com/javase/8/docs/technotes/guides/net/proxies.html]
 - [https://www.baeldung.com/java-connect-via-proxy-server]

Based on these, I created a new test run config and added these JVM options:
{code:java}
-Djavax.net.debug=all
-Dhttp.proxyHost=localhost
-Dhttp.proxyPort=8888
-Dhttps.proxyHost=localhost
-Dhttps.proxyPort=8888
{code}
These global settings (http.proxyHost / http.proxyPort) did not work at all. 
Also tried using 127.0.0.1 instead of 'localhost'.
 This didn't work either so I suspected it's because something is bad with the 
JVM options so I added some assert statements:
{code:java}
//PROXY CONFIG
Assert.assertEquals("localhost", System.getProperty("http.proxyHost"));
Assert.assertEquals("8888", System.getProperty("http.proxyPort"));
Assert.assertEquals("localhost", System.getProperty("https.proxyHost"));
Assert.assertEquals("8888", System.getProperty("https.proxyPort"));
Assert.assertEquals(null, System.getProperty("http.nonProxyHosts"));
{code}
Also tried to set it to some dummy port value, expecting a connection issue as 
the proxy server is not listening on these ports. Result was still nothing:
{code:java}
System.setProperty("http.proxyPort", "1234");
System.setProperty("http.proxyHost", "127.0.0.1 ");
System.setProperty("http.proxyHost", "localhost");
System.setProperty("http.proxyPort", "8888");
{code}
So I gave up using the global config and found another way: 
[https://www.baeldung.com/java-connect-via-proxy-server#1-using-an-http-proxy]
 This finally worked and I could see the traffic going through the mitmproxy.
 Code:
{code:java}
//    HttpURLConnection conn = (HttpURLConnection) url.openConnection();
      Proxy webProxy
          = new Proxy(Proxy.Type.HTTP, new InetSocketAddress("127.0.0.1", 
8888));
      HttpURLConnection conn
          = (HttpURLConnection) url.openConnection(webProxy);
{code}
FINAL conclusion for this:
 It's valid by the RFC that the server sends a HTTP response with multiple 
status lines.
 However, the first status line is the one that really matters.
 Related SO answer: [https://stackoverflow.com/a/47541155/1106893]
 For me it's still weird, but this is what it is.

*8. TestShuffleHandler: Stop shufflehandler in all tests, fix debug mode 
issues: 
[https://github.com/szilard-nemeth/hadoop/commit/d472cb33cf038dee6f241382311f2bdf8314e0b9]*
 This is an important one for test stability and independence from each other.
 I added some teardown code that checks if the ShuffleHandler's port is still 
open. In practice, it should not be open when any of the testcases finished.
 This made me realize that many of the testcase wouldn't stop the 
ShuffleHandler so I fixed this.

*9. testReduceFromPartialMem: Add Shuffle IO error assertion to test: 
[https://github.com/szilard-nemeth/hadoop/commit/7eba7a4c9a08bd0de486c250a6bc711c7665f08e]*

testReduceFromPartialMem failed with [this Jenkins 
build|https://issues.apache.org/jira/browse/HADOOP-15327?focusedCommentId=17363928&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17363928]

Unfortunately, the output doesn't say too much about the root cause of the test 
failure:
{code:java}
[INFO] Running org.apache.hadoop.mapred.TestReduceFetchFromPartialMem
[ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 89.147 
s <<< FAILURE! - in org.apache.hadoop.mapred.TestReduceFetchFromPartialMem
[ERROR] 
testReduceFromPartialMem(org.apache.hadoop.mapred.TestReduceFetchFromPartialMem)
  Time elapsed: 89.024 s  <<< ERROR!
java.io.IOException: Job failed!
  at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:876)
  at 
org.apache.hadoop.mapred.TestReduceFetchFromPartialMem.runJob(TestReduceFetchFromPartialMem.java:292)
  at 
org.apache.hadoop.mapred.TestReduceFetchFromPartialMem.testReduceFromPartialMem(TestReduceFetchFromPartialMem.java:85)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
  at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
  at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
  at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
  at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
  at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
  at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
  at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
  at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
  at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
  at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
  at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
  at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
  at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
  at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
  at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
  at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
  at 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
  at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
  at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
  at 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
  at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
  at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
  at 
org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
  at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
{code}
So, I added some code that prints the Shuffle IO errors at least:
{code:java}
long shuffleIoErrors =
    
c.getGroup(SHUFFLE_ERR_GRP_NAME).getCounter(Fetcher.ShuffleErrors.IO_ERROR.toString());
assertEquals(0, shuffleIoErrors);
{code}
It turned out that this IO_ERROR from ShuffleErrors is sometimes 1 so I 
continued my investigation. 
 It took a lot of time but in the end I think was worth it.
 Please see the next points (11 / 12) where I will add detailed information 
about what was going on.

*10. LoggingHttpResponseEncoder: Add some new logs: 
[https://github.com/szilard-nemeth/hadoop/commit/e4260d1047805123d17b7d8dc94d776c818726a3]*
 In order to see all outbound message types I needed to add some new logs into 
LoggingHttpResponseEncoder.

*11. Fixed error handling + LastHttpContent: 
[https://github.com/szilard-nemeth/hadoop/commit/61cc084c6e05a778360b0f475a783b0c30f7f0cf]*
 When testReduceFromPartialMem failed once, I was able to check on my local 
machine what is really going on:
{code:java}
2021-06-22 18:18:27,452 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:channelRead(988)) - ***channelRead
2021-06-22 18:18:27,455 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:channelRead(990)) - ***REQUEST: 
HttpObjectAggregator$AggregatedFullHttpRequest(decodeResult: success, version: 
HTTP/1.1, content: CompositeByteBuf(ridx: 0, widx: 0, cap: 0, components=0))
GET 
/mapOutput?job=job_1624378685077_0001&reduce=0&map=attempt_1624378685077_0001_m_000000_0,attempt_1624378685077_0001_m_000002_0,attempt_1624378685077_0001_m_000004_0,attempt_1624378685077_0001_m_000005_0
 HTTP/1.1
UrlHash: C/94EnScoT3sUBGXf4ul5bdpm1Q=
name: mapreduce
version: 1.0.0
User-Agent: Java/1.8.0_232b09
Host: localhost:62272
Accept: text/html, image/gif, image/jpeg, *; q=.2, */*; q=.2
Connection: keep-alive
content-length: 0
2021-06-22 18:18:27,489 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1114)) - ***LOOP1
2021-06-22 18:18:27,498 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1352)) - ***Writing 
response buffer: 47, channel: c029638c
2021-06-22 18:18:27,507 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1369)) - ***Writing 
response partition: org.apache.hadoop.mapred.FadvisedFileRegion@5510d748, 
channel: c029638c
2021-06-22 18:18:27,509 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1114)) - ***LOOP2
2021-06-22 18:18:27,509 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1352)) - ***Writing 
response buffer: 47, channel: c029638c
2021-06-22 18:18:27,510 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1369)) - ***Writing 
response partition: org.apache.hadoop.mapred.FadvisedFileRegion@45e2b799, 
channel: c029638c
2021-06-22 18:18:27,510 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1114)) - ***LOOP3
2021-06-22 18:18:27,510 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1352)) - ***Writing 
response buffer: 47, channel: c029638c
2021-06-22 18:18:27,510 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1369)) - ***Writing 
response partition: org.apache.hadoop.mapred.FadvisedFileRegion@169b92fa, 
channel: c029638c
2021-06-22 18:18:27,510 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1120)) - ***LOOP ENDED
2021-06-22 18:18:27,511 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1123)) - ***Writing 
LastHttpContent, channel: c029638c
2021-06-22 18:18:27,513 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:operationComplete(338)) - 
***OPERATION COMPLETE
2021-06-22 18:18:27,513 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1352)) - ***Writing 
response buffer: 47, channel: c029638c
2021-06-22 18:18:27,514 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1369)) - ***Writing 
response partition: org.apache.hadoop.mapred.FadvisedFileRegion@1040936, 
channel: c029638c
2021-06-22 18:18:27,515 ERROR [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:operationComplete(319)) - Future is 
unsuccessful. Cause: 
io.netty.handler.codec.EncoderException: java.lang.IllegalStateException: 
unexpected message type: FadvisedFileRegion, state: 0
  at 
io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:104)
  at 
io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
  at 
io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
  at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
  at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
  at 
io.netty.handler.stream.ChunkedWriteHandler.doFlush(ChunkedWriteHandler.java:302)
  at 
io.netty.handler.stream.ChunkedWriteHandler.flush(ChunkedWriteHandler.java:131)
  at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
  at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
  at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
  at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127)
  at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
  at 
io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
  at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
  at 
io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
  at 
io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:808)
  at 
io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1025)
  at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:306)
  at 
org.apache.hadoop.mapred.ShuffleHandler$Shuffle.sendMapOutput(ShuffleHandler.java:1370)
  at 
org.apache.hadoop.mapred.ShuffleHandler$Shuffle.sendMap(ShuffleHandler.java:1152)
  at 
org.apache.hadoop.mapred.ShuffleHandler$ReduceMapFileCount.operationComplete(ShuffleHandler.java:339)
  at 
org.apache.hadoop.mapred.ShuffleHandler$ReduceMapFileCount.operationComplete(ShuffleHandler.java:307)
  at 
io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
  at 
io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571)
  at 
io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550)
  at 
io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
  at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
  at 
io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:605)
  at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)
  at 
io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48)
  at 
io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:717)
  at 
io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:272)
  at 
io.netty.channel.nio.AbstractNioByteChannel.doWriteInternal(AbstractNioByteChannel.java:242)
  at 
io.netty.channel.nio.AbstractNioByteChannel.doWrite0(AbstractNioByteChannel.java:212)
  at 
io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:400)
  at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:953)
  at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.forceFlush(AbstractNioChannel.java:361)
  at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:713)
  at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
  at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
  at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
  at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
  at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
  at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: unexpected message type: 
FadvisedFileRegion, state: 0
  at 
io.netty.handler.codec.http.HttpObjectEncoder.encode(HttpObjectEncoder.java:124)
  at 
io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:89)
  ... 43 more
2021-06-22 18:18:27,517 ERROR [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:operationComplete(319)) - Future is 
unsuccessful. Cause: 
io.netty.channel.StacklessClosedChannelException
  at io.netty.channel.AbstractChannel.close(ChannelPromise)(Unknown Source)
2021-06-22 18:18:27,517 ERROR [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:operationComplete(319)) - Future is 
unsuccessful. Cause: 
io.netty.channel.StacklessClosedChannelException
  at io.netty.channel.AbstractChannel.close(ChannelPromise)(Unknown Source)
{code}
The famous IllegalStateException once again but this time with a different 
message type: FadvisedFileRegion:
{code:java}
io.netty.handler.codec.EncoderException: java.lang.IllegalStateException: 
unexpected message type: FadvisedFileRegion, state: 0
{code}
In this case, the LastHttpContent was written too early to the channel and then 
the ShuffleHandler written an FadvisedFileRegion to the channel.
 With my added logs, it was evident what is happening:
{code:java}
2021-06-22 18:18:27,489 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1114)) - ***LOOP1
2021-06-22 18:18:27,498 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1352)) - ***Writing 
response buffer: 47, channel: c029638c
2021-06-22 18:18:27,507 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1369)) - ***Writing 
response partition: org.apache.hadoop.mapred.FadvisedFileRegion@5510d748, 
channel: c029638c
2021-06-22 18:18:27,509 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1114)) - ***LOOP2
2021-06-22 18:18:27,509 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1352)) - ***Writing 
response buffer: 47, channel: c029638c
2021-06-22 18:18:27,510 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1369)) - ***Writing 
response partition: org.apache.hadoop.mapred.FadvisedFileRegion@45e2b799, 
channel: c029638c
2021-06-22 18:18:27,510 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1114)) - ***LOOP3
2021-06-22 18:18:27,510 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1352)) - ***Writing 
response buffer: 47, channel: c029638c
2021-06-22 18:18:27,510 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1369)) - ***Writing 
response partition: org.apache.hadoop.mapred.FadvisedFileRegion@169b92fa, 
channel: c029638c
2021-06-22 18:18:27,510 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1120)) - ***LOOP ENDED
2021-06-22 18:18:27,511 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1123)) - ***Writing 
LastHttpContent, channel: c029638c
2021-06-22 18:18:27,513 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:operationComplete(338)) - 
***OPERATION COMPLETE
2021-06-22 18:18:27,513 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1352)) - ***Writing 
response buffer: 47, channel: c029638c
2021-06-22 18:18:27,514 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1369)) - ***Writing 
response partition: org.apache.hadoop.mapred.FadvisedFileRegion@1040936, 
channel: c029638c
2021-06-22 18:18:27,515 ERROR [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:operationComplete(319)) - Future is 
unsuccessful. Cause:
{code}
This is the original loop without the logs:
{code:java}
boolean keepAlive = keepAliveParam || connectionKeepAliveEnabled;
      ReduceContext reduceContext = new ReduceContext(mapIds, reduceId, ctx,
          user, mapOutputInfoMap, jobId, keepAlive);
      for (int i = 0; i < Math.min(maxSessionOpenFiles, mapIds.size()); i++) {
        ChannelFuture nextMap = sendMap(reduceContext);
        if(nextMap == null) {
          return;
        }
      }
//HADOOP-15327: Need to send an instance of LastHttpContent to define HTTP
//message boundaries. See details in jira.
ch.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
{code}
So somehow the loops end, the LastHttpContent.EMPTY_LAST_CONTENT is written but 
the ShuffleHandler.ReduceMapFileCount#operationComplete method tries to write 
an FadvisedFileRegion to the channel afterwards.
 The thing is, operationComplete invokes
{code:java}
pipelineFact.getSHUFFLE().sendMap(reduceContext);
{code}
if there are remaining map outputs to send. Then, 
ShuffleHandler.Shuffle#sendMap calls ShuffleHandler.Shuffle#sendMapOutput which 
writes the partition to the channel:
{code:java}
ChannelFuture writeFuture;
if (ch.pipeline().get(SslHandler.class) == null) {
  final FadvisedFileRegion partition = new FadvisedFileRegion(spill,
      info.startOffset, info.partLength, manageOsCache, readaheadLength,
      readaheadPool, spillfile.getAbsolutePath(), 
      shuffleBufferSize, shuffleTransferToAllowed);
  writeFuture = writeToChannel(ch, partition);
{code}
The point is that the LastHttpContent.EMPTY_LAST_CONTENT can only be written 
when we are in ShuffleHandler.ReduceMapFileCount#operationComplete and there 
are no remaining maps to send in the response. This is the only time we can 
finish the HTTP response.
 This is the main thing fixed in this commit.

There are a couple of other things included:
 - Introduced NettyChannelHelper in ShuffleHandler
 - Added Debug / Trace logs to ShuffleHandler
 - Add a flag to control if LoggingHttpResponseEncoder is added to the pipeline 
(for debugging purposes)

*12. ShuffleHandlerTest fixes + enhancements: 
[https://github.com/szilard-nemeth/hadoop/commit/a91a04772390f377d478bb6ada4d435feb344500]*
 Finally, this is the last commit.
 As the 'testReduceFromPartialMem' testcase pointed to an issue with the 
production code and I haven't found any testcase that would use Keepalive + 
provide multiple mapIds (mapOutput) to the GET request in ShuffleHandlerTest I 
thought it's important to add a testcase.
 This was a good decision in general, but I was faced with some complications 
along the way.

So, the new testcase is called: 
TestShuffleHandler#testKeepAliveMultipleMapAttemptIds.
 What is the difference between this and the other keep alive tests?
 It adds multiple map attempt ids to the GET request's URL, which will 
translate to mapId / mapOutput in ShuffleHandler.
 Also, it simulates the production code's behavior based on method 
'ShuffleHandler.Shuffle#sendMapOutput' that writes file chunks to the channel.
 This is how I'm doing it from the test:
{code:java}
shuffleHandler.mapOutputSender.additionalMapOutputSenderOperations = new 
AdditionalMapOutputSenderOperations() {
  @Override
  public ChannelFuture perform(ChannelHandlerContext ctx, Channel ch) throws 
IOException {
    File tmpFile = File.createTempFile("test", ".tmp");
    Files.write(tmpFile.toPath(), 
"dummytestcontent123456".getBytes(StandardCharsets.UTF_8));
    final DefaultFileRegion partition = new DefaultFileRegion(tmpFile, 0, 
mapOutputContentLength);
    LOG.debug("Writing response partition: {}, channel: {}",
        partition, ch.id());
    return ch.writeAndFlush(partition)
        .addListener((ChannelFutureListener) future ->
            LOG.debug("Finished Writing response partition: {}, channel: " +
                "{}", partition, ch.id()));
  }
};
{code}
Just to reiterate: The main problem with most of the testcases in 
TestShuffleHandler is that the class 
org.apache.hadoop.mapred.ShuffleHandler.Shuffle is an inner-class and to create 
a stubbed version, we need to have the enclosing class (ShuffleHandler) in 
context, so we can't define a stubbed version of Shuffle easily.
 The second issue is that most of the tests creating their own subclass of 
ShuffleHandler / Shuffle, that tries to simulate the real code.
 Instead, better mocking of dependent objects of these classes would be the way 
to go.
 Anyway, this would complicate things and increase the refactor work more and 
more, so I dropped this idea.
 Following the idea of subclassing ShuffleHandler / Shuffle, I reused the 
previously created class: ShuffleHandlerForKeepAliveTests.
 The only remaining piece of the puzzle was to tweak 
TestShuffleHandler.MapOutputSender#send to make it capable of sending something 
more to the channel, in this case chunks of a file.

Interestingly, the test failed. I will attach full testcase logs, but here I'm 
only adding the exception that the testcase failed with:
{code:java}
java.io.IOException: Invalid Http response

  at 
sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1612)
  at 
sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1498)
  at 
org.apache.hadoop.mapred.TestShuffleHandler$HttpConnectionHelper.connectToUrls(TestShuffleHandler.java:553)
  at 
org.apache.hadoop.mapred.TestShuffleHandler.testKeepAliveInternal(TestShuffleHandler.java:1047)
  at 
org.apache.hadoop.mapred.TestShuffleHandler.testKeepAliveMultipleMapAttemptIds(TestShuffleHandler.java:997)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
  at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
  at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
  at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
  at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
  at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.lang.Thread.run(Thread.java:748)
{code}
I was thinking: What the hell is going on? Is this a production code issue or a 
test code issue?
 The best thing to describe what was happening afterwards is to go down the 
rabbithole to really understand the issue I was facing and find out the root 
cause.
 All in all, the test executes two GET requests with the same URL and using 
keep alive.
 With some more added logs, I was able to point at that writing the file chunks 
happened later than the first request read the whole inpustream.
 Later, I also realized that the size of the bytes read from the InputStream is 
smaller than what I've expected. 
 This led me to check the Content-length of the HTTP response. Seemed okay. 
Tried to debug all pieces of the code, including the HttpClient in order to 
understand why the heck is the InputStream is closed. Checked the values more 
and more and realized that the headers are written to the channel N+1 times, so 
the Content-Length was less with 1 * header size. This is the code that writes 
these data to the channel in the tests:
{code:java}
public ChannelFuture send(ChannelHandlerContext ctx, Channel ch) throws 
IOException {
  LOG.debug("In MapOutputSender#send");
  lastSocketAddress.setAddress(ch.remoteAddress());
  ShuffleHeader header = shuffleHeaderProvider.createNewShuffleHeader();
  writeOneHeader(ch, header);
  ChannelFuture future = writeHeaderNTimes(ch, header, 
responseConfig.headerWriteCount);
  // This is the last operation
  // It's safe to increment ShuffleHeader counter for better identification
  shuffleHeaderProvider.incrementCounter();
  if (additionalMapOutputSenderOperations != null) {
    return additionalMapOutputSenderOperations.perform(ctx, ch);
  }
  return future;
}
{code}
Also, it's required to add the size of the file chunk(s) to the overall 
Content-length of the response.
 So the fix was simple: Compute the correct Content-length, then after this, 
both the responses from Netty were processed without an issue.
 This was one hell of a ride to find out.

There are some other changes in the test added with this commit:
 - Increase URLConnection read timeout / connect timeout when using Debug mode
 - Introduce class: ResponseConfig, that stores header + payload data sizes + 
final HTTP response content-length
 - Introduce abstract class: AdditionalMapOutputSenderOperations, that can 
perform additional operations when sendMap is invoked
 - ShuffleHandlerForKeepAliveTests: Enhanced failure control / close channel 
control
 - ShuffleHeaderProvider: Don't compute header on every invocation, cache the 
size of it [Only an optimization]
 - Fix TestShuffleHandler.HeaderPopulator#populateHeaders: Return full 
content-length of response, not just the length of the header
 - Fix in HttpConnectionHelper#connectToUrlsInternal: Add one headerSize to 
totalBytesRead.
 - Enhancement in HttpConnectionHelper#connectToUrlsInternal: Fail-fast if 
expected content-length < actual content-length.
 - Added new keepalive tests, including: testKeepAliveMultipleMapAttemptIds
 - Added new keepalive test with HTTP 400 bad request

I tried to make those changes only that are really required, but passing 
several number values for computing the content-length was ugly as hell so I 
introduced the ResponseConfig class.
----
h2. CONCLUSION

There were 2 test failure investigations that required to fix the production 
code and also took a lot of time to fix.
 At least I learned a ton of new stuff along the way :) 
 These investigations are detailed with *points 7., 11. and 12.*
 Please also see the attached investigation zip files if you are interested in 
interim patches / logs produced by the tests and other materials:
 [^testfailure-testMapFileAccess-emptyresponse.zip]

[^testfailure-testReduceFromPartialMem.zip]
h2. REMAINING TODOs
 * Testing ShuffleHandler on a cluster
 * Fix Maven shading issues


was (Author: snemeth):
Just uploaded a new patch: [^HADOOP-15327.005.patch]

I have been (almost) exclusively working on this since my last comment and 
there are a couple of things to add again.
 The last commit that was discussed is this: 
[https://github.com/szilard-nemeth/hadoop/commit/f149be8de28baafc64eed1c47e788f5beb215e62]
 Let me explain what've changed commit by commit. I will skip a bunch of 
trivial ones like code cleanup, added comments and the like.
 *I will cover the test failures surfaced by Jenkins build / unit test results:*
 - Build #1
 - Build #2

*1. TestShuffleHandler: Introduced InputStreamReadResult that stores response 
as string + total bytes read: 
[https://github.com/szilard-nemeth/hadoop/commit/a57de573c97fe12c9071dd3450df8f450bf075ea]*
 Here, I added a new class called 'InputStreamReadResult' that stores the bytes 
read (byte[]) and the number of bytes read from a response InputStream.
 This improves the way testcases can assert on these data.

*2. TestShuffleHandler: Use DEFAULT_PORT for all shuffle handler port configs: 
[https://github.com/szilard-nemeth/hadoop/commit/78b1166866c85cab6860407f8fe4a4ddc3168fae]*
 It was a common pitfall while debugging that the tests had to modified to use 
a certain fixed port. Here, I added a constant to store the port number so when 
I had to debug I only needed to change it in one single place.

*3. Create class: TestExecution: Configure proxy, keep alive connection 
timeout: 
[https://github.com/szilard-nemeth/hadoop/commit/fa5bb32ae4eb737077a165b3b1fba5069c982243]*
 In order to debug the HTTP responses, I found it convenient to add a helper 
class that is responsible for the following:
 - Configuring the HTTP connections, use a proxy when required
 - Increase the keepalive timeout when using DEBUG mode

TEST_EXECUTION is a static instance of TestExecution, initialized with a JUnit 
test setup method.
 There are 2 flags that control the behaviour of this object:
{code:java}
//Control test execution properties with these flags
private static final boolean DEBUG_MODE = true;
//If this is set to true and proxy server is not running, tests will fail!
private static final boolean USE_PROXY = false; 
{code}
The only difference on top of these is in the code of testcases: They create 
all HTTP connections with:
{code:java}
TEST_EXECUTION.openConnection(url)
{code}
*4. TestExecution: Configure port: 
[https://github.com/szilard-nemeth/hadoop/commit/4a5c035695be1099bff4a633cd605b9f8146d841]*
 One addition to 3. is to include the port used by ShuffleHandler in the 
TestExecution object. When using DEBUG mode, the port is fixed to a value, 
otherwise it is set to 0, meaning that the port will be dynamically chosen.

*5. Add logging response encoder to TestShuffleHandler.testMapFileAccess: 
[https://github.com/szilard-nemeth/hadoop/commit/64686b47d2fed4e923c1c9c0169a06aba3e339be]*
 While debugging TestShuffleHandler#testMapFileAccess, just realized that I 
forgot to add the LoggingHttpResponseEncoder to the pipeline. The most trivial 
way was to modify the pipeline when the channel is activated.

*6. TestShuffleHandler.testMapFileAccess: Modify to be able to run it locally + 
reproduce jenkins UT failure: 
[https://github.com/szilard-nemeth/hadoop/commit/bb0fcbbd7dcbe3fa7efd1b6a8c2eb8a9055c5ecd]*
 Here's where the fun begins. The problem with 
TestShuffleHandler#testMapFileAccess is that it requires the NativeIO module:
{code:java}
// This will run only in NativeIO is enabled as SecureIOUtils need it
assumeTrue(NativeIO.isAvailable());
{code}
I tried to compile the Hadoop Native libraries on my Mac according to these 
resources:
 - Native libraries: 
[https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/NativeLibraries.html]
 - Followed this guide: 
[https://dev.to/zejnilovic/building-hadoop-native-libraries-on-mac-in-2019-1iee]

Unfortunately, I still had compilation errors so I eventually gave up and 
tweaked the test to be able to run it locally. This wasn't such a complex 
thing, I don't think it's worth to go into the details, had to comment out some 
test code that used the Native library and that was all.
 From the Jenkins results I had this:
{code:java}
[INFO] --- maven-surefire-plugin:3.0.0-M1:test (default-test) @ 
hadoop-mapreduce-client-shuffle ---
[INFO] 
[INFO] -------------------------------------------------------
[INFO]  T E S T S
[INFO] -------------------------------------------------------
[INFO] Running org.apache.hadoop.mapred.TestFadvisedFileRegion
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.493 s 
- in org.apache.hadoop.mapred.TestFadvisedFileRegion
[INFO] Running org.apache.hadoop.mapred.TestShuffleHandler
[ERROR] Tests run: 15, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 12.033 
s <<< FAILURE! - in org.apache.hadoop.mapred.TestShuffleHandler
[ERROR] testMapFileAccess(org.apache.hadoop.mapred.TestShuffleHandler)  Time 
elapsed: 0.339 s  <<< FAILURE!
java.lang.AssertionError: Received string '.......<MANY MANY NULL TERMINATING 
CHARACTERS>........' 
should contain message 'Owner 'jenkins' for path 
/home/jenkins/jenkins-home/workspace/PreCommit-HADOOP-Build/sourcedir/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/target/test-dir/TestShuffleHandlerLocDir/usercache/randomUser/appcache/application_12345_0001/output/attempt_12345_1_m_1_0/file.out.index
 did not match expected owner 'randomUser''
        at org.junit.Assert.fail(Assert.java:89)
        at org.junit.Assert.assertTrue(Assert.java:42)
        at 
org.apache.hadoop.mapred.TestShuffleHandler.testMapFileAccess(TestShuffleHandler.java:1228)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
        at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
        at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
        at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.lang.Thread.run(Thread.java:748)
{code}
Please keep on reading the next bullet point where I detail how I found out 
what was happening.

*7. TestShuffleHandler.testMapFileAccess: Fix in production code: 
[https://github.com/szilard-nemeth/hadoop/commit/6b9b9c39e181cf7836de33a2cda0712a81b85d01]*
 Continuing on the previous point, I had to find out what's going on and why 
the response was full of null-terminating characters.
 It's important to refer back to my previous comment: By checking 7. from that 
comment, I already gave a detailed analyis about error handling and what's 
wrong with writing a HTTP 200 OK, then a HTTP 500 (or any other bad code) to 
the channel. By the time I was putting that comment together I didn't realize 
that the 'testMpafileAccess' testcase wouldn't pass as it wasn't running on my 
machine locally so I proposed a follow-up jira. It turned out I need to fix it 
now as the test fails so I will retract my statement.
 The fix is utilizing the usual "trick": Write the HTTP 200 response, write 
LastHttpContent.EMPTY_LAST_CONTENT then write the HTTP 500 response to the 
channel. This way, we can avoid the IllegalStateException thrown by the 
outbound message handler.
{code:java}
try {
    populateHeaders(mapIds, jobId, user, reduceId, request,
      response, keepAliveParam, mapOutputInfoMap);
  } catch(IOException e) {
    //HADOOP-15327
    // Need to send an instance of LastHttpContent to define HTTP
    // message boundaries.
    //Sending a HTTP 200 OK + HTTP 500 later (sendError)
    // is quite a non-standard way of crafting HTTP responses,
    // but we need to keep backward compatibility.
    // See more details in jira.
    ch.writeAndFlush(response);
    ch.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
    LOG.error("Shuffle error in populating headers :", e);
    String errorMessage = getErrorMessage(e);
    sendError(ctx,errorMessage , INTERNAL_SERVER_ERROR);
    return;
  }
{code}
I also went ahead and checked if the tests are too autonomous and does this 
tricky HTTP error handling or the production code (Fetcher.java) has the same 
logic: It turned out Fetcher also works quite the same, so at least the tests 
and the production code are close together. I still don't think this is good 
how the "error responses" are handled as HTTP 200 responses, since the 
HttpURLConnection has a 
[getErrorStream|https://docs.oracle.com/javase/8/docs/api/java/net/HttpURLConnection.html#getErrorStream--]
 method that could be used for proper error handling. 
 This is also something that points to this direction: 
[https://stackoverflow.com/a/11721878/1106893]
 Nevertheless, it would be too risky to change this behaviour plus it would 
also break backward-compatibility. It would be a dumb idea to modify this 
especially with the Netty upgrade as it would have been involved a heavy 
behavioral change.
 So it's sad as it looks: The tests are not buggy, they are okay and their 
expectation of the HTTP 200 and the HTTP 500 error code packed into that 
response is the correct thing to check to match the behaviour of the production 
code of the ShuffleHandler client (Fetcher.java).

Let me also share what I've used to come to this conclusion: 
 *7.1 Modified the Apache trunk code to check the response codes in the tests 
including particular testcase as well. It turned out that HTTP 200 is the 
response code.*

*7.2 Also did a diff between TCP dump outputs generated by the old vs. new 
code, they were the same in terms of HTTP response codes and structure, after 
my fix was applied.*

*7.3 Wanted to to a final verification step and double-check HTTP responses to 
see if they are really all HTTP 200 and the HTTP is wrapped as a second status 
line or in the response body itself and I found a very useful tool to capture 
HTTP traffic, it's called 'mitmproxy': [https://mitmproxy.org/]*

Some useful links related to this tool:
 - 
[https://yoshihisaonoue.wordpress.com/2017/12/23/basic-usage-of-mitmdump/comment-page-1/]
 - 
[https://blog.packagecloud.io/eng/2016/11/14/debugging-ssl-in-java-using-mitmproxy/]
 - [https://dzone.com/articles/how-to-record-httphttps-traffic-with-mitmproxy]
 - [https://github.com/mitmproxy/mitmproxy/issues/2832]

For this, I had to reconfigure the HTTPUrlConnections to use a proxy. In 
theory, there are some standard HTTP / HTTPS JVM switches to control the global 
proxy but somehow I was not able to make it work.

Initially, I found some basic resouces about configuring Java to use proxy for 
all connections, globally:
 - 
[https://stackoverflow.com/questions/120797/how-do-i-set-the-proxy-to-be-used-by-the-jvm]
 - [https://docs.oracle.com/javase/8/docs/technotes/guides/net/proxies.html]
 - [https://www.baeldung.com/java-connect-via-proxy-server]

Based on these, I created a new test run config and added these JVM options:
{code:java}
-Djavax.net.debug=all
-Dhttp.proxyHost=localhost
-Dhttp.proxyPort=8888
-Dhttps.proxyHost=localhost
-Dhttps.proxyPort=8888
{code}
These global settings (http.proxyHost / http.proxyPort) did not work at all. 
Also tried using 127.0.0.1 instead of 'localhost'.
 This didn't work either so I suspected it's because something is bad with the 
JVM options so I added some assert statements:
{code:java}
//PROXY CONFIG
Assert.assertEquals("localhost", System.getProperty("http.proxyHost"));
Assert.assertEquals("8888", System.getProperty("http.proxyPort"));
Assert.assertEquals("localhost", System.getProperty("https.proxyHost"));
Assert.assertEquals("8888", System.getProperty("https.proxyPort"));
Assert.assertEquals(null, System.getProperty("http.nonProxyHosts"));
{code}
Also tried to set it to some dummy port value, expecting a connection issue as 
the proxy server is not listening on these ports. Result was still nothing:
{code:java}
System.setProperty("http.proxyPort", "1234");
System.setProperty("http.proxyHost", "127.0.0.1 ");
System.setProperty("http.proxyHost", "localhost");
System.setProperty("http.proxyPort", "8888");
{code}
So I gave up using the global config and found another way: 
[https://www.baeldung.com/java-connect-via-proxy-server#1-using-an-http-proxy]
 This finally worked and I could see the traffic going through the mitmproxy.
 Code:
{code:java}
//    HttpURLConnection conn = (HttpURLConnection) url.openConnection();
      Proxy webProxy
          = new Proxy(Proxy.Type.HTTP, new InetSocketAddress("127.0.0.1", 
8888));
      HttpURLConnection conn
          = (HttpURLConnection) url.openConnection(webProxy);
{code}
FINAL conclusion for this:
 It's valid by the RFC that the server sends a HTTP response with multiple 
status lines.
 However, the first status line is the one that really matters.
 Related SO answer: [https://stackoverflow.com/a/47541155/1106893]
 For me it's still weird, but this is what it is.

*8. TestShuffleHandler: Stop shufflehandler in all tests, fix debug mode 
issues: 
[https://github.com/szilard-nemeth/hadoop/commit/d472cb33cf038dee6f241382311f2bdf8314e0b9]*
 This is an important one for test stability and independence from each other.
 I added some teardown code that checks if the ShuffleHandler's port is still 
open. In practice, it should not be open when any of the testcases finished.
 This made me realize that many of the testcase wouldn't stop the 
ShuffleHandler so I fixed this.

*9. testReduceFromPartialMem: Add Shuffle IO error assertion to test: 
[https://github.com/szilard-nemeth/hadoop/commit/7eba7a4c9a08bd0de486c250a6bc711c7665f08e]*

testReduceFromPartialMem failed with [this Jenkins 
build|https://issues.apache.org/jira/browse/HADOOP-15327?focusedCommentId=17363928&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17363928]

Unfortunately, the output doesn't say too much about the root cause of the test 
failure:
{code:java}
[INFO] Running org.apache.hadoop.mapred.TestReduceFetchFromPartialMem
[ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 89.147 
s <<< FAILURE! - in org.apache.hadoop.mapred.TestReduceFetchFromPartialMem
[ERROR] 
testReduceFromPartialMem(org.apache.hadoop.mapred.TestReduceFetchFromPartialMem)
  Time elapsed: 89.024 s  <<< ERROR!
java.io.IOException: Job failed!
  at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:876)
  at 
org.apache.hadoop.mapred.TestReduceFetchFromPartialMem.runJob(TestReduceFetchFromPartialMem.java:292)
  at 
org.apache.hadoop.mapred.TestReduceFetchFromPartialMem.testReduceFromPartialMem(TestReduceFetchFromPartialMem.java:85)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
  at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
  at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
  at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
  at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
  at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
  at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
  at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
  at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
  at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
  at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
  at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
  at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
  at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
  at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
  at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
  at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
  at 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
  at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
  at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
  at 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
  at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
  at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
  at 
org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
  at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
{code}
So, I added some code that prints the Shuffle IO errors at least:
{code:java}
long shuffleIoErrors =
    
c.getGroup(SHUFFLE_ERR_GRP_NAME).getCounter(Fetcher.ShuffleErrors.IO_ERROR.toString());
assertEquals(0, shuffleIoErrors);
{code}
It turned out that this IO_ERROR from ShuffleErrors is sometimes 1 so I 
continued my investigation. 
 It took a lot of time but in the end I think was worth it.
 Please see the next points (11 / 12) where I will add detailed information 
about what was going on.

*10. LoggingHttpResponseEncoder: Add some new logs: 
[https://github.com/szilard-nemeth/hadoop/commit/e4260d1047805123d17b7d8dc94d776c818726a3]*
 In order to see all outbound message types I needed to add some new logs into 
LoggingHttpResponseEncoder.

*11. Fixed error handling + LastHttpContent: 
[https://github.com/szilard-nemeth/hadoop/commit/61cc084c6e05a778360b0f475a783b0c30f7f0cf]*
 When testReduceFromPartialMem failed once, I was able to check on my local 
machine what is really going on:
{code:java}
2021-06-22 18:18:27,452 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:channelRead(988)) - ***channelRead
2021-06-22 18:18:27,455 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:channelRead(990)) - ***REQUEST: 
HttpObjectAggregator$AggregatedFullHttpRequest(decodeResult: success, version: 
HTTP/1.1, content: CompositeByteBuf(ridx: 0, widx: 0, cap: 0, components=0))
GET 
/mapOutput?job=job_1624378685077_0001&reduce=0&map=attempt_1624378685077_0001_m_000000_0,attempt_1624378685077_0001_m_000002_0,attempt_1624378685077_0001_m_000004_0,attempt_1624378685077_0001_m_000005_0
 HTTP/1.1
UrlHash: C/94EnScoT3sUBGXf4ul5bdpm1Q=
name: mapreduce
version: 1.0.0
User-Agent: Java/1.8.0_232b09
Host: localhost:62272
Accept: text/html, image/gif, image/jpeg, *; q=.2, */*; q=.2
Connection: keep-alive
content-length: 0
2021-06-22 18:18:27,489 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1114)) - ***LOOP1
2021-06-22 18:18:27,498 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1352)) - ***Writing 
response buffer: 47, channel: c029638c
2021-06-22 18:18:27,507 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1369)) - ***Writing 
response partition: org.apache.hadoop.mapred.FadvisedFileRegion@5510d748, 
channel: c029638c
2021-06-22 18:18:27,509 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1114)) - ***LOOP2
2021-06-22 18:18:27,509 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1352)) - ***Writing 
response buffer: 47, channel: c029638c
2021-06-22 18:18:27,510 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1369)) - ***Writing 
response partition: org.apache.hadoop.mapred.FadvisedFileRegion@45e2b799, 
channel: c029638c
2021-06-22 18:18:27,510 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1114)) - ***LOOP3
2021-06-22 18:18:27,510 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1352)) - ***Writing 
response buffer: 47, channel: c029638c
2021-06-22 18:18:27,510 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1369)) - ***Writing 
response partition: org.apache.hadoop.mapred.FadvisedFileRegion@169b92fa, 
channel: c029638c
2021-06-22 18:18:27,510 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1120)) - ***LOOP ENDED
2021-06-22 18:18:27,511 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1123)) - ***Writing 
LastHttpContent, channel: c029638c
2021-06-22 18:18:27,513 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:operationComplete(338)) - 
***OPERATION COMPLETE
2021-06-22 18:18:27,513 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1352)) - ***Writing 
response buffer: 47, channel: c029638c
2021-06-22 18:18:27,514 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1369)) - ***Writing 
response partition: org.apache.hadoop.mapred.FadvisedFileRegion@1040936, 
channel: c029638c
2021-06-22 18:18:27,515 ERROR [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:operationComplete(319)) - Future is 
unsuccessful. Cause: 
io.netty.handler.codec.EncoderException: java.lang.IllegalStateException: 
unexpected message type: FadvisedFileRegion, state: 0
  at 
io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:104)
  at 
io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
  at 
io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
  at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
  at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
  at 
io.netty.handler.stream.ChunkedWriteHandler.doFlush(ChunkedWriteHandler.java:302)
  at 
io.netty.handler.stream.ChunkedWriteHandler.flush(ChunkedWriteHandler.java:131)
  at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
  at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
  at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
  at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127)
  at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
  at 
io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
  at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
  at 
io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
  at 
io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:808)
  at 
io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1025)
  at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:306)
  at 
org.apache.hadoop.mapred.ShuffleHandler$Shuffle.sendMapOutput(ShuffleHandler.java:1370)
  at 
org.apache.hadoop.mapred.ShuffleHandler$Shuffle.sendMap(ShuffleHandler.java:1152)
  at 
org.apache.hadoop.mapred.ShuffleHandler$ReduceMapFileCount.operationComplete(ShuffleHandler.java:339)
  at 
org.apache.hadoop.mapred.ShuffleHandler$ReduceMapFileCount.operationComplete(ShuffleHandler.java:307)
  at 
io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
  at 
io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571)
  at 
io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550)
  at 
io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
  at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
  at 
io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:605)
  at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)
  at 
io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48)
  at 
io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:717)
  at 
io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:272)
  at 
io.netty.channel.nio.AbstractNioByteChannel.doWriteInternal(AbstractNioByteChannel.java:242)
  at 
io.netty.channel.nio.AbstractNioByteChannel.doWrite0(AbstractNioByteChannel.java:212)
  at 
io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:400)
  at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:953)
  at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.forceFlush(AbstractNioChannel.java:361)
  at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:713)
  at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
  at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
  at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
  at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
  at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
  at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: unexpected message type: 
FadvisedFileRegion, state: 0
  at 
io.netty.handler.codec.http.HttpObjectEncoder.encode(HttpObjectEncoder.java:124)
  at 
io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:89)
  ... 43 more
2021-06-22 18:18:27,517 ERROR [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:operationComplete(319)) - Future is 
unsuccessful. Cause: 
io.netty.channel.StacklessClosedChannelException
  at io.netty.channel.AbstractChannel.close(ChannelPromise)(Unknown Source)
2021-06-22 18:18:27,517 ERROR [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:operationComplete(319)) - Future is 
unsuccessful. Cause: 
io.netty.channel.StacklessClosedChannelException
  at io.netty.channel.AbstractChannel.close(ChannelPromise)(Unknown Source)
{code}
The famous IllegalStateException once again but this time with a different 
message type: FadvisedFileRegion:
{code:java}
io.netty.handler.codec.EncoderException: java.lang.IllegalStateException: 
unexpected message type: FadvisedFileRegion, state: 0
{code}
In this case, the LastHttpContent was written too early to the channel and then 
the ShuffleHandler written an FadvisedFileRegion to the channel.
 With my added logs, it was evident what is happening:
{code:java}
2021-06-22 18:18:27,489 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1114)) - ***LOOP1
2021-06-22 18:18:27,498 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1352)) - ***Writing 
response buffer: 47, channel: c029638c
2021-06-22 18:18:27,507 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1369)) - ***Writing 
response partition: org.apache.hadoop.mapred.FadvisedFileRegion@5510d748, 
channel: c029638c
2021-06-22 18:18:27,509 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1114)) - ***LOOP2
2021-06-22 18:18:27,509 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1352)) - ***Writing 
response buffer: 47, channel: c029638c
2021-06-22 18:18:27,510 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1369)) - ***Writing 
response partition: org.apache.hadoop.mapred.FadvisedFileRegion@45e2b799, 
channel: c029638c
2021-06-22 18:18:27,510 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1114)) - ***LOOP3
2021-06-22 18:18:27,510 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1352)) - ***Writing 
response buffer: 47, channel: c029638c
2021-06-22 18:18:27,510 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1369)) - ***Writing 
response partition: org.apache.hadoop.mapred.FadvisedFileRegion@169b92fa, 
channel: c029638c
2021-06-22 18:18:27,510 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1120)) - ***LOOP ENDED
2021-06-22 18:18:27,511 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1123)) - ***Writing 
LastHttpContent, channel: c029638c
2021-06-22 18:18:27,513 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:operationComplete(338)) - 
***OPERATION COMPLETE
2021-06-22 18:18:27,513 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1352)) - ***Writing 
response buffer: 47, channel: c029638c
2021-06-22 18:18:27,514 INFO  [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1369)) - ***Writing 
response partition: org.apache.hadoop.mapred.FadvisedFileRegion@1040936, 
channel: c029638c
2021-06-22 18:18:27,515 ERROR [ShuffleHandler Netty Worker #0] 
mapred.ShuffleHandler (ShuffleHandler.java:operationComplete(319)) - Future is 
unsuccessful. Cause:
{code}
This is the original loop without the logs:
{code:java}
boolean keepAlive = keepAliveParam || connectionKeepAliveEnabled;
      ReduceContext reduceContext = new ReduceContext(mapIds, reduceId, ctx,
          user, mapOutputInfoMap, jobId, keepAlive);
      for (int i = 0; i < Math.min(maxSessionOpenFiles, mapIds.size()); i++) {
        ChannelFuture nextMap = sendMap(reduceContext);
        if(nextMap == null) {
          return;
        }
      }
//HADOOP-15327: Need to send an instance of LastHttpContent to define HTTP
//message boundaries. See details in jira.
ch.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
{code}
So somehow the loops end, the LastHttpContent.EMPTY_LAST_CONTENT is written but 
the ShuffleHandler.ReduceMapFileCount#operationComplete method tries to write 
an FadvisedFileRegion to the channel afterwards.
 The thing is, operationComplete invokes
{code:java}
pipelineFact.getSHUFFLE().sendMap(reduceContext);
{code}
if there are remaining map outputs to send. Then, 
ShuffleHandler.Shuffle#sendMap calls ShuffleHandler.Shuffle#sendMapOutput which 
writes the partition to the channel:
{code:java}
ChannelFuture writeFuture;
if (ch.pipeline().get(SslHandler.class) == null) {
  final FadvisedFileRegion partition = new FadvisedFileRegion(spill,
      info.startOffset, info.partLength, manageOsCache, readaheadLength,
      readaheadPool, spillfile.getAbsolutePath(), 
      shuffleBufferSize, shuffleTransferToAllowed);
  writeFuture = writeToChannel(ch, partition);
{code}
The point is that the LastHttpContent.EMPTY_LAST_CONTENT can only be written 
when we are in ShuffleHandler.ReduceMapFileCount#operationComplete and there 
are no remaining maps to send in the response. This is the only time we can 
finish the HTTP response.
 This is the main thing fixed in this commit.

There are a couple of other things included:
 - Introduced NettyChannelHelper in ShuffleHandler
 - Added Debug / Trace logs to ShuffleHandler
 - Add a flag to control if LoggingHttpResponseEncoder is added to the pipeline 
(for debugging purposes)

*12. ShuffleHandlerTest fixes + enhancements: 
[https://github.com/szilard-nemeth/hadoop/commit/a91a04772390f377d478bb6ada4d435feb344500]*
 Finally, this is the last commit.
 As the 'testReduceFromPartialMem' testcase pointed to an issue with the 
production code and I haven't found any testcase that would use Keepalive + 
provide multiple mapIds (mapOutput) to the GET request in ShuffleHandlerTest I 
thought it's important to add a testcase.
 This was a good decision in general, but I was faced with some complications 
along the way.

So, the new testcase is called: 
TestShuffleHandler#testKeepAliveMultipleMapAttemptIds.
 What is the difference between this and the other keep alive tests?
 It adds multiple map attempt ids to the GET request's URL, which will 
translate to mapId / mapOutput in ShuffleHandler.
 Also, it simulates the production code's behavior based on method 
'ShuffleHandler.Shuffle#sendMapOutput' that writes file chunks to the channel.
 This is how I'm doing it from the test:
{code:java}
shuffleHandler.mapOutputSender.additionalMapOutputSenderOperations = new 
AdditionalMapOutputSenderOperations() {
  @Override
  public ChannelFuture perform(ChannelHandlerContext ctx, Channel ch) throws 
IOException {
    File tmpFile = File.createTempFile("test", ".tmp");
    Files.write(tmpFile.toPath(), 
"dummytestcontent123456".getBytes(StandardCharsets.UTF_8));
    final DefaultFileRegion partition = new DefaultFileRegion(tmpFile, 0, 
mapOutputContentLength);
    LOG.debug("Writing response partition: {}, channel: {}",
        partition, ch.id());
    return ch.writeAndFlush(partition)
        .addListener((ChannelFutureListener) future ->
            LOG.debug("Finished Writing response partition: {}, channel: " +
                "{}", partition, ch.id()));
  }
};
{code}
Just to reiterate: The main problem with most of the testcases in 
TestShuffleHandler is that the class 
org.apache.hadoop.mapred.ShuffleHandler.Shuffle is an inner-class and to create 
a stubbed version, we need to have the enclosing class (ShuffleHandler) in 
context, so we can't define a stubbed version of Shuffle easily.
 The second issue is that most of the tests creating their own subclass of 
ShuffleHandler / Shuffle, that tries to simulate the real code.
 Instead, better mocking of dependent objects of these classes would be the way 
to go.
 Anyway, this would complicate things and increase the refactor work more and 
more, so I dropped this idea.
 Following the idea of subclassing ShuffleHandler / Shuffle, I reused the 
previously created class: ShuffleHandlerForKeepAliveTests.
 The only remaining piece of the puzzle was to tweak 
TestShuffleHandler.MapOutputSender#send to make it capable of sending something 
more to the channel, in this case chunks of a file.

Interestingly, the test failed. I will attach full testcase logs, but here I'm 
only adding the exception that the testcase failed with:
{code:java}
java.io.IOException: Invalid Http response

  at 
sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1612)
  at 
sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1498)
  at 
org.apache.hadoop.mapred.TestShuffleHandler$HttpConnectionHelper.connectToUrls(TestShuffleHandler.java:553)
  at 
org.apache.hadoop.mapred.TestShuffleHandler.testKeepAliveInternal(TestShuffleHandler.java:1047)
  at 
org.apache.hadoop.mapred.TestShuffleHandler.testKeepAliveMultipleMapAttemptIds(TestShuffleHandler.java:997)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
  at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
  at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
  at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
  at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
  at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.lang.Thread.run(Thread.java:748)
{code}
I was thinking: What the hell is going on? Is this a production code issue or a 
test code issue?
 The best thing to describe what was happening afterwards is to go down the 
rabbithole to really understand the issue I was facing and find out the root 
cause.
 All in all, the test executes two GET requests with the same URL and using 
keep alive.
 With some more added logs, I was able to point at that writing the file chunks 
happened later than the first request read the whole inpustream.
 Later, I also realized that the size of the bytes read from the InputStream is 
smaller than what I've expected. 
 This led me to check the Content-length of the HTTP response. Seemed okay. 
Tried to debug all pieces of the code, including the HttpClient in order to 
understand why the heck is the InputStream is closed. Checked the values more 
and more and realized that the headers are written to the channel N+1 times, so 
the Content-Length was less with 1 * header size. This is the code that writes 
these data to the channel in the tests:
{code:java}
public ChannelFuture send(ChannelHandlerContext ctx, Channel ch) throws 
IOException {
  LOG.debug("In MapOutputSender#send");
  lastSocketAddress.setAddress(ch.remoteAddress());
  ShuffleHeader header = shuffleHeaderProvider.createNewShuffleHeader();
  writeOneHeader(ch, header);
  ChannelFuture future = writeHeaderNTimes(ch, header, 
responseConfig.headerWriteCount);
  // This is the last operation
  // It's safe to increment ShuffleHeader counter for better identification
  shuffleHeaderProvider.incrementCounter();
  if (additionalMapOutputSenderOperations != null) {
    return additionalMapOutputSenderOperations.perform(ctx, ch);
  }
  return future;
}
{code}
Also, it's required to add the size of the file chunk(s) to the overall 
Content-length of the response.
 So the fix was simple: Compute the correct Content-length, then after this, 
both the responses from Netty were processed without an issue.
 This was one hell of a ride to find out.

There are some other changes in the test added with this commit:
 - Increase URLConnection read timeout / connect timeout when using Debug mode
 - Introduce class: ResponseConfig, that stores header + payload data sizes + 
final HTTP response content-length
 - Introduce abstract class: AdditionalMapOutputSenderOperations, that can 
perform additional operations when sendMap is invoked
 - ShuffleHandlerForKeepAliveTests: Enhanced failure control / close channel 
control
 - ShuffleHeaderProvider: Don't compute header on every invocation, cache the 
size of it [Only an optimization]
 - Fix TestShuffleHandler.HeaderPopulator#populateHeaders: Return full 
content-length of response, not just the length of the header
 - Fix in HttpConnectionHelper#connectToUrlsInternal: Add one headerSize to 
totalBytesRead.
 - Enhancement in HttpConnectionHelper#connectToUrlsInternal: Fail-fast if 
expected content-length < actual content-length.
 - Added new keepalive tests, including: testKeepAliveMultipleMapAttemptIds
 - Added new keepalive test with HTTP 400 bad request

I tried to make those changes only that are really required, but passing 
several number values for computing the content-length was ugly as hell so I 
introduced the ResponseConfig class.
----
h2. CONCLUSION

There were 2 test failure investigations that required to fix the production 
code and also took a lot of time to fix.
 At least I learned a ton of new stuff along the way :) 
 These investigations are detailed with *points 7., 11. and 12.*
 Please also see the attached investigation zip files if you are interested in 
interim patches / logs produced by the tests and other materials:
 [^testfailure-testMapFileAccess-emptyresponse.zip]

[^testfailure-testReduceFromPartialMem.zip]
h2. REMAINING TODOs
 * Testing ShuffleHandler on a cluster
 * Fix Maven shading issues

> Upgrade MR ShuffleHandler to use Netty4
> ---------------------------------------
>
>                 Key: HADOOP-15327
>                 URL: https://issues.apache.org/jira/browse/HADOOP-15327
>             Project: Hadoop Common
>          Issue Type: Sub-task
>            Reporter: Xiaoyu Yao
>            Assignee: Szilard Nemeth
>            Priority: Major
>         Attachments: HADOOP-15327.001.patch, HADOOP-15327.002.patch, 
> HADOOP-15327.003.patch, HADOOP-15327.004.patch, HADOOP-15327.005.patch, 
> getMapOutputInfo_BlockingOperationException_awaitUninterruptibly.log, 
> testfailure-testMapFileAccess-emptyresponse.zip, 
> testfailure-testReduceFromPartialMem.zip
>
>
> This way, we can remove the dependencies on the netty3 (jboss.netty)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to