Repository: tez Updated Branches: refs/heads/master 44046f8a4 -> decb4191b
TEZ-2313. Regression in handling obsolete events in ShuffleScheduler (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/decb4191 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/decb4191 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/decb4191 Branch: refs/heads/master Commit: decb4191bbada28749483b1b5af837fc87aff8bc Parents: 44046f8 Author: Rajesh Balamohan <[email protected]> Authored: Tue Apr 21 11:10:33 2015 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Tue Apr 21 11:10:33 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../common/shuffle/orderedgrouped/ShuffleScheduler.java | 8 +++++--- .../TestShuffleInputEventHandlerOrderedGrouped.java | 4 ++++ 3 files changed, 10 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/decb4191/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 64b3561..bd9ff6d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly ALL CHANGES: + TEZ-2313. Regression in handling obsolete events in ShuffleScheduler. TEZ-2212. Notify components on DAG completion. TEZ-2328. Add tez.runtime.sorter.class & rename tez.runtime.sort.threads to tez.runtime.pipelined.sorter.sort.threads. http://git-wip-us.apache.org/repos/asf/tez/blob/decb4191/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index d0b6346..a3d79ae 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -349,6 +349,10 @@ class ShuffleScheduler { + ", newAttemptNum=" + input.getAttemptNumber())); return false; } + + if (eventInfo == null) { + shuffleInfoEventsMap.put(input.getInputIdentifier(), new ShuffleEventInfo(input)); + } } return true; } @@ -523,9 +527,6 @@ class ShuffleScheduler { if (!validateInputAttemptForPipelinedShuffle(srcAttempt)) { return; } - if (shuffleInfoEventsMap.get(srcAttempt.getInputIdentifier()) == null) { - shuffleInfoEventsMap.put(srcAttempt.getInputIdentifier(), new ShuffleEventInfo(srcAttempt)); - } host.addKnownMap(srcAttempt); pathToIdentifierMap.put( @@ -542,6 +543,7 @@ class ShuffleScheduler { // The incoming srcAttempt does not contain a path component. LOG.info("Adding obsolete input: " + srcAttempt); if (shuffleInfoEventsMap.containsKey(srcAttempt.getInputIdentifier())) { + //Pipelined shuffle case (where shuffleInfoEventsMap gets populated). //Fail fast here. shuffle.reportException(new IOException(srcAttempt + " is marked as obsoleteInput, but it " + "exists in shuffleInfoEventMap. Some data could have been already merged " http://git-wip-us.apache.org/repos/asf/tez/blob/decb4191/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java index 460db01..eed9fd8 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java @@ -236,6 +236,8 @@ public class TestShuffleInputEventHandlerOrderedGrouped { PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0); verify(scheduler, times(1)).addKnownMapOutput(eq(HOST), eq(PORT), eq(1), eq(baseUri), eq(id1)); + assertTrue("Shuffle info events should not be empty for pipelined shuffle", + !scheduler.shuffleInfoEventsMap.isEmpty()); //Attempt #0 comes up. When processing this, it should report exception attemptNum = 0; @@ -263,6 +265,8 @@ public class TestShuffleInputEventHandlerOrderedGrouped { int partitionId = srcIdx; verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(baseUri), eq(expectedIdentifier)); + assertTrue("Shuffle info events should be empty for regular shuffle codepath", + scheduler.shuffleInfoEventsMap.isEmpty()); } @Test(timeout = 5000)
