Repository: tez Updated Branches: refs/heads/master 6af43ce40 -> 3be9c53b2
TEZ-2832. Support tests for both SimpleHistory logging and ATS logging (bikaS) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3be9c53b Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3be9c53b Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3be9c53b Branch: refs/heads/master Commit: 3be9c53b296ec75b672a14c38603c4b67485dce9 Parents: 6af43ce Author: Bikas Saha <[email protected]> Authored: Thu Sep 17 23:04:27 2015 -0700 Committer: Bikas Saha <[email protected]> Committed: Thu Sep 17 23:04:27 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../analyzer/plugins/CriticalPathAnalyzer.java | 16 +- .../org/apache/tez/analyzer/utils/SVGUtils.java | 9 +- .../org/apache/tez/analyzer/TestAnalyzer.java | 265 +++++++++++++------ 4 files changed, 197 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/3be9c53b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index eb646f2..180a26d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ Release 0.8.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2832. Support tests for both SimpleHistory logging and ATS logging TEZ-2840. MRInputLegacy.init should set splitInfoViaEvents. TEZ-2827. Increase timeout for TestFetcher testInputAttemptIdentifierMap TEZ-2774. Improvements and cleanup of logging for the AM and parts of the runtme. http://git-wip-us.apache.org/repos/asf/tez/blob/3be9c53b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java index 3eb7701..4062142 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java @@ -190,21 +190,31 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer { Collections.sort(attemptsList, TaskAttemptInfo.orderingOnAllocationTime()); // walk the list to record allocation time before the current attempt long containerPreviousAllocatedTime = 0; + int wavesForVertex = 1; for (TaskAttemptInfo containerAttempt : attemptsList) { if (containerAttempt.getTaskAttemptId().equals(attempt.getTaskAttemptId())) { break; } + if (containerAttempt.getTaskInfo().getVertexInfo().getVertexId().equals( + attempt.getTaskInfo().getVertexInfo().getVertexId())) { + // another task from the same vertex ran in this container. So there are multiple + // waves for this vertex on this container. + wavesForVertex++; + } System.out.println("Container: " + container.getId() + " running att: " + containerAttempt.getTaskAttemptId() + " wait att: " + attempt.getTaskAttemptId()); containerPreviousAllocatedTime += containerAttempt.getAllocationToEndTimeInterval(); } + if (wavesForVertex > 1) { + step.notes.add("Container ran multiple waves for this vertex."); + } if (containerPreviousAllocatedTime == 0) { - step.notes.add("Container " + container.getId() + " newly allocated."); + step.notes.add("Container newly allocated."); } else { if (containerPreviousAllocatedTime >= attempt.getCreationToAllocationTimeInterval()) { - step.notes.add("Container " + container.getId() + " was fully allocated"); + step.notes.add("Container was fully allocated"); } else { - step.notes.add("Container " + container.getId() + " allocated for " + + step.notes.add("Container in use for " + SVGUtils.getTimeStr(containerPreviousAllocatedTime) + " out of " + SVGUtils.getTimeStr(attempt.getCreationToAllocationTimeInterval()) + " of allocation wait time"); http://git-wip-us.apache.org/repos/asf/tez/blob/3be9c53b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java index 88a2105..2e94ec0 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java @@ -50,7 +50,6 @@ public class SVGUtils { private static final int STEP_GAP = 50; private static final int TEXT_SIZE = 20; private static final String RUNTIME_COLOR = "LightGreen"; - private static final String ERROR_COLOR = "Tomato"; private static final String ALLOCATION_OVERHEAD_COLOR = "GoldenRod"; private static final String LAUNCH_OVERHEAD_COLOR = "DarkSalmon"; private static final String BORDER_COLOR = "Sienna"; @@ -203,14 +202,14 @@ public class SVGUtils { addRectStr(launchTimeInterval, finishTimeInterval - launchTimeInterval, yOffset * STEP_GAP, STEP_GAP, RUNTIME_COLOR, BORDER_COLOR, RECT_OPACITY, titleStr); } else { - // no launch - so allocate to finish drawn + // no launch - so allocate to finish drawn - ended while launching addRectStr(allocationTimeInterval, finishTimeInterval - allocationTimeInterval, yOffset * STEP_GAP, - STEP_GAP, ERROR_COLOR, BORDER_COLOR, RECT_OPACITY, titleStr); + STEP_GAP, LAUNCH_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY, titleStr); } } else { - // no allocation - so create to finish drawn + // no allocation - so create to finish drawn - ended while allocating addRectStr(creationTimeInterval, finishTimeInterval - creationTimeInterval, yOffset * STEP_GAP, - STEP_GAP, ERROR_COLOR, BORDER_COLOR, RECT_OPACITY, titleStr); + STEP_GAP, ALLOCATION_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY, titleStr); } addTextStr((finishTimeInterval + creationTimeInterval) / 2, http://git-wip-us.apache.org/repos/asf/tez/blob/3be9c53b/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java index f54a15a..ca9250d 100644 --- a/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java +++ b/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java @@ -31,6 +31,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer; import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer.CriticalPathDependency; @@ -39,15 +41,17 @@ import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer.CriticalPathStep.Ent import org.apache.tez.client.TezClient; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService; +import org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService; import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.history.ATSImportTool; import org.apache.tez.history.parser.ATSFileParser; +import org.apache.tez.history.parser.SimpleHistoryParser; import org.apache.tez.history.parser.datamodel.DagInfo; -import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.test.SimpleTestDAG; import org.apache.tez.test.SimpleTestDAG3Vertices; import org.apache.tez.test.TestInput; @@ -72,6 +76,7 @@ public class TestAnalyzer { "target" + Path.SEPARATOR + TestAnalyzer.class.getName() + "-tmpDir"; private static String DOWNLOAD_DIR = TEST_ROOT_DIR + Path.SEPARATOR + "download"; private final static String SIMPLE_HISTORY_DIR = "/tmp/simplehistory/"; + private final static String HISTORY_TXT = "history.txt"; private static MiniDFSCluster dfsCluster; private static MiniTezClusterWithTimeline miniTezCluster; @@ -81,7 +86,8 @@ public class TestAnalyzer { private static TezClient tezSession = null; - private static int numDAGs = 0; + private boolean usingATS = true; + private boolean downloadedSimpleHistoryFile = false; @BeforeClass public static void setupClass() throws Exception { @@ -95,15 +101,11 @@ public class TestAnalyzer { conf.set("fs.defaultFS", fs.getUri().toString()); setupTezCluster(); - numDAGs = 0; } @AfterClass public static void tearDownClass() throws Exception { LOG.info("Stopping mini clusters"); - if (tezSession != null) { - tezSession.stop(); - } if (miniTezCluster != null) { miniTezCluster.stop(); miniTezCluster = null; @@ -113,8 +115,8 @@ public class TestAnalyzer { dfsCluster = null; } } - - public CriticalPathAnalyzer setupCPAnalyzer() { + + private CriticalPathAnalyzer setupCPAnalyzer() { Configuration analyzerConf = new Configuration(false); analyzerConf.setBoolean(CriticalPathAnalyzer.DRAW_SVG, false); CriticalPathAnalyzer cp = new CriticalPathAnalyzer(); @@ -122,27 +124,37 @@ public class TestAnalyzer { return cp; } - public static void setupTezCluster() throws Exception { - conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT, 3 * 1000); - conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT, 3 * 1000); - conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT, 2); - - //Enable per edge counters - conf.setBoolean(TezConfiguration.TEZ_TASK_GENERATE_COUNTERS_PER_IO, true); - conf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true); + private static void setupTezCluster() throws Exception { + // make the test run faster by speeding heartbeat frequency + conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true); conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, ATSHistoryLoggingService .class.getName()); - conf.set(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_DIR, SIMPLE_HISTORY_DIR); - miniTezCluster = new MiniTezClusterWithTimeline(TestAnalyzer.class.getName(), 1, 1, 1, true); miniTezCluster.init(conf); miniTezCluster.start(); + } + private TezConfiguration createCommonTezLog() throws Exception { TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig()); + + tezConf.setInt(TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX, 100); + Path remoteStagingDir = dfsCluster.getFileSystem().makeQualified(new Path(TEST_ROOT_DIR, String + .valueOf(new Random().nextInt(100000)))); + + tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, + remoteStagingDir.toString()); + tezConf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false); + + return tezConf; + } + + private void createTezSessionATS() throws Exception { + TezConfiguration tezConf = createCommonTezLog(); tezConf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); tezConf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, "0.0.0.0:8188"); tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true); @@ -158,19 +170,36 @@ public class TestAnalyzer { tezSession = TezClient.create("TestFaultTolerance", tezConf, true); tezSession.start(); + } + + private void createTezSessionSimpleHistory() throws Exception { + TezConfiguration tezConf = createCommonTezLog(); + tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, + SimpleHistoryLoggingService.class.getName()); + + tezConf.set(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_DIR, SIMPLE_HISTORY_DIR); + + Path remoteStagingDir = dfsCluster.getFileSystem().makeQualified(new Path(TEST_ROOT_DIR, String + .valueOf(new Random().nextInt(100000)))); + + tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, + remoteStagingDir.toString()); + tezConf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false); + tezSession = TezClient.create("TestFaultTolerance", tezConf, true); + tezSession.start(); } - StepCheck createStep(String attempt, CriticalPathDependency reason) { + private StepCheck createStep(String attempt, CriticalPathDependency reason) { return createStep(attempt, reason, null, null); } - StepCheck createStep(String attempt, CriticalPathDependency reason, + private StepCheck createStep(String attempt, CriticalPathDependency reason, TaskAttemptTerminationCause errCause, List<String> notes) { return new StepCheck(attempt, reason, errCause, notes); } - class StepCheck { + private class StepCheck { String attempt; // attempt is the TaskAttemptInfo short name with regex CriticalPathDependency reason; TaskAttemptTerminationCause errCause; @@ -196,10 +225,9 @@ public class TestAnalyzer { return notesStr; } } - - DagInfo runDAGAndVerify(DAG dag, DAGStatus.State finalState, List<StepCheck[]> steps) throws Exception { + + private void runDAG(DAG dag, DAGStatus.State finalState) throws Exception { tezSession.waitTillReady(); - numDAGs++; LOG.info("ABC Running DAG name: " + dag.getName()); DAGClient dagClient = tezSession.submitDAG(dag); DAGStatus dagStatus = dagClient.getDAGStatus(null); @@ -213,35 +241,57 @@ public class TestAnalyzer { } Assert.assertEquals(finalState, dagStatus.getState()); - - String dagId = TezDAGID.getInstance(tezSession.getAppMasterApplicationId(), numDAGs).toString(); + } + + private void verify(ApplicationId appId, int dagNum, List<StepCheck[]> steps) throws Exception { + String dagId = TezDAGID.getInstance(appId, dagNum).toString(); DagInfo dagInfo = getDagInfo(dagId); verifyCriticalPath(dagInfo, steps); - return dagInfo; } - DagInfo getDagInfo(String dagId) throws Exception { + private DagInfo getDagInfo(String dagId) throws Exception { // sleep for a bit to let ATS events be sent from AM - Thread.sleep(1000); - //Export the data from ATS - String[] args = { "--dagId=" + dagId, "--downloadDir=" + DOWNLOAD_DIR }; - - int result = ATSImportTool.process(args); - assertTrue(result == 0); - - //Parse ATS data and verify results - //Parse downloaded contents - File downloadedFile = new File(DOWNLOAD_DIR - + Path.SEPARATOR + dagId - + Path.SEPARATOR + dagId + ".zip"); - ATSFileParser parser = new ATSFileParser(downloadedFile); - DagInfo dagInfo = parser.getDAGData(dagId); - assertTrue(dagInfo.getDagId().equals(dagId)); + DagInfo dagInfo = null; + if (usingATS) { + //Export the data from ATS + String[] args = { "--dagId=" + dagId, "--downloadDir=" + DOWNLOAD_DIR }; + + int result = ATSImportTool.process(args); + assertTrue(result == 0); + + //Parse ATS data and verify results + //Parse downloaded contents + File downloadedFile = new File(DOWNLOAD_DIR + + Path.SEPARATOR + dagId + + Path.SEPARATOR + dagId + ".zip"); + ATSFileParser parser = new ATSFileParser(downloadedFile); + dagInfo = parser.getDAGData(dagId); + assertTrue(dagInfo.getDagId().equals(dagId)); + } else { + if (!downloadedSimpleHistoryFile) { + downloadedSimpleHistoryFile = true; + TezDAGID tezDAGID = TezDAGID.fromString(dagId); + ApplicationAttemptId applicationAttemptId = ApplicationAttemptId.newInstance(tezDAGID + .getApplicationId(), 1); + Path historyPath = new Path(miniTezCluster.getConfig().get("fs.defaultFS") + + SIMPLE_HISTORY_DIR + HISTORY_TXT + "." + + applicationAttemptId); + FileSystem fs = historyPath.getFileSystem(miniTezCluster.getConfig()); + + Path localPath = new Path(DOWNLOAD_DIR, HISTORY_TXT); + fs.copyToLocalFile(historyPath, localPath); + } + //Now parse via SimpleHistory + File localFile = new File(DOWNLOAD_DIR, HISTORY_TXT); + SimpleHistoryParser parser = new SimpleHistoryParser(localFile); + dagInfo = parser.getDAGData(dagId); + assertTrue(dagInfo.getDagId().equals(dagId)); + } return dagInfo; } - void verifyCriticalPath(DagInfo dagInfo, List<StepCheck[]> stepsOptions) throws Exception { + private void verifyCriticalPath(DagInfo dagInfo, List<StepCheck[]> stepsOptions) throws Exception { CriticalPathAnalyzer cp = setupCPAnalyzer(); cp.analyze(dagInfo); @@ -295,8 +345,51 @@ public class TestAnalyzer { } - @Test (timeout=60000) - public void testBasicSuccessScatterGather() throws Exception { + @Test (timeout=300000) + public void testWithATS() throws Exception { + usingATS = true; + createTezSessionATS(); + runTests(); + } + + @Test (timeout=300000) + public void testWithSimpleHistory() throws Exception { + usingATS = false; + createTezSessionSimpleHistory(); + runTests(); + } + + private void runTests() throws Exception { + ApplicationId appId = tezSession.getAppMasterApplicationId(); + List<List<StepCheck[]>> stepsOptions = Lists.newArrayList(); + // run all test dags + stepsOptions.add(testAttemptOfDownstreamVertexConnectedWithTwoUpstreamVerticesFailure()); + testInputFailureCausesRerunOfTwoVerticesWithoutExit(); + testMultiVersionInputFailureWithoutExit(); + testCascadingInputFailureWithoutExitSuccess(); + testTaskMultipleFailures(); + testBasicInputFailureWithoutExit(); + testBasicTaskFailure(); + testBasicSuccessScatterGather(); + testMultiVersionInputFailureWithExit(); + testBasicInputFailureWithExit(); + testInputFailureRerunCanSendOutputToTwoDownstreamVertices(); + testCascadingInputFailureWithExitSuccess(); + testInternalPreemption(); + + // close session to flush + if (tezSession != null) { + tezSession.stop(); + } + Thread.sleep((TezConstants.TEZ_DAG_SLEEP_TIME_BEFORE_EXIT*3)/2); + + // verify all dags + for (int i=0; i<stepsOptions.size(); ++i) { + verify(appId, i+1, stepsOptions.get(i)); + } + } + + private List<StepCheck[]> testBasicSuccessScatterGather() throws Exception { Configuration testConf = new Configuration(false); testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1); StepCheck[] check = { @@ -304,11 +397,11 @@ public class TestAnalyzer { createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY) }; DAG dag = SimpleTestDAG.createDAG("testBasicSuccessScatterGather", testConf); - runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check)); + runDAG(dag, DAGStatus.State.SUCCEEDED); + return Collections.singletonList(check); } - @Test (timeout=60000) - public void testBasicTaskFailure() throws Exception { + private List<StepCheck[]> testBasicTaskFailure() throws Exception { Configuration testConf = new Configuration(false); testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1); testConf.setBoolean(TestProcessor.getVertexConfName( @@ -324,11 +417,11 @@ public class TestAnalyzer { createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), }; DAG dag = SimpleTestDAG.createDAG("testBasicTaskFailure", testConf); - runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check)); + runDAG(dag, DAGStatus.State.SUCCEEDED); + return Collections.singletonList(check); } - @Test (timeout=60000) - public void testTaskMultipleFailures() throws Exception { + private List<StepCheck[]> testTaskMultipleFailures() throws Exception { Configuration testConf = new Configuration(false); testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1); testConf.setBoolean(TestProcessor.getVertexConfName( @@ -346,11 +439,11 @@ public class TestAnalyzer { }; DAG dag = SimpleTestDAG.createDAG("testTaskMultipleFailures", testConf); - runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check)); + runDAG(dag, DAGStatus.State.SUCCEEDED); + return Collections.singletonList(check); } - @Test (timeout=60000) - public void testBasicInputFailureWithExit() throws Exception { + private List<StepCheck[]> testBasicInputFailureWithExit() throws Exception { Configuration testConf = new Configuration(false); testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1); testConf.setBoolean(TestInput.getVertexConfName( @@ -372,11 +465,11 @@ public class TestAnalyzer { }; DAG dag = SimpleTestDAG.createDAG("testBasicInputFailureWithExit", testConf); - runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check)); + runDAG(dag, DAGStatus.State.SUCCEEDED); + return Collections.singletonList(check); } - @Test (timeout=60000) - public void testBasicInputFailureWithoutExit() throws Exception { + private List<StepCheck[]> testBasicInputFailureWithoutExit() throws Exception { Configuration testConf = new Configuration(false); testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1); testConf.setBoolean(TestInput.getVertexConfName( @@ -396,11 +489,11 @@ public class TestAnalyzer { }; DAG dag = SimpleTestDAG.createDAG("testBasicInputFailureWithoutExit", testConf); - runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check)); + runDAG(dag, DAGStatus.State.SUCCEEDED); + return Collections.singletonList(check); } - @Test (timeout=60000) - public void testMultiVersionInputFailureWithExit() throws Exception { + private List<StepCheck[]> testMultiVersionInputFailureWithExit() throws Exception { Configuration testConf = new Configuration(false); testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1); testConf.setBoolean(TestInput.getVertexConfName( @@ -426,11 +519,11 @@ public class TestAnalyzer { }; DAG dag = SimpleTestDAG.createDAG("testMultiVersionInputFailureWithExit", testConf); - runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check)); + runDAG(dag, DAGStatus.State.SUCCEEDED); + return Collections.singletonList(check); } - @Test (timeout=60000) - public void testMultiVersionInputFailureWithoutExit() throws Exception { + private List<StepCheck[]> testMultiVersionInputFailureWithoutExit() throws Exception { Configuration testConf = new Configuration(false); testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1); testConf.setBoolean(TestInput.getVertexConfName( @@ -454,7 +547,8 @@ public class TestAnalyzer { }; DAG dag = SimpleTestDAG.createDAG("testMultiVersionInputFailureWithoutExit", testConf); - runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check)); + runDAG(dag, DAGStatus.State.SUCCEEDED); + return Collections.singletonList(check); } /** @@ -510,8 +604,7 @@ public class TestAnalyzer { * AM vertex succeeded order is v1, v2, v1, v2, v3. * @throws Exception */ - @Test (timeout=60000) - public void testCascadingInputFailureWithoutExitSuccess() throws Exception { + private List<StepCheck[]> testCascadingInputFailureWithoutExitSuccess() throws Exception { Configuration testConf = new Configuration(false); setCascadingInputFailureConfig(testConf, false, 1); @@ -527,7 +620,8 @@ public class TestAnalyzer { DAG dag = SimpleTestDAG3Vertices.createDAG( "testCascadingInputFailureWithoutExitSuccess", testConf); - runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check)); + runDAG(dag, DAGStatus.State.SUCCEEDED); + return Collections.singletonList(check); } /** @@ -541,8 +635,7 @@ public class TestAnalyzer { * AM vertex succeeded order is v1, v2, v3, v1, v2, v3. * @throws Exception */ - @Test (timeout=60000) - public void testCascadingInputFailureWithExitSuccess() throws Exception { + private List<StepCheck[]> testCascadingInputFailureWithExitSuccess() throws Exception { Configuration testConf = new Configuration(false); setCascadingInputFailureConfig(testConf, true, 1); @@ -558,7 +651,8 @@ public class TestAnalyzer { DAG dag = SimpleTestDAG3Vertices.createDAG( "testCascadingInputFailureWithExitSuccess", testConf); - runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check)); + runDAG(dag, DAGStatus.State.SUCCEEDED); + return Collections.singletonList(check); } /** @@ -572,8 +666,7 @@ public class TestAnalyzer { * Also v1 retry attempts note show that it caused preemption of v3 * @throws Exception */ - @Test (timeout=60000) - public void testInternalPreemption() throws Exception { + private List<StepCheck[]> testInternalPreemption() throws Exception { Configuration testConf = new Configuration(false); setCascadingInputFailureConfig(testConf, false, 2); @@ -591,7 +684,8 @@ public class TestAnalyzer { DAG dag = SimpleTestDAG3Vertices.createDAG( "testInternalPreemption", testConf); - runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check)); + runDAG(dag, DAGStatus.State.SUCCEEDED); + return Collections.singletonList(check); } /** @@ -602,8 +696,7 @@ public class TestAnalyzer { * * @throws Exception */ - @Test (timeout=60000) - public void testInputFailureCausesRerunOfTwoVerticesWithoutExit() throws Exception { + private List<StepCheck[]> testInputFailureCausesRerunOfTwoVerticesWithoutExit() throws Exception { Configuration testConf = new Configuration(false); testConf.setInt(SimpleVTestDAG.TEZ_SIMPLE_V_DAG_NUM_TASKS, 1); testConf.setBoolean(TestInput.getVertexConfName( @@ -635,7 +728,8 @@ public class TestAnalyzer { DAG dag = SimpleVTestDAG.createDAG( "testInputFailureCausesRerunOfTwoVerticesWithoutExit", testConf); - runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check)); + runDAG(dag, DAGStatus.State.SUCCEEDED); + return Collections.singletonList(check); } /** @@ -647,8 +741,8 @@ public class TestAnalyzer { * * @throws Exception */ - @Test (timeout=60000) - public void testAttemptOfDownstreamVertexConnectedWithTwoUpstreamVerticesFailure() throws Exception { + private List<StepCheck[]> testAttemptOfDownstreamVertexConnectedWithTwoUpstreamVerticesFailure() + throws Exception { Configuration testConf = new Configuration(false); testConf.setInt(SimpleVTestDAG.TEZ_SIMPLE_V_DAG_NUM_TASKS, 1); testConf.setBoolean(TestProcessor.getVertexConfName( @@ -668,7 +762,8 @@ public class TestAnalyzer { DAG dag = SimpleVTestDAG.createDAG( "testAttemptOfDownstreamVertexConnectedWithTwoUpstreamVerticesFailure", testConf); - runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, Collections.singletonList(check)); + runDAG(dag, DAGStatus.State.SUCCEEDED); + return Collections.singletonList(check); } /** @@ -684,8 +779,7 @@ public class TestAnalyzer { * Also covers multiple consumer vertices report failure against same producer task. * @throws Exception */ - @Test (timeout=60000) - public void testInputFailureRerunCanSendOutputToTwoDownstreamVertices() throws Exception { + private List<StepCheck[]> testInputFailureRerunCanSendOutputToTwoDownstreamVertices() throws Exception { Configuration testConf = new Configuration(false); testConf.setInt(SimpleReverseVTestDAG.TEZ_SIMPLE_REVERSE_V_DAG_NUM_TASKS, 1); testConf.setBoolean(TestInput.getVertexConfName( @@ -714,18 +808,17 @@ public class TestAnalyzer { testConf.set(TestInput.getVertexConfName( TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v3"), "0"); - List<StepCheck[]> stepsOptions = Lists.newLinkedList(); - StepCheck[] check1 = { + StepCheck[] check = { // use regex for either vertices being possible on the path createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY), createStep("v[23] : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), createStep("v1 : 000000_1", CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY), createStep("v[23] : 000000_0", CriticalPathDependency.DATA_DEPENDENCY), }; - stepsOptions.add(check1); DAG dag = SimpleReverseVTestDAG.createDAG( "testInputFailureRerunCanSendOutputToTwoDownstreamVertices", testConf); - runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, stepsOptions); + runDAG(dag, DAGStatus.State.SUCCEEDED); + return Collections.singletonList(check); } } \ No newline at end of file
