Repository: tez
Updated Branches:
  refs/heads/master 6af43ce40 -> 3be9c53b2


TEZ-2832. Support tests for both SimpleHistory logging and ATS logging (bikaS)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3be9c53b
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3be9c53b
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3be9c53b

Branch: refs/heads/master
Commit: 3be9c53b296ec75b672a14c38603c4b67485dce9
Parents: 6af43ce
Author: Bikas Saha <[email protected]>
Authored: Thu Sep 17 23:04:27 2015 -0700
Committer: Bikas Saha <[email protected]>
Committed: Thu Sep 17 23:04:27 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../analyzer/plugins/CriticalPathAnalyzer.java  |  16 +-
 .../org/apache/tez/analyzer/utils/SVGUtils.java |   9 +-
 .../org/apache/tez/analyzer/TestAnalyzer.java   | 265 +++++++++++++------
 4 files changed, 197 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/3be9c53b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index eb646f2..180a26d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.8.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2832. Support tests for both SimpleHistory logging and ATS logging
   TEZ-2840. MRInputLegacy.init should set splitInfoViaEvents.
   TEZ-2827. Increase timeout for TestFetcher testInputAttemptIdentifierMap
   TEZ-2774. Improvements and cleanup of logging for the AM and parts of the 
runtme.

http://git-wip-us.apache.org/repos/asf/tez/blob/3be9c53b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
index 3eb7701..4062142 100644
--- 
a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
+++ 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java
@@ -190,21 +190,31 @@ public class CriticalPathAnalyzer extends TezAnalyzerBase 
implements Analyzer {
           Collections.sort(attemptsList, 
TaskAttemptInfo.orderingOnAllocationTime());
           // walk the list to record allocation time before the current attempt
           long containerPreviousAllocatedTime = 0;
+          int wavesForVertex = 1;
           for (TaskAttemptInfo containerAttempt : attemptsList) {
             if 
(containerAttempt.getTaskAttemptId().equals(attempt.getTaskAttemptId())) {
               break;
             }
+            if 
(containerAttempt.getTaskInfo().getVertexInfo().getVertexId().equals(
+                attempt.getTaskInfo().getVertexInfo().getVertexId())) {
+              // another task from the same vertex ran in this container. So 
there are multiple 
+              // waves for this vertex on this container.
+              wavesForVertex++;
+            }
             System.out.println("Container: " + container.getId() + " running 
att: " + 
             containerAttempt.getTaskAttemptId() + " wait att: " + 
attempt.getTaskAttemptId());
             containerPreviousAllocatedTime += 
containerAttempt.getAllocationToEndTimeInterval();
           }
+          if (wavesForVertex > 1) {
+            step.notes.add("Container ran multiple waves for this vertex.");
+          }
           if (containerPreviousAllocatedTime == 0) {
-            step.notes.add("Container " + container.getId() + " newly 
allocated.");
+            step.notes.add("Container newly allocated.");
           } else {
             if (containerPreviousAllocatedTime >= 
attempt.getCreationToAllocationTimeInterval()) {
-              step.notes.add("Container " + container.getId() + " was fully 
allocated");
+              step.notes.add("Container was fully allocated");
             } else {
-              step.notes.add("Container " + container.getId() + " allocated 
for " + 
+              step.notes.add("Container in use for " + 
               SVGUtils.getTimeStr(containerPreviousAllocatedTime) + " out of " 
+
                   
SVGUtils.getTimeStr(attempt.getCreationToAllocationTimeInterval()) + 
                   " of allocation wait time");

http://git-wip-us.apache.org/repos/asf/tez/blob/3be9c53b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
----------------------------------------------------------------------
diff --git 
a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
index 88a2105..2e94ec0 100644
--- 
a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
+++ 
b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/SVGUtils.java
@@ -50,7 +50,6 @@ public class SVGUtils {
   private static final int STEP_GAP = 50;
   private static final int TEXT_SIZE = 20;
   private static final String RUNTIME_COLOR = "LightGreen";
-  private static final String ERROR_COLOR = "Tomato";
   private static final String ALLOCATION_OVERHEAD_COLOR = "GoldenRod";
   private static final String LAUNCH_OVERHEAD_COLOR = "DarkSalmon";
   private static final String BORDER_COLOR = "Sienna";
@@ -203,14 +202,14 @@ public class SVGUtils {
           addRectStr(launchTimeInterval, finishTimeInterval - 
launchTimeInterval, yOffset * STEP_GAP,
               STEP_GAP, RUNTIME_COLOR, BORDER_COLOR, RECT_OPACITY, titleStr);
         } else {
-          // no launch - so allocate to finish drawn
+          // no launch - so allocate to finish drawn - ended while launching
           addRectStr(allocationTimeInterval, finishTimeInterval - 
allocationTimeInterval, yOffset * STEP_GAP,
-              STEP_GAP, ERROR_COLOR, BORDER_COLOR, RECT_OPACITY, titleStr);    
    
+              STEP_GAP, LAUNCH_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY, 
titleStr);        
         }
       } else {
-        // no allocation - so create to finish drawn
+        // no allocation - so create to finish drawn - ended while allocating
         addRectStr(creationTimeInterval, finishTimeInterval - 
creationTimeInterval, yOffset * STEP_GAP,
-            STEP_GAP, ERROR_COLOR, BORDER_COLOR, RECT_OPACITY, titleStr);      
  
+            STEP_GAP, ALLOCATION_OVERHEAD_COLOR, BORDER_COLOR, RECT_OPACITY, 
titleStr);        
       }
 
       addTextStr((finishTimeInterval + creationTimeInterval) / 2,

http://git-wip-us.apache.org/repos/asf/tez/blob/3be9c53b/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
 
b/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
index f54a15a..ca9250d 100644
--- 
a/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
+++ 
b/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
@@ -31,6 +31,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.tez.analyzer.plugins.CriticalPathAnalyzer;
 import 
org.apache.tez.analyzer.plugins.CriticalPathAnalyzer.CriticalPathDependency;
@@ -39,15 +41,17 @@ import 
org.apache.tez.analyzer.plugins.CriticalPathAnalyzer.CriticalPathStep.Ent
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService;
+import org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService;
 import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.history.ATSImportTool;
 import org.apache.tez.history.parser.ATSFileParser;
+import org.apache.tez.history.parser.SimpleHistoryParser;
 import org.apache.tez.history.parser.datamodel.DagInfo;
-import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.test.SimpleTestDAG;
 import org.apache.tez.test.SimpleTestDAG3Vertices;
 import org.apache.tez.test.TestInput;
@@ -72,6 +76,7 @@ public class TestAnalyzer {
       "target" + Path.SEPARATOR + TestAnalyzer.class.getName() + "-tmpDir";
   private static String DOWNLOAD_DIR = TEST_ROOT_DIR + Path.SEPARATOR + 
"download";
   private final static String SIMPLE_HISTORY_DIR = "/tmp/simplehistory/";
+  private final static String HISTORY_TXT = "history.txt";
 
   private static MiniDFSCluster dfsCluster;
   private static MiniTezClusterWithTimeline miniTezCluster;
@@ -81,7 +86,8 @@ public class TestAnalyzer {
   
   private static TezClient tezSession = null;
   
-  private static int numDAGs = 0;
+  private boolean usingATS = true;
+  private boolean downloadedSimpleHistoryFile = false;
 
   @BeforeClass
   public static void setupClass() throws Exception {
@@ -95,15 +101,11 @@ public class TestAnalyzer {
     conf.set("fs.defaultFS", fs.getUri().toString());
 
     setupTezCluster();
-    numDAGs = 0;
   }
   
   @AfterClass
   public static void tearDownClass() throws Exception {
     LOG.info("Stopping mini clusters");
-    if (tezSession != null) {
-      tezSession.stop();
-    }
     if (miniTezCluster != null) {
       miniTezCluster.stop();
       miniTezCluster = null;
@@ -113,8 +115,8 @@ public class TestAnalyzer {
       dfsCluster = null;
     }
   }
-    
-  public CriticalPathAnalyzer setupCPAnalyzer() {
+  
+  private CriticalPathAnalyzer setupCPAnalyzer() {
     Configuration analyzerConf = new Configuration(false);
     analyzerConf.setBoolean(CriticalPathAnalyzer.DRAW_SVG, false);
     CriticalPathAnalyzer cp = new CriticalPathAnalyzer();
@@ -122,27 +124,37 @@ public class TestAnalyzer {
     return cp;
   }
 
-  public static void setupTezCluster() throws Exception {
-    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT, 3 
* 1000);
-    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT, 3 * 
1000);
-    
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT, 
2);
-
-    //Enable per edge counters
-    conf.setBoolean(TezConfiguration.TEZ_TASK_GENERATE_COUNTERS_PER_IO, true);
-    conf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, 
true);
+  private static void setupTezCluster() throws Exception {
+    // make the test run faster by speeding heartbeat frequency
+    conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100);
     conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    conf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, 
true);
     conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, 
ATSHistoryLoggingService
         .class.getName());
 
-    conf.set(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_DIR, 
SIMPLE_HISTORY_DIR);
-
     miniTezCluster =
         new MiniTezClusterWithTimeline(TestAnalyzer.class.getName(), 1, 1, 1, 
true);
 
     miniTezCluster.init(conf);
     miniTezCluster.start();
+  }
 
+  private TezConfiguration createCommonTezLog() throws Exception {
     TezConfiguration tezConf = new 
TezConfiguration(miniTezCluster.getConfig());
+    
+    tezConf.setInt(TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX, 100);
+    Path remoteStagingDir = dfsCluster.getFileSystem().makeQualified(new 
Path(TEST_ROOT_DIR, String
+        .valueOf(new Random().nextInt(100000))));
+    
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
+        remoteStagingDir.toString());
+    tezConf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, 
false);
+    
+    return tezConf;
+  }
+
+  private void createTezSessionATS() throws Exception {
+    TezConfiguration tezConf = createCommonTezLog();
     tezConf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
     tezConf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, 
"0.0.0.0:8188");
     
tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, 
true);
@@ -158,19 +170,36 @@ public class TestAnalyzer {
 
     tezSession = TezClient.create("TestFaultTolerance", tezConf, true);
     tezSession.start();
+  }
+  
+  private void createTezSessionSimpleHistory() throws Exception {
+    TezConfiguration tezConf = createCommonTezLog();
+    tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+        SimpleHistoryLoggingService.class.getName());
+
+    tezConf.set(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_DIR, 
SIMPLE_HISTORY_DIR);
+
+    Path remoteStagingDir = dfsCluster.getFileSystem().makeQualified(new 
Path(TEST_ROOT_DIR, String
+        .valueOf(new Random().nextInt(100000))));
+    
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
+        remoteStagingDir.toString());
+    tezConf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, 
false);
 
