Repository: tez Updated Branches: refs/heads/branch-0.8 e6ba30631 -> cc4830692
TEZ-3440. Shuffling to memory can get out-of-sync when fetching multiple compressed map outputs (Nathan Roberts via jeagles) (cherry picked from commit 149db1b48e8be91455a383d1793ef46e789cfea6) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/cc483069 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/cc483069 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/cc483069 Branch: refs/heads/branch-0.8 Commit: cc48306927da9f8de5ad2e8415d26d5067734908 Parents: e6ba306 Author: Jonathan Eagles <[email protected]> Authored: Wed Oct 5 10:18:22 2016 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Wed Oct 5 10:26:21 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../runtime/library/common/sort/impl/IFile.java | 9 ++++ .../library/common/sort/impl/TestIFile.java | 54 +++++++++++++++++++ .../TestIFile_concatenated_compressed.bin | Bin 0 -> 51913 bytes 4 files changed, 65 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/cc483069/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d7cbf9d..1d20f24 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3440. Shuffling to memory can get out-of-sync when fetching multiple compressed map outputs TEZ-3429. Set reconfigureDoneTime on VertexConfigurationDoneEvent properly. TEZ-3000. Fix TestContainerReuse. TEZ-3436. Check input and output count before start in MapProcessor. @@ -508,6 +509,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3440. Shuffling to memory can get out-of-sync when fetching multiple compressed map outputs TEZ-3426. Second AM attempt launched for session mode and recovery disabled for certain cases TEZ-3009. Errors that occur during container task acquisition are not logged. TEZ-3413. ConcurrentModificationException in HistoryEventTimelineConversion for AppLaunchedEvent. http://git-wip-us.apache.org/repos/asf/tez/blob/cc483069/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java index a20182c..f49bc35 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java @@ -627,6 +627,15 @@ public class IFile { } try { IOUtils.readFully(in, buffer, 0, buffer.length - IFile.HEADER.length); + /* + * We've gotten the amount of data we were expecting. Verify the + * decompressor has nothing more to offer. This action also forces the + * decompressor to read any trailing bytes that weren't critical for + * decompression, which is necessary to keep the stream in sync. + */ + if (in.read() >= 0) { + throw new IOException("Unexpected extra bytes from input stream"); + } } catch (IOException ioe) { if(in != null) { try { http://git-wip-us.apache.org/repos/asf/tez/blob/cc483069/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java index 24acc40..25e916e 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java @@ -21,6 +21,8 @@ package org.apache.tez.runtime.library.common.sort.impl; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.net.URISyntaxException; +import java.net.URL; import java.util.ArrayList; import java.util.Arrays; import java.util.LinkedList; @@ -243,6 +245,58 @@ public class TestIFile { testWithDataBuffer(sortedData); } + //test concatenated zlib input - as in multiple map outputs during shuffle + //This specific input is valid but the decompressor can leave lingering + // bytes between segments. If the lingering bytes aren't handled correctly, + // the stream will get out-of-sync. + @Test(timeout = 5000) + public void testConcatenatedZlibPadding() + throws IOException, URISyntaxException { + byte[] bytes; + long compTotal = 0; + // Known raw and compressed lengths of input + long raws[] = { 2392, 102314, 42576, 31432, 25090 }; + long compressed[] = { 723, 25396, 10926, 8203, 6665 }; + + CompressionCodecFactory codecFactory = new CompressionCodecFactory(new + Configuration()); + codec = codecFactory.getCodecByClassName("org.apache.hadoop.io.compress.DefaultCodec"); + + URL url = getClass().getClassLoader() + .getResource("TestIFile_concatenated_compressed.bin"); + assertNotEquals("IFileinput file must exist", null, url); + Path p = new Path(url.toURI()); + FSDataInputStream inStream = localFs.open(p); + + for (int i = 0; i < 5; i++) { + bytes = new byte[(int) raws[i]]; + assertEquals("Compressed stream out-of-sync", inStream.getPos(), compTotal); + IFile.Reader.readToMemory(bytes, inStream, (int) compressed[i], codec, + false, -1); + compTotal += compressed[i]; + + // Now read the data + InMemoryReader inMemReader = new InMemoryReader(null, + new InputAttemptIdentifier(0, 0), bytes, 0, bytes.length); + + DataInputBuffer keyIn = new DataInputBuffer(); + DataInputBuffer valIn = new DataInputBuffer(); + Deserializer<Text> keyDeserializer; + Deserializer<IntWritable> valDeserializer; + SerializationFactory serializationFactory = + new SerializationFactory(defaultConf); + keyDeserializer = serializationFactory.getDeserializer(Text.class); + valDeserializer = serializationFactory.getDeserializer(IntWritable.class); + keyDeserializer.open(keyIn); + valDeserializer.open(valIn); + + while (inMemReader.nextRawKey(keyIn)) { + inMemReader.nextRawValue(valIn); + } + } + inStream.close(); + } + @Test(timeout = 5000) //Test InMemoryWriter public void testInMemoryWriter() throws IOException { http://git-wip-us.apache.org/repos/asf/tez/blob/cc483069/tez-runtime-library/src/test/resources/TestIFile_concatenated_compressed.bin ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/resources/TestIFile_concatenated_compressed.bin b/tez-runtime-library/src/test/resources/TestIFile_concatenated_compressed.bin new file mode 100644 index 0000000..395452e Binary files /dev/null and b/tez-runtime-library/src/test/resources/TestIFile_concatenated_compressed.bin differ
