Repository: tez Updated Branches: refs/heads/branch-0.7 1ba277c1c -> ad3065134
TEZ-3196. java.lang.InternalError from decompression codec is fatal to a task during shuffle (jlowe) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ad306513 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ad306513 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ad306513 Branch: refs/heads/branch-0.7 Commit: ad3065134af65a84da408d332d6165357bd487c0 Parents: 1ba277c Author: Jason Lowe <[email protected]> Authored: Fri Apr 8 14:12:12 2016 +0000 Committer: Jason Lowe <[email protected]> Committed: Fri Apr 8 14:12:12 2016 +0000 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../library/common/shuffle/ShuffleUtils.java | 9 ++++++ .../common/shuffle/TestShuffleUtils.java | 31 ++++++++++++++++++++ 3 files changed, 41 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/ad306513/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 4b204d0..77d0b5d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES TEZ-2972. Avoid task rescheduling when a node turns unhealthy ALL CHANGES: + TEZ-3196. java.lang.InternalError from decompression codec is fatal to a task during shuffle TEZ-3177. Non-DAG events should use the session domain or no domain if the data does not need protection. TEZ-3192. IFile#checkState creating unnecessary objects though auto-boxing TEZ-3189. Pre-warm dags should not be counted in submitted dags count by DAGAppMaster. http://git-wip-us.apache.org/repos/asf/tez/blob/ad306513/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java index a4e95f7..60e53f8 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java @@ -124,6 +124,15 @@ public class ShuffleUtils { ioCleanup(input); // Re-throw throw ioe; + } catch (InternalError e) { + // Close the streams + LOG.info("Failed to read data to memory for " + identifier + ". len=" + compressedLength + + ", decomp=" + decompressedLength + ". ExceptionMessage=" + e.getMessage()); + ioCleanup(input); + // The codec for lz0,lz4,snappy,bz2,etc. throw java.lang.InternalError + // on decompression failures. Catching and re-throwing as IOException + // to allow fetch failure logic to be processed. + throw new IOException(e); } } http://git-wip-us.apache.org/repos/asf/tez/blob/ad306513/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java index 9f9cd59..4ac1bca 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java @@ -2,11 +2,15 @@ package org.apache.tez.runtime.library.common.shuffle; import com.google.common.collect.Lists; import com.google.protobuf.ByteString; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionInputStream; +import org.apache.hadoop.io.compress.Decompressor; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.TezRuntimeFrameworkConfigs; @@ -26,13 +30,18 @@ import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; import java.util.BitSet; import java.util.List; import java.util.Random; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -252,4 +261,26 @@ public class TestShuffleUtils { .cardinality(), emptyPartitionsBitSet.cardinality() == 10); } + + @Test + public void testInternalErrorTranslation() throws Exception { + String codecErrorMsg = "codec failure"; + CompressionInputStream mockCodecStream = mock(CompressionInputStream.class); + when(mockCodecStream.read(any(byte[].class), anyInt(), anyInt())) + .thenThrow(new InternalError(codecErrorMsg)); + Decompressor mockDecoder = mock(Decompressor.class); + CompressionCodec mockCodec = mock(CompressionCodec.class); + when(mockCodec.createDecompressor()).thenReturn(mockDecoder); + when(mockCodec.createInputStream(any(InputStream.class), any(Decompressor.class))) + .thenReturn(mockCodecStream); + byte[] header = new byte[] { (byte) 'T', (byte) 'I', (byte) 'F', (byte) 1}; + try { + ShuffleUtils.shuffleToMemory(new byte[1024], new ByteArrayInputStream(header), + 1024, 128, mockCodec, false, 0, mock(Logger.class), "identifier"); + Assert.fail("shuffle was supposed to throw!"); + } catch (IOException e) { + Assert.assertTrue(e.getCause() instanceof InternalError); + Assert.assertTrue(e.getMessage().contains(codecErrorMsg)); + } + } }
