Repository: tez Updated Branches: refs/heads/master fd0ed5244 -> 406721ab1
TEZ-2398. Flaky test: TestFaultTolerance (bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/406721ab Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/406721ab Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/406721ab Branch: refs/heads/master Commit: 406721ab17b58e29e5bf3585d556700c2ef04f05 Parents: fd0ed52 Author: Bikas Saha <[email protected]> Authored: Fri Sep 25 10:30:53 2015 -0700 Committer: Bikas Saha <[email protected]> Committed: Fri Sep 25 10:30:53 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 4 ++++ .../java/org/apache/tez/test/TestFaultTolerance.java | 5 +---- .../src/test/java/org/apache/tez/test/TestInput.java | 13 +++++++++---- 3 files changed, 14 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/406721ab/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 4c417da..01fa23e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ Release 0.8.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2398. Flaky test: TestFaultTolerance TEZ-2833. Dont create extra directory during ATS file download TEZ-2834. Make Tez preemption resilient to incorrect free resource reported by YARN @@ -189,6 +190,7 @@ Release 0.7.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES + TEZ-2398. Flaky test: TestFaultTolerance TEZ-2808. Race condition between preemption and container assignment TEZ-2853. Tez UI: task attempt page is coming empty TEZ-2716. DefaultSorter.isRleNeeded not thread safe @@ -456,6 +458,7 @@ Release 0.6.3: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2398. Flaky test: TestFaultTolerance TEZ-2808. Race condition between preemption and container assignment TEZ-2834. Make Tez preemption resilient to incorrect free resource reported by YARN @@ -682,6 +685,7 @@ INCOMPATIBLE CHANGES TEZ-2552. CRC errors can cause job to run for very long time in large jobs. ALL CHANGES: + TEZ-2398. Flaky test: TestFaultTolerance TEZ-2808. Race condition between preemption and container assignment TEZ-2834. Make Tez preemption resilient to incorrect free resource reported by YARN http://git-wip-us.apache.org/repos/asf/tez/blob/406721ab/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java index 0d27032..ec89c4b 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java @@ -74,7 +74,7 @@ public class TestFaultTolerance { } if (miniTezCluster == null) { miniTezCluster = new MiniTezCluster(TestFaultTolerance.class.getName(), - 4, 1, 1); + 3, 1, 1); Configuration miniTezconf = new Configuration(conf); miniTezconf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS miniTezCluster.init(miniTezconf); @@ -242,9 +242,6 @@ public class TestFaultTolerance { TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX, "v2"), "0,1"); testConf.setInt(TestProcessor.getVertexConfName( TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v2", 1), 5); - //v2 task0 attempt 0 succeeds instantly. - testConf.setInt(TestProcessor.getVertexConfName( - TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v2", 0), 3); DAG dag = SimpleTestDAG.createDAG("testBasicInputFailureWithExit", testConf); runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED); http://git-wip-us.apache.org/repos/asf/tez/blob/406721ab/tez-tests/src/test/java/org/apache/tez/test/TestInput.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java index c2b60c7..a4f3c27 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java @@ -75,7 +75,6 @@ public class TestInput extends AbstractLogicalInput { Set<Integer> failingInputIndices = Sets.newHashSet(); Integer failAll = new Integer(-1); int[] inputValues; - AtomicInteger numEventsReceived = new AtomicInteger(0); /** * Enable failure for this logical input @@ -193,7 +192,6 @@ public class TestInput extends AbstractLogicalInput { LOG.info("Failing input: " + msg); } } - int numEvents = numEventsReceived.get(); getContext().sendEvents(events); if (doFailAndExit) { String msg = "FailingInput exiting: " + getContext().getUniqueIdentifier(); @@ -201,7 +199,15 @@ public class TestInput extends AbstractLogicalInput { throwException(msg); } else { try { - while (numEvents == numEventsReceived.get()) { + // keep sending input read error until we receive the new input + // this check breaks the loop when we see a new input version + // thus, when multiple input versions arrive, this methods gets triggered + // for each version via wait-notify. But all events may have been processed in + // handleEvents() before the code reaches this point. Having this loop, makes + // it quickly exit for an older version if a newer version has been seen. + // however, if a newer version is not seen then it keeps sending input error + // indefinitely, by design. + while (lastInputReadyValue == inputReady.get()) { // keep sending events Thread.sleep(500); getContext().sendEvents(events); @@ -341,7 +347,6 @@ public class TestInput extends AbstractLogicalInput { @Override public void handleEvents(List<Event> inputEvents) throws Exception { for (Event event : inputEvents) { - numEventsReceived.incrementAndGet(); if (event instanceof DataMovementEvent) { DataMovementEvent dmEvent = (DataMovementEvent) event; numCompletedInputs++;