+    tezSession = TezClient.create("TestFaultTolerance", tezConf, true);
+    tezSession.start();
   }
   
-  StepCheck createStep(String attempt, CriticalPathDependency reason) {
+  private StepCheck createStep(String attempt, CriticalPathDependency reason) {
     return createStep(attempt, reason, null, null);
   }
   
-  StepCheck createStep(String attempt, CriticalPathDependency reason, 
+  private StepCheck createStep(String attempt, CriticalPathDependency reason, 
       TaskAttemptTerminationCause errCause, List<String> notes) {
     return new StepCheck(attempt, reason, errCause, notes);
   }
   
-  class StepCheck {
+  private class StepCheck {
     String attempt; // attempt is the TaskAttemptInfo short name with regex
     CriticalPathDependency reason;
     TaskAttemptTerminationCause errCause;
@@ -196,10 +225,9 @@ public class TestAnalyzer {
       return notesStr;
     }
   }
-
-  DagInfo runDAGAndVerify(DAG dag, DAGStatus.State finalState, 
List<StepCheck[]> steps) throws Exception {
+  
+  private void runDAG(DAG dag, DAGStatus.State finalState) throws Exception {
     tezSession.waitTillReady();
-    numDAGs++;
     LOG.info("ABC Running DAG name: " + dag.getName());
     DAGClient dagClient = tezSession.submitDAG(dag);
     DAGStatus dagStatus = dagClient.getDAGStatus(null);
@@ -213,35 +241,57 @@ public class TestAnalyzer {
     }
 
     Assert.assertEquals(finalState, dagStatus.getState());
-    
-    String dagId = 
TezDAGID.getInstance(tezSession.getAppMasterApplicationId(), 
numDAGs).toString();
+  }
+  
+  private void verify(ApplicationId appId, int dagNum, List<StepCheck[]> 
steps) throws Exception {
+    String dagId = TezDAGID.getInstance(appId, dagNum).toString();
     DagInfo dagInfo = getDagInfo(dagId);
     
     verifyCriticalPath(dagInfo, steps);
-    return dagInfo;
   }
   
-  DagInfo getDagInfo(String dagId) throws Exception {
+  private DagInfo getDagInfo(String dagId) throws Exception {
     // sleep for a bit to let ATS events be sent from AM
-    Thread.sleep(1000);
-    //Export the data from ATS
-    String[] args = { "--dagId=" + dagId, "--downloadDir=" + DOWNLOAD_DIR };
-
-    int result = ATSImportTool.process(args);
-    assertTrue(result == 0);
-
-    //Parse ATS data and verify results
-    //Parse downloaded contents
-    File downloadedFile = new File(DOWNLOAD_DIR
-        + Path.SEPARATOR + dagId
-        + Path.SEPARATOR + dagId + ".zip");
-    ATSFileParser parser = new ATSFileParser(downloadedFile);
-    DagInfo dagInfo = parser.getDAGData(dagId);
-    assertTrue(dagInfo.getDagId().equals(dagId));
+    DagInfo dagInfo = null;
+    if (usingATS) {
+      //Export the data from ATS
+      String[] args = { "--dagId=" + dagId, "--downloadDir=" + DOWNLOAD_DIR };
+  
+      int result = ATSImportTool.process(args);
+      assertTrue(result == 0);
+  
+      //Parse ATS data and verify results
+      //Parse downloaded contents
+      File downloadedFile = new File(DOWNLOAD_DIR
+          + Path.SEPARATOR + dagId
+          + Path.SEPARATOR + dagId + ".zip");
+      ATSFileParser parser = new ATSFileParser(downloadedFile);
+      dagInfo = parser.getDAGData(dagId);
+      assertTrue(dagInfo.getDagId().equals(dagId));
+    } else {
+      if (!downloadedSimpleHistoryFile) {
+        downloadedSimpleHistoryFile = true;
+        TezDAGID tezDAGID = TezDAGID.fromString(dagId);
+        ApplicationAttemptId applicationAttemptId = 
ApplicationAttemptId.newInstance(tezDAGID
+            .getApplicationId(), 1);
+        Path historyPath = new 
Path(miniTezCluster.getConfig().get("fs.defaultFS")
+            + SIMPLE_HISTORY_DIR + HISTORY_TXT + "."
+            + applicationAttemptId);
+        FileSystem fs = historyPath.getFileSystem(miniTezCluster.getConfig());
+  
+        Path localPath = new Path(DOWNLOAD_DIR, HISTORY_TXT);
+        fs.copyToLocalFile(historyPath, localPath);
+      }
+      //Now parse via SimpleHistory
+      File localFile = new File(DOWNLOAD_DIR, HISTORY_TXT);
+      SimpleHistoryParser parser = new SimpleHistoryParser(localFile);
+      dagInfo = parser.getDAGData(dagId);
+      assertTrue(dagInfo.getDagId().equals(dagId));
+    }
     return dagInfo;
   }
   
-  void verifyCriticalPath(DagInfo dagInfo, List<StepCheck[]> stepsOptions) 
throws Exception {
+  private void verifyCriticalPath(DagInfo dagInfo, List<StepCheck[]> 
stepsOptions) throws Exception {
     CriticalPathAnalyzer cp = setupCPAnalyzer();
     cp.analyze(dagInfo);
     
@@ -295,8 +345,51 @@ public class TestAnalyzer {
     
   }
   
-  @Test (timeout=60000)
-  public void testBasicSuccessScatterGather() throws Exception {
+  @Test (timeout=300000)
+  public void testWithATS() throws Exception {
+    usingATS = true;
+    createTezSessionATS();
+    runTests();
+  }
+  
+  @Test (timeout=300000)
+  public void testWithSimpleHistory() throws Exception {
+    usingATS = false;
+    createTezSessionSimpleHistory();
+    runTests();
+  }
+  
+  private void runTests() throws Exception {
+    ApplicationId appId = tezSession.getAppMasterApplicationId();
+    List<List<StepCheck[]>> stepsOptions = Lists.newArrayList();
+    // run all test dags
+    
stepsOptions.add(testAttemptOfDownstreamVertexConnectedWithTwoUpstreamVerticesFailure());
+    testInputFailureCausesRerunOfTwoVerticesWithoutExit();
+    testMultiVersionInputFailureWithoutExit();
+    testCascadingInputFailureWithoutExitSuccess();
+    testTaskMultipleFailures();
+    testBasicInputFailureWithoutExit();
+    testBasicTaskFailure();
+    testBasicSuccessScatterGather();
+    testMultiVersionInputFailureWithExit();
+    testBasicInputFailureWithExit();
+    testInputFailureRerunCanSendOutputToTwoDownstreamVertices();
+    testCascadingInputFailureWithExitSuccess();
+    testInternalPreemption();
+    
+    // close session to flush
+    if (tezSession != null) {
+      tezSession.stop();
+    }
+    Thread.sleep((TezConstants.TEZ_DAG_SLEEP_TIME_BEFORE_EXIT*3)/2);
+    
+    // verify all dags
+    for (int i=0; i<stepsOptions.size(); ++i) {
+      verify(appId, i+1, stepsOptions.get(i));
+    }    
+  }
+  
+  private List<StepCheck[]> testBasicSuccessScatterGather() throws Exception {
     Configuration testConf = new Configuration(false);
     testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1);
     StepCheck[] check = { 
@@ -304,11 +397,11 @@ public class TestAnalyzer {
         createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY)
         };
     DAG dag = SimpleTestDAG.createDAG("testBasicSuccessScatterGather", 
testConf);
-    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, 
Collections.singletonList(check));
+    runDAG(dag, DAGStatus.State.SUCCEEDED);
+    return Collections.singletonList(check);
   }
   
-  @Test (timeout=60000)
-  public void testBasicTaskFailure() throws Exception {
+  private List<StepCheck[]> testBasicTaskFailure() throws Exception {
     Configuration testConf = new Configuration(false);
     testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1);
     testConf.setBoolean(TestProcessor.getVertexConfName(
@@ -324,11 +417,11 @@ public class TestAnalyzer {
         createStep("v2 : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
     };
     DAG dag = SimpleTestDAG.createDAG("testBasicTaskFailure", testConf);
-    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, 
Collections.singletonList(check));
+    runDAG(dag, DAGStatus.State.SUCCEEDED);
+    return Collections.singletonList(check);
   }
   
-  @Test (timeout=60000)
-  public void testTaskMultipleFailures() throws Exception {
+  private List<StepCheck[]> testTaskMultipleFailures() throws Exception {
     Configuration testConf = new Configuration(false);
     testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1);
     testConf.setBoolean(TestProcessor.getVertexConfName(
@@ -346,11 +439,11 @@ public class TestAnalyzer {
     };    
     
     DAG dag = SimpleTestDAG.createDAG("testTaskMultipleFailures", testConf);
-    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, 
Collections.singletonList(check));
+    runDAG(dag, DAGStatus.State.SUCCEEDED);
+    return Collections.singletonList(check);
   }
   
-  @Test (timeout=60000)
-  public void testBasicInputFailureWithExit() throws Exception {
+  private List<StepCheck[]> testBasicInputFailureWithExit() throws Exception {
     Configuration testConf = new Configuration(false);
     testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1);
     testConf.setBoolean(TestInput.getVertexConfName(
@@ -372,11 +465,11 @@ public class TestAnalyzer {
       };
     
     DAG dag = SimpleTestDAG.createDAG("testBasicInputFailureWithExit", 
testConf);
-    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, 
Collections.singletonList(check));
+    runDAG(dag, DAGStatus.State.SUCCEEDED);
+    return Collections.singletonList(check);
   }
   
-  @Test (timeout=60000)
-  public void testBasicInputFailureWithoutExit() throws Exception {
+  private List<StepCheck[]> testBasicInputFailureWithoutExit() throws 
Exception {
     Configuration testConf = new Configuration(false);
     testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1);
     testConf.setBoolean(TestInput.getVertexConfName(
@@ -396,11 +489,11 @@ public class TestAnalyzer {
       };
 
     DAG dag = SimpleTestDAG.createDAG("testBasicInputFailureWithoutExit", 
testConf);
-    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, 
Collections.singletonList(check));
+    runDAG(dag, DAGStatus.State.SUCCEEDED);
+    return Collections.singletonList(check);
   }
 
-  @Test (timeout=60000)
-  public void testMultiVersionInputFailureWithExit() throws Exception {
+  private List<StepCheck[]> testMultiVersionInputFailureWithExit() throws 
Exception {
     Configuration testConf = new Configuration(false);
     testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1);
     testConf.setBoolean(TestInput.getVertexConfName(
@@ -426,11 +519,11 @@ public class TestAnalyzer {
       };
 
     DAG dag = SimpleTestDAG.createDAG("testMultiVersionInputFailureWithExit", 
testConf);
-    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, 
Collections.singletonList(check));
+    runDAG(dag, DAGStatus.State.SUCCEEDED);
+    return Collections.singletonList(check);
   }
 
-  @Test (timeout=60000)
-  public void testMultiVersionInputFailureWithoutExit() throws Exception {
+  private List<StepCheck[]> testMultiVersionInputFailureWithoutExit() throws 
Exception {
     Configuration testConf = new Configuration(false);
     testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1);
     testConf.setBoolean(TestInput.getVertexConfName(
@@ -454,7 +547,8 @@ public class TestAnalyzer {
       };
 
     DAG dag = 
SimpleTestDAG.createDAG("testMultiVersionInputFailureWithoutExit", testConf);
-    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, 
Collections.singletonList(check));
+    runDAG(dag, DAGStatus.State.SUCCEEDED);
+    return Collections.singletonList(check);
   }
   
   /**
@@ -510,8 +604,7 @@ public class TestAnalyzer {
    * AM vertex succeeded order is v1, v2, v1, v2, v3.
    * @throws Exception
    */
-  @Test (timeout=60000)
-  public void testCascadingInputFailureWithoutExitSuccess() throws Exception {
+  private List<StepCheck[]> testCascadingInputFailureWithoutExitSuccess() 
throws Exception {
     Configuration testConf = new Configuration(false);
     setCascadingInputFailureConfig(testConf, false, 1);
 
@@ -527,7 +620,8 @@ public class TestAnalyzer {
     
     DAG dag = SimpleTestDAG3Vertices.createDAG(
               "testCascadingInputFailureWithoutExitSuccess", testConf);
-    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, 
Collections.singletonList(check));
+    runDAG(dag, DAGStatus.State.SUCCEEDED);
+    return Collections.singletonList(check);
   }
   
   /**
@@ -541,8 +635,7 @@ public class TestAnalyzer {
    * AM vertex succeeded order is v1, v2, v3, v1, v2, v3.
    * @throws Exception
    */
-  @Test (timeout=60000)
-  public void testCascadingInputFailureWithExitSuccess() throws Exception {
+  private List<StepCheck[]> testCascadingInputFailureWithExitSuccess() throws 
Exception {
     Configuration testConf = new Configuration(false);
     setCascadingInputFailureConfig(testConf, true, 1);
     
@@ -558,7 +651,8 @@ public class TestAnalyzer {
 
     DAG dag = SimpleTestDAG3Vertices.createDAG(
               "testCascadingInputFailureWithExitSuccess", testConf);
-    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, 
Collections.singletonList(check));
+    runDAG(dag, DAGStatus.State.SUCCEEDED);
+    return Collections.singletonList(check);
   }
   
   /**
@@ -572,8 +666,7 @@ public class TestAnalyzer {
    * Also v1 retry attempts note show that it caused preemption of v3
    * @throws Exception
    */
-  @Test (timeout=60000)
-  public void testInternalPreemption() throws Exception {
+  private List<StepCheck[]> testInternalPreemption() throws Exception {
     Configuration testConf = new Configuration(false);
     setCascadingInputFailureConfig(testConf, false, 2);
 
@@ -591,7 +684,8 @@ public class TestAnalyzer {
     
     DAG dag = SimpleTestDAG3Vertices.createDAG(
               "testInternalPreemption", testConf);
-    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, 
Collections.singletonList(check));
+    runDAG(dag, DAGStatus.State.SUCCEEDED);
+    return Collections.singletonList(check);
   }
   
   /**
@@ -602,8 +696,7 @@ public class TestAnalyzer {
    * 
    * @throws Exception
    */
-  @Test (timeout=60000)
-  public void testInputFailureCausesRerunOfTwoVerticesWithoutExit() throws 
Exception {
+  private List<StepCheck[]> 
testInputFailureCausesRerunOfTwoVerticesWithoutExit() throws Exception {
     Configuration testConf = new Configuration(false);
     testConf.setInt(SimpleVTestDAG.TEZ_SIMPLE_V_DAG_NUM_TASKS, 1);
     testConf.setBoolean(TestInput.getVertexConfName(
@@ -635,7 +728,8 @@ public class TestAnalyzer {
 
     DAG dag = SimpleVTestDAG.createDAG(
             "testInputFailureCausesRerunOfTwoVerticesWithoutExit", testConf);
-    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, 
Collections.singletonList(check));
+    runDAG(dag, DAGStatus.State.SUCCEEDED);
+    return Collections.singletonList(check);
   }
   
   /**
@@ -647,8 +741,8 @@ public class TestAnalyzer {
    * 
    * @throws Exception
    */
-  @Test (timeout=60000)
-  public void 
testAttemptOfDownstreamVertexConnectedWithTwoUpstreamVerticesFailure() throws 
Exception {
+  private List<StepCheck[]> 
testAttemptOfDownstreamVertexConnectedWithTwoUpstreamVerticesFailure() 
+      throws Exception {
     Configuration testConf = new Configuration(false);
     testConf.setInt(SimpleVTestDAG.TEZ_SIMPLE_V_DAG_NUM_TASKS, 1);
     testConf.setBoolean(TestProcessor.getVertexConfName(
@@ -668,7 +762,8 @@ public class TestAnalyzer {
 
     DAG dag = SimpleVTestDAG.createDAG(
             
"testAttemptOfDownstreamVertexConnectedWithTwoUpstreamVerticesFailure", 
testConf);
-    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, 
Collections.singletonList(check));
+    runDAG(dag, DAGStatus.State.SUCCEEDED);
+    return Collections.singletonList(check);
   }
     
   /**
@@ -684,8 +779,7 @@ public class TestAnalyzer {
    * Also covers multiple consumer vertices report failure against same 
producer task.
    * @throws Exception
    */
-  @Test (timeout=60000)
-  public void testInputFailureRerunCanSendOutputToTwoDownstreamVertices() 
throws Exception {
+  private List<StepCheck[]> 
testInputFailureRerunCanSendOutputToTwoDownstreamVertices() throws Exception {
     Configuration testConf = new Configuration(false);
     testConf.setInt(SimpleReverseVTestDAG.TEZ_SIMPLE_REVERSE_V_DAG_NUM_TASKS, 
1);
     testConf.setBoolean(TestInput.getVertexConfName(
@@ -714,18 +808,17 @@ public class TestAnalyzer {
     testConf.set(TestInput.getVertexConfName(
         TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v3"), "0");
     
-    List<StepCheck[]> stepsOptions = Lists.newLinkedList();
-    StepCheck[] check1 = {
+    StepCheck[] check = {
         // use regex for either vertices being possible on the path
       createStep("v1 : 000000_0", CriticalPathDependency.INIT_DEPENDENCY),
       createStep("v[23] : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
       createStep("v1 : 000000_1", 
CriticalPathDependency.OUTPUT_RECREATE_DEPENDENCY),
       createStep("v[23] : 000000_0", CriticalPathDependency.DATA_DEPENDENCY),
     };
-    stepsOptions.add(check1);
     DAG dag = SimpleReverseVTestDAG.createDAG(
             "testInputFailureRerunCanSendOutputToTwoDownstreamVertices", 
testConf);
-    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED, stepsOptions);
+    runDAG(dag, DAGStatus.State.SUCCEEDED);
+    return Collections.singletonList(check);
   }
   
 }
\ No newline at end of file

Reply via email to