Repository: tez Updated Branches: refs/heads/branch-0.6 3875a0409 -> 6751a67f2
TEZ-2855. Fix a potential NPE while routing VertexManager events. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6751a67f Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6751a67f Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6751a67f Branch: refs/heads/branch-0.6 Commit: 6751a67f2051f44932c8fb5b80f0729e37dae7b1 Parents: 3875a04 Author: Siddharth Seth <[email protected]> Authored: Wed Sep 30 17:09:01 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Wed Sep 30 17:09:01 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/tez/dag/app/dag/impl/VertexImpl.java | 24 ++- .../tez/dag/app/dag/impl/TestVertexImpl.java | 181 ++++++++++++++++++- 3 files changed, 202 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/6751a67f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 768db48..4fdcb66 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.6.3: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2855. Fix a potential NPE while routing VertexManager events. TEZ-2716. DefaultSorter.isRleNeeded not thread safe. TEZ-2758. Remove append API in RecoveryService after TEZ-1909. TEZ-2851. Support a way for upstream applications to pass in a caller context to Tez. http://git-wip-us.apache.org/repos/asf/tez/blob/6751a67f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index ca91bb9..737b508 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -256,6 +256,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, @VisibleForTesting final List<TezEvent> pendingInitializerEvents = new LinkedList<TezEvent>(); + + @VisibleForTesting + final List<VertexManagerEvent> pendingVmEvents = new LinkedList<VertexManagerEvent>(); LegacySpeculator speculator; @@ -643,8 +646,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, private final ProcessorDescriptor processorDescriptor; private boolean vertexToBeReconfiguredByManager = false; - AtomicBoolean vmIsInitialized = new AtomicBoolean(false); - AtomicBoolean completelyConfiguredSent = new AtomicBoolean(false); + final AtomicBoolean vmIsInitialized = new AtomicBoolean(false); + final AtomicBoolean completelyConfiguredSent = new AtomicBoolean(false); @VisibleForTesting Map<Vertex, Edge> sourceVertices; @@ -2114,6 +2117,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, try { vertexManager.initialize(); vmIsInitialized.set(true); + if (!pendingVmEvents.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Processing: " + pendingVmEvents.size() + " pending VMEvents for Vertex: " + + logIdentifier); + } + for (VertexManagerEvent vmEvent : pendingVmEvents) { + vertexManager.onVertexManagerEventReceived(vmEvent); + } + pendingVmEvents.clear(); + } } catch (AMUserCodeException e) { String msg = "Exception in " + e.getSource()+ ", vertex:" + logIdentifier; LOG.error(msg, e); @@ -3820,7 +3833,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, Preconditions.checkArgument(target != null, "Event sent to unkown vertex: " + vmEvent.getTargetVertexName()); if (target == vertex) { - vertex.vertexManager.onVertexManagerEventReceived(vmEvent); + if (!vertex.vmIsInitialized.get()) { + // The VM hasn't been setup yet, defer event consumption + vertex.pendingVmEvents.add(vmEvent); + } else { + vertex.vertexManager.onVertexManagerEventReceived(vmEvent); + } } else { checkEventSourceMetadata(vertex, sourceMeta); vertex.eventHandler.handle(new VertexEventRouteEvent(target http://git-wip-us.apache.org/repos/asf/tez/blob/6751a67f/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index 66a18b3..befcac2 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -20,6 +20,7 @@ package org.apache.tez.dag.app.dag.impl; import java.nio.ByteBuffer; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; @@ -40,6 +41,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @@ -406,7 +408,92 @@ public class TestVertexImpl { .build(); return dag; } - + + // Simple dag with a CountingVM on v3 (which has v1, v2 as inputs) + // v1, v2 -> v3 + private DAGPlan createDAGPlanWithCountingVM() { + LOG.info("Setting up dag plan with coutning VertexManager"); + DAGPlan dag = DAGPlan.newBuilder() + .setName("dagWithCountingVM") + .addVertex( + VertexPlan.newBuilder() + .setName("vertex1") + .setType(PlanVertexType.NORMAL) + .setTaskConfig( + PlanTaskConfiguration.newBuilder() + .setNumTasks(1) + .setVirtualCores(4) + .setMemoryMb(1024) + .setJavaOpts("") + .setTaskModule("x1.y1") + .build() + ) + .addOutEdgeId("e1") + .build() + ) + .addVertex( + VertexPlan.newBuilder() + .setName("vertex2") + .setType(PlanVertexType.NORMAL) + .setTaskConfig( + PlanTaskConfiguration.newBuilder() + .setNumTasks(1) + .setVirtualCores(4) + .setMemoryMb(1024) + .setJavaOpts("") + .setTaskModule("x2.y2") + .build() + ) + .addOutEdgeId("e2") + .build() + ) + .addVertex( + VertexPlan.newBuilder() + .setName("vertex3") + .setType(PlanVertexType.NORMAL) + .setTaskConfig( + PlanTaskConfiguration.newBuilder() + .setNumTasks(1) + .setVirtualCores(4) + .setMemoryMb(1024) + .setJavaOpts("") + .setTaskModule("x3.y3") + .build() + ) + .addInEdgeId("e1") + .addInEdgeId("e2") + .setVertexManagerPlugin(TezEntityDescriptorProto.newBuilder() + .setClassName(InvocationCountingVertexManager.class.getName())) + .build() + ) + .addEdge( + EdgePlan.newBuilder() + .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v2")) + .setInputVertexName("vertex1") + .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2")) + .setOutputVertexName("vertex3") + .setDataMovementType(PlanEdgeDataMovementType.BROADCAST) + .setId("e1") + .setDataSourceType(PlanEdgeDataSourceType.PERSISTED) + .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL) + .build() + ) + .addEdge( + EdgePlan.newBuilder() + .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v3")) + .setInputVertexName("vertex2") + .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2")) + .setOutputVertexName("vertex3") + .setDataMovementType(PlanEdgeDataMovementType.BROADCAST) + .setId("e2") + .setDataSourceType(PlanEdgeDataSourceType.PERSISTED) + .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL) + .build() + ) + .build(); + return dag; + } + /** * v1 -> v2 */ @@ -5218,6 +5305,64 @@ public class TestVertexImpl { @SuppressWarnings("unchecked") @Test(timeout = 5000) + public void testVMEventBeforeVertexInitialized() throws Exception { + useCustomInitializer = true; + setupPreDagCreation(); + dagPlan = createDAGPlanWithCountingVM(); + setupPostDagCreation(); + + VertexImpl v1 = vertices.get("vertex1"); + VertexImpl v2 = vertices.get("vertex2"); + VertexImpl v3 = vertices.get("vertex3"); + dispatcher.getEventHandler().handle(new VertexEvent(v1.getVertexId(), + VertexEventType.V_INIT)); + dispatcher.await(); + assertEquals(VertexState.INITED, v1.getState()); + dispatcher.getEventHandler().handle(new VertexEvent(v1.getVertexId(), VertexEventType.V_START)); + dispatcher.await(); + assertEquals(VertexState.RUNNING, v1.getState()); + + assertEquals(VertexState.NEW, v3.getState()); + // Generate a VM event for v1, targeted at v3 + VertexManagerEvent vmEvent = VertexManagerEvent.create("vertex3", ByteBuffer.wrap(new byte[0])); + TezEvent tezVmEvent = new TezEvent(vmEvent, + new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", null, + TezTaskAttemptID.getInstance( + TezTaskID.getInstance(v1.getVertexId(), 1), 1))); + dispatcher.getEventHandler() + .handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezVmEvent))); + dispatcher.await(); + + assertEquals(1, v3.pendingVmEvents.size()); + assertEquals(0, InvocationCountingVertexManager.numVmEventsReceived.get()); + + // Initialize v2, which will trigger initialization of v3 + dispatcher.getEventHandler().handle(new VertexEvent(v2.getVertexId(), + VertexEventType.V_INIT)); + dispatcher.await(); + + assertEquals(VertexState.INITED, v3.getState()); + + // The VM event should have been processed. + assertEquals(0, v3.pendingVmEvents.size()); + assertEquals(1, InvocationCountingVertexManager.numVmEventsReceived.get()); + + // Send another VM event - make sure it's processed without additional events. + vmEvent = VertexManagerEvent.create("vertex3", ByteBuffer.wrap(new byte[0])); + tezVmEvent = new TezEvent(vmEvent, + new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", null, + TezTaskAttemptID.getInstance( + TezTaskID.getInstance(v1.getVertexId(), 1), 2))); + dispatcher.getEventHandler() + .handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezVmEvent))); + dispatcher.await(); + + assertEquals(0, v3.pendingVmEvents.size()); + assertEquals(2, InvocationCountingVertexManager.numVmEventsReceived.get()); + } + + @SuppressWarnings("unchecked") + @Test(timeout = 5000) public void testExceptionFromVM_Initialize() throws AMUserCodeException { useCustomInitializer = true; setupPreDagCreation(); @@ -5693,6 +5838,40 @@ public class TestVertexImpl { } } + public static class InvocationCountingVertexManager extends VertexManagerPlugin { + + static final AtomicInteger numVmEventsReceived = new AtomicInteger(0); + static final AtomicInteger numInitializedInputs = new AtomicInteger(0); + + public InvocationCountingVertexManager(VertexManagerPluginContext context) { + super(context); + } + + @Override + public void initialize() throws Exception { + } + + @Override + public void onVertexStarted(Map<String, List<Integer>> completions) throws Exception { + } + + @Override + public void onSourceTaskCompleted(String srcVertexName, Integer taskId) throws Exception { + + } + + @Override + public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) throws Exception { + numVmEventsReceived.incrementAndGet(); + } + + @Override + public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor, + List<Event> events) throws Exception { + numInitializedInputs.incrementAndGet(); + } + } + @InterfaceAudience.Private public static class VertexManagerWithException extends RootInputVertexManager{
