[ https://issues.apache.org/jira/browse/HADOOP-15327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17371468#comment-17371468 ]
Szilard Nemeth edited comment on HADOOP-15327 at 6/29/21, 3:14 PM: ------------------------------------------------------------------- Just uploaded a new patch: [^HADOOP-15327.005.patch] I have been (almost) exclusively working on this since [my last comment|https://issues.apache.org/jira/browse/HADOOP-15327?focusedCommentId=17362367&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17362367] and there are a couple of things to add again. The last commit that was discussed is this: [https://github.com/szilard-nemeth/hadoop/commit/f149be8de28baafc64eed1c47e788f5beb215e62] Let me explain what've changed commit by commit. I will skip a bunch of trivial ones like code cleanup, added comments and the like. *I will cover the test failures surfaced by Jenkins build / unit test results:* - [Build #1|https://issues.apache.org/jira/browse/HADOOP-15327?focusedCommentId=17362456&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17362456] - [Build #2|https://issues.apache.org/jira/browse/HADOOP-15327?focusedCommentId=17363928&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17363928] *1. TestShuffleHandler: Introduced InputStreamReadResult that stores response as string + total bytes read: [https://github.com/szilard-nemeth/hadoop/commit/a57de573c97fe12c9071dd3450df8f450bf075ea]* Here, I added a new class called 'InputStreamReadResult' that stores the bytes read (byte[]) and the number of bytes read from a response InputStream. This improves the way testcases can assert on these data. *2. TestShuffleHandler: Use DEFAULT_PORT for all shuffle handler port configs: [https://github.com/szilard-nemeth/hadoop/commit/78b1166866c85cab6860407f8fe4a4ddc3168fae]* It was a common pitfall while debugging that the tests had to modified to use a certain fixed port. Here, I added a constant to store the port number so when I had to debug I only needed to change it in one single place. *3. Create class: TestExecution: Configure proxy, keep alive connection timeout: [https://github.com/szilard-nemeth/hadoop/commit/fa5bb32ae4eb737077a165b3b1fba5069c982243]* In order to debug the HTTP responses, I found it convenient to add a helper class that is responsible for the following: - Configuring the HTTP connections, use a proxy when required - Increase the keepalive timeout when using DEBUG mode TEST_EXECUTION is a static instance of TestExecution, initialized with a JUnit test setup method. There are 2 flags that control the behaviour of this object: {code:java} //Control test execution properties with these flags private static final boolean DEBUG_MODE = true; //If this is set to true and proxy server is not running, tests will fail! private static final boolean USE_PROXY = false; {code} The only difference on top of these is in the code of testcases: They create all HTTP connections with: {code:java} TEST_EXECUTION.openConnection(url) {code} *4. TestExecution: Configure port: [https://github.com/szilard-nemeth/hadoop/commit/4a5c035695be1099bff4a633cd605b9f8146d841]* One addition to 3. is to include the port used by ShuffleHandler in the TestExecution object. When using DEBUG mode, the port is fixed to a value, otherwise it is set to 0, meaning that the port will be dynamically chosen. *5. Add logging response encoder to TestShuffleHandler.testMapFileAccess: [https://github.com/szilard-nemeth/hadoop/commit/64686b47d2fed4e923c1c9c0169a06aba3e339be]* While debugging TestShuffleHandler#testMapFileAccess, just realized that I forgot to add the LoggingHttpResponseEncoder to the pipeline. The most trivial way was to modify the pipeline when the channel is activated. *6. TestShuffleHandler.testMapFileAccess: Modify to be able to run it locally + reproduce jenkins UT failure: [https://github.com/szilard-nemeth/hadoop/commit/bb0fcbbd7dcbe3fa7efd1b6a8c2eb8a9055c5ecd]* Here's where the fun begins. The problem with TestShuffleHandler#testMapFileAccess is that it requires the NativeIO module: {code:java} // This will run only in NativeIO is enabled as SecureIOUtils need it assumeTrue(NativeIO.isAvailable()); {code} I tried to compile the Hadoop Native libraries on my Mac according to these resources: - Native libraries: [https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/NativeLibraries.html] - Followed this guide: [https://dev.to/zejnilovic/building-hadoop-native-libraries-on-mac-in-2019-1iee] Unfortunately, I still had compilation errors so I eventually gave up and tweaked the test to be able to run it locally. This wasn't such a complex thing, I don't think it's worth to go into the details, had to comment out some test code that used the Native library and that was all. From the Jenkins results I had this: {code:java} [INFO] --- maven-surefire-plugin:3.0.0-M1:test (default-test) @ hadoop-mapreduce-client-shuffle --- [INFO] [INFO] ------------------------------------------------------- [INFO] T E S T S [INFO] ------------------------------------------------------- [INFO] Running org.apache.hadoop.mapred.TestFadvisedFileRegion [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.493 s - in org.apache.hadoop.mapred.TestFadvisedFileRegion [INFO] Running org.apache.hadoop.mapred.TestShuffleHandler [ERROR] Tests run: 15, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 12.033 s <<< FAILURE! - in org.apache.hadoop.mapred.TestShuffleHandler [ERROR] testMapFileAccess(org.apache.hadoop.mapred.TestShuffleHandler) Time elapsed: 0.339 s <<< FAILURE! java.lang.AssertionError: Received string '.......<MANY MANY NULL TERMINATING CHARACTERS>........' should contain message 'Owner 'jenkins' for path /home/jenkins/jenkins-home/workspace/PreCommit-HADOOP-Build/sourcedir/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/target/test-dir/TestShuffleHandlerLocDir/usercache/randomUser/appcache/application_12345_0001/output/attempt_12345_1_m_1_0/file.out.index did not match expected owner 'randomUser'' at org.junit.Assert.fail(Assert.java:89) at org.junit.Assert.assertTrue(Assert.java:42) at org.apache.hadoop.mapred.TestShuffleHandler.testMapFileAccess(TestShuffleHandler.java:1228) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.lang.Thread.run(Thread.java:748) {code} Please keep on reading the next bullet point where I detail how I found out what was happening. *7. TestShuffleHandler.testMapFileAccess: Fix in production code: [https://github.com/szilard-nemeth/hadoop/commit/6b9b9c39e181cf7836de33a2cda0712a81b85d01]* Continuing on the previous point, I had to find out what's going on and why the response was full of null-terminating characters. It's important to refer back to my [previous comment|https://issues.apache.org/jira/browse/HADOOP-15327?focusedCommentId=17362367&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17362367]: By checking 7. from that comment, I already gave a detailed analyis about error handling and what's wrong with writing a HTTP 200 OK, then a HTTP 500 (or any other bad code) to the channel. By the time I was putting that comment together I didn't realize that the 'testMapfileAccess' testcase wouldn't pass as it wasn't running on my machine locally so I proposed a follow-up jira. It turned out I need to fix it now as the test fails so I will retract my statement. The fix is utilizing the usual "trick": Write the HTTP 200 response, write LastHttpContent.EMPTY_LAST_CONTENT then write the HTTP 500 response to the channel. This way, we can avoid the IllegalStateException thrown by the outbound message handler. {code:java} try { populateHeaders(mapIds, jobId, user, reduceId, request, response, keepAliveParam, mapOutputInfoMap); } catch(IOException e) { //HADOOP-15327 // Need to send an instance of LastHttpContent to define HTTP // message boundaries. //Sending a HTTP 200 OK + HTTP 500 later (sendError) // is quite a non-standard way of crafting HTTP responses, // but we need to keep backward compatibility. // See more details in jira. ch.writeAndFlush(response); ch.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); LOG.error("Shuffle error in populating headers :", e); String errorMessage = getErrorMessage(e); sendError(ctx,errorMessage , INTERNAL_SERVER_ERROR); return; } {code} I also went ahead and checked if the tests are too autonomous and does this tricky HTTP error handling or the production code (Fetcher.java) has the same logic: It turned out Fetcher also works quite the same, so at least the tests and the production code are close together. I still don't think this is good how the "error responses" are handled as HTTP 200 responses, since the HttpURLConnection has a [getErrorStream|https://docs.oracle.com/javase/8/docs/api/java/net/HttpURLConnection.html#getErrorStream--] method that could be used for proper error handling. This is also something that points to this direction: [https://stackoverflow.com/a/11721878/1106893] Nevertheless, it would be too risky to change this behaviour plus it would also break backward-compatibility. It would be a dumb idea to modify this especially with the Netty upgrade as it would have been involved a heavy behavioral change. So it's sad as it looks: The tests are not buggy, they are okay and their expectation of the HTTP 200 and the HTTP 500 error code packed into that response is the correct thing to check to match the behaviour of the production code of the ShuffleHandler client (Fetcher.java). Let me also share what I've used to come to this conclusion: *7.1 Modified the Apache trunk code to check the response codes in the tests including particular testcase as well. It turned out that HTTP 200 is the response code.* *7.2 Also did a diff between TCP dump outputs generated by the old vs. new code, they were the same in terms of HTTP response codes and structure, after my fix was applied.* *7.3 Wanted to to a final verification step and double-check HTTP responses to see if they are really all HTTP 200 and the HTTP is wrapped as a second status line or in the response body itself and I found a very useful tool to capture HTTP traffic, it's called 'mitmproxy': [https://mitmproxy.org/]* Some useful links related to this tool: - [https://yoshihisaonoue.wordpress.com/2017/12/23/basic-usage-of-mitmdump/comment-page-1/] - [https://blog.packagecloud.io/eng/2016/11/14/debugging-ssl-in-java-using-mitmproxy/] - [https://dzone.com/articles/how-to-record-httphttps-traffic-with-mitmproxy] - [https://github.com/mitmproxy/mitmproxy/issues/2832] For this, I had to reconfigure the HTTPUrlConnections to use a proxy. In theory, there are some standard HTTP / HTTPS JVM switches to control the global proxy but somehow I was not able to make it work. Initially, I found some basic resouces about configuring Java to use proxy for all connections, globally: - [https://stackoverflow.com/questions/120797/how-do-i-set-the-proxy-to-be-used-by-the-jvm] - [https://docs.oracle.com/javase/8/docs/technotes/guides/net/proxies.html] - [https://www.baeldung.com/java-connect-via-proxy-server] Based on these, I created a new test run config and added these JVM options: {code:java} -Djavax.net.debug=all -Dhttp.proxyHost=localhost -Dhttp.proxyPort=8888 -Dhttps.proxyHost=localhost -Dhttps.proxyPort=8888 {code} These global settings (http.proxyHost / http.proxyPort) did not work at all. Also tried using 127.0.0.1 instead of 'localhost'. This didn't work either so I suspected it's because something is bad with the JVM options so I added some assert statements: {code:java} //PROXY CONFIG Assert.assertEquals("localhost", System.getProperty("http.proxyHost")); Assert.assertEquals("8888", System.getProperty("http.proxyPort")); Assert.assertEquals("localhost", System.getProperty("https.proxyHost")); Assert.assertEquals("8888", System.getProperty("https.proxyPort")); Assert.assertEquals(null, System.getProperty("http.nonProxyHosts")); {code} Also tried to set it to some dummy port value, expecting a connection issue as the proxy server is not listening on these ports. Result was still nothing: {code:java} System.setProperty("http.proxyPort", "1234"); System.setProperty("http.proxyHost", "127.0.0.1 "); System.setProperty("http.proxyHost", "localhost"); System.setProperty("http.proxyPort", "8888"); {code} So I gave up using the global config and found another way: [https://www.baeldung.com/java-connect-via-proxy-server#1-using-an-http-proxy] This finally worked and I could see the traffic going through the mitmproxy. Code: {code:java} // HttpURLConnection conn = (HttpURLConnection) url.openConnection(); Proxy webProxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress("127.0.0.1", 8888)); HttpURLConnection conn = (HttpURLConnection) url.openConnection(webProxy); {code} FINAL conclusion for this: It's valid by the RFC that the server sends a HTTP response with multiple status lines. However, the first status line is the one that really matters. Related SO answer: [https://stackoverflow.com/a/47541155/1106893] For me it's still weird, but this is what it is. *8. TestShuffleHandler: Stop shufflehandler in all tests, fix debug mode issues: [https://github.com/szilard-nemeth/hadoop/commit/d472cb33cf038dee6f241382311f2bdf8314e0b9]* This is an important one for test stability and independence from each other. I added some teardown code that checks if the ShuffleHandler's port is still open. In practice, it should not be open when any of the testcases finished. This made me realize that many of the testcase wouldn't stop the ShuffleHandler so I fixed this. *9. testReduceFromPartialMem: Add Shuffle IO error assertion to test: [https://github.com/szilard-nemeth/hadoop/commit/7eba7a4c9a08bd0de486c250a6bc711c7665f08e]* testReduceFromPartialMem failed with [this Jenkins build|https://issues.apache.org/jira/browse/HADOOP-15327?focusedCommentId=17363928&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17363928] Unfortunately, the output doesn't say too much about the root cause of the test failure: {code:java} [INFO] Running org.apache.hadoop.mapred.TestReduceFetchFromPartialMem [ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 89.147 s <<< FAILURE! - in org.apache.hadoop.mapred.TestReduceFetchFromPartialMem [ERROR] testReduceFromPartialMem(org.apache.hadoop.mapred.TestReduceFetchFromPartialMem) Time elapsed: 89.024 s <<< ERROR! java.io.IOException: Job failed! at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:876) at org.apache.hadoop.mapred.TestReduceFetchFromPartialMem.runJob(TestReduceFetchFromPartialMem.java:292) at org.apache.hadoop.mapred.TestReduceFetchFromPartialMem.testReduceFromPartialMem(TestReduceFetchFromPartialMem.java:85) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365) at org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273) at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238) at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159) at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384) at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345) at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126) at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418) {code} So, I added some code that prints the Shuffle IO errors at least: {code:java} long shuffleIoErrors = c.getGroup(SHUFFLE_ERR_GRP_NAME).getCounter(Fetcher.ShuffleErrors.IO_ERROR.toString()); assertEquals(0, shuffleIoErrors); {code} It turned out that this IO_ERROR from ShuffleErrors is sometimes 1 so I continued my investigation. It took a lot of time but in the end I think was worth it. Please see the next points (11 / 12) where I will add detailed information about what was going on. *10. LoggingHttpResponseEncoder: Add some new logs: [https://github.com/szilard-nemeth/hadoop/commit/e4260d1047805123d17b7d8dc94d776c818726a3]* In order to see all outbound message types I needed to add some new logs into LoggingHttpResponseEncoder. *11. Fixed error handling + LastHttpContent: [https://github.com/szilard-nemeth/hadoop/commit/61cc084c6e05a778360b0f475a783b0c30f7f0cf]* When testReduceFromPartialMem failed once, I was able to check on my local machine what is really going on: {code:java} 2021-06-22 18:18:27,452 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:channelRead(988)) - ***channelRead 2021-06-22 18:18:27,455 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:channelRead(990)) - ***REQUEST: HttpObjectAggregator$AggregatedFullHttpRequest(decodeResult: success, version: HTTP/1.1, content: CompositeByteBuf(ridx: 0, widx: 0, cap: 0, components=0)) GET /mapOutput?job=job_1624378685077_0001&reduce=0&map=attempt_1624378685077_0001_m_000000_0,attempt_1624378685077_0001_m_000002_0,attempt_1624378685077_0001_m_000004_0,attempt_1624378685077_0001_m_000005_0 HTTP/1.1 UrlHash: C/94EnScoT3sUBGXf4ul5bdpm1Q= name: mapreduce version: 1.0.0 User-Agent: Java/1.8.0_232b09 Host: localhost:62272 Accept: text/html, image/gif, image/jpeg, *; q=.2, */*; q=.2 Connection: keep-alive content-length: 0 2021-06-22 18:18:27,489 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1114)) - ***LOOP1 2021-06-22 18:18:27,498 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1352)) - ***Writing response buffer: 47, channel: c029638c 2021-06-22 18:18:27,507 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1369)) - ***Writing response partition: org.apache.hadoop.mapred.FadvisedFileRegion@5510d748, channel: c029638c 2021-06-22 18:18:27,509 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1114)) - ***LOOP2 2021-06-22 18:18:27,509 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1352)) - ***Writing response buffer: 47, channel: c029638c 2021-06-22 18:18:27,510 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1369)) - ***Writing response partition: org.apache.hadoop.mapred.FadvisedFileRegion@45e2b799, channel: c029638c 2021-06-22 18:18:27,510 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1114)) - ***LOOP3 2021-06-22 18:18:27,510 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1352)) - ***Writing response buffer: 47, channel: c029638c 2021-06-22 18:18:27,510 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1369)) - ***Writing response partition: org.apache.hadoop.mapred.FadvisedFileRegion@169b92fa, channel: c029638c 2021-06-22 18:18:27,510 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1120)) - ***LOOP ENDED 2021-06-22 18:18:27,511 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1123)) - ***Writing LastHttpContent, channel: c029638c 2021-06-22 18:18:27,513 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:operationComplete(338)) - ***OPERATION COMPLETE 2021-06-22 18:18:27,513 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1352)) - ***Writing response buffer: 47, channel: c029638c 2021-06-22 18:18:27,514 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1369)) - ***Writing response partition: org.apache.hadoop.mapred.FadvisedFileRegion@1040936, channel: c029638c 2021-06-22 18:18:27,515 ERROR [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:operationComplete(319)) - Future is unsuccessful. Cause: io.netty.handler.codec.EncoderException: java.lang.IllegalStateException: unexpected message type: FadvisedFileRegion, state: 0 at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:104) at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717) at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702) at io.netty.handler.stream.ChunkedWriteHandler.doFlush(ChunkedWriteHandler.java:302) at io.netty.handler.stream.ChunkedWriteHandler.flush(ChunkedWriteHandler.java:131) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728) at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750) at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790) at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758) at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:808) at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1025) at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:306) at org.apache.hadoop.mapred.ShuffleHandler$Shuffle.sendMapOutput(ShuffleHandler.java:1370) at org.apache.hadoop.mapred.ShuffleHandler$Shuffle.sendMap(ShuffleHandler.java:1152) at org.apache.hadoop.mapred.ShuffleHandler$ReduceMapFileCount.operationComplete(ShuffleHandler.java:339) at org.apache.hadoop.mapred.ShuffleHandler$ReduceMapFileCount.operationComplete(ShuffleHandler.java:307) at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578) at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571) at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550) at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491) at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616) at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:605) at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104) at io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48) at io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:717) at io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:272) at io.netty.channel.nio.AbstractNioByteChannel.doWriteInternal(AbstractNioByteChannel.java:242) at io.netty.channel.nio.AbstractNioByteChannel.doWrite0(AbstractNioByteChannel.java:212) at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:400) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:953) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.forceFlush(AbstractNioChannel.java:361) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:713) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalStateException: unexpected message type: FadvisedFileRegion, state: 0 at io.netty.handler.codec.http.HttpObjectEncoder.encode(HttpObjectEncoder.java:124) at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:89) ... 43 more 2021-06-22 18:18:27,517 ERROR [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:operationComplete(319)) - Future is unsuccessful. Cause: io.netty.channel.StacklessClosedChannelException at io.netty.channel.AbstractChannel.close(ChannelPromise)(Unknown Source) 2021-06-22 18:18:27,517 ERROR [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:operationComplete(319)) - Future is unsuccessful. Cause: io.netty.channel.StacklessClosedChannelException at io.netty.channel.AbstractChannel.close(ChannelPromise)(Unknown Source) {code} The famous IllegalStateException once again but this time with a different message type: FadvisedFileRegion: {code:java} io.netty.handler.codec.EncoderException: java.lang.IllegalStateException: unexpected message type: FadvisedFileRegion, state: 0 {code} In this case, the LastHttpContent was written too early to the channel and then the ShuffleHandler written an FadvisedFileRegion to the channel. With my added logs, it was evident what is happening: {code:java} 2021-06-22 18:18:27,489 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1114)) - ***LOOP1 2021-06-22 18:18:27,498 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1352)) - ***Writing response buffer: 47, channel: c029638c 2021-06-22 18:18:27,507 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1369)) - ***Writing response partition: org.apache.hadoop.mapred.FadvisedFileRegion@5510d748, channel: c029638c 2021-06-22 18:18:27,509 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1114)) - ***LOOP2 2021-06-22 18:18:27,509 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1352)) - ***Writing response buffer: 47, channel: c029638c 2021-06-22 18:18:27,510 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1369)) - ***Writing response partition: org.apache.hadoop.mapred.FadvisedFileRegion@45e2b799, channel: c029638c 2021-06-22 18:18:27,510 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1114)) - ***LOOP3 2021-06-22 18:18:27,510 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1352)) - ***Writing response buffer: 47, channel: c029638c 2021-06-22 18:18:27,510 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1369)) - ***Writing response partition: org.apache.hadoop.mapred.FadvisedFileRegion@169b92fa, channel: c029638c 2021-06-22 18:18:27,510 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1120)) - ***LOOP ENDED 2021-06-22 18:18:27,511 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1123)) - ***Writing LastHttpContent, channel: c029638c 2021-06-22 18:18:27,513 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:operationComplete(338)) - ***OPERATION COMPLETE 2021-06-22 18:18:27,513 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1352)) - ***Writing response buffer: 47, channel: c029638c 2021-06-22 18:18:27,514 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1369)) - ***Writing response partition: org.apache.hadoop.mapred.FadvisedFileRegion@1040936, channel: c029638c 2021-06-22 18:18:27,515 ERROR [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:operationComplete(319)) - Future is unsuccessful. Cause: {code} This is the original loop without the logs: {code:java} boolean keepAlive = keepAliveParam || connectionKeepAliveEnabled; ReduceContext reduceContext = new ReduceContext(mapIds, reduceId, ctx, user, mapOutputInfoMap, jobId, keepAlive); for (int i = 0; i < Math.min(maxSessionOpenFiles, mapIds.size()); i++) { ChannelFuture nextMap = sendMap(reduceContext); if(nextMap == null) { return; } } //HADOOP-15327: Need to send an instance of LastHttpContent to define HTTP //message boundaries. See details in jira. ch.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); {code} So somehow the loops end, the LastHttpContent.EMPTY_LAST_CONTENT is written but the ShuffleHandler.ReduceMapFileCount#operationComplete method tries to write an FadvisedFileRegion to the channel afterwards. The thing is, operationComplete invokes {code:java} pipelineFact.getSHUFFLE().sendMap(reduceContext); {code} if there are remaining map outputs to send. Then, ShuffleHandler.Shuffle#sendMap calls ShuffleHandler.Shuffle#sendMapOutput which writes the partition to the channel: {code:java} ChannelFuture writeFuture; if (ch.pipeline().get(SslHandler.class) == null) { final FadvisedFileRegion partition = new FadvisedFileRegion(spill, info.startOffset, info.partLength, manageOsCache, readaheadLength, readaheadPool, spillfile.getAbsolutePath(), shuffleBufferSize, shuffleTransferToAllowed); writeFuture = writeToChannel(ch, partition); {code} The point is that the LastHttpContent.EMPTY_LAST_CONTENT can only be written when we are in ShuffleHandler.ReduceMapFileCount#operationComplete and there are no remaining maps to send in the response. This is the only time we can finish the HTTP response. This is the main thing fixed in this commit. There are a couple of other things included: - Introduced NettyChannelHelper in ShuffleHandler - Added Debug / Trace logs to ShuffleHandler - Add a flag to control if LoggingHttpResponseEncoder is added to the pipeline (for debugging purposes) *12. ShuffleHandlerTest fixes + enhancements: [https://github.com/szilard-nemeth/hadoop/commit/a91a04772390f377d478bb6ada4d435feb344500]* Finally, this is the last commit. As the 'testReduceFromPartialMem' testcase pointed to an issue with the production code and I haven't found any testcase that would use Keepalive + provide multiple mapIds (mapOutput) to the GET request in ShuffleHandlerTest I thought it's important to add a testcase. This was a good decision in general, but I was faced with some complications along the way. So, the new testcase is called: TestShuffleHandler#testKeepAliveMultipleMapAttemptIds. What is the difference between this and the other keep alive tests? It adds multiple map attempt ids to the GET request's URL, which will translate to mapId / mapOutput in ShuffleHandler. Also, it simulates the production code's behavior based on method 'ShuffleHandler.Shuffle#sendMapOutput' that writes file chunks to the channel. This is how I'm doing it from the test: {code:java} shuffleHandler.mapOutputSender.additionalMapOutputSenderOperations = new AdditionalMapOutputSenderOperations() { @Override public ChannelFuture perform(ChannelHandlerContext ctx, Channel ch) throws IOException { File tmpFile = File.createTempFile("test", ".tmp"); Files.write(tmpFile.toPath(), "dummytestcontent123456".getBytes(StandardCharsets.UTF_8)); final DefaultFileRegion partition = new DefaultFileRegion(tmpFile, 0, mapOutputContentLength); LOG.debug("Writing response partition: {}, channel: {}", partition, ch.id()); return ch.writeAndFlush(partition) .addListener((ChannelFutureListener) future -> LOG.debug("Finished Writing response partition: {}, channel: " + "{}", partition, ch.id())); } }; {code} Just to reiterate: The main problem with most of the testcases in TestShuffleHandler is that the class org.apache.hadoop.mapred.ShuffleHandler.Shuffle is an inner-class and to create a stubbed version, we need to have the enclosing class (ShuffleHandler) in context, so we can't define a stubbed version of Shuffle easily. The second issue is that most of the tests creating their own subclass of ShuffleHandler / Shuffle, that tries to simulate the real code. Instead, better mocking of dependent objects of these classes would be the way to go. Anyway, this would complicate things and increase the refactor work more and more, so I dropped this idea. Following the idea of subclassing ShuffleHandler / Shuffle, I reused the previously created class: ShuffleHandlerForKeepAliveTests. The only remaining piece of the puzzle was to tweak TestShuffleHandler.MapOutputSender#send to make it capable of sending something more to the channel, in this case chunks of a file. Interestingly, the test failed. I will attach full testcase logs, but here I'm only adding the exception that the testcase failed with: {code:java} java.io.IOException: Invalid Http response at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1612) at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1498) at org.apache.hadoop.mapred.TestShuffleHandler$HttpConnectionHelper.connectToUrls(TestShuffleHandler.java:553) at org.apache.hadoop.mapred.TestShuffleHandler.testKeepAliveInternal(TestShuffleHandler.java:1047) at org.apache.hadoop.mapred.TestShuffleHandler.testKeepAliveMultipleMapAttemptIds(TestShuffleHandler.java:997) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.lang.Thread.run(Thread.java:748) {code} I was thinking: What the hell is going on? Is this a production code issue or a test code issue? The best thing to describe what was happening afterwards is to go down the rabbithole to really understand the issue I was facing and find out the root cause. All in all, the test executes two GET requests with the same URL and using keep alive. With some more added logs, I was able to point at that writing the file chunks happened later than the first request read the whole inpustream. Later, I also realized that the size of the bytes read from the InputStream is smaller than what I've expected. This led me to check the Content-length of the HTTP response. Seemed okay. Tried to debug all pieces of the code, including the HttpClient in order to understand why the heck is the InputStream is closed. Checked the values more and more and realized that the headers are written to the channel N+1 times, so the Content-Length was less with 1 * header size. This is the code that writes these data to the channel in the tests: {code:java} public ChannelFuture send(ChannelHandlerContext ctx, Channel ch) throws IOException { LOG.debug("In MapOutputSender#send"); lastSocketAddress.setAddress(ch.remoteAddress()); ShuffleHeader header = shuffleHeaderProvider.createNewShuffleHeader(); writeOneHeader(ch, header); ChannelFuture future = writeHeaderNTimes(ch, header, responseConfig.headerWriteCount); // This is the last operation // It's safe to increment ShuffleHeader counter for better identification shuffleHeaderProvider.incrementCounter(); if (additionalMapOutputSenderOperations != null) { return additionalMapOutputSenderOperations.perform(ctx, ch); } return future; } {code} Also, it's required to add the size of the file chunk(s) to the overall Content-length of the response. So the fix was simple: Compute the correct Content-length, then after this, both the responses from Netty were processed without an issue. This was one hell of a ride to find out. There are some other changes in the test added with this commit: - Increase URLConnection read timeout / connect timeout when using Debug mode - Introduce class: ResponseConfig, that stores header + payload data sizes + final HTTP response content-length - Introduce abstract class: AdditionalMapOutputSenderOperations, that can perform additional operations when sendMap is invoked - ShuffleHandlerForKeepAliveTests: Enhanced failure control / close channel control - ShuffleHeaderProvider: Don't compute header on every invocation, cache the size of it [Only an optimization] - Fix TestShuffleHandler.HeaderPopulator#populateHeaders: Return full content-length of response, not just the length of the header - Fix in HttpConnectionHelper#connectToUrlsInternal: Add one headerSize to totalBytesRead. - Enhancement in HttpConnectionHelper#connectToUrlsInternal: Fail-fast if expected content-length < actual content-length. - Added new keepalive tests, including: testKeepAliveMultipleMapAttemptIds - Added new keepalive test with HTTP 400 bad request I tried to make those changes only that are really required, but passing several number values for computing the content-length was ugly as hell so I introduced the ResponseConfig class. ---- h2. CONCLUSION There were 2 test failure investigations that required to fix the production code and also took a lot of time to fix. At least I learned a ton of new stuff along the way :) These investigations are detailed with *points 7., 11. and 12.* Please also see the attached investigation zip files if you are interested in interim patches / logs produced by the tests and other materials: [^testfailure-testMapFileAccess-emptyresponse.zip] [^testfailure-testReduceFromPartialMem.zip] h2. REMAINING TODOs * Testing ShuffleHandler on a cluster * Fix Maven shading issues was (Author: snemeth): Just uploaded a new patch: [^HADOOP-15327.005.patch] I have been (almost) exclusively working on this since [my last comment|https://issues.apache.org/jira/browse/HADOOP-15327?focusedCommentId=17362367&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17362367] and there are a couple of things to add again. The last commit that was discussed is this: [https://github.com/szilard-nemeth/hadoop/commit/f149be8de28baafc64eed1c47e788f5beb215e62] Let me explain what've changed commit by commit. I will skip a bunch of trivial ones like code cleanup, added comments and the like. *I will cover the test failures surfaced by Jenkins build / unit test results:* - [Build #1|https://issues.apache.org/jira/browse/HADOOP-15327?focusedCommentId=17362456&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17362456] - [Build #2|https://issues.apache.org/jira/browse/HADOOP-15327?focusedCommentId=17363928&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17363928] *1. TestShuffleHandler: Introduced InputStreamReadResult that stores response as string + total bytes read: [https://github.com/szilard-nemeth/hadoop/commit/a57de573c97fe12c9071dd3450df8f450bf075ea]* Here, I added a new class called 'InputStreamReadResult' that stores the bytes read (byte[]) and the number of bytes read from a response InputStream. This improves the way testcases can assert on these data. *2. TestShuffleHandler: Use DEFAULT_PORT for all shuffle handler port configs: [https://github.com/szilard-nemeth/hadoop/commit/78b1166866c85cab6860407f8fe4a4ddc3168fae]* It was a common pitfall while debugging that the tests had to modified to use a certain fixed port. Here, I added a constant to store the port number so when I had to debug I only needed to change it in one single place. *3. Create class: TestExecution: Configure proxy, keep alive connection timeout: [https://github.com/szilard-nemeth/hadoop/commit/fa5bb32ae4eb737077a165b3b1fba5069c982243]* In order to debug the HTTP responses, I found it convenient to add a helper class that is responsible for the following: - Configuring the HTTP connections, use a proxy when required - Increase the keepalive timeout when using DEBUG mode TEST_EXECUTION is a static instance of TestExecution, initialized with a JUnit test setup method. There are 2 flags that control the behaviour of this object: {code:java} //Control test execution properties with these flags private static final boolean DEBUG_MODE = true; //If this is set to true and proxy server is not running, tests will fail! private static final boolean USE_PROXY = false; {code} The only difference on top of these is in the code of testcases: They create all HTTP connections with: {code:java} TEST_EXECUTION.openConnection(url) {code} *4. TestExecution: Configure port: [https://github.com/szilard-nemeth/hadoop/commit/4a5c035695be1099bff4a633cd605b9f8146d841]* One addition to 3. is to include the port used by ShuffleHandler in the TestExecution object. When using DEBUG mode, the port is fixed to a value, otherwise it is set to 0, meaning that the port will be dynamically chosen. *5. Add logging response encoder to TestShuffleHandler.testMapFileAccess: [https://github.com/szilard-nemeth/hadoop/commit/64686b47d2fed4e923c1c9c0169a06aba3e339be]* While debugging TestShuffleHandler#testMapFileAccess, just realized that I forgot to add the LoggingHttpResponseEncoder to the pipeline. The most trivial way was to modify the pipeline when the channel is activated. *6. TestShuffleHandler.testMapFileAccess: Modify to be able to run it locally + reproduce jenkins UT failure: [https://github.com/szilard-nemeth/hadoop/commit/bb0fcbbd7dcbe3fa7efd1b6a8c2eb8a9055c5ecd]* Here's where the fun begins. The problem with TestShuffleHandler#testMapFileAccess is that it requires the NativeIO module: {code:java} // This will run only in NativeIO is enabled as SecureIOUtils need it assumeTrue(NativeIO.isAvailable()); {code} I tried to compile the Hadoop Native libraries on my Mac according to these resources: - Native libraries: [https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/NativeLibraries.html] - Followed this guide: [https://dev.to/zejnilovic/building-hadoop-native-libraries-on-mac-in-2019-1iee] Unfortunately, I still had compilation errors so I eventually gave up and tweaked the test to be able to run it locally. This wasn't such a complex thing, I don't think it's worth to go into the details, had to comment out some test code that used the Native library and that was all. From the Jenkins results I had this: {code:java} [INFO] --- maven-surefire-plugin:3.0.0-M1:test (default-test) @ hadoop-mapreduce-client-shuffle --- [INFO] [INFO] ------------------------------------------------------- [INFO] T E S T S [INFO] ------------------------------------------------------- [INFO] Running org.apache.hadoop.mapred.TestFadvisedFileRegion [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.493 s - in org.apache.hadoop.mapred.TestFadvisedFileRegion [INFO] Running org.apache.hadoop.mapred.TestShuffleHandler [ERROR] Tests run: 15, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 12.033 s <<< FAILURE! - in org.apache.hadoop.mapred.TestShuffleHandler [ERROR] testMapFileAccess(org.apache.hadoop.mapred.TestShuffleHandler) Time elapsed: 0.339 s <<< FAILURE! java.lang.AssertionError: Received string '.......<MANY MANY NULL TERMINATING CHARACTERS>........' should contain message 'Owner 'jenkins' for path /home/jenkins/jenkins-home/workspace/PreCommit-HADOOP-Build/sourcedir/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/target/test-dir/TestShuffleHandlerLocDir/usercache/randomUser/appcache/application_12345_0001/output/attempt_12345_1_m_1_0/file.out.index did not match expected owner 'randomUser'' at org.junit.Assert.fail(Assert.java:89) at org.junit.Assert.assertTrue(Assert.java:42) at org.apache.hadoop.mapred.TestShuffleHandler.testMapFileAccess(TestShuffleHandler.java:1228) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.lang.Thread.run(Thread.java:748) {code} Please keep on reading the next bullet point where I detail how I found out what was happening. *7. TestShuffleHandler.testMapFileAccess: Fix in production code: [https://github.com/szilard-nemeth/hadoop/commit/6b9b9c39e181cf7836de33a2cda0712a81b85d01]* Continuing on the previous point, I had to find out what's going on and why the response was full of null-terminating characters. It's important to refer back to my [previous comment|https://issues.apache.org/jira/browse/HADOOP-15327?focusedCommentId=17362367&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17362367]: By checking 7. from that comment, I already gave a detailed analyis about error handling and what's wrong with writing a HTTP 200 OK, then a HTTP 500 (or any other bad code) to the channel. By the time I was putting that comment together I didn't realize that the 'testMpafileAccess' testcase wouldn't pass as it wasn't running on my machine locally so I proposed a follow-up jira. It turned out I need to fix it now as the test fails so I will retract my statement. The fix is utilizing the usual "trick": Write the HTTP 200 response, write LastHttpContent.EMPTY_LAST_CONTENT then write the HTTP 500 response to the channel. This way, we can avoid the IllegalStateException thrown by the outbound message handler. {code:java} try { populateHeaders(mapIds, jobId, user, reduceId, request, response, keepAliveParam, mapOutputInfoMap); } catch(IOException e) { //HADOOP-15327 // Need to send an instance of LastHttpContent to define HTTP // message boundaries. //Sending a HTTP 200 OK + HTTP 500 later (sendError) // is quite a non-standard way of crafting HTTP responses, // but we need to keep backward compatibility. // See more details in jira. ch.writeAndFlush(response); ch.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); LOG.error("Shuffle error in populating headers :", e); String errorMessage = getErrorMessage(e); sendError(ctx,errorMessage , INTERNAL_SERVER_ERROR); return; } {code} I also went ahead and checked if the tests are too autonomous and does this tricky HTTP error handling or the production code (Fetcher.java) has the same logic: It turned out Fetcher also works quite the same, so at least the tests and the production code are close together. I still don't think this is good how the "error responses" are handled as HTTP 200 responses, since the HttpURLConnection has a [getErrorStream|https://docs.oracle.com/javase/8/docs/api/java/net/HttpURLConnection.html#getErrorStream--] method that could be used for proper error handling. This is also something that points to this direction: [https://stackoverflow.com/a/11721878/1106893] Nevertheless, it would be too risky to change this behaviour plus it would also break backward-compatibility. It would be a dumb idea to modify this especially with the Netty upgrade as it would have been involved a heavy behavioral change. So it's sad as it looks: The tests are not buggy, they are okay and their expectation of the HTTP 200 and the HTTP 500 error code packed into that response is the correct thing to check to match the behaviour of the production code of the ShuffleHandler client (Fetcher.java). Let me also share what I've used to come to this conclusion: *7.1 Modified the Apache trunk code to check the response codes in the tests including particular testcase as well. It turned out that HTTP 200 is the response code.* *7.2 Also did a diff between TCP dump outputs generated by the old vs. new code, they were the same in terms of HTTP response codes and structure, after my fix was applied.* *7.3 Wanted to to a final verification step and double-check HTTP responses to see if they are really all HTTP 200 and the HTTP is wrapped as a second status line or in the response body itself and I found a very useful tool to capture HTTP traffic, it's called 'mitmproxy': [https://mitmproxy.org/]* Some useful links related to this tool: - [https://yoshihisaonoue.wordpress.com/2017/12/23/basic-usage-of-mitmdump/comment-page-1/] - [https://blog.packagecloud.io/eng/2016/11/14/debugging-ssl-in-java-using-mitmproxy/] - [https://dzone.com/articles/how-to-record-httphttps-traffic-with-mitmproxy] - [https://github.com/mitmproxy/mitmproxy/issues/2832] For this, I had to reconfigure the HTTPUrlConnections to use a proxy. In theory, there are some standard HTTP / HTTPS JVM switches to control the global proxy but somehow I was not able to make it work. Initially, I found some basic resouces about configuring Java to use proxy for all connections, globally: - [https://stackoverflow.com/questions/120797/how-do-i-set-the-proxy-to-be-used-by-the-jvm] - [https://docs.oracle.com/javase/8/docs/technotes/guides/net/proxies.html] - [https://www.baeldung.com/java-connect-via-proxy-server] Based on these, I created a new test run config and added these JVM options: {code:java} -Djavax.net.debug=all -Dhttp.proxyHost=localhost -Dhttp.proxyPort=8888 -Dhttps.proxyHost=localhost -Dhttps.proxyPort=8888 {code} These global settings (http.proxyHost / http.proxyPort) did not work at all. Also tried using 127.0.0.1 instead of 'localhost'. This didn't work either so I suspected it's because something is bad with the JVM options so I added some assert statements: {code:java} //PROXY CONFIG Assert.assertEquals("localhost", System.getProperty("http.proxyHost")); Assert.assertEquals("8888", System.getProperty("http.proxyPort")); Assert.assertEquals("localhost", System.getProperty("https.proxyHost")); Assert.assertEquals("8888", System.getProperty("https.proxyPort")); Assert.assertEquals(null, System.getProperty("http.nonProxyHosts")); {code} Also tried to set it to some dummy port value, expecting a connection issue as the proxy server is not listening on these ports. Result was still nothing: {code:java} System.setProperty("http.proxyPort", "1234"); System.setProperty("http.proxyHost", "127.0.0.1 "); System.setProperty("http.proxyHost", "localhost"); System.setProperty("http.proxyPort", "8888"); {code} So I gave up using the global config and found another way: [https://www.baeldung.com/java-connect-via-proxy-server#1-using-an-http-proxy] This finally worked and I could see the traffic going through the mitmproxy. Code: {code:java} // HttpURLConnection conn = (HttpURLConnection) url.openConnection(); Proxy webProxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress("127.0.0.1", 8888)); HttpURLConnection conn = (HttpURLConnection) url.openConnection(webProxy); {code} FINAL conclusion for this: It's valid by the RFC that the server sends a HTTP response with multiple status lines. However, the first status line is the one that really matters. Related SO answer: [https://stackoverflow.com/a/47541155/1106893] For me it's still weird, but this is what it is. *8. TestShuffleHandler: Stop shufflehandler in all tests, fix debug mode issues: [https://github.com/szilard-nemeth/hadoop/commit/d472cb33cf038dee6f241382311f2bdf8314e0b9]* This is an important one for test stability and independence from each other. I added some teardown code that checks if the ShuffleHandler's port is still open. In practice, it should not be open when any of the testcases finished. This made me realize that many of the testcase wouldn't stop the ShuffleHandler so I fixed this. *9. testReduceFromPartialMem: Add Shuffle IO error assertion to test: [https://github.com/szilard-nemeth/hadoop/commit/7eba7a4c9a08bd0de486c250a6bc711c7665f08e]* testReduceFromPartialMem failed with [this Jenkins build|https://issues.apache.org/jira/browse/HADOOP-15327?focusedCommentId=17363928&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17363928] Unfortunately, the output doesn't say too much about the root cause of the test failure: {code:java} [INFO] Running org.apache.hadoop.mapred.TestReduceFetchFromPartialMem [ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 89.147 s <<< FAILURE! - in org.apache.hadoop.mapred.TestReduceFetchFromPartialMem [ERROR] testReduceFromPartialMem(org.apache.hadoop.mapred.TestReduceFetchFromPartialMem) Time elapsed: 89.024 s <<< ERROR! java.io.IOException: Job failed! at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:876) at org.apache.hadoop.mapred.TestReduceFetchFromPartialMem.runJob(TestReduceFetchFromPartialMem.java:292) at org.apache.hadoop.mapred.TestReduceFetchFromPartialMem.testReduceFromPartialMem(TestReduceFetchFromPartialMem.java:85) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365) at org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273) at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238) at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159) at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384) at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345) at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126) at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418) {code} So, I added some code that prints the Shuffle IO errors at least: {code:java} long shuffleIoErrors = c.getGroup(SHUFFLE_ERR_GRP_NAME).getCounter(Fetcher.ShuffleErrors.IO_ERROR.toString()); assertEquals(0, shuffleIoErrors); {code} It turned out that this IO_ERROR from ShuffleErrors is sometimes 1 so I continued my investigation. It took a lot of time but in the end I think was worth it. Please see the next points (11 / 12) where I will add detailed information about what was going on. *10. LoggingHttpResponseEncoder: Add some new logs: [https://github.com/szilard-nemeth/hadoop/commit/e4260d1047805123d17b7d8dc94d776c818726a3]* In order to see all outbound message types I needed to add some new logs into LoggingHttpResponseEncoder. *11. Fixed error handling + LastHttpContent: [https://github.com/szilard-nemeth/hadoop/commit/61cc084c6e05a778360b0f475a783b0c30f7f0cf]* When testReduceFromPartialMem failed once, I was able to check on my local machine what is really going on: {code:java} 2021-06-22 18:18:27,452 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:channelRead(988)) - ***channelRead 2021-06-22 18:18:27,455 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:channelRead(990)) - ***REQUEST: HttpObjectAggregator$AggregatedFullHttpRequest(decodeResult: success, version: HTTP/1.1, content: CompositeByteBuf(ridx: 0, widx: 0, cap: 0, components=0)) GET /mapOutput?job=job_1624378685077_0001&reduce=0&map=attempt_1624378685077_0001_m_000000_0,attempt_1624378685077_0001_m_000002_0,attempt_1624378685077_0001_m_000004_0,attempt_1624378685077_0001_m_000005_0 HTTP/1.1 UrlHash: C/94EnScoT3sUBGXf4ul5bdpm1Q= name: mapreduce version: 1.0.0 User-Agent: Java/1.8.0_232b09 Host: localhost:62272 Accept: text/html, image/gif, image/jpeg, *; q=.2, */*; q=.2 Connection: keep-alive content-length: 0 2021-06-22 18:18:27,489 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1114)) - ***LOOP1 2021-06-22 18:18:27,498 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1352)) - ***Writing response buffer: 47, channel: c029638c 2021-06-22 18:18:27,507 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1369)) - ***Writing response partition: org.apache.hadoop.mapred.FadvisedFileRegion@5510d748, channel: c029638c 2021-06-22 18:18:27,509 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1114)) - ***LOOP2 2021-06-22 18:18:27,509 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1352)) - ***Writing response buffer: 47, channel: c029638c 2021-06-22 18:18:27,510 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1369)) - ***Writing response partition: org.apache.hadoop.mapred.FadvisedFileRegion@45e2b799, channel: c029638c 2021-06-22 18:18:27,510 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1114)) - ***LOOP3 2021-06-22 18:18:27,510 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1352)) - ***Writing response buffer: 47, channel: c029638c 2021-06-22 18:18:27,510 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1369)) - ***Writing response partition: org.apache.hadoop.mapred.FadvisedFileRegion@169b92fa, channel: c029638c 2021-06-22 18:18:27,510 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1120)) - ***LOOP ENDED 2021-06-22 18:18:27,511 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1123)) - ***Writing LastHttpContent, channel: c029638c 2021-06-22 18:18:27,513 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:operationComplete(338)) - ***OPERATION COMPLETE 2021-06-22 18:18:27,513 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1352)) - ***Writing response buffer: 47, channel: c029638c 2021-06-22 18:18:27,514 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1369)) - ***Writing response partition: org.apache.hadoop.mapred.FadvisedFileRegion@1040936, channel: c029638c 2021-06-22 18:18:27,515 ERROR [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:operationComplete(319)) - Future is unsuccessful. Cause: io.netty.handler.codec.EncoderException: java.lang.IllegalStateException: unexpected message type: FadvisedFileRegion, state: 0 at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:104) at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717) at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702) at io.netty.handler.stream.ChunkedWriteHandler.doFlush(ChunkedWriteHandler.java:302) at io.netty.handler.stream.ChunkedWriteHandler.flush(ChunkedWriteHandler.java:131) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742) at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728) at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127) at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750) at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790) at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758) at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:808) at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1025) at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:306) at org.apache.hadoop.mapred.ShuffleHandler$Shuffle.sendMapOutput(ShuffleHandler.java:1370) at org.apache.hadoop.mapred.ShuffleHandler$Shuffle.sendMap(ShuffleHandler.java:1152) at org.apache.hadoop.mapred.ShuffleHandler$ReduceMapFileCount.operationComplete(ShuffleHandler.java:339) at org.apache.hadoop.mapred.ShuffleHandler$ReduceMapFileCount.operationComplete(ShuffleHandler.java:307) at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578) at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571) at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550) at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491) at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616) at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:605) at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104) at io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48) at io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:717) at io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:272) at io.netty.channel.nio.AbstractNioByteChannel.doWriteInternal(AbstractNioByteChannel.java:242) at io.netty.channel.nio.AbstractNioByteChannel.doWrite0(AbstractNioByteChannel.java:212) at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:400) at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:953) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.forceFlush(AbstractNioChannel.java:361) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:713) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalStateException: unexpected message type: FadvisedFileRegion, state: 0 at io.netty.handler.codec.http.HttpObjectEncoder.encode(HttpObjectEncoder.java:124) at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:89) ... 43 more 2021-06-22 18:18:27,517 ERROR [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:operationComplete(319)) - Future is unsuccessful. Cause: io.netty.channel.StacklessClosedChannelException at io.netty.channel.AbstractChannel.close(ChannelPromise)(Unknown Source) 2021-06-22 18:18:27,517 ERROR [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:operationComplete(319)) - Future is unsuccessful. Cause: io.netty.channel.StacklessClosedChannelException at io.netty.channel.AbstractChannel.close(ChannelPromise)(Unknown Source) {code} The famous IllegalStateException once again but this time with a different message type: FadvisedFileRegion: {code:java} io.netty.handler.codec.EncoderException: java.lang.IllegalStateException: unexpected message type: FadvisedFileRegion, state: 0 {code} In this case, the LastHttpContent was written too early to the channel and then the ShuffleHandler written an FadvisedFileRegion to the channel. With my added logs, it was evident what is happening: {code:java} 2021-06-22 18:18:27,489 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1114)) - ***LOOP1 2021-06-22 18:18:27,498 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1352)) - ***Writing response buffer: 47, channel: c029638c 2021-06-22 18:18:27,507 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1369)) - ***Writing response partition: org.apache.hadoop.mapred.FadvisedFileRegion@5510d748, channel: c029638c 2021-06-22 18:18:27,509 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1114)) - ***LOOP2 2021-06-22 18:18:27,509 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1352)) - ***Writing response buffer: 47, channel: c029638c 2021-06-22 18:18:27,510 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1369)) - ***Writing response partition: org.apache.hadoop.mapred.FadvisedFileRegion@45e2b799, channel: c029638c 2021-06-22 18:18:27,510 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1114)) - ***LOOP3 2021-06-22 18:18:27,510 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1352)) - ***Writing response buffer: 47, channel: c029638c 2021-06-22 18:18:27,510 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1369)) - ***Writing response partition: org.apache.hadoop.mapred.FadvisedFileRegion@169b92fa, channel: c029638c 2021-06-22 18:18:27,510 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1120)) - ***LOOP ENDED 2021-06-22 18:18:27,511 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:channelRead(1123)) - ***Writing LastHttpContent, channel: c029638c 2021-06-22 18:18:27,513 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:operationComplete(338)) - ***OPERATION COMPLETE 2021-06-22 18:18:27,513 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1352)) - ***Writing response buffer: 47, channel: c029638c 2021-06-22 18:18:27,514 INFO [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:sendMapOutput(1369)) - ***Writing response partition: org.apache.hadoop.mapred.FadvisedFileRegion@1040936, channel: c029638c 2021-06-22 18:18:27,515 ERROR [ShuffleHandler Netty Worker #0] mapred.ShuffleHandler (ShuffleHandler.java:operationComplete(319)) - Future is unsuccessful. Cause: {code} This is the original loop without the logs: {code:java} boolean keepAlive = keepAliveParam || connectionKeepAliveEnabled; ReduceContext reduceContext = new ReduceContext(mapIds, reduceId, ctx, user, mapOutputInfoMap, jobId, keepAlive); for (int i = 0; i < Math.min(maxSessionOpenFiles, mapIds.size()); i++) { ChannelFuture nextMap = sendMap(reduceContext); if(nextMap == null) { return; } } //HADOOP-15327: Need to send an instance of LastHttpContent to define HTTP //message boundaries. See details in jira. ch.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); {code} So somehow the loops end, the LastHttpContent.EMPTY_LAST_CONTENT is written but the ShuffleHandler.ReduceMapFileCount#operationComplete method tries to write an FadvisedFileRegion to the channel afterwards. The thing is, operationComplete invokes {code:java} pipelineFact.getSHUFFLE().sendMap(reduceContext); {code} if there are remaining map outputs to send. Then, ShuffleHandler.Shuffle#sendMap calls ShuffleHandler.Shuffle#sendMapOutput which writes the partition to the channel: {code:java} ChannelFuture writeFuture; if (ch.pipeline().get(SslHandler.class) == null) { final FadvisedFileRegion partition = new FadvisedFileRegion(spill, info.startOffset, info.partLength, manageOsCache, readaheadLength, readaheadPool, spillfile.getAbsolutePath(), shuffleBufferSize, shuffleTransferToAllowed); writeFuture = writeToChannel(ch, partition); {code} The point is that the LastHttpContent.EMPTY_LAST_CONTENT can only be written when we are in ShuffleHandler.ReduceMapFileCount#operationComplete and there are no remaining maps to send in the response. This is the only time we can finish the HTTP response. This is the main thing fixed in this commit. There are a couple of other things included: - Introduced NettyChannelHelper in ShuffleHandler - Added Debug / Trace logs to ShuffleHandler - Add a flag to control if LoggingHttpResponseEncoder is added to the pipeline (for debugging purposes) *12. ShuffleHandlerTest fixes + enhancements: [https://github.com/szilard-nemeth/hadoop/commit/a91a04772390f377d478bb6ada4d435feb344500]* Finally, this is the last commit. As the 'testReduceFromPartialMem' testcase pointed to an issue with the production code and I haven't found any testcase that would use Keepalive + provide multiple mapIds (mapOutput) to the GET request in ShuffleHandlerTest I thought it's important to add a testcase. This was a good decision in general, but I was faced with some complications along the way. So, the new testcase is called: TestShuffleHandler#testKeepAliveMultipleMapAttemptIds. What is the difference between this and the other keep alive tests? It adds multiple map attempt ids to the GET request's URL, which will translate to mapId / mapOutput in ShuffleHandler. Also, it simulates the production code's behavior based on method 'ShuffleHandler.Shuffle#sendMapOutput' that writes file chunks to the channel. This is how I'm doing it from the test: {code:java} shuffleHandler.mapOutputSender.additionalMapOutputSenderOperations = new AdditionalMapOutputSenderOperations() { @Override public ChannelFuture perform(ChannelHandlerContext ctx, Channel ch) throws IOException { File tmpFile = File.createTempFile("test", ".tmp"); Files.write(tmpFile.toPath(), "dummytestcontent123456".getBytes(StandardCharsets.UTF_8)); final DefaultFileRegion partition = new DefaultFileRegion(tmpFile, 0, mapOutputContentLength); LOG.debug("Writing response partition: {}, channel: {}", partition, ch.id()); return ch.writeAndFlush(partition) .addListener((ChannelFutureListener) future -> LOG.debug("Finished Writing response partition: {}, channel: " + "{}", partition, ch.id())); } }; {code} Just to reiterate: The main problem with most of the testcases in TestShuffleHandler is that the class org.apache.hadoop.mapred.ShuffleHandler.Shuffle is an inner-class and to create a stubbed version, we need to have the enclosing class (ShuffleHandler) in context, so we can't define a stubbed version of Shuffle easily. The second issue is that most of the tests creating their own subclass of ShuffleHandler / Shuffle, that tries to simulate the real code. Instead, better mocking of dependent objects of these classes would be the way to go. Anyway, this would complicate things and increase the refactor work more and more, so I dropped this idea. Following the idea of subclassing ShuffleHandler / Shuffle, I reused the previously created class: ShuffleHandlerForKeepAliveTests. The only remaining piece of the puzzle was to tweak TestShuffleHandler.MapOutputSender#send to make it capable of sending something more to the channel, in this case chunks of a file. Interestingly, the test failed. I will attach full testcase logs, but here I'm only adding the exception that the testcase failed with: {code:java} java.io.IOException: Invalid Http response at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1612) at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1498) at org.apache.hadoop.mapred.TestShuffleHandler$HttpConnectionHelper.connectToUrls(TestShuffleHandler.java:553) at org.apache.hadoop.mapred.TestShuffleHandler.testKeepAliveInternal(TestShuffleHandler.java:1047) at org.apache.hadoop.mapred.TestShuffleHandler.testKeepAliveMultipleMapAttemptIds(TestShuffleHandler.java:997) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.lang.Thread.run(Thread.java:748) {code} I was thinking: What the hell is going on? Is this a production code issue or a test code issue? The best thing to describe what was happening afterwards is to go down the rabbithole to really understand the issue I was facing and find out the root cause. All in all, the test executes two GET requests with the same URL and using keep alive. With some more added logs, I was able to point at that writing the file chunks happened later than the first request read the whole inpustream. Later, I also realized that the size of the bytes read from the InputStream is smaller than what I've expected. This led me to check the Content-length of the HTTP response. Seemed okay. Tried to debug all pieces of the code, including the HttpClient in order to understand why the heck is the InputStream is closed. Checked the values more and more and realized that the headers are written to the channel N+1 times, so the Content-Length was less with 1 * header size. This is the code that writes these data to the channel in the tests: {code:java} public ChannelFuture send(ChannelHandlerContext ctx, Channel ch) throws IOException { LOG.debug("In MapOutputSender#send"); lastSocketAddress.setAddress(ch.remoteAddress()); ShuffleHeader header = shuffleHeaderProvider.createNewShuffleHeader(); writeOneHeader(ch, header); ChannelFuture future = writeHeaderNTimes(ch, header, responseConfig.headerWriteCount); // This is the last operation // It's safe to increment ShuffleHeader counter for better identification shuffleHeaderProvider.incrementCounter(); if (additionalMapOutputSenderOperations != null) { return additionalMapOutputSenderOperations.perform(ctx, ch); } return future; } {code} Also, it's required to add the size of the file chunk(s) to the overall Content-length of the response. So the fix was simple: Compute the correct Content-length, then after this, both the responses from Netty were processed without an issue. This was one hell of a ride to find out. There are some other changes in the test added with this commit: - Increase URLConnection read timeout / connect timeout when using Debug mode - Introduce class: ResponseConfig, that stores header + payload data sizes + final HTTP response content-length - Introduce abstract class: AdditionalMapOutputSenderOperations, that can perform additional operations when sendMap is invoked - ShuffleHandlerForKeepAliveTests: Enhanced failure control / close channel control - ShuffleHeaderProvider: Don't compute header on every invocation, cache the size of it [Only an optimization] - Fix TestShuffleHandler.HeaderPopulator#populateHeaders: Return full content-length of response, not just the length of the header - Fix in HttpConnectionHelper#connectToUrlsInternal: Add one headerSize to totalBytesRead. - Enhancement in HttpConnectionHelper#connectToUrlsInternal: Fail-fast if expected content-length < actual content-length. - Added new keepalive tests, including: testKeepAliveMultipleMapAttemptIds - Added new keepalive test with HTTP 400 bad request I tried to make those changes only that are really required, but passing several number values for computing the content-length was ugly as hell so I introduced the ResponseConfig class. ---- h2. CONCLUSION There were 2 test failure investigations that required to fix the production code and also took a lot of time to fix. At least I learned a ton of new stuff along the way :) These investigations are detailed with *points 7., 11. and 12.* Please also see the attached investigation zip files if you are interested in interim patches / logs produced by the tests and other materials: [^testfailure-testMapFileAccess-emptyresponse.zip] [^testfailure-testReduceFromPartialMem.zip] h2. REMAINING TODOs * Testing ShuffleHandler on a cluster * Fix Maven shading issues > Upgrade MR ShuffleHandler to use Netty4 > --------------------------------------- > > Key: HADOOP-15327 > URL: https://issues.apache.org/jira/browse/HADOOP-15327 > Project: Hadoop Common > Issue Type: Sub-task > Reporter: Xiaoyu Yao > Assignee: Szilard Nemeth > Priority: Major > Attachments: HADOOP-15327.001.patch, HADOOP-15327.002.patch, > HADOOP-15327.003.patch, HADOOP-15327.004.patch, HADOOP-15327.005.patch, > getMapOutputInfo_BlockingOperationException_awaitUninterruptibly.log, > testfailure-testMapFileAccess-emptyresponse.zip, > testfailure-testReduceFromPartialMem.zip > > > This way, we can remove the dependencies on the netty3 (jboss.netty) -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org