Repository: tez Updated Branches: refs/heads/branch-0.7 4a2316ba8 -> 1a5595032
TEZ-3582. Exception swallowed in PipelinedSorter causing incorrect results (rbalamohan) (cherry picked from commit abab526940f6353866d866b93d6da685edfa6014) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1a559503 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1a559503 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1a559503 Branch: refs/heads/branch-0.7 Commit: 1a5595032dbd6ddd0396dc3b0b3e251e603b06f4 Parents: 4a2316b Author: Rajesh Balamohan <[email protected]> Authored: Wed Jan 25 13:02:53 2017 +0530 Committer: Siddharth Seth <[email protected]> Committed: Wed Jan 25 16:29:18 2017 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../common/sort/impl/PipelinedSorter.java | 20 ++++++++++++++++---- 2 files changed, 17 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/1a559503/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 7204600..1a68c05 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3582. Exception swallowed in PipelinedSorter causing incorrect results TEZ-3559. TEZ_LIB_URIS doesn't work with schemes different than the defaultFS TEZ-3549. TaskAttemptImpl does not initialize TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS correctly TEZ-3537. ArrayIndexOutOfBoundsException with empty environment variables/Port YARN-3768 to Tez http://git-wip-us.apache.org/repos/asf/tez/blob/1a559503/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java index be9b10c..f4daf1c 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java @@ -258,6 +258,7 @@ public class PipelinedSorter extends ExternalSorter { } else { // queue up the sort SortTask task = new SortTask(span, sorter); + LOG.debug("Submitting span={} for sort", span.toString()); Future<SpanIterator> future = sortmaster.submit(task); merger.add(future); span = newSpan; @@ -834,8 +835,15 @@ public class PipelinedSorter extends ExternalSorter { items = 1024*1024; perItem = 16; } - newSpan = new SortSpan(remaining, items, perItem, - ConfigUtils.getIntermediateOutputKeyComparator(conf)); + final RawComparator newComparator = ConfigUtils.getIntermediateOutputKeyComparator(conf); + if (this.comparator == newComparator) { + LOG.warn("Same comparator used. comparator={}, newComparator={}," + + " hashCode: comparator={}, newComparator={}", + this.comparator, newComparator, + System.identityHashCode(this.comparator), + System.identityHashCode(newComparator)); + } + newSpan = new SortSpan(remaining, items, perItem, newComparator); newSpan.index = index+1; LOG.info(String.format(outputContext.getDestinationVertexName() + ": " + "New Span%d.length = %d, perItem = %d", newSpan.index, newSpan .length(), perItem) + ", counter:" + mapOutputRecordCounter.getValue()); @@ -1142,6 +1150,7 @@ public class PipelinedSorter extends ExternalSorter { } public final boolean ready() throws IOException, InterruptedException { + int numSpanItr = futures.size(); try { SpanIterator iter = null; while(this.futures.size() > 0) { @@ -1160,8 +1169,11 @@ public class PipelinedSorter extends ExternalSorter { LOG.info(outputContext.getDestinationVertexName() + ": " + "Heap = " + sb.toString()); return true; } catch(Exception e) { - LOG.info(outputContext.getDestinationVertexName() + ": " + e.toString()); - return false; + LOG.error("Heap size={}, total={}, eq={}, partition={}, gallop={}, totalItr={}," + + " futures.size={}, destVertexName={}", + heap.size(), total, eq, partition, gallop, numSpanItr, futures.size(), + outputContext.getDestinationVertexName(), e); + throw new IOException(e); } }
