Repository: tez
Updated Branches:
  refs/heads/branch-0.9 8c60086f3 -> d607c948a


TEZ-3912. Fetchers should be more robust to corrupted inputs (Kuhu Shukla via 
jeagles)

(cherry picked from commit 12ff93779f5c8ef5708bf3f14fb49a628bc41218)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d607c948
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d607c948
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d607c948

Branch: refs/heads/branch-0.9
Commit: d607c948aed42720fcd0cbd2a82c1f16d0f53c5c
Parents: 8c60086
Author: Jonathan Eagles <[email protected]>
Authored: Mon Jul 9 10:44:06 2018 -0500
Committer: Jonathan Eagles <[email protected]>
Committed: Mon Jul 9 10:44:54 2018 -0500

----------------------------------------------------------------------
 .../library/common/shuffle/ShuffleUtils.java    |  9 ++--
 .../common/shuffle/TestShuffleUtils.java        | 53 ++++++++++++++++++++
 2 files changed, 59 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/d607c948/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index bf58172..df4281a 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -125,7 +125,7 @@ public class ShuffleUtils {
         LOG.debug("Read " + shuffleData.length + " bytes from input for "
             + identifier);
       }
-    } catch (InternalError | IOException e) {
+    } catch (InternalError | Exception e) {
       // Close the streams
       LOG.info("Failed to read data to memory for " + identifier + ". len=" + 
compressedLength +
           ", decomp=" + decompressedLength + ". ExceptionMessage=" + 
e.getMessage());
@@ -135,9 +135,12 @@ public class ShuffleUtils {
         // on decompression failures. Catching and re-throwing as IOException
         // to allow fetch failure logic to be processed.
         throw new IOException(e);
+      } else if (e instanceof IOException) {
+        throw e;
+      } else {
+        // Re-throw as an IOException
+        throw new IOException(e);
       }
-      // Re-throw
-      throw e;
     }
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/d607c948/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
index 1d2d428..f61c7e5 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
@@ -41,6 +41,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.BitSet;
@@ -301,6 +302,58 @@ public class TestShuffleUtils {
   }
 
   @Test
+  public void testExceptionTranslation() throws Exception {
+    String codecErrorMsg = "codec failure";
+    CompressionInputStream mockCodecStream = 
mock(CompressionInputStream.class);
+    when(mockCodecStream.read(any(byte[].class), anyInt(), anyInt()))
+        .thenThrow(new IllegalArgumentException(codecErrorMsg));
+    Decompressor mockDecoder = mock(Decompressor.class);
+    CompressionCodec mockCodec = mock(CompressionCodec.class);
+    when(mockCodec.createDecompressor()).thenReturn(mockDecoder);
+    when(mockCodec.createInputStream(any(InputStream.class), 
any(Decompressor.class)))
+        .thenReturn(mockCodecStream);
+    byte[] header = new byte[] { (byte) 'T', (byte) 'I', (byte) 'F', (byte) 1};
+    try {
+      ShuffleUtils.shuffleToMemory(new byte[1024], new 
ByteArrayInputStream(header),
+          1024, 128, mockCodec, false, 0, mock(Logger.class), null);
+      Assert.fail("shuffle was supposed to throw!");
+    } catch (IOException e) {
+      Assert.assertTrue(e.getCause() instanceof IllegalArgumentException);
+      Assert.assertTrue(e.getMessage().contains(codecErrorMsg));
+    }
+    CompressionInputStream mockCodecStream1 = 
mock(CompressionInputStream.class);
+    when(mockCodecStream1.read(any(byte[].class), anyInt(), anyInt()))
+        .thenThrow(new SocketTimeoutException(codecErrorMsg));
+    CompressionCodec mockCodec1 = mock(CompressionCodec.class);
+    when(mockCodec1.createDecompressor()).thenReturn(mockDecoder);
+    when(mockCodec1.createInputStream(any(InputStream.class), 
any(Decompressor.class)))
+        .thenReturn(mockCodecStream1);
+    try {
+      ShuffleUtils.shuffleToMemory(new byte[1024], new 
ByteArrayInputStream(header),
+          1024, 128, mockCodec1, false, 0, mock(Logger.class), null);
+      Assert.fail("shuffle was supposed to throw!");
+    } catch (IOException e) {
+      Assert.assertTrue(e instanceof SocketTimeoutException);
+      Assert.assertTrue(e.getMessage().contains(codecErrorMsg));
+    }
+    CompressionInputStream mockCodecStream2 = 
mock(CompressionInputStream.class);
+    when(mockCodecStream2.read(any(byte[].class), anyInt(), anyInt()))
+        .thenThrow(new InternalError(codecErrorMsg));
+    CompressionCodec mockCodec2 = mock(CompressionCodec.class);
+    when(mockCodec2.createDecompressor()).thenReturn(mockDecoder);
+    when(mockCodec2.createInputStream(any(InputStream.class), 
any(Decompressor.class)))
+        .thenReturn(mockCodecStream2);
+    try {
+      ShuffleUtils.shuffleToMemory(new byte[1024], new 
ByteArrayInputStream(header),
+          1024, 128, mockCodec2, false, 0, mock(Logger.class), null);
+      Assert.fail("shuffle was supposed to throw!");
+    } catch (IOException e) {
+      Assert.assertTrue(e.getCause() instanceof InternalError);
+      Assert.assertTrue(e.getMessage().contains(codecErrorMsg));
+    }
+  }
+
+  @Test
   public void testShuffleToDiskChecksum() throws Exception {
     // verify sending a stream of zeroes without checksum validation
     // does not trigger an exception

Reply via email to