Repository: tez
Updated Branches:
  refs/heads/master f46997a7c -> c6e400e2d


TEZ-2292. Add e2e test for error reporting when vertex manager invokes plugin 
APIs (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c6e400e2
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c6e400e2
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c6e400e2

Branch: refs/heads/master
Commit: c6e400e2d1d484bc5bc33ec27324b065de40e465
Parents: f46997a
Author: Bikas Saha <[email protected]>
Authored: Tue Apr 21 16:48:54 2015 -0700
Committer: Bikas Saha <[email protected]>
Committed: Tue Apr 21 16:48:54 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../tez/dag/api/VertexManagerPluginContext.java |  3 +-
 .../dag/app/dag/impl/AMUserCodeException.java   |  3 +-
 .../tez/dag/app/dag/impl/VertexManager.java     |  7 +-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 98 +++++++++-----------
 .../tez/test/VertexManagerPluginForTest.java    | 70 +++++++++++++-
 .../vertexmanager/InputReadyVertexManager.java  |  8 +-
 .../vertexmanager/ShuffleVertexManager.java     |  9 +-
 .../vertexmanager/TestShuffleVertexManager.java | 23 ++---
 9 files changed, 129 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/c6e400e2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5293120..0c83c08 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,8 @@ INCOMPATIBLE CHANGES
   TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly
 
 ALL CHANGES:
+  TEZ-2292. Add e2e test for error reporting when vertex manager invokes
+  plugin APIs
   TEZ-2308. Add set/get of record counts in task/vertex statistics
   TEZ-2344. Tez UI: Equip basic-ember-table's cell level loading for all use 
cases in all DAGs table
   TEZ-2313. Regression in handling obsolete events in ShuffleScheduler.

http://git-wip-us.apache.org/repos/asf/tez/blob/c6e400e2/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
----------------------------------------------------------------------
diff --git 
a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java 
b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
index afcdb88..8b0e89e 100644
--- 
a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
+++ 
b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
@@ -172,11 +172,10 @@ public interface VertexManagerPluginContext {
    *          Map with Key=name of {@link Edge} to be updated and Value=
    *          {@link EdgeProperty}. The name of the Edge will be the 
    *          corresponding source vertex name.
-   * @throws TezException Exception to indicate errors
    */
   public void reconfigureVertex(int parallelism,
       @Nullable VertexLocationHint locationHint,
-      @Nullable Map<String, EdgeProperty> sourceEdgeProperties) throws 
TezException;
+      @Nullable Map<String, EdgeProperty> sourceEdgeProperties);
 
   /**
    * Allows a VertexManagerPlugin to assign Events for Root Inputs

http://git-wip-us.apache.org/repos/asf/tez/blob/c6e400e2/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/AMUserCodeException.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/AMUserCodeException.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/AMUserCodeException.java
index b4e1849..22b1211 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/AMUserCodeException.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/AMUserCodeException.java
@@ -19,6 +19,7 @@
 package org.apache.tez.dag.app.dag.impl;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.tez.dag.api.TezException;
 
 
 /**
@@ -28,7 +29,7 @@ import 
org.apache.hadoop.classification.InterfaceAudience.Private;
  * <li>InputInitializer</li>
  */
 @Private
-public class AMUserCodeException extends Exception {
+public class AMUserCodeException extends TezException {
 
   private static final long serialVersionUID = -3642816091492797520L;
 

http://git-wip-us.apache.org/repos/asf/tez/blob/c6e400e2/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index bcea22c..2ac1acf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -44,7 +44,6 @@ import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.RootInputLeafOutput;
-import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.VertexLocationHint;
@@ -171,7 +170,6 @@ public class VertexManager {
         managedVertex.setParallelism(parallelism, vertexLocationHint, 
sourceEdgeManagers,
             rootInputSpecUpdate, true);
       } catch (AMUserCodeException e) {
-        // workaround: convert it to TezUncheckedException which would be 
caught in VM
         throw new TezUncheckedException(e);
       }
     }
@@ -179,13 +177,12 @@ public class VertexManager {
     @Override
     public synchronized void reconfigureVertex(int parallelism,
         @Nullable VertexLocationHint locationHint,
-        @Nullable Map<String, EdgeProperty> sourceEdgeProperties) throws 
TezException {
+        @Nullable Map<String, EdgeProperty> sourceEdgeProperties) {
       checkAndThrowIfDone();
       try {
         managedVertex.reconfigureVertex(parallelism, locationHint, 
sourceEdgeProperties);
       } catch (AMUserCodeException e) {
-        // convert it to TezException which would be caught in VM
-        throw new TezException(e);
+        throw new TezUncheckedException(e);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/c6e400e2/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index c752965..2403599 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -170,6 +170,7 @@ import 
org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.test.EdgeManagerForTest;
 import org.apache.tez.test.VertexManagerPluginForTest;
+import 
org.apache.tez.test.VertexManagerPluginForTest.VertexManagerPluginForTestConfig;
 import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 import org.apache.tez.runtime.api.impl.GroupInputSpec;
@@ -184,6 +185,7 @@ import org.junit.Test;
 import org.mockito.Mockito;
 import org.mockito.internal.util.collections.Sets;
 
+import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -192,6 +194,7 @@ import 
com.google.common.util.concurrent.ListeningExecutorService;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+@SuppressWarnings("unchecked")
 public class TestVertexImpl {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(TestVertexImpl.class);
@@ -2273,7 +2276,6 @@ public class TestVertexImpl {
     }
   }
 
-  @SuppressWarnings("unchecked")
   private void initVertex(VertexImpl v) {
     Assert.assertEquals(VertexState.NEW, v.getState());
     dispatcher.getEventHandler().handle(new VertexEvent(v.getVertexId(),
@@ -2285,7 +2287,6 @@ public class TestVertexImpl {
     startVertex(v, true);
   }
 
-  @SuppressWarnings("unchecked")
   private void killVertex(VertexImpl v) {
     dispatcher.getEventHandler().handle(
         new VertexEventTermination(v.getVertexId(), 
VertexTerminationCause.DAG_KILL));
@@ -2294,7 +2295,6 @@ public class TestVertexImpl {
     Assert.assertEquals(v.getTerminationCause(), 
VertexTerminationCause.DAG_KILL);
   }
 
-  @SuppressWarnings("unchecked")
   private void startVertex(VertexImpl v,
       boolean checkRunningState) {
     Assert.assertEquals(VertexState.INITED, v.getState());
@@ -2379,7 +2379,6 @@ public class TestVertexImpl {
     updateTracker.unregisterForVertexUpdates("vertex3", listener);
   }
   
-  @SuppressWarnings("unchecked")
   @Test (timeout=5000)
   public void testVertexConfigureEventWithReconfigure() throws Exception {
     useCustomInitializer = true;
@@ -2626,7 +2625,6 @@ public class TestVertexImpl {
     }
   }
   
-  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testVertexPendingTaskEvents() {
     initAllVertices(VertexState.INITED);
@@ -2701,7 +2699,6 @@ public class TestVertexImpl {
         ((EdgeManagerForTest) 
modifiedEdgeManager).getUserPayload().deepCopyAsArray()));
   }
 
-  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testBasicVertexCompletion() {
     initAllVertices(VertexState.INITED);
@@ -2725,7 +2722,6 @@ public class TestVertexImpl {
     Assert.assertEquals(2, v.getCompletedTasks());
   }
 
-  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   @Ignore // FIXME fix verteximpl for this test to work
   public void testDuplicateTaskCompletion() {
@@ -2753,8 +2749,6 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
   }
 
-
-  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testVertexFailure() {
     initAllVertices(VertexState.INITED);
@@ -2801,7 +2795,6 @@ public class TestVertexImpl {
         "vertex received kill while in running state"));
   }
 
-  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testVertexKillPending() {
     initAllVertices(VertexState.INITED);
@@ -2827,7 +2820,6 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexState.KILLED, v.getState());
   }
 
-  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testVertexKill() {
     initAllVertices(VertexState.INITED);
@@ -2853,7 +2845,6 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexState.KILLED, v.getState());
   }
 
-  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testKilledTasksHandling() {
     initAllVertices(VertexState.INITED);
@@ -2895,7 +2886,6 @@ public class TestVertexImpl {
         instanceof ShuffleVertexManager);
   }
 
-  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testVertexTaskFailure() {
     initAllVertices(VertexState.INITED);
@@ -2926,7 +2916,6 @@ public class TestVertexImpl {
     Assert.assertEquals(1, committer.abortCounter);
   }
   
-  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testVertexTaskAttemptProcessorFailure() throws Exception {
     initAllVertices(VertexState.INITED);
@@ -2962,7 +2951,6 @@ public class TestVertexImpl {
     Assert.assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, 
ta.getTerminationCause());
   }
 
-  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testVertexTaskAttemptInputFailure() throws Exception {
     initAllVertices(VertexState.INITED);
@@ -2999,7 +2987,6 @@ public class TestVertexImpl {
   }
 
 
-  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testVertexTaskAttemptOutputFailure() throws Exception {
     initAllVertices(VertexState.INITED);
@@ -3098,7 +3085,6 @@ public class TestVertexImpl {
 
   }
 
-  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testDAGEventGeneration() {
     initAllVertices(VertexState.INITED);
@@ -3120,7 +3106,6 @@ public class TestVertexImpl {
             DAGEventType.DAG_VERTEX_COMPLETED).intValue());
   }
 
-  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testTaskReschedule() {
     // For downstream failures
@@ -3150,8 +3135,7 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
 
   }
-  
-  @SuppressWarnings("unchecked")
+
   @Test(timeout = 5000)
   public void testVertexSuccessToRunningAfterTaskScheduler() {
     // For downstream failures
@@ -3191,8 +3175,7 @@ public class TestVertexImpl {
         dagEventDispatcher.eventCount.get(
             DAGEventType.DAG_VERTEX_COMPLETED).intValue());
   }
-  
-  @SuppressWarnings("unchecked")
+
   @Test(timeout = 5000)
   public void testVertexSuccessToFailedAfterTaskScheduler() throws Exception {
     // For downstream failures
@@ -3240,7 +3223,6 @@ public class TestVertexImpl {
             DAGEventType.DAG_VERTEX_COMPLETED).intValue());
   }
 
-  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testVertexCommit() {
     initAllVertices(VertexState.INITED);
@@ -3267,7 +3249,6 @@ public class TestVertexImpl {
     Assert.assertEquals(1, committer.setupCounter);
   }
 
-  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testTaskFailedAfterVertexSuccess() {
     initAllVertices(VertexState.INITED);
@@ -3303,7 +3284,6 @@ public class TestVertexImpl {
     
   }
 
-  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testBadCommitter() throws Exception {
     VertexImpl v = vertices.get("vertex2");
@@ -3348,7 +3328,6 @@ public class TestVertexImpl {
     Assert.assertEquals(1, committer.setupCounter);
   }
 
-  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testBadCommitter2() throws Exception {
     VertexImpl v = vertices.get("vertex2");
@@ -3391,8 +3370,7 @@ public class TestVertexImpl {
     Assert.assertEquals(1, committer.initCounter);
     Assert.assertEquals(1, committer.setupCounter);
   }
-  
-  @SuppressWarnings("unchecked")
+
   @Test(timeout = 5000)
   public void testVertexInitWithCustomVertexManager() throws Exception {
     setupPreDagCreation();
@@ -3612,7 +3590,39 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexState.RUNNING, 
vertices.get("vertex4").getState());
   }
 
-  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testVertexVMErrorReport() throws Exception {
+    int numTasks = 5;
+    // create a diamond shaped dag with 1-1 edges. 
+    setupPreDagCreation();
+    dagPlan = createDAGPlanForOneToOneSplit(null, numTasks, false);
+    setupPostDagCreation();
+    VertexImpl v1 = vertices.get("vertex1");
+    initAllVertices(VertexState.INITED);
+    
+    // fudge vertex manager so that tasks dont start running
+    // it is not calling reconfigurtionPlanned() but will call 
reconfigureVertex().
+    // the vertex is already fully configured. this causes exception and 
verify that
+    // its caught and reported.
+    VertexManagerPluginForTestConfig config = new 
VertexManagerPluginForTestConfig();
+    config.setReconfigureOnStart(true);
+    v1.vertexManager = new VertexManager(VertexManagerPluginDescriptor.create(
+        VertexManagerPluginForTest.class.getName()).setUserPayload(
+        UserPayload.create(config.getPayload())), 
UserGroupInformation.getCurrentUser(), v1,
+        appContext, mock(StateChangeNotifier.class));
+    v1.vertexManager.initialize();
+
+    startVertex(v1, false);
+    dispatcher.await();
+
+    // failed due to exception
+    Assert.assertEquals(VertexState.FAILED, 
vertices.get("vertex1").getState());
+    Assert.assertEquals(VertexTerminationCause.AM_USERCODE_FAILURE, 
vertices.get("vertex1")
+        .getTerminationCause());
+    
Assert.assertTrue(Joiner.on(":").join(vertices.get("vertex1").getDiagnostics()).contains(
+        "context.vertexReconfigurationPlanned() before re-configuring"));
+  }
+
   @Test(timeout = 5000)
   public void testInvalidEvent() {
     VertexImpl v = vertices.get("vertex2");
@@ -3625,7 +3635,6 @@ public class TestVertexImpl {
             DAGEventType.INTERNAL_ERROR).intValue());
   }
 
-  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testVertexWithInitializerFailure() throws Exception {
     useCustomInitializer = true;
@@ -4125,7 +4134,6 @@ public class TestVertexImpl {
         initializer.stateUpdates.get(2).getVertexState());
   }
 
-  @SuppressWarnings("unchecked")
   @Test(timeout = 10000)
   public void testInputInitializerEvents() throws Exception {
     useCustomInitializer = true;
@@ -4202,7 +4210,6 @@ public class TestVertexImpl {
     Assert.assertEquals(0, 
initializerWrapper.getPendingEvents().get(v1.getName()).size());
   }
 
-  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   /**
    * Ref: TEZ-1494
@@ -4563,7 +4570,6 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexState.INITED, v1.getState());
   }
 
-  @SuppressWarnings("unchecked")
   @Test(timeout = 500000)
   public void testVertexWithInitializerSuccess() throws Exception {
     useCustomInitializer = true;
@@ -4637,8 +4643,7 @@ public class TestVertexImpl {
       Assert.assertEquals(1, inputSpecs.get(0).getPhysicalEdgeCount());
     }
   }
-  
-  @SuppressWarnings("unchecked")
+
   @Test(timeout = 5000)
   public void testVertexWithInputDistributor() throws Exception {
     useCustomInitializer = true;
@@ -4674,7 +4679,6 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexState.INITED, v2.getState());
   }
   
-  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testVertexRootInputSpecUpdateAll() throws Exception {
     useCustomInitializer = true;
@@ -4704,8 +4708,7 @@ public class TestVertexImpl {
       Assert.assertEquals(4, inputSpecs.get(0).getPhysicalEdgeCount());
     }
   }
-  
-  @SuppressWarnings("unchecked")
+
   @Test(timeout = 5000)
   public void testVertexRootInputSpecUpdatePerTask() throws Exception {
     useCustomInitializer = true;
@@ -4983,7 +4986,6 @@ public class TestVertexImpl {
     }
   }
 
-  @SuppressWarnings("unchecked")
   @Test(timeout=5000)
   public void testVertexGroupInput() throws AMUserCodeException {
     setupPreDagCreation();
@@ -5010,8 +5012,7 @@ public class TestVertexImpl {
     assertTrue(groupInSpec.get(0).getGroupVertices().contains("B"));
     
groupInSpec.get(0).getMergedInputDescriptor().getClassName().equals("Group.class");
   }
-  
-  @SuppressWarnings("unchecked")
+
   @Test(timeout = 5000)
   public void testStartWithUninitializedCustomEdge() throws Exception {
     // Race when a source vertex manages to start before the target vertex has
@@ -5060,8 +5061,7 @@ public class TestVertexImpl {
     Assert.assertNotNull(vB.getTask(0));
     Assert.assertNotNull(vC.getTask(0));
   }
-  
-  @SuppressWarnings("unchecked")
+
   @Test(timeout = 5000)
   public void testVertexConfiguredDoneByVMBeforeEdgeDefined() throws Exception 
{
     // Race when a source vertex manages to start before the target vertex has
@@ -5136,7 +5136,6 @@ public class TestVertexImpl {
     Assert.assertNotNull(vC.getTask(0));
   }
 
-  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testInitStartRace() throws AMUserCodeException {
     // Race when a source vertex manages to start before the target vertex has
@@ -5160,7 +5159,6 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexState.RUNNING, vC.getState());
   }
 
-  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testInitStartRace2() throws AMUserCodeException {
     // Race when a source vertex manages to start before the target vertex has
@@ -5188,7 +5186,6 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexState.RUNNING, vC.getState());
   }
 
-  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testExceptionFromVM_Initialize() throws AMUserCodeException {
     useCustomInitializer = true;
@@ -5208,7 +5205,6 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexTerminationCause.AM_USERCODE_FAILURE, 
v1.getTerminationCause());
   }
   
-  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testExceptionFromVM_OnRootVertexInitialized() throws Exception {
     useCustomInitializer = true;
@@ -5233,7 +5229,6 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexTerminationCause.AM_USERCODE_FAILURE, 
v1.getTerminationCause());
   }
   
-  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testExceptionFromVM_OnVertexStarted() throws Exception {
     useCustomInitializer = true;
@@ -5261,7 +5256,6 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexTerminationCause.AM_USERCODE_FAILURE, 
v1.getTerminationCause());
   }
   
-  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testExceptionFromVM_OnSourceTaskCompleted() throws Exception {
     useCustomInitializer = true;
@@ -5298,7 +5292,6 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexTerminationCause.AM_USERCODE_FAILURE, 
v2.getTerminationCause());
   }
   
-  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testExceptionFromVM_OnVertexManagerEventReceived() throws 
Exception {
     useCustomInitializer = true;
@@ -5325,7 +5318,6 @@ public class TestVertexImpl {
     
Assert.assertTrue(diagnostics.contains(VMExceptionLocation.OnVertexManagerEventReceived.name()));
   }
   
-  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testExceptionFromVM_OnVertexManagerVertexStateUpdated() throws 
Exception {
     useCustomInitializer = true;
@@ -5376,7 +5368,6 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, 
v1.getTerminationCause());
   }
 
-  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testExceptionFromII_InitFailedAfterInitialized() throws 
Exception {
     useCustomInitializer = true;
@@ -5401,7 +5392,6 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, 
v1.getTerminationCause());
   }
 
-  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testExceptionFromII_InitFailedAfterRunning() throws Exception {
     useCustomInitializer = true;
@@ -5427,7 +5417,6 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, 
v1.getTerminationCause());
   }
 
-  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testExceptionFromII_HandleInputInitializerEvent() throws 
Exception {
     useCustomInitializer = true;
@@ -5507,7 +5496,6 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, 
v2.getTerminationCause());
   }
 
-  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testExceptionFromII_InitSucceededAfterInitFailure() throws 
AMUserCodeException, InterruptedException {
     useCustomInitializer = true;

http://git-wip-us.apache.org/repos/asf/tez/blob/c6e400e2/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java 
b/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java
index 422d785..84e060b 100644
--- a/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java
+++ b/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java
@@ -18,26 +18,92 @@
 
 package org.apache.tez.test;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 
 public class VertexManagerPluginForTest extends VertexManagerPlugin {
+  VertexManagerPluginForTestConfig pluginConfig = new 
VertexManagerPluginForTestConfig();
+  
+  public static class VertexManagerPluginForTestConfig {
+    Configuration conf = new Configuration(false);
 
+    static final String RECONFIGURE_ON_START = "reconfigureOnStart";
+    static final String NUM_TASKS = "numTasks";
+    
+    public void setReconfigureOnStart(boolean value) {
+      conf.setBoolean(RECONFIGURE_ON_START, value);
+    }
+    
+    public void setNumTasks(int value) {
+      conf.setInt(NUM_TASKS, value);
+    }
+    
+    boolean getReconfigureOnStart() {
+      return conf.getBoolean(RECONFIGURE_ON_START, false);
+    }
+    
+    int getNumTasks() {
+      return conf.getInt(NUM_TASKS, 1);
+    }
+    
+    public ByteBuffer getPayload() {
+      ByteArrayOutputStream b = new ByteArrayOutputStream();
+      DataOutputStream out = new DataOutputStream(b);
+      try {
+        conf.write(out);
+      } catch (IOException e) {
+        throw new TezUncheckedException(e);
+      }
+      return ByteBuffer.wrap(b.toByteArray());
+    }
+    
+    void initialize(ByteBuffer buff) {
+      ByteBuffer copy = ByteBuffer.allocate(buff.capacity());
+      copy.put(buff);
+      copy.flip();
+      ByteArrayInputStream b = new ByteArrayInputStream(copy.array());
+      DataInputStream in = new DataInputStream(b);
+      try {
+        conf.readFields(in);
+      } catch (IOException e) {
+        throw new TezUncheckedException(e);
+      }
+    }
+  }
   public VertexManagerPluginForTest(VertexManagerPluginContext context) {
     super(context);
   }
+  
 
   @Override
-  public void initialize() {}
+  public void initialize() {
+    UserPayload payload = getContext().getUserPayload();
+    if (payload != null && payload.getPayload() != null) {
+      pluginConfig.initialize(getContext().getUserPayload().getPayload());
+    }
+  }
 
   @Override
-  public void onVertexStarted(Map<String, List<Integer>> completions) {}
+  public void onVertexStarted(Map<String, List<Integer>> completions) {
+    if (pluginConfig.getReconfigureOnStart()) {
+      getContext().reconfigureVertex(pluginConfig.getNumTasks(), null, null);
+    }
+  }
 
   @Override
   public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {}

http://git-wip-us.apache.org/repos/asf/tez/blob/c6e400e2/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
index 8671161..30e3e81 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
@@ -30,7 +30,6 @@ import 
org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.api.VertexManagerPlugin;
@@ -124,12 +123,7 @@ public class InputReadyVertexManager extends 
VertexManagerPlugin {
         // must change parallelism to make them the same
         LOG.info("Update parallelism of vertex: " + 
getContext().getVertexName() + 
             " to " + oneToOneSrcTaskCount + " to match source 1-1 vertices.");
-        try {
-          getContext().reconfigureVertex(oneToOneSrcTaskCount, null, null);
-        } catch (TezException e) {
-          // TODO fail vertex - TEZ-2292
-          LOG.warn("Failed to change parallelism in: " + 
getContext().getVertexName(), e);
-        }
+        getContext().reconfigureVertex(oneToOneSrcTaskCount, null, null);
       }
       oneToOneSrcTasksDoneCount = new int[oneToOneSrcTaskCount];
       oneToOneLocationHints = new TaskLocationHint[oneToOneSrcTaskCount];

http://git-wip-us.apache.org/repos/asf/tez/blob/c6e400e2/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
 
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index 9be9986..2ea0299 100644
--- 
a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ 
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -39,7 +39,6 @@ import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.VertexManagerPlugin;
@@ -533,12 +532,8 @@ public class ShuffleVertexManager extends 
VertexManagerPlugin {
         edgeProperties.put(vertex, newEdgeProp);
       }
       
-      try {
-        getContext().reconfigureVertex(finalTaskParallelism, null, 
edgeProperties);
-      } catch (TezException e) {
-        // TODO fail vertex - TEZ-2292
-        LOG.warn("Failed to change parallelism in: " + 
getContext().getVertexName(), e);
-      }
+      getContext().reconfigureVertex(finalTaskParallelism, null, 
edgeProperties);
+      
       updatePendingTasks();
     }
     return true;

http://git-wip-us.apache.org/repos/asf/tez/blob/c6e400e2/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index 27cd292..8807674 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -30,7 +30,6 @@ import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
-import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.VertexLocationHint;
@@ -66,9 +65,9 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+@SuppressWarnings({ "unchecked", "rawtypes" })
 public class TestShuffleVertexManager {
 
-  @SuppressWarnings({ "unchecked", "rawtypes" })
   @Test(timeout = 5000)
   public void testShuffleVertexManagerAutoParallelism() throws Exception {
     Configuration conf = new Configuration();
@@ -452,7 +451,6 @@ public class TestShuffleVertexManager {
     }
   }
   
-  @SuppressWarnings({ "unchecked", "rawtypes" })
   @Test(timeout = 5000)
   public void testShuffleVertexManagerSlowStart() {
     Configuration conf = new Configuration();
@@ -779,18 +777,13 @@ public class TestShuffleVertexManager {
     Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9);
 
     //Ensure that setVertexParallelism is not called for R2.
-    try {
-      verify(mockContext_R2, times(0)).reconfigureVertex(anyInt(), 
any(VertexLocationHint.class),
-          anyMap());
-      // complete configuration of r1 triggers the scheduling
-      manager.onVertexStateUpdated(new VertexStateUpdate(r1, 
VertexState.CONFIGURED));
-      verify(mockContext_R2, times(1)).reconfigureVertex(eq(1), 
any(VertexLocationHint.class),
-          anyMap());
-    } catch (TezException e) {
-      e.printStackTrace();
-      Assert.fail(); // should not happen
-    }
-
+    verify(mockContext_R2, times(0)).reconfigureVertex(anyInt(), 
any(VertexLocationHint.class),
+        anyMap());
+    // complete configuration of r1 triggers the scheduling
+    manager.onVertexStateUpdated(new VertexStateUpdate(r1, 
VertexState.CONFIGURED));
+    verify(mockContext_R2, times(1)).reconfigureVertex(eq(1), 
any(VertexLocationHint.class),
+        anyMap());
+  
     Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
     Assert.assertTrue(scheduledTasks.size() == 3);
 

Reply via email to