Repository: tez Updated Branches: refs/heads/master f46997a7c -> c6e400e2d
TEZ-2292. Add e2e test for error reporting when vertex manager invokes plugin APIs (bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c6e400e2 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c6e400e2 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c6e400e2 Branch: refs/heads/master Commit: c6e400e2d1d484bc5bc33ec27324b065de40e465 Parents: f46997a Author: Bikas Saha <[email protected]> Authored: Tue Apr 21 16:48:54 2015 -0700 Committer: Bikas Saha <[email protected]> Committed: Tue Apr 21 16:48:54 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../tez/dag/api/VertexManagerPluginContext.java | 3 +- .../dag/app/dag/impl/AMUserCodeException.java | 3 +- .../tez/dag/app/dag/impl/VertexManager.java | 7 +- .../tez/dag/app/dag/impl/TestVertexImpl.java | 98 +++++++++----------- .../tez/test/VertexManagerPluginForTest.java | 70 +++++++++++++- .../vertexmanager/InputReadyVertexManager.java | 8 +- .../vertexmanager/ShuffleVertexManager.java | 9 +- .../vertexmanager/TestShuffleVertexManager.java | 23 ++--- 9 files changed, 129 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/c6e400e2/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 5293120..0c83c08 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,8 @@ INCOMPATIBLE CHANGES TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly ALL CHANGES: + TEZ-2292. Add e2e test for error reporting when vertex manager invokes + plugin APIs TEZ-2308. Add set/get of record counts in task/vertex statistics TEZ-2344. Tez UI: Equip basic-ember-table's cell level loading for all use cases in all DAGs table TEZ-2313. Regression in handling obsolete events in ShuffleScheduler. http://git-wip-us.apache.org/repos/asf/tez/blob/c6e400e2/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java index afcdb88..8b0e89e 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java @@ -172,11 +172,10 @@ public interface VertexManagerPluginContext { * Map with Key=name of {@link Edge} to be updated and Value= * {@link EdgeProperty}. The name of the Edge will be the * corresponding source vertex name. - * @throws TezException Exception to indicate errors */ public void reconfigureVertex(int parallelism, @Nullable VertexLocationHint locationHint, - @Nullable Map<String, EdgeProperty> sourceEdgeProperties) throws TezException; + @Nullable Map<String, EdgeProperty> sourceEdgeProperties); /** * Allows a VertexManagerPlugin to assign Events for Root Inputs http://git-wip-us.apache.org/repos/asf/tez/blob/c6e400e2/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/AMUserCodeException.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/AMUserCodeException.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/AMUserCodeException.java index b4e1849..22b1211 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/AMUserCodeException.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/AMUserCodeException.java @@ -19,6 +19,7 @@ package org.apache.tez.dag.app.dag.impl; import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.tez.dag.api.TezException; /** @@ -28,7 +29,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; * <li>InputInitializer</li> */ @Private -public class AMUserCodeException extends Exception { +public class AMUserCodeException extends TezException { private static final long serialVersionUID = -3642816091492797520L; http://git-wip-us.apache.org/repos/asf/tez/blob/c6e400e2/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java index bcea22c..2ac1acf 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java @@ -44,7 +44,6 @@ import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.InputInitializerDescriptor; import org.apache.tez.dag.api.RootInputLeafOutput; -import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.VertexLocationHint; @@ -171,7 +170,6 @@ public class VertexManager { managedVertex.setParallelism(parallelism, vertexLocationHint, sourceEdgeManagers, rootInputSpecUpdate, true); } catch (AMUserCodeException e) { - // workaround: convert it to TezUncheckedException which would be caught in VM throw new TezUncheckedException(e); } } @@ -179,13 +177,12 @@ public class VertexManager { @Override public synchronized void reconfigureVertex(int parallelism, @Nullable VertexLocationHint locationHint, - @Nullable Map<String, EdgeProperty> sourceEdgeProperties) throws TezException { + @Nullable Map<String, EdgeProperty> sourceEdgeProperties) { checkAndThrowIfDone(); try { managedVertex.reconfigureVertex(parallelism, locationHint, sourceEdgeProperties); } catch (AMUserCodeException e) { - // convert it to TezException which would be caught in VM - throw new TezException(e); + throw new TezUncheckedException(e); } } http://git-wip-us.apache.org/repos/asf/tez/blob/c6e400e2/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index c752965..2403599 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -170,6 +170,7 @@ import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent; import org.apache.tez.runtime.api.events.VertexManagerEvent; import org.apache.tez.test.EdgeManagerForTest; import org.apache.tez.test.VertexManagerPluginForTest; +import org.apache.tez.test.VertexManagerPluginForTest.VertexManagerPluginForTestConfig; import org.apache.tez.runtime.api.impl.EventMetaData; import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; import org.apache.tez.runtime.api.impl.GroupInputSpec; @@ -184,6 +185,7 @@ import org.junit.Test; import org.mockito.Mockito; import org.mockito.internal.util.collections.Sets; +import com.google.common.base.Joiner; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.util.concurrent.ListenableFuture; @@ -192,6 +194,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +@SuppressWarnings("unchecked") public class TestVertexImpl { private static final Logger LOG = LoggerFactory.getLogger(TestVertexImpl.class); @@ -2273,7 +2276,6 @@ public class TestVertexImpl { } } - @SuppressWarnings("unchecked") private void initVertex(VertexImpl v) { Assert.assertEquals(VertexState.NEW, v.getState()); dispatcher.getEventHandler().handle(new VertexEvent(v.getVertexId(), @@ -2285,7 +2287,6 @@ public class TestVertexImpl { startVertex(v, true); } - @SuppressWarnings("unchecked") private void killVertex(VertexImpl v) { dispatcher.getEventHandler().handle( new VertexEventTermination(v.getVertexId(), VertexTerminationCause.DAG_KILL)); @@ -2294,7 +2295,6 @@ public class TestVertexImpl { Assert.assertEquals(v.getTerminationCause(), VertexTerminationCause.DAG_KILL); } - @SuppressWarnings("unchecked") private void startVertex(VertexImpl v, boolean checkRunningState) { Assert.assertEquals(VertexState.INITED, v.getState()); @@ -2379,7 +2379,6 @@ public class TestVertexImpl { updateTracker.unregisterForVertexUpdates("vertex3", listener); } - @SuppressWarnings("unchecked") @Test (timeout=5000) public void testVertexConfigureEventWithReconfigure() throws Exception { useCustomInitializer = true; @@ -2626,7 +2625,6 @@ public class TestVertexImpl { } } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testVertexPendingTaskEvents() { initAllVertices(VertexState.INITED); @@ -2701,7 +2699,6 @@ public class TestVertexImpl { ((EdgeManagerForTest) modifiedEdgeManager).getUserPayload().deepCopyAsArray())); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testBasicVertexCompletion() { initAllVertices(VertexState.INITED); @@ -2725,7 +2722,6 @@ public class TestVertexImpl { Assert.assertEquals(2, v.getCompletedTasks()); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) @Ignore // FIXME fix verteximpl for this test to work public void testDuplicateTaskCompletion() { @@ -2753,8 +2749,6 @@ public class TestVertexImpl { Assert.assertEquals(VertexState.SUCCEEDED, v.getState()); } - - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testVertexFailure() { initAllVertices(VertexState.INITED); @@ -2801,7 +2795,6 @@ public class TestVertexImpl { "vertex received kill while in running state")); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testVertexKillPending() { initAllVertices(VertexState.INITED); @@ -2827,7 +2820,6 @@ public class TestVertexImpl { Assert.assertEquals(VertexState.KILLED, v.getState()); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testVertexKill() { initAllVertices(VertexState.INITED); @@ -2853,7 +2845,6 @@ public class TestVertexImpl { Assert.assertEquals(VertexState.KILLED, v.getState()); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testKilledTasksHandling() { initAllVertices(VertexState.INITED); @@ -2895,7 +2886,6 @@ public class TestVertexImpl { instanceof ShuffleVertexManager); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testVertexTaskFailure() { initAllVertices(VertexState.INITED); @@ -2926,7 +2916,6 @@ public class TestVertexImpl { Assert.assertEquals(1, committer.abortCounter); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testVertexTaskAttemptProcessorFailure() throws Exception { initAllVertices(VertexState.INITED); @@ -2962,7 +2951,6 @@ public class TestVertexImpl { Assert.assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, ta.getTerminationCause()); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testVertexTaskAttemptInputFailure() throws Exception { initAllVertices(VertexState.INITED); @@ -2999,7 +2987,6 @@ public class TestVertexImpl { } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testVertexTaskAttemptOutputFailure() throws Exception { initAllVertices(VertexState.INITED); @@ -3098,7 +3085,6 @@ public class TestVertexImpl { } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testDAGEventGeneration() { initAllVertices(VertexState.INITED); @@ -3120,7 +3106,6 @@ public class TestVertexImpl { DAGEventType.DAG_VERTEX_COMPLETED).intValue()); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testTaskReschedule() { // For downstream failures @@ -3150,8 +3135,7 @@ public class TestVertexImpl { Assert.assertEquals(VertexState.SUCCEEDED, v.getState()); } - - @SuppressWarnings("unchecked") + @Test(timeout = 5000) public void testVertexSuccessToRunningAfterTaskScheduler() { // For downstream failures @@ -3191,8 +3175,7 @@ public class TestVertexImpl { dagEventDispatcher.eventCount.get( DAGEventType.DAG_VERTEX_COMPLETED).intValue()); } - - @SuppressWarnings("unchecked") + @Test(timeout = 5000) public void testVertexSuccessToFailedAfterTaskScheduler() throws Exception { // For downstream failures @@ -3240,7 +3223,6 @@ public class TestVertexImpl { DAGEventType.DAG_VERTEX_COMPLETED).intValue()); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testVertexCommit() { initAllVertices(VertexState.INITED); @@ -3267,7 +3249,6 @@ public class TestVertexImpl { Assert.assertEquals(1, committer.setupCounter); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testTaskFailedAfterVertexSuccess() { initAllVertices(VertexState.INITED); @@ -3303,7 +3284,6 @@ public class TestVertexImpl { } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testBadCommitter() throws Exception { VertexImpl v = vertices.get("vertex2"); @@ -3348,7 +3328,6 @@ public class TestVertexImpl { Assert.assertEquals(1, committer.setupCounter); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testBadCommitter2() throws Exception { VertexImpl v = vertices.get("vertex2"); @@ -3391,8 +3370,7 @@ public class TestVertexImpl { Assert.assertEquals(1, committer.initCounter); Assert.assertEquals(1, committer.setupCounter); } - - @SuppressWarnings("unchecked") + @Test(timeout = 5000) public void testVertexInitWithCustomVertexManager() throws Exception { setupPreDagCreation(); @@ -3612,7 +3590,39 @@ public class TestVertexImpl { Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex4").getState()); } - @SuppressWarnings("unchecked") + @Test(timeout = 5000) + public void testVertexVMErrorReport() throws Exception { + int numTasks = 5; + // create a diamond shaped dag with 1-1 edges. + setupPreDagCreation(); + dagPlan = createDAGPlanForOneToOneSplit(null, numTasks, false); + setupPostDagCreation(); + VertexImpl v1 = vertices.get("vertex1"); + initAllVertices(VertexState.INITED); + + // fudge vertex manager so that tasks dont start running + // it is not calling reconfigurtionPlanned() but will call reconfigureVertex(). + // the vertex is already fully configured. this causes exception and verify that + // its caught and reported. + VertexManagerPluginForTestConfig config = new VertexManagerPluginForTestConfig(); + config.setReconfigureOnStart(true); + v1.vertexManager = new VertexManager(VertexManagerPluginDescriptor.create( + VertexManagerPluginForTest.class.getName()).setUserPayload( + UserPayload.create(config.getPayload())), UserGroupInformation.getCurrentUser(), v1, + appContext, mock(StateChangeNotifier.class)); + v1.vertexManager.initialize(); + + startVertex(v1, false); + dispatcher.await(); + + // failed due to exception + Assert.assertEquals(VertexState.FAILED, vertices.get("vertex1").getState()); + Assert.assertEquals(VertexTerminationCause.AM_USERCODE_FAILURE, vertices.get("vertex1") + .getTerminationCause()); + Assert.assertTrue(Joiner.on(":").join(vertices.get("vertex1").getDiagnostics()).contains( + "context.vertexReconfigurationPlanned() before re-configuring")); + } + @Test(timeout = 5000) public void testInvalidEvent() { VertexImpl v = vertices.get("vertex2"); @@ -3625,7 +3635,6 @@ public class TestVertexImpl { DAGEventType.INTERNAL_ERROR).intValue()); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testVertexWithInitializerFailure() throws Exception { useCustomInitializer = true; @@ -4125,7 +4134,6 @@ public class TestVertexImpl { initializer.stateUpdates.get(2).getVertexState()); } - @SuppressWarnings("unchecked") @Test(timeout = 10000) public void testInputInitializerEvents() throws Exception { useCustomInitializer = true; @@ -4202,7 +4210,6 @@ public class TestVertexImpl { Assert.assertEquals(0, initializerWrapper.getPendingEvents().get(v1.getName()).size()); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) /** * Ref: TEZ-1494 @@ -4563,7 +4570,6 @@ public class TestVertexImpl { Assert.assertEquals(VertexState.INITED, v1.getState()); } - @SuppressWarnings("unchecked") @Test(timeout = 500000) public void testVertexWithInitializerSuccess() throws Exception { useCustomInitializer = true; @@ -4637,8 +4643,7 @@ public class TestVertexImpl { Assert.assertEquals(1, inputSpecs.get(0).getPhysicalEdgeCount()); } } - - @SuppressWarnings("unchecked") + @Test(timeout = 5000) public void testVertexWithInputDistributor() throws Exception { useCustomInitializer = true; @@ -4674,7 +4679,6 @@ public class TestVertexImpl { Assert.assertEquals(VertexState.INITED, v2.getState()); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testVertexRootInputSpecUpdateAll() throws Exception { useCustomInitializer = true; @@ -4704,8 +4708,7 @@ public class TestVertexImpl { Assert.assertEquals(4, inputSpecs.get(0).getPhysicalEdgeCount()); } } - - @SuppressWarnings("unchecked") + @Test(timeout = 5000) public void testVertexRootInputSpecUpdatePerTask() throws Exception { useCustomInitializer = true; @@ -4983,7 +4986,6 @@ public class TestVertexImpl { } } - @SuppressWarnings("unchecked") @Test(timeout=5000) public void testVertexGroupInput() throws AMUserCodeException { setupPreDagCreation(); @@ -5010,8 +5012,7 @@ public class TestVertexImpl { assertTrue(groupInSpec.get(0).getGroupVertices().contains("B")); groupInSpec.get(0).getMergedInputDescriptor().getClassName().equals("Group.class"); } - - @SuppressWarnings("unchecked") + @Test(timeout = 5000) public void testStartWithUninitializedCustomEdge() throws Exception { // Race when a source vertex manages to start before the target vertex has @@ -5060,8 +5061,7 @@ public class TestVertexImpl { Assert.assertNotNull(vB.getTask(0)); Assert.assertNotNull(vC.getTask(0)); } - - @SuppressWarnings("unchecked") + @Test(timeout = 5000) public void testVertexConfiguredDoneByVMBeforeEdgeDefined() throws Exception { // Race when a source vertex manages to start before the target vertex has @@ -5136,7 +5136,6 @@ public class TestVertexImpl { Assert.assertNotNull(vC.getTask(0)); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testInitStartRace() throws AMUserCodeException { // Race when a source vertex manages to start before the target vertex has @@ -5160,7 +5159,6 @@ public class TestVertexImpl { Assert.assertEquals(VertexState.RUNNING, vC.getState()); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testInitStartRace2() throws AMUserCodeException { // Race when a source vertex manages to start before the target vertex has @@ -5188,7 +5186,6 @@ public class TestVertexImpl { Assert.assertEquals(VertexState.RUNNING, vC.getState()); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testExceptionFromVM_Initialize() throws AMUserCodeException { useCustomInitializer = true; @@ -5208,7 +5205,6 @@ public class TestVertexImpl { Assert.assertEquals(VertexTerminationCause.AM_USERCODE_FAILURE, v1.getTerminationCause()); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testExceptionFromVM_OnRootVertexInitialized() throws Exception { useCustomInitializer = true; @@ -5233,7 +5229,6 @@ public class TestVertexImpl { Assert.assertEquals(VertexTerminationCause.AM_USERCODE_FAILURE, v1.getTerminationCause()); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testExceptionFromVM_OnVertexStarted() throws Exception { useCustomInitializer = true; @@ -5261,7 +5256,6 @@ public class TestVertexImpl { Assert.assertEquals(VertexTerminationCause.AM_USERCODE_FAILURE, v1.getTerminationCause()); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testExceptionFromVM_OnSourceTaskCompleted() throws Exception { useCustomInitializer = true; @@ -5298,7 +5292,6 @@ public class TestVertexImpl { Assert.assertEquals(VertexTerminationCause.AM_USERCODE_FAILURE, v2.getTerminationCause()); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testExceptionFromVM_OnVertexManagerEventReceived() throws Exception { useCustomInitializer = true; @@ -5325,7 +5318,6 @@ public class TestVertexImpl { Assert.assertTrue(diagnostics.contains(VMExceptionLocation.OnVertexManagerEventReceived.name())); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testExceptionFromVM_OnVertexManagerVertexStateUpdated() throws Exception { useCustomInitializer = true; @@ -5376,7 +5368,6 @@ public class TestVertexImpl { Assert.assertEquals(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, v1.getTerminationCause()); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testExceptionFromII_InitFailedAfterInitialized() throws Exception { useCustomInitializer = true; @@ -5401,7 +5392,6 @@ public class TestVertexImpl { Assert.assertEquals(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, v1.getTerminationCause()); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testExceptionFromII_InitFailedAfterRunning() throws Exception { useCustomInitializer = true; @@ -5427,7 +5417,6 @@ public class TestVertexImpl { Assert.assertEquals(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, v1.getTerminationCause()); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testExceptionFromII_HandleInputInitializerEvent() throws Exception { useCustomInitializer = true; @@ -5507,7 +5496,6 @@ public class TestVertexImpl { Assert.assertEquals(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, v2.getTerminationCause()); } - @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testExceptionFromII_InitSucceededAfterInitFailure() throws AMUserCodeException, InterruptedException { useCustomInitializer = true; http://git-wip-us.apache.org/repos/asf/tez/blob/c6e400e2/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java b/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java index 422d785..84e060b 100644 --- a/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java +++ b/tez-dag/src/test/java/org/apache/tez/test/VertexManagerPluginForTest.java @@ -18,26 +18,92 @@ package org.apache.tez.test; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; import java.util.List; import java.util.Map; +import org.apache.hadoop.conf.Configuration; import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.VertexManagerPlugin; import org.apache.tez.dag.api.VertexManagerPluginContext; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.events.VertexManagerEvent; public class VertexManagerPluginForTest extends VertexManagerPlugin { + VertexManagerPluginForTestConfig pluginConfig = new VertexManagerPluginForTestConfig(); + + public static class VertexManagerPluginForTestConfig { + Configuration conf = new Configuration(false); + static final String RECONFIGURE_ON_START = "reconfigureOnStart"; + static final String NUM_TASKS = "numTasks"; + + public void setReconfigureOnStart(boolean value) { + conf.setBoolean(RECONFIGURE_ON_START, value); + } + + public void setNumTasks(int value) { + conf.setInt(NUM_TASKS, value); + } + + boolean getReconfigureOnStart() { + return conf.getBoolean(RECONFIGURE_ON_START, false); + } + + int getNumTasks() { + return conf.getInt(NUM_TASKS, 1); + } + + public ByteBuffer getPayload() { + ByteArrayOutputStream b = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(b); + try { + conf.write(out); + } catch (IOException e) { + throw new TezUncheckedException(e); + } + return ByteBuffer.wrap(b.toByteArray()); + } + + void initialize(ByteBuffer buff) { + ByteBuffer copy = ByteBuffer.allocate(buff.capacity()); + copy.put(buff); + copy.flip(); + ByteArrayInputStream b = new ByteArrayInputStream(copy.array()); + DataInputStream in = new DataInputStream(b); + try { + conf.readFields(in); + } catch (IOException e) { + throw new TezUncheckedException(e); + } + } + } public VertexManagerPluginForTest(VertexManagerPluginContext context) { super(context); } + @Override - public void initialize() {} + public void initialize() { + UserPayload payload = getContext().getUserPayload(); + if (payload != null && payload.getPayload() != null) { + pluginConfig.initialize(getContext().getUserPayload().getPayload()); + } + } @Override - public void onVertexStarted(Map<String, List<Integer>> completions) {} + public void onVertexStarted(Map<String, List<Integer>> completions) { + if (pluginConfig.getReconfigureOnStart()) { + getContext().reconfigureVertex(pluginConfig.getNumTasks(), null, null); + } + } @Override public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {} http://git-wip-us.apache.org/repos/asf/tez/blob/c6e400e2/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java index 8671161..30e3e81 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java @@ -30,7 +30,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.EdgeProperty.DataMovementType; import org.apache.tez.dag.api.InputDescriptor; -import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.TaskLocationHint; import org.apache.tez.dag.api.VertexManagerPlugin; @@ -124,12 +123,7 @@ public class InputReadyVertexManager extends VertexManagerPlugin { // must change parallelism to make them the same LOG.info("Update parallelism of vertex: " + getContext().getVertexName() + " to " + oneToOneSrcTaskCount + " to match source 1-1 vertices."); - try { - getContext().reconfigureVertex(oneToOneSrcTaskCount, null, null); - } catch (TezException e) { - // TODO fail vertex - TEZ-2292 - LOG.warn("Failed to change parallelism in: " + getContext().getVertexName(), e); - } + getContext().reconfigureVertex(oneToOneSrcTaskCount, null, null); } oneToOneSrcTasksDoneCount = new int[oneToOneSrcTaskCount]; oneToOneLocationHints = new TaskLocationHint[oneToOneSrcTaskCount]; http://git-wip-us.apache.org/repos/asf/tez/blob/c6e400e2/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java index 9be9986..2ea0299 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java @@ -39,7 +39,6 @@ import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.EdgeProperty.DataMovementType; import org.apache.tez.dag.api.InputDescriptor; -import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.VertexManagerPlugin; @@ -533,12 +532,8 @@ public class ShuffleVertexManager extends VertexManagerPlugin { edgeProperties.put(vertex, newEdgeProp); } - try { - getContext().reconfigureVertex(finalTaskParallelism, null, edgeProperties); - } catch (TezException e) { - // TODO fail vertex - TEZ-2292 - LOG.warn("Failed to change parallelism in: " + getContext().getVertexName(), e); - } + getContext().reconfigureVertex(finalTaskParallelism, null, edgeProperties); + updatePendingTasks(); } return true; http://git-wip-us.apache.org/repos/asf/tez/blob/c6e400e2/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java index 27cd292..8807674 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java @@ -30,7 +30,6 @@ import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.EdgeProperty.SchedulingType; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.OutputDescriptor; -import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.VertexLocationHint; @@ -66,9 +65,9 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +@SuppressWarnings({ "unchecked", "rawtypes" }) public class TestShuffleVertexManager { - @SuppressWarnings({ "unchecked", "rawtypes" }) @Test(timeout = 5000) public void testShuffleVertexManagerAutoParallelism() throws Exception { Configuration conf = new Configuration(); @@ -452,7 +451,6 @@ public class TestShuffleVertexManager { } } - @SuppressWarnings({ "unchecked", "rawtypes" }) @Test(timeout = 5000) public void testShuffleVertexManagerSlowStart() { Configuration conf = new Configuration(); @@ -779,18 +777,13 @@ public class TestShuffleVertexManager { Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9); //Ensure that setVertexParallelism is not called for R2. - try { - verify(mockContext_R2, times(0)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), - anyMap()); - // complete configuration of r1 triggers the scheduling - manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED)); - verify(mockContext_R2, times(1)).reconfigureVertex(eq(1), any(VertexLocationHint.class), - anyMap()); - } catch (TezException e) { - e.printStackTrace(); - Assert.fail(); // should not happen - } - + verify(mockContext_R2, times(0)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), + anyMap()); + // complete configuration of r1 triggers the scheduling + manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED)); + verify(mockContext_R2, times(1)).reconfigureVertex(eq(1), any(VertexLocationHint.class), + anyMap()); + Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled Assert.assertTrue(scheduledTasks.size() == 3);
