Repository: tez Updated Branches: refs/heads/master b3712f863 -> 5dd47c685
TEZ-3237. Corrupted shuffle transfers to disk are not detected during transfer (jlowe) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5dd47c68 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5dd47c68 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5dd47c68 Branch: refs/heads/master Commit: 5dd47c6858028ac0187c4fab76b66217bf0d6c56 Parents: b3712f8 Author: Jason Lowe <[email protected]> Authored: Fri May 20 17:51:29 2016 +0000 Committer: Jason Lowe <[email protected]> Committed: Fri May 20 17:51:29 2016 +0000 ---------------------------------------------------------------------- CHANGES.txt | 3 ++ .../library/api/TezRuntimeConfiguration.java | 12 +++++ .../runtime/library/common/shuffle/Fetcher.java | 17 ++++--- .../library/common/shuffle/ShuffleUtils.java | 29 +++++++----- .../common/shuffle/impl/ShuffleManager.java | 6 ++- .../orderedgrouped/FetcherOrderedGrouped.java | 9 +++- .../orderedgrouped/ShuffleScheduler.java | 8 +++- .../runtime/library/common/sort/impl/IFile.java | 49 ++++++++++++++++++-- .../common/sort/impl/IFileInputStream.java | 7 ++- .../library/input/OrderedGroupedKVInput.java | 1 + .../runtime/library/input/UnorderedKVInput.java | 1 + .../library/common/shuffle/TestFetcher.java | 15 +++--- .../common/shuffle/TestShuffleUtils.java | 27 +++++++++++ .../shuffle/orderedgrouped/TestFetcher.java | 27 +++++++---- .../library/common/sort/impl/TestIFile.java | 40 ++++++++++++++++ 15 files changed, 208 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/5dd47c68/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 04b16ba..7274b6f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3237. Corrupted shuffle transfers to disk are not detected during transfer TEZ-3240. Improvements to tez.lib.uris to allow for multiple tarballs and mixing tarballs and jars. TEZ-3246. Improve diagnostics when DAG killed by user TEZ-3258. Jvm Checker does not ignore DisableExplicitGC when checking JVM GC options. @@ -42,6 +43,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3237. Corrupted shuffle transfers to disk are not detected during transfer TEZ-3246. Improve diagnostics when DAG killed by user TEZ-3258. Jvm Checker does not ignore DisableExplicitGC when checking JVM GC options. TEZ-3256. [Backport HADOOP-11032] Remove Guava Stopwatch dependency @@ -484,6 +486,7 @@ Release 0.7.2: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3237. Corrupted shuffle transfers to disk are not detected during transfer TEZ-3258. Jvm Checker does not ignore DisableExplicitGC when checking JVM GC options. TEZ-3256. [Backport HADOOP-11032] Remove Guava Stopwatch dependency TEZ-2342. Reduce bytearray copy with TezEvent Serialization and deserialization http://git-wip-us.apache.org/repos/asf/tez/blob/5dd47c68/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java index caad6ef..08f76f2 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java @@ -373,6 +373,17 @@ public class TezRuntimeConfiguration { "shuffle.ssl.enable"; public static final boolean TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT = false; + /** + * Controls verification of data checksums when fetching data directly to + * disk. Enabling verification allows the fetcher to detect corrupted data + * and report the failure against the upstream task before the data reaches + * the Processor and causes the fetching task to fail. + */ + @ConfigurationProperty(type = "boolean") + public static final String TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM = + TEZ_RUNTIME_PREFIX + "shuffle.fetch.verify-disk-checksum"; + public static final boolean TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM_DEFAULT = true; + @ConfigurationProperty(type = "float") public static final String TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT = TEZ_RUNTIME_PREFIX + "shuffle.fetch.buffer.percent"; @@ -541,6 +552,7 @@ public class TezRuntimeConfiguration { tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT); tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE); tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_ENABLE_SSL); + tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM); tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT); tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT); tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT); http://git-wip-us.apache.org/repos/asf/tez/blob/5dd47c68/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java index d445587..6cbff94 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java @@ -139,6 +139,8 @@ public class Fetcher extends CallableWithNdc<FetchResult> { private final boolean asyncHttp; + private final boolean verifyDiskChecksum; + private final boolean isDebugEnabled = LOG.isDebugEnabled(); private Fetcher(FetcherCallback fetcherCallback, HttpConnectionParams params, @@ -150,8 +152,9 @@ public class Fetcher extends CallableWithNdc<FetchResult> { boolean localDiskFetchEnabled, boolean sharedFetchEnabled, String localHostname, - int shufflePort, boolean asyncHttp) { + int shufflePort, boolean asyncHttp, boolean verifyDiskChecksum) { this.asyncHttp = asyncHttp; + this.verifyDiskChecksum = verifyDiskChecksum; this.fetcherCallback = fetcherCallback; this.inputManager = inputManager; this.jobTokenSecretMgr = jobTokenSecretManager; @@ -815,7 +818,8 @@ public class Fetcher extends CallableWithNdc<FetchResult> { } else if (fetchedInput.getType() == Type.DISK) { ShuffleUtils.shuffleToDisk(((DiskFetchedInput) fetchedInput).getOutputStream(), (host +":" +port), input, compressedLength, decompressedLength, LOG, - fetchedInput.getInputAttemptIdentifier().toString()); + fetchedInput.getInputAttemptIdentifier().toString(), + ifileReadAhead, ifileReadAheadLength, verifyDiskChecksum); } else { throw new TezUncheckedException("Bad fetchedInput type while fetching shuffle data " + fetchedInput); @@ -982,10 +986,10 @@ public class Fetcher extends CallableWithNdc<FetchResult> { HttpConnectionParams params, FetchedInputAllocator inputManager, ApplicationId appId, int dagIdentifier, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed, Configuration conf, boolean localDiskFetchEnabled, String localHostname, int shufflePort, - boolean asyncHttp) { + boolean asyncHttp, boolean verifyDiskChecksum) { this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, dagIdentifier, jobTokenSecretMgr, srcNameTrimmed, conf, null, null, null, localDiskFetchEnabled, - false, localHostname, shufflePort, asyncHttp); + false, localHostname, shufflePort, asyncHttp, verifyDiskChecksum); } public FetcherBuilder(FetcherCallback fetcherCallback, @@ -994,10 +998,11 @@ public class Fetcher extends CallableWithNdc<FetchResult> { Configuration conf, RawLocalFileSystem localFs, LocalDirAllocator localDirAllocator, Path lockPath, boolean localDiskFetchEnabled, boolean sharedFetchEnabled, - String localHostname, int shufflePort, boolean asyncHttp) { + String localHostname, int shufflePort, boolean asyncHttp, boolean verifyDiskChecksum) { this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, dagIdentifier, jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator, - lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort, asyncHttp); + lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort, asyncHttp, + verifyDiskChecksum); } public FetcherBuilder setHttpConnectionParameters(HttpConnectionParams httpParams) { http://git-wip-us.apache.org/repos/asf/tez/blob/5dd47c68/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java index 685503c..ae646ea 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java @@ -140,22 +140,27 @@ public class ShuffleUtils { } public static void shuffleToDisk(OutputStream output, String hostIdentifier, - InputStream input, long compressedLength, long decompressedLength, Logger LOG, String identifier) - throws IOException { + InputStream input, long compressedLength, long decompressedLength, Logger LOG, String identifier, + boolean ifileReadAhead, int ifileReadAheadLength, boolean verifyChecksum) throws IOException { // Copy data to local-disk long bytesLeft = compressedLength; try { - final int BYTES_TO_READ = 64 * 1024; - byte[] buf = new byte[BYTES_TO_READ]; - while (bytesLeft > 0) { - int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ)); - if (n < 0) { - throw new IOException("read past end of stream reading " - + identifier); + if (verifyChecksum) { + bytesLeft -= IFile.Reader.readToDisk(output, input, compressedLength, + ifileReadAhead, ifileReadAheadLength); + } else { + final int BYTES_TO_READ = 64 * 1024; + byte[] buf = new byte[BYTES_TO_READ]; + while (bytesLeft > 0) { + int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ)); + if (n < 0) { + throw new IOException("read past end of stream reading " + + identifier); + } + output.write(buf, 0, n); + bytesLeft -= n; + // metrics.inputBytes(n); } - output.write(buf, 0, n); - bytesLeft -= n; - // metrics.inputBytes(n); } if (LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/tez/blob/5dd47c68/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index 7ca9a1f..c80713b 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -140,6 +140,7 @@ public class ShuffleManager implements FetcherCallback { private final CompressionCodec codec; private final boolean localDiskFetchEnabled; private final boolean sharedFetchEnabled; + private final boolean verifyDiskChecksum; private final int ifileBufferSize; private final boolean ifileReadAhead; @@ -202,6 +203,9 @@ public class ShuffleManager implements FetcherCallback { TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH_DEFAULT); this.sharedFetchEnabled = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH, TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH_DEFAULT); + this.verifyDiskChecksum = conf.getBoolean( + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM_DEFAULT); this.shufflePhaseTime = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_PHASE_TIME); this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED); @@ -412,7 +416,7 @@ public class ShuffleManager implements FetcherCallback { httpConnectionParams, inputManager, inputContext.getApplicationId(), inputContext.getDagIdentifier(), jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator, lockDisk, localDiskFetchEnabled, sharedFetchEnabled, - localhostName, shufflePort, asyncHttp); + localhostName, shufflePort, asyncHttp, verifyDiskChecksum); if (codec != null) { fetcherBuilder.setCompressionParameters(codec); http://git-wip-us.apache.org/repos/asf/tez/blob/5dd47c68/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java index 51bdf68..bcb75d2 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java @@ -59,6 +59,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { private final Configuration conf; private final boolean localDiskFetchEnabled; + private final boolean verifyDiskChecksum; private final TezCounter connectionErrs; private final TezCounter ioErrs; @@ -125,7 +126,8 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { String applicationId, int dagId, boolean asyncHttp, - boolean sslShuffle) { + boolean sslShuffle, + boolean verifyDiskChecksum) { this.scheduler = scheduler; this.allocator = allocator; this.metrics = metrics; @@ -159,6 +161,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { this.localDiskFetchEnabled = localDiskFetchEnabled; this.sslShuffle = sslShuffle; + this.verifyDiskChecksum = verifyDiskChecksum; this.logIdentifier = "fetcher [" + srcNameTrimmed + "] #" + id; } @@ -504,7 +507,9 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { ifileReadAheadLength, LOG, mapOutput.getAttemptIdentifier().toString()); } else if (mapOutput.getType() == Type.DISK) { ShuffleUtils.shuffleToDisk(mapOutput.getDisk(), host.getHostIdentifier(), - input, compressedLength, decompressedLength, LOG, mapOutput.getAttemptIdentifier().toString()); + input, compressedLength, decompressedLength, LOG, + mapOutput.getAttemptIdentifier().toString(), + ifileReadAhead, ifileReadAheadLength, verifyDiskChecksum); } else { throw new IOException("Unknown mapOutput type while fetching shuffle data:" + mapOutput.getType()); http://git-wip-us.apache.org/repos/asf/tez/blob/5dd47c68/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index 0a2b730..afd280b 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -235,6 +235,7 @@ class ShuffleScheduler { private final float minReqProgressFraction; private final float maxAllowedFailedFetchFraction; private final boolean checkFailedFetchSinceLastCompletion; + private final boolean verifyDiskChecksum; private volatile Thread shuffleSchedulerThread = null; @@ -388,6 +389,10 @@ class ShuffleScheduler { conf.getBoolean( TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR, TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR_DEFAULT); + this.verifyDiskChecksum = conf.getBoolean( + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM_DEFAULT); + /** * Setting to very high val can lead to Http 400 error. Cap it to 75; every attempt id would * be approximately 48 bytes; 48 * 75 = 3600 which should give some room for other info in URL. @@ -1347,7 +1352,8 @@ class ShuffleScheduler { shuffleMetrics, exceptionReporter, jobTokenSecretManager, ifileReadAhead, ifileReadAheadLength, codec, conf, localDiskFetchEnabled, localHostname, shufflePort, srcNameTrimmed, mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, - connectionErrsCounter, wrongReduceErrsCounter, applicationId, dagId, asyncHttp, sslShuffle); + connectionErrsCounter, wrongReduceErrsCounter, applicationId, dagId, asyncHttp, sslShuffle, + verifyDiskChecksum); } private class FetchFutureCallback implements FutureCallback<Void> { http://git-wip-us.apache.org/repos/asf/tez/blob/5dd47c68/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java index 6d8992e..a20182c 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java @@ -27,6 +27,7 @@ import java.util.Iterator; import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.annotations.VisibleForTesting; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -645,6 +646,43 @@ public class IFile { } } + /** + * Read entire IFile content to disk. + * + * @param out the output stream that will receive the data + * @param in the input stream containing the IFile data + * @param length the amount of data to read from the input + * @return the number of bytes copied + * @throws IOException + */ + public static long readToDisk(OutputStream out, InputStream in, long length, + boolean ifileReadAhead, int ifileReadAheadLength) + throws IOException { + final int BYTES_TO_READ = 64 * 1024; + byte[] buf = new byte[BYTES_TO_READ]; + + // copy the IFile header + if (length < HEADER.length) { + throw new IOException("Missing IFile header"); + } + IOUtils.readFully(in, buf, 0, HEADER.length); + verifyHeaderMagic(buf); + out.write(buf, 0, HEADER.length); + long bytesLeft = length - HEADER.length; + @SuppressWarnings("resource") + IFileInputStream ifInput = new IFileInputStream(in, bytesLeft, + ifileReadAhead, ifileReadAheadLength); + while (bytesLeft > 0) { + int n = ifInput.readWithChecksum(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ)); + if (n < 0) { + throw new IOException("read past end of stream"); + } + out.write(buf, 0, n); + bytesLeft -= n; + } + return length - bytesLeft; + } + public long getLength() { return fileLength - checksumIn.getSize(); } @@ -784,14 +822,17 @@ public class IFile { ++numRecordsRead; } - public static boolean isCompressedFlagEnabled(InputStream in) throws IOException { - byte[] header = new byte[HEADER.length]; - IOUtils.readFully(in, header, 0, HEADER.length); - + private static void verifyHeaderMagic(byte[] header) throws IOException { if (!(header[0] == 'T' && header[1] == 'I' && header[2] == 'F')) { throw new IOException("Not a valid ifile header"); } + } + + public static boolean isCompressedFlagEnabled(InputStream in) throws IOException { + byte[] header = new byte[HEADER.length]; + IOUtils.readFully(in, header, 0, HEADER.length); + verifyHeaderMagic(header); return (header[3] == 1); } http://git-wip-us.apache.org/repos/asf/tez/blob/5dd47c68/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java index d116242..c5853d4 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java @@ -125,7 +125,7 @@ public class IFileInputStream extends InputStream { if (curReadahead != null) { curReadahead.cancel(); } - if (currentOffset < dataLength) { + if (currentOffset < dataLength && !disableChecksumValidation) { byte[] t = new byte[Math.min((int) (Integer.MAX_VALUE & (dataLength - currentOffset)), 32 * 1024)]; while (currentOffset < dataLength) { @@ -300,7 +300,10 @@ public class IFileInputStream extends InputStream { return result; } - void disableChecksumValidation() { + /** + * Disable checksum validation when reading the stream + */ + public void disableChecksumValidation() { disableChecksumValidation = true; } } http://git-wip-us.apache.org/repos/asf/tez/blob/5dd47c68/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java index 5e367cf..9a2a23e 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java @@ -354,6 +354,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput { confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL); + confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT); http://git-wip-us.apache.org/repos/asf/tez/blob/5dd47c68/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java index dbbe23ff..ec9a191 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java @@ -263,6 +263,7 @@ public class UnorderedKVInput extends AbstractLogicalInput { confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL); + confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT); http://git-wip-us.apache.org/repos/asf/tez/blob/5dd47c68/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java index 0aa112e..bd0ea0f 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java @@ -71,7 +71,7 @@ public class TestFetcher { Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null, ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST, - PORT, false); + PORT, false, true); builder.assignWork(HOST, PORT, 0, Arrays.asList(srcAttempts)); Fetcher fetcher = spy(builder.build()); @@ -89,7 +89,7 @@ public class TestFetcher { // when enabled and hostname does not match use http fetch. builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null, ApplicationId.newInstance(0, 1), -1, null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST, - PORT, false); + PORT, false, true); builder.assignWork(HOST + "_OTHER", PORT, 0, Arrays.asList(srcAttempts)); fetcher = spy(builder.build()); @@ -104,7 +104,8 @@ public class TestFetcher { // when enabled and port does not match use http fetch. builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null, - ApplicationId.newInstance(0, 1), -1, null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST, PORT, false); + ApplicationId.newInstance(0, 1), -1, null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST, + PORT, false, true); builder.assignWork(HOST, PORT + 1, 0, Arrays.asList(srcAttempts)); fetcher = spy(builder.build()); @@ -121,7 +122,7 @@ public class TestFetcher { conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false); builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null, ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, DISABLE_LOCAL_FETCH, HOST, - PORT, false); + PORT, false, true); builder.assignWork(HOST, PORT, 0, Arrays.asList(srcAttempts)); fetcher = spy(builder.build()); @@ -154,7 +155,8 @@ public class TestFetcher { int partition = 42; FetcherCallback callback = mock(FetcherCallback.class); Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(callback, null, null, - ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, true, HOST, PORT, false); + ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, true, HOST, PORT, + false, true); builder.assignWork(HOST, PORT, partition, Arrays.asList(srcAttempts)); Fetcher fetcher = spy(builder.build()); @@ -272,7 +274,8 @@ public class TestFetcher { int partition = 42; FetcherCallback callback = mock(FetcherCallback.class); Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(callback, null, null, - ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, true, HOST, PORT, false); + ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, true, HOST, PORT, + false, true); builder.assignWork(HOST, PORT, partition, Arrays.asList(srcAttempts)); Fetcher fetcher = spy(builder.build()); fetcher.populateRemainingMap(new LinkedList<InputAttemptIdentifier>(Arrays.asList(srcAttempts))); http://git-wip-us.apache.org/repos/asf/tez/blob/5dd47c68/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java index 4ac1bca..c542030 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java @@ -4,6 +4,7 @@ import com.google.common.collect.Lists; import com.google.protobuf.ByteString; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataOutputBuffer; @@ -23,6 +24,7 @@ import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; import org.apache.tez.runtime.api.events.VertexManagerEvent; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.apache.tez.runtime.library.common.sort.impl.IFileOutputStream; import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord; import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord; import org.apache.tez.runtime.library.partitioner.HashPartitioner; @@ -33,9 +35,12 @@ import org.junit.Test; import org.slf4j.Logger; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.BitSet; import java.util.List; import java.util.Random; @@ -283,4 +288,26 @@ public class TestShuffleUtils { Assert.assertTrue(e.getMessage().contains(codecErrorMsg)); } } + + @Test + public void testShuffleToDiskChecksum() throws Exception { + // verify sending a stream of zeroes without checksum validation + // does not trigger an exception + byte[] bogusData = new byte[1000]; + Arrays.fill(bogusData, (byte) 0); + ByteArrayInputStream in = new ByteArrayInputStream(bogusData); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ShuffleUtils.shuffleToDisk(baos, "somehost", in, + bogusData.length, 2000, mock(Logger.class), "identifier", false, 0, false); + Assert.assertArrayEquals(bogusData, baos.toByteArray()); + + // verify sending same stream of zeroes with validation generates an exception + in.reset(); + try { + ShuffleUtils.shuffleToDisk(mock(OutputStream.class), "somehost", in, + bogusData.length, 2000, mock(Logger.class), "identifier", false, 0, true); + Assert.fail("shuffle was supposed to throw!"); + } catch (IOException e) { + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/5dd47c68/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java index 89d35f4..310f1b2 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java @@ -127,7 +127,8 @@ public class TestFetcher { new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, null, conf, false, HOST, PORT, "src vertex", mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, - wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false); + wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, + false, false, true); fetcher.call(); verify(scheduler).getMapsForHost(mapHost); @@ -155,7 +156,8 @@ public class TestFetcher { new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, - wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false); + wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, + false, false, true); // when local mode is enabled and host and port matches use local fetch FetcherOrderedGrouped spyFetcher = spy(fetcher); @@ -172,7 +174,8 @@ public class TestFetcher { new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, - wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false); + wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, + false, false ,true); spyFetcher = spy(fetcher); doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost); @@ -187,7 +190,8 @@ public class TestFetcher { new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, - wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false); + wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, + false, false, true); spyFetcher = spy(fetcher); doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost); @@ -201,7 +205,8 @@ public class TestFetcher { fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, null, conf, DISABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, - wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false); + wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, + false, false, true); spyFetcher = spy(fetcher); doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost); @@ -225,7 +230,8 @@ public class TestFetcher { MapHost host = new MapHost(HOST, PORT, 1); FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, null, conf, true, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, - wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false); + wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, + false, false, true); FetcherOrderedGrouped spyFetcher = spy(fetcher); @@ -369,7 +375,8 @@ public class TestFetcher { final MapHost host = new MapHost(HOST, PORT, 1); FetcherOrderedGrouped mockFetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, null, conf, false, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, - wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false); + wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, + false, false, true); final FetcherOrderedGrouped fetcher = spy(mockFetcher); @@ -460,7 +467,8 @@ public class TestFetcher { false, 0, null, conf, false, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, - wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, true, false); + wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, + true, false, true); final FetcherOrderedGrouped fetcher = spy(mockFetcher); fetcher.remaining = new LinkedHashMap<String, InputAttemptIdentifier>(); final List<InputAttemptIdentifier> srcAttempts = Arrays.asList( @@ -527,7 +535,8 @@ public class TestFetcher { new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, null, conf, false, HOST, PORT, "src vertex", mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, - wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false); + wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, + false, false, true); fetcher.populateRemainingMap(new LinkedList<InputAttemptIdentifier>(Arrays.asList(srcAttempts))); Assert.assertEquals(expectedSrcAttempts.length, fetcher.remaining.size()); Iterator<Entry<String, InputAttemptIdentifier>> iterator = fetcher.remaining.entrySet().iterator(); http://git-wip-us.apache.org/repos/asf/tez/blob/5dd47c68/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java index 1c269bd..24acc40 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java @@ -19,8 +19,10 @@ package org.apache.tez.runtime.library.common.sort.impl; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.Random; @@ -28,6 +30,7 @@ import java.util.Random; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -375,6 +378,43 @@ public class TestIFile { readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec); } + @Test(timeout = 20000) + public void testReadToDisk() throws IOException { + // verify sending a stream of zeroes generates an error + byte[] zeroData = new byte[1000]; + Arrays.fill(zeroData, (byte) 0); + ByteArrayInputStream in = new ByteArrayInputStream(zeroData); + try { + IFile.Reader.readToDisk(new ByteArrayOutputStream(), in, zeroData.length, false, 0); + fail("Exception should have been thrown"); + } catch (IOException e) { + } + + // verify sending same stream of zeroes with a valid IFile header still + // generates an error + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + baos.write(IFile.HEADER); + baos.write(zeroData); + try { + IFile.Reader.readToDisk(new ByteArrayOutputStream(), + new ByteArrayInputStream(baos.toByteArray()), zeroData.length, false, 0); + fail("Exception should have been thrown"); + } catch (IOException e) { + assertTrue(e instanceof ChecksumException); + } + + // verify valid data is copied properly + List<KVPair> data = KVDataGen.generateTestData(true, 0); + Writer writer = writeTestFile(false, false, data, codec); + baos.reset(); + IFile.Reader.readToDisk(baos, localFs.open(outputPath), writer.getCompressedLength(), + false, 0); + byte[] diskData = baos.toByteArray(); + Reader reader = new Reader(new ByteArrayInputStream(diskData), diskData.length, + codec, null, null, false, 0, 1024); + verifyData(reader, data); + reader.close(); + } /** * Test different options (RLE, repeat keys, compression) on reader/writer
