Repository: tez Updated Branches: refs/heads/branch-0.7 c6edff292 -> 24957c9f8
TEZ-3237. Corrupted shuffle transfers to disk are not detected during transfer (jlowe) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/24957c9f Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/24957c9f Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/24957c9f Branch: refs/heads/branch-0.7 Commit: 24957c9f8a5d579e78f2f89b79f5cdabfed743c7 Parents: c6edff2 Author: Jason Lowe <[email protected]> Authored: Fri May 20 17:54:37 2016 +0000 Committer: Jason Lowe <[email protected]> Committed: Fri May 20 17:54:37 2016 +0000 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../library/api/TezRuntimeConfiguration.java | 10 ++++ .../runtime/library/common/shuffle/Fetcher.java | 19 +++++--- .../library/common/shuffle/ShuffleUtils.java | 29 +++++++----- .../common/shuffle/impl/ShuffleManager.java | 6 ++- .../orderedgrouped/FetcherOrderedGrouped.java | 9 +++- .../common/shuffle/orderedgrouped/Shuffle.java | 8 +++- .../orderedgrouped/ShuffleScheduler.java | 5 ++ .../runtime/library/common/sort/impl/IFile.java | 49 ++++++++++++++++++-- .../common/sort/impl/IFileInputStream.java | 7 ++- .../library/input/OrderedGroupedKVInput.java | 1 + .../runtime/library/input/UnorderedKVInput.java | 1 + .../library/common/shuffle/TestFetcher.java | 16 ++++--- .../common/shuffle/TestShuffleUtils.java | 27 +++++++++++ .../shuffle/orderedgrouped/TestFetcher.java | 8 ++-- .../library/common/sort/impl/TestIFile.java | 40 ++++++++++++++++ 16 files changed, 198 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/24957c9f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9d8972c..28b4936 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.7.2 Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3237. Corrupted shuffle transfers to disk are not detected during transfer TEZ-3258. Jvm Checker does not ignore DisableExplicitGC when checking JVM GC options. TEZ-3256. [Backport HADOOP-11032] Remove Guava Stopwatch dependency TEZ-2342. Reduce bytearray copy with TezEvent Serialization and deserialization http://git-wip-us.apache.org/repos/asf/tez/blob/24957c9f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java index 440c9f4..f942d22 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java @@ -315,6 +315,15 @@ public class TezRuntimeConfiguration { "shuffle.ssl.enable"; public static final boolean TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT = false; + /** + * Controls verification of data checksums when fetching data directly to + * disk. Enabling verification allows the fetcher to detect corrupted data + * and report the failure against the upstream task before the data reaches + * the Processor and causes the fetching task to fail. + */ + public static final String TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM = + TEZ_RUNTIME_PREFIX + "shuffle.fetch.verify-disk-checksum"; + public static final boolean TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM_DEFAULT = true; public static final String TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT = TEZ_RUNTIME_PREFIX + "shuffle.fetch.buffer.percent"; @@ -452,6 +461,7 @@ public class TezRuntimeConfiguration { tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT); tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE); tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_ENABLE_SSL); + tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM); tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT); tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT); tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT); http://git-wip-us.apache.org/repos/asf/tez/blob/24957c9f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java index be68cc1..3892578 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java @@ -121,6 +121,8 @@ public class Fetcher extends CallableWithNdc<FetchResult> { // Initiative value is 0, which means it hasn't retried yet. private long retryStartTime = 0; + private final boolean verifyDiskChecksum; + private final boolean isDebugEnabled = LOG.isDebugEnabled(); private Fetcher(FetcherCallback fetcherCallback, HttpConnectionParams params, @@ -132,7 +134,9 @@ public class Fetcher extends CallableWithNdc<FetchResult> { boolean localDiskFetchEnabled, boolean sharedFetchEnabled, String localHostname, - int shufflePort) { + int shufflePort, + boolean verifyDiskChecksum) { + this.verifyDiskChecksum = verifyDiskChecksum; this.fetcherCallback = fetcherCallback; this.inputManager = inputManager; this.jobTokenSecretMgr = jobTokenSecretManager; @@ -770,7 +774,8 @@ public class Fetcher extends CallableWithNdc<FetchResult> { } else if (fetchedInput.getType() == Type.DISK) { ShuffleUtils.shuffleToDisk(((DiskFetchedInput) fetchedInput).getOutputStream(), (host +":" +port), input, compressedLength, decompressedLength, LOG, - fetchedInput.getInputAttemptIdentifier().toString()); + fetchedInput.getInputAttemptIdentifier().toString(), + ifileReadAhead, ifileReadAheadLength, verifyDiskChecksum); } else { throw new TezUncheckedException("Bad fetchedInput type while fetching shuffle data " + fetchedInput); @@ -935,10 +940,11 @@ public class Fetcher extends CallableWithNdc<FetchResult> { public FetcherBuilder(FetcherCallback fetcherCallback, HttpConnectionParams params, FetchedInputAllocator inputManager, ApplicationId appId, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed, - Configuration conf, boolean localDiskFetchEnabled, String localHostname, int shufflePort) { + Configuration conf, boolean localDiskFetchEnabled, String localHostname, + int shufflePort, boolean verifyDiskChecksum) { this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, jobTokenSecretMgr, srcNameTrimmed, conf, null, null, null, localDiskFetchEnabled, - false, localHostname, shufflePort); + false, localHostname, shufflePort, verifyDiskChecksum); } public FetcherBuilder(FetcherCallback fetcherCallback, @@ -947,10 +953,11 @@ public class Fetcher extends CallableWithNdc<FetchResult> { Configuration conf, RawLocalFileSystem localFs, LocalDirAllocator localDirAllocator, Path lockPath, boolean localDiskFetchEnabled, boolean sharedFetchEnabled, - String localHostname, int shufflePort) { + String localHostname, int shufflePort, boolean verifyDiskChecksum) { this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator, - lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort); + lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, + shufflePort, verifyDiskChecksum); } public FetcherBuilder setHttpConnectionParameters(HttpConnectionParams httpParams) { http://git-wip-us.apache.org/repos/asf/tez/blob/24957c9f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java index 60e53f8..ed4c510 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java @@ -137,22 +137,27 @@ public class ShuffleUtils { } public static void shuffleToDisk(OutputStream output, String hostIdentifier, - InputStream input, long compressedLength, long decompressedLength, Logger LOG, String identifier) - throws IOException { + InputStream input, long compressedLength, long decompressedLength, Logger LOG, String identifier, + boolean ifileReadAhead, int ifileReadAheadLength, boolean verifyChecksum) throws IOException { // Copy data to local-disk long bytesLeft = compressedLength; try { - final int BYTES_TO_READ = 64 * 1024; - byte[] buf = new byte[BYTES_TO_READ]; - while (bytesLeft > 0) { - int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ)); - if (n < 0) { - throw new IOException("read past end of stream reading " - + identifier); + if (verifyChecksum) { + bytesLeft -= IFile.Reader.readToDisk(output, input, compressedLength, + ifileReadAhead, ifileReadAheadLength); + } else { + final int BYTES_TO_READ = 64 * 1024; + byte[] buf = new byte[BYTES_TO_READ]; + while (bytesLeft > 0) { + int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ)); + if (n < 0) { + throw new IOException("read past end of stream reading " + + identifier); + } + output.write(buf, 0, n); + bytesLeft -= n; + // metrics.inputBytes(n); } - output.write(buf, 0, n); - bytesLeft -= n; - // metrics.inputBytes(n); } if (LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/tez/blob/24957c9f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index e5ecf06..4c78711 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -136,6 +136,7 @@ public class ShuffleManager implements FetcherCallback { private final CompressionCodec codec; private final boolean localDiskFetchEnabled; private final boolean sharedFetchEnabled; + private final boolean verifyDiskChecksum; private final int ifileBufferSize; private final boolean ifileReadAhead; @@ -198,6 +199,9 @@ public class ShuffleManager implements FetcherCallback { TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH_DEFAULT); this.sharedFetchEnabled = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH, TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH_DEFAULT); + this.verifyDiskChecksum = conf.getBoolean( + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM_DEFAULT); this.shufflePhaseTime = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_PHASE_TIME); this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED); @@ -402,7 +406,7 @@ public class ShuffleManager implements FetcherCallback { httpConnectionParams, inputManager, inputContext.getApplicationId(), jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator, lockDisk, localDiskFetchEnabled, sharedFetchEnabled, - localhostName, shufflePort); + localhostName, shufflePort, verifyDiskChecksum); if (codec != null) { fetcherBuilder.setCompressionParameters(codec); http://git-wip-us.apache.org/repos/asf/tez/blob/24957c9f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java index 385ed48..a857a9b 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java @@ -56,6 +56,7 @@ class FetcherOrderedGrouped extends Thread { private static final Logger LOG = LoggerFactory.getLogger(FetcherOrderedGrouped.class); private final Configuration conf; private final boolean localDiskFetchEnabled; + private final boolean verifyDiskChecksum; private enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP, CONNECTION, WRONG_REDUCE} @@ -111,7 +112,8 @@ class FetcherOrderedGrouped extends Thread { InputContext inputContext, Configuration conf, boolean localDiskFetchEnabled, String localHostname, - int shufflePort) throws IOException { + int shufflePort, + boolean verifyDiskChecksum) throws IOException { setDaemon(true); this.scheduler = scheduler; this.merger = merger; @@ -148,6 +150,7 @@ class FetcherOrderedGrouped extends Thread { this.localDiskFetchEnabled = localDiskFetchEnabled; this.sslShuffle = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL, TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT); + this.verifyDiskChecksum = verifyDiskChecksum; this.logIdentifier = "fetcher {" + TezUtilsInternal .cleanVertexName(inputContext.getSourceVertexName()) + "} #" + id; @@ -512,7 +515,9 @@ class FetcherOrderedGrouped extends Thread { ifileReadAheadLength, LOG, mapOutput.getAttemptIdentifier().toString()); } else if (mapOutput.getType() == Type.DISK) { ShuffleUtils.shuffleToDisk(mapOutput.getDisk(), host.getHostIdentifier(), - input, compressedLength, decompressedLength, LOG, mapOutput.getAttemptIdentifier().toString()); + input, compressedLength, decompressedLength, LOG, + mapOutput.getAttemptIdentifier().toString(), + ifileReadAhead, ifileReadAheadLength, verifyDiskChecksum); } else { throw new IOException("Unknown mapOutput type while fetching shuffle data:" + mapOutput.getType()); http://git-wip-us.apache.org/repos/asf/tez/blob/24957c9f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java index efce6d2..8aa815a 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java @@ -95,6 +95,7 @@ public class Shuffle implements ExceptionReporter { private final boolean ifileReadAhead; private final int ifileReadAheadLength; private final int numFetchers; + private final boolean verifyDiskChecksum; private final boolean localDiskFetchEnabled; private final String localHostname; private final int shufflePort; @@ -157,6 +158,10 @@ public class Shuffle implements ExceptionReporter { this.ifileReadAheadLength = 0; } + this.verifyDiskChecksum = conf.getBoolean( + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM_DEFAULT); + Combiner combiner = TezRuntimeUtils.instantiateCombiner(conf, inputContext); FileSystem localFS = FileSystem.getLocal(this.conf); @@ -344,7 +349,8 @@ public class Shuffle implements ExceptionReporter { FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger, metrics, Shuffle.this, jobTokenSecretMgr, ifileReadAhead, ifileReadAheadLength, - codec, inputContext, conf, localDiskFetchEnabled, localHostname, shufflePort); + codec, inputContext, conf, localDiskFetchEnabled, localHostname, shufflePort, + verifyDiskChecksum); fetchers.add(fetcher); fetcher.start(); } http://git-wip-us.apache.org/repos/asf/tez/blob/24957c9f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index 65b195a..a145937 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -175,6 +175,7 @@ class ShuffleScheduler { private final float minReqProgressFraction; private final float maxAllowedFailedFetchFraction; private final boolean checkFailedFetchSinceLastCompletion; + private final boolean verifyDiskChecksum; private long totalBytesShuffledTillNow = 0; private final DecimalFormat mbpsFormat = new DecimalFormat("0.00"); @@ -269,6 +270,10 @@ class ShuffleScheduler { conf.getBoolean( TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR, TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR_DEFAULT); + this.verifyDiskChecksum = conf.getBoolean( + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM_DEFAULT); + /** * Setting to very high val can lead to Http 400 error. Cap it to 75; every attempt id would * be approximately 48 bytes; 48 * 75 = 3600 which should give some room for other info in URL. http://git-wip-us.apache.org/repos/asf/tez/blob/24957c9f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java index 6d8992e..a20182c 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java @@ -27,6 +27,7 @@ import java.util.Iterator; import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.annotations.VisibleForTesting; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -645,6 +646,43 @@ public class IFile { } } + /** + * Read entire IFile content to disk. + * + * @param out the output stream that will receive the data + * @param in the input stream containing the IFile data + * @param length the amount of data to read from the input + * @return the number of bytes copied + * @throws IOException + */ + public static long readToDisk(OutputStream out, InputStream in, long length, + boolean ifileReadAhead, int ifileReadAheadLength) + throws IOException { + final int BYTES_TO_READ = 64 * 1024; + byte[] buf = new byte[BYTES_TO_READ]; + + // copy the IFile header + if (length < HEADER.length) { + throw new IOException("Missing IFile header"); + } + IOUtils.readFully(in, buf, 0, HEADER.length); + verifyHeaderMagic(buf); + out.write(buf, 0, HEADER.length); + long bytesLeft = length - HEADER.length; + @SuppressWarnings("resource") + IFileInputStream ifInput = new IFileInputStream(in, bytesLeft, + ifileReadAhead, ifileReadAheadLength); + while (bytesLeft > 0) { + int n = ifInput.readWithChecksum(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ)); + if (n < 0) { + throw new IOException("read past end of stream"); + } + out.write(buf, 0, n); + bytesLeft -= n; + } + return length - bytesLeft; + } + public long getLength() { return fileLength - checksumIn.getSize(); } @@ -784,14 +822,17 @@ public class IFile { ++numRecordsRead; } - public static boolean isCompressedFlagEnabled(InputStream in) throws IOException { - byte[] header = new byte[HEADER.length]; - IOUtils.readFully(in, header, 0, HEADER.length); - + private static void verifyHeaderMagic(byte[] header) throws IOException { if (!(header[0] == 'T' && header[1] == 'I' && header[2] == 'F')) { throw new IOException("Not a valid ifile header"); } + } + + public static boolean isCompressedFlagEnabled(InputStream in) throws IOException { + byte[] header = new byte[HEADER.length]; + IOUtils.readFully(in, header, 0, HEADER.length); + verifyHeaderMagic(header); return (header[3] == 1); } http://git-wip-us.apache.org/repos/asf/tez/blob/24957c9f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java index d116242..c5853d4 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java @@ -125,7 +125,7 @@ public class IFileInputStream extends InputStream { if (curReadahead != null) { curReadahead.cancel(); } - if (currentOffset < dataLength) { + if (currentOffset < dataLength && !disableChecksumValidation) { byte[] t = new byte[Math.min((int) (Integer.MAX_VALUE & (dataLength - currentOffset)), 32 * 1024)]; while (currentOffset < dataLength) { @@ -300,7 +300,10 @@ public class IFileInputStream extends InputStream { return result; } - void disableChecksumValidation() { + /** + * Disable checksum validation when reading the stream + */ + public void disableChecksumValidation() { disableChecksumValidation = true; } } http://git-wip-us.apache.org/repos/asf/tez/blob/24957c9f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java index 278c8d0..40db1c4 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java @@ -344,6 +344,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput { confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL); + confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT); http://git-wip-us.apache.org/repos/asf/tez/blob/24957c9f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java index f1b15d2..2cb317a 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java @@ -261,6 +261,7 @@ public class UnorderedKVInput extends AbstractLogicalInput { confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL); + confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT); http://git-wip-us.apache.org/repos/asf/tez/blob/24957c9f/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java index 4ef187d..9af8d92 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java @@ -70,7 +70,8 @@ public class TestFetcher { final boolean DISABLE_LOCAL_FETCH = false; Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null, - ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST, PORT); + ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, + HOST, PORT, true); builder.assignWork(HOST, PORT, 0, Arrays.asList(srcAttempts)); Fetcher fetcher = spy(builder.build()); @@ -87,8 +88,8 @@ public class TestFetcher { // when enabled and hostname does not match use http fetch. builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null, - ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST, - PORT); + ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, + HOST, PORT, true); builder.assignWork(HOST + "_OTHER", PORT, 0, Arrays.asList(srcAttempts)); fetcher = spy(builder.build()); @@ -103,7 +104,8 @@ public class TestFetcher { // when enabled and port does not match use http fetch. builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null, - ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST, PORT); + ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, + HOST, PORT, true); builder.assignWork(HOST, PORT + 1, 0, Arrays.asList(srcAttempts)); fetcher = spy(builder.build()); @@ -119,7 +121,8 @@ public class TestFetcher { // When disabled use http fetch conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false); builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null, - ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, DISABLE_LOCAL_FETCH, HOST, PORT); + ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, DISABLE_LOCAL_FETCH, + HOST, PORT, true); builder.assignWork(HOST, PORT, 0, Arrays.asList(srcAttempts)); fetcher = spy(builder.build()); @@ -152,7 +155,8 @@ public class TestFetcher { int partition = 42; FetcherCallback callback = mock(FetcherCallback.class); Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(callback, null, null, - ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, true, HOST, PORT); + ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, true, + HOST, PORT, true); builder.assignWork(HOST, PORT, partition, Arrays.asList(srcAttempts)); Fetcher fetcher = spy(builder.build()); http://git-wip-us.apache.org/repos/asf/tez/blob/24957c9f/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java index 4ac1bca..c542030 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java @@ -4,6 +4,7 @@ import com.google.common.collect.Lists; import com.google.protobuf.ByteString; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataOutputBuffer; @@ -23,6 +24,7 @@ import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; import org.apache.tez.runtime.api.events.VertexManagerEvent; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.apache.tez.runtime.library.common.sort.impl.IFileOutputStream; import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord; import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord; import org.apache.tez.runtime.library.partitioner.HashPartitioner; @@ -33,9 +35,12 @@ import org.junit.Test; import org.slf4j.Logger; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.BitSet; import java.util.List; import java.util.Random; @@ -283,4 +288,26 @@ public class TestShuffleUtils { Assert.assertTrue(e.getMessage().contains(codecErrorMsg)); } } + + @Test + public void testShuffleToDiskChecksum() throws Exception { + // verify sending a stream of zeroes without checksum validation + // does not trigger an exception + byte[] bogusData = new byte[1000]; + Arrays.fill(bogusData, (byte) 0); + ByteArrayInputStream in = new ByteArrayInputStream(bogusData); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ShuffleUtils.shuffleToDisk(baos, "somehost", in, + bogusData.length, 2000, mock(Logger.class), "identifier", false, 0, false); + Assert.assertArrayEquals(bogusData, baos.toByteArray()); + + // verify sending same stream of zeroes with validation generates an exception + in.reset(); + try { + ShuffleUtils.shuffleToDisk(mock(OutputStream.class), "somehost", in, + bogusData.length, 2000, mock(Logger.class), "identifier", false, 0, true); + Assert.fail("shuffle was supposed to throw!"); + } catch (IOException e) { + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/24957c9f/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java index 531c18b..5ab7fd2 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java @@ -94,7 +94,7 @@ public class TestFetcher { MapHost mapHost = new MapHost(HOST, PORT, 0); FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, - false, 0, null, inputContext, conf, ENABLE_LOCAL_FETCH, HOST, PORT); + false, 0, null, inputContext, conf, ENABLE_LOCAL_FETCH, HOST, PORT, true); // when local mode is enabled and host and port matches use local fetch FetcherOrderedGrouped spyFetcher = spy(fetcher); @@ -131,7 +131,7 @@ public class TestFetcher { //if local fetch is not enabled mapHost = new MapHost(HOST, PORT, 0); fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, - false, 0, null, inputContext, conf, DISABLE_LOCAL_FETCH, HOST, PORT); + false, 0, null, inputContext, conf, DISABLE_LOCAL_FETCH, HOST, PORT, true); spyFetcher = spy(fetcher); doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost); doReturn(mapHost).when(scheduler).getHost(); @@ -156,7 +156,7 @@ public class TestFetcher { FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, - false, 0, null, inputContext, conf, true, HOST, PORT); + false, 0, null, inputContext, conf, true, HOST, PORT, true); FetcherOrderedGrouped spyFetcher = spy(fetcher); MapHost host = new MapHost(HOST, PORT, 1); @@ -299,7 +299,7 @@ public class TestFetcher { ShuffleUtils.constructHttpShuffleConnectionParams(conf); FetcherOrderedGrouped mockFetcher = new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger, metrics, shuffle, null, - false, 0, null, inputContext, conf, false, HOST, PORT); + false, 0, null, inputContext, conf, false, HOST, PORT, true); final FetcherOrderedGrouped fetcher = spy(mockFetcher); final MapHost host = new MapHost(HOST, PORT, 1); http://git-wip-us.apache.org/repos/asf/tez/blob/24957c9f/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java index 1c269bd..24acc40 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java @@ -19,8 +19,10 @@ package org.apache.tez.runtime.library.common.sort.impl; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.Random; @@ -28,6 +30,7 @@ import java.util.Random; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -375,6 +378,43 @@ public class TestIFile { readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec); } + @Test(timeout = 20000) + public void testReadToDisk() throws IOException { + // verify sending a stream of zeroes generates an error + byte[] zeroData = new byte[1000]; + Arrays.fill(zeroData, (byte) 0); + ByteArrayInputStream in = new ByteArrayInputStream(zeroData); + try { + IFile.Reader.readToDisk(new ByteArrayOutputStream(), in, zeroData.length, false, 0); + fail("Exception should have been thrown"); + } catch (IOException e) { + } + + // verify sending same stream of zeroes with a valid IFile header still + // generates an error + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + baos.write(IFile.HEADER); + baos.write(zeroData); + try { + IFile.Reader.readToDisk(new ByteArrayOutputStream(), + new ByteArrayInputStream(baos.toByteArray()), zeroData.length, false, 0); + fail("Exception should have been thrown"); + } catch (IOException e) { + assertTrue(e instanceof ChecksumException); + } + + // verify valid data is copied properly + List<KVPair> data = KVDataGen.generateTestData(true, 0); + Writer writer = writeTestFile(false, false, data, codec); + baos.reset(); + IFile.Reader.readToDisk(baos, localFs.open(outputPath), writer.getCompressedLength(), + false, 0); + byte[] diskData = baos.toByteArray(); + Reader reader = new Reader(new ByteArrayInputStream(diskData), diskData.length, + codec, null, null, false, 0, 1024); + verifyData(reader, data); + reader.close(); + } /** * Test different options (RLE, repeat keys, compression) on reader/writer
