Repository: tez Updated Branches: refs/heads/branch-0.9 8c60086f3 -> d607c948a
TEZ-3912. Fetchers should be more robust to corrupted inputs (Kuhu Shukla via jeagles) (cherry picked from commit 12ff93779f5c8ef5708bf3f14fb49a628bc41218) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d607c948 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d607c948 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d607c948 Branch: refs/heads/branch-0.9 Commit: d607c948aed42720fcd0cbd2a82c1f16d0f53c5c Parents: 8c60086 Author: Jonathan Eagles <[email protected]> Authored: Mon Jul 9 10:44:06 2018 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Mon Jul 9 10:44:54 2018 -0500 ---------------------------------------------------------------------- .../library/common/shuffle/ShuffleUtils.java | 9 ++-- .../common/shuffle/TestShuffleUtils.java | 53 ++++++++++++++++++++ 2 files changed, 59 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/d607c948/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java index bf58172..df4281a 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java @@ -125,7 +125,7 @@ public class ShuffleUtils { LOG.debug("Read " + shuffleData.length + " bytes from input for " + identifier); } - } catch (InternalError | IOException e) { + } catch (InternalError | Exception e) { // Close the streams LOG.info("Failed to read data to memory for " + identifier + ". len=" + compressedLength + ", decomp=" + decompressedLength + ". ExceptionMessage=" + e.getMessage()); @@ -135,9 +135,12 @@ public class ShuffleUtils { // on decompression failures. Catching and re-throwing as IOException // to allow fetch failure logic to be processed. throw new IOException(e); + } else if (e instanceof IOException) { + throw e; + } else { + // Re-throw as an IOException + throw new IOException(e); } - // Re-throw - throw e; } } http://git-wip-us.apache.org/repos/asf/tez/blob/d607c948/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java index 1d2d428..f61c7e5 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java @@ -41,6 +41,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.BitSet; @@ -301,6 +302,58 @@ public class TestShuffleUtils { } @Test + public void testExceptionTranslation() throws Exception { + String codecErrorMsg = "codec failure"; + CompressionInputStream mockCodecStream = mock(CompressionInputStream.class); + when(mockCodecStream.read(any(byte[].class), anyInt(), anyInt())) + .thenThrow(new IllegalArgumentException(codecErrorMsg)); + Decompressor mockDecoder = mock(Decompressor.class); + CompressionCodec mockCodec = mock(CompressionCodec.class); + when(mockCodec.createDecompressor()).thenReturn(mockDecoder); + when(mockCodec.createInputStream(any(InputStream.class), any(Decompressor.class))) + .thenReturn(mockCodecStream); + byte[] header = new byte[] { (byte) 'T', (byte) 'I', (byte) 'F', (byte) 1}; + try { + ShuffleUtils.shuffleToMemory(new byte[1024], new ByteArrayInputStream(header), + 1024, 128, mockCodec, false, 0, mock(Logger.class), null); + Assert.fail("shuffle was supposed to throw!"); + } catch (IOException e) { + Assert.assertTrue(e.getCause() instanceof IllegalArgumentException); + Assert.assertTrue(e.getMessage().contains(codecErrorMsg)); + } + CompressionInputStream mockCodecStream1 = mock(CompressionInputStream.class); + when(mockCodecStream1.read(any(byte[].class), anyInt(), anyInt())) + .thenThrow(new SocketTimeoutException(codecErrorMsg)); + CompressionCodec mockCodec1 = mock(CompressionCodec.class); + when(mockCodec1.createDecompressor()).thenReturn(mockDecoder); + when(mockCodec1.createInputStream(any(InputStream.class), any(Decompressor.class))) + .thenReturn(mockCodecStream1); + try { + ShuffleUtils.shuffleToMemory(new byte[1024], new ByteArrayInputStream(header), + 1024, 128, mockCodec1, false, 0, mock(Logger.class), null); + Assert.fail("shuffle was supposed to throw!"); + } catch (IOException e) { + Assert.assertTrue(e instanceof SocketTimeoutException); + Assert.assertTrue(e.getMessage().contains(codecErrorMsg)); + } + CompressionInputStream mockCodecStream2 = mock(CompressionInputStream.class); + when(mockCodecStream2.read(any(byte[].class), anyInt(), anyInt())) + .thenThrow(new InternalError(codecErrorMsg)); + CompressionCodec mockCodec2 = mock(CompressionCodec.class); + when(mockCodec2.createDecompressor()).thenReturn(mockDecoder); + when(mockCodec2.createInputStream(any(InputStream.class), any(Decompressor.class))) + .thenReturn(mockCodecStream2); + try { + ShuffleUtils.shuffleToMemory(new byte[1024], new ByteArrayInputStream(header), + 1024, 128, mockCodec2, false, 0, mock(Logger.class), null); + Assert.fail("shuffle was supposed to throw!"); + } catch (IOException e) { + Assert.assertTrue(e.getCause() instanceof InternalError); + Assert.assertTrue(e.getMessage().contains(codecErrorMsg)); + } + } + + @Test public void testShuffleToDiskChecksum() throws Exception { // verify sending a stream of zeroes without checksum validation // does not trigger an exception
