Repository: tez Updated Branches: refs/heads/master 5b1f1a923 -> 6ea7b24db
TEZ-3196. java.lang.InternalError from decompression codec is fatal to a task during shuffle (jlowe) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6ea7b24d Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6ea7b24d Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6ea7b24d Branch: refs/heads/master Commit: 6ea7b24db6f8c0c360f3465f0b1d504d15c67205 Parents: 5b1f1a9 Author: Jason Lowe <[email protected]> Authored: Fri Apr 8 13:48:44 2016 +0000 Committer: Jason Lowe <[email protected]> Committed: Fri Apr 8 13:48:44 2016 +0000 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../library/common/shuffle/ShuffleUtils.java | 12 ++++++-- .../common/shuffle/TestShuffleUtils.java | 31 ++++++++++++++++++++ 3 files changed, 42 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/6ea7b24d/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f72e726..0555d4e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -10,6 +10,7 @@ INCOMPATIBLE CHANGES TEZ-3199. Rename getCredentials in TaskCommunicatorContext to be less confusing. ALL CHANGES: + TEZ-3196. java.lang.InternalError from decompression codec is fatal to a task during shuffle TEZ-3161. Allow task to report different kinds of errors - fatal / kill. TEZ-3177. Non-DAG events should use the session domain or no domain if the data does not need protection. TEZ-3192. IFile#checkState creating unnecessary objects though auto-boxing @@ -423,6 +424,7 @@ INCOMPATIBLE CHANGES TEZ-2949. Allow duplicate dag names within session for Tez. ALL CHANGES: + TEZ-3196. java.lang.InternalError from decompression codec is fatal to a task during shuffle TEZ-3177. Non-DAG events should use the session domain or no domain if the data does not need protection. TEZ-3192. IFile#checkState creating unnecessary objects though auto-boxing TEZ-3189. Pre-warm dags should not be counted in submitted dags count by DAGAppMaster. http://git-wip-us.apache.org/repos/asf/tez/blob/6ea7b24d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java index 013a002..685503c 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java @@ -123,13 +123,19 @@ public class ShuffleUtils { LOG.debug("Read " + shuffleData.length + " bytes from input for " + identifier); } - } catch (IOException ioe) { + } catch (InternalError | IOException e) { // Close the streams LOG.info("Failed to read data to memory for " + identifier + ". len=" + compressedLength + - ", decomp=" + decompressedLength + ". ExceptionMessage=" + ioe.getMessage()); + ", decomp=" + decompressedLength + ". ExceptionMessage=" + e.getMessage()); ioCleanup(input); + if (e instanceof InternalError) { + // The codec for lz0,lz4,snappy,bz2,etc. throw java.lang.InternalError + // on decompression failures. Catching and re-throwing as IOException + // to allow fetch failure logic to be processed. + throw new IOException(e); + } // Re-throw - throw ioe; + throw e; } } http://git-wip-us.apache.org/repos/asf/tez/blob/6ea7b24d/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java index 9f9cd59..4ac1bca 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java @@ -2,11 +2,15 @@ package org.apache.tez.runtime.library.common.shuffle; import com.google.common.collect.Lists; import com.google.protobuf.ByteString; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionInputStream; +import org.apache.hadoop.io.compress.Decompressor; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.TezRuntimeFrameworkConfigs; @@ -26,13 +30,18 @@ import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; import java.util.BitSet; import java.util.List; import java.util.Random; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -252,4 +261,26 @@ public class TestShuffleUtils { .cardinality(), emptyPartitionsBitSet.cardinality() == 10); } + + @Test + public void testInternalErrorTranslation() throws Exception { + String codecErrorMsg = "codec failure"; + CompressionInputStream mockCodecStream = mock(CompressionInputStream.class); + when(mockCodecStream.read(any(byte[].class), anyInt(), anyInt())) + .thenThrow(new InternalError(codecErrorMsg)); + Decompressor mockDecoder = mock(Decompressor.class); + CompressionCodec mockCodec = mock(CompressionCodec.class); + when(mockCodec.createDecompressor()).thenReturn(mockDecoder); + when(mockCodec.createInputStream(any(InputStream.class), any(Decompressor.class))) + .thenReturn(mockCodecStream); + byte[] header = new byte[] { (byte) 'T', (byte) 'I', (byte) 'F', (byte) 1}; + try { + ShuffleUtils.shuffleToMemory(new byte[1024], new ByteArrayInputStream(header), + 1024, 128, mockCodec, false, 0, mock(Logger.class), "identifier"); + Assert.fail("shuffle was supposed to throw!"); + } catch (IOException e) { + Assert.assertTrue(e.getCause() instanceof InternalError); + Assert.assertTrue(e.getMessage().contains(codecErrorMsg)); + } + } }
