Repository: tez Updated Branches: refs/heads/master 4496f6667 -> 633fde2cd
TEZ-3970. NullPointerException in Tez ShuffleHandler Ranged Fetch (Jonathan Eagles via kshukla) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/633fde2c Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/633fde2c Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/633fde2c Branch: refs/heads/master Commit: 633fde2cd41ae0a2f51b5c34190318bae54a2ee6 Parents: 4496f66 Author: Kuhu Shukla <[email protected]> Authored: Thu Jul 12 17:44:19 2018 -0500 Committer: Kuhu Shukla <[email protected]> Committed: Thu Jul 12 17:45:49 2018 -0500 ---------------------------------------------------------------------- .../apache/tez/auxservices/ShuffleHandler.java | 2 +- .../tez/auxservices/TestShuffleHandler.java | 114 +++++++++++++++++-- 2 files changed, 104 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/633fde2c/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java index 24a821f..f294edc 100644 --- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java +++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java @@ -1399,9 +1399,9 @@ public class ShuffleHandler extends AuxiliaryService { DataOutputBuffer dob = new DataOutputBuffer(); header.write(dob); // Free the memory needed to store the spill and index records - outputInfo.finish(); ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); } + outputInfo.finish(); final long rangeOffset = firstIndex.getStartOffset(); final long rangePartLength = lastIndex.getStartOffset() + lastIndex.getPartLength() - firstIndex.getStartOffset(); http://git-wip-us.apache.org/repos/asf/tez/blob/633fde2c/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java index 7d53abc..7c421a9 100644 --- a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java +++ b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java @@ -55,6 +55,7 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.MapTask; @@ -75,6 +76,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; import org.apache.hadoop.yarn.server.records.Version; +import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord; +import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelHandlerContext; @@ -658,6 +661,102 @@ public class TestShuffleHandler { } /** + * Validate the ranged fetch works as expected + */ + @Test(timeout = 10000) + public void testRangedFetch() throws IOException { + Configuration conf = new Configuration(); + conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "simple"); + UserGroupInformation.setConfiguration(conf); + File absLogDir = new File("target", + TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile(); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath()); + ApplicationId appId = ApplicationId.newInstance(12345, 1); + LOG.info(appId.toString()); + String appAttemptId = "attempt_12345_1_m_1_0"; + String user = "randomUser"; + String reducerIdStart = "0"; + String reducerIdEnd = "1"; + List<File> fileMap = new ArrayList<>(); + createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId, + conf, fileMap); + ShuffleHandler shuffleHandler = new ShuffleHandler() { + + @Override + protected Shuffle getShuffle(Configuration conf) { + // replace the shuffle handler with one stubbed for testing + return new Shuffle(conf) { + + @Override + protected void verifyRequest(String appid, ChannelHandlerContext ctx, + HttpRequest request, HttpResponse response, URL requestUri) + throws IOException { + // Do nothing. + } + + }; + } + }; + shuffleHandler.init(conf); + try { + shuffleHandler.start(); + DataOutputBuffer outputBuffer = new DataOutputBuffer(); + outputBuffer.reset(); + Token<JobTokenIdentifier> jt = + new Token<JobTokenIdentifier>("identifier".getBytes(), + "password".getBytes(), new Text(user), new Text("shuffleService")); + jt.write(outputBuffer); + shuffleHandler + .initializeApplication(new ApplicationInitializationContext(user, + appId, ByteBuffer.wrap(outputBuffer.getData(), 0, + outputBuffer.getLength()))); + URL url = + new URL( + "http://127.0.0.1:" + + shuffleHandler.getConfig().get( + ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY) + + "/mapOutput?job=job_12345_0001&dag=1&reduce=" + reducerIdStart + "-" + reducerIdEnd + + "&map=attempt_12345_1_m_1_0"); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, + ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, + ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); + conn.connect(); + boolean succeeded = false; + try { + DataInputStream is = new DataInputStream(conn.getInputStream()); + int partitionCount = WritableUtils.readVInt(is); + List<ShuffleHeader> headers = new ArrayList<>(2); + for (int i = 0; i < partitionCount; i++) { + ShuffleHeader header = new ShuffleHeader(); + header.readFields(is); + Assert.assertEquals("Incorrect map id", "attempt_12345_1_m_1_0", header.getMapId()); + Assert.assertEquals("Incorrect reduce id", i, header.getPartition()); + headers.add(header); + } + for (ShuffleHeader header: headers) { + byte[] bytes = new byte[(int)header.getCompressedLength()]; + is.read(bytes); + } + succeeded = true; + // Read one more byte to force EOF + is.readByte(); + Assert.fail("More fetch bytes that expected in stream"); + } catch (EOFException e) { + Assert.assertTrue("Failed to copy ranged fetch", succeeded); + } + + } finally { + shuffleHandler.stop(); + FileUtil.fullyDelete(absLogDir); + } + } + + /** * Validate the ownership of the map-output files being pulled in. The * local-file-system owner of the file should match the user component in the * @@ -785,18 +884,11 @@ public class TestShuffleHandler { System.out.println("Deleting existing file"); indexFile.delete(); } - indexFile.createNewFile(); - FSDataOutputStream output = FileSystem.getLocal(conf).getRaw().append( - new Path(indexFile.getAbsolutePath())); Checksum crc = new PureJavaCrc32(); - crc.reset(); - CheckedOutputStream chk = new CheckedOutputStream(output, crc); - String msg = "Writing new index file. This file will be used only " + - "for the testing."; - chk.write(Arrays.copyOf(msg.getBytes(), - MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH)); - output.writeLong(chk.getChecksum().getValue()); - output.close(); + TezSpillRecord tezSpillRecord = new TezSpillRecord(2); + tezSpillRecord.putIndex(new TezIndexRecord(0, 10, 10), 0); + tezSpillRecord.putIndex(new TezIndexRecord(10, 10, 10), 1); + tezSpillRecord.writeToFile(new Path(indexFile.getAbsolutePath()), conf, crc); } @Test
