Repository: tez Updated Branches: refs/heads/branch-0.7 98a72e5aa -> a7b7ed7fe
TEZ-2855. Fix a potential NPE while routing VertexManager events. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a7b7ed7f Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a7b7ed7f Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a7b7ed7f Branch: refs/heads/branch-0.7 Commit: a7b7ed7fe2f125d231458da7bcc1de6bed3efd61 Parents: 98a72e5 Author: Siddharth Seth <[email protected]> Authored: Wed Sep 30 17:08:15 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Wed Sep 30 17:08:15 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/tez/dag/app/dag/impl/VertexImpl.java | 24 ++- .../tez/dag/app/dag/impl/TestVertexImpl.java | 180 ++++++++++++++++++- 3 files changed, 202 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/a7b7ed7f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c55f4bf..614f67c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.7.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES + TEZ-2855. Fix a potential NPE while routing VertexManager events. TEZ-2758. Remove append API in RecoveryService after TEZ-1909. TEZ-2851. Support a way for upstream applications to pass in a caller context to Tez. TEZ-2858. Stop using System.currentTimeMillis in TestInputReadyTracker. @@ -283,6 +284,7 @@ Release 0.6.3: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2855. Fix a potential NPE while routing VertexManager events. TEZ-2716. DefaultSorter.isRleNeeded not thread safe TEZ-2758. Remove append API in RecoveryService after TEZ-1909. TEZ-2851. Support a way for upstream applications to pass in a caller context to Tez. http://git-wip-us.apache.org/repos/asf/tez/blob/a7b7ed7f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 4a8309e..2f81a50 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -277,6 +277,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl @VisibleForTesting final List<TezEvent> pendingInitializerEvents = new LinkedList<TezEvent>(); + + @VisibleForTesting + final List<VertexManagerEvent> pendingVmEvents = new LinkedList<VertexManagerEvent>(); LegacySpeculator speculator; @@ -710,8 +713,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl private final ProcessorDescriptor processorDescriptor; private boolean vertexToBeReconfiguredByManager = false; - AtomicBoolean vmIsInitialized = new AtomicBoolean(false); - AtomicBoolean completelyConfiguredSent = new AtomicBoolean(false); + final AtomicBoolean vmIsInitialized = new AtomicBoolean(false); + final AtomicBoolean completelyConfiguredSent = new AtomicBoolean(false); @VisibleForTesting Map<Vertex, Edge> sourceVertices; @@ -2429,6 +2432,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl try { vertexManager.initialize(); vmIsInitialized.set(true); + if (!pendingVmEvents.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Processing: " + pendingVmEvents.size() + " pending VMEvents for Vertex: " + + logIdentifier); + } + for (VertexManagerEvent vmEvent : pendingVmEvents) { + vertexManager.onVertexManagerEventReceived(vmEvent); + } + pendingVmEvents.clear(); + } } catch (AMUserCodeException e) { String msg = "Exception in " + e.getSource()+ ", vertex:" + logIdentifier; LOG.error(msg, e); @@ -4356,7 +4369,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl Preconditions.checkArgument(target != null, "Event sent to unkown vertex: " + vmEvent.getTargetVertexName()); if (target == this) { - vertexManager.onVertexManagerEventReceived(vmEvent); + if (!vmIsInitialized.get()) { + // The VM hasn't been setup yet, defer event consumption + pendingVmEvents.add(vmEvent); + } else { + vertexManager.onVertexManagerEventReceived(vmEvent); + } } else { checkEventSourceMetadata(this, sourceMeta); eventHandler.handle(new VertexEventRouteEvent(target http://git-wip-us.apache.org/repos/asf/tez/blob/a7b7ed7f/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index f65ecab..e6387b3 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -20,6 +20,7 @@ package org.apache.tez.dag.app.dag.impl; import java.nio.ByteBuffer; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; @@ -41,6 +42,7 @@ import java.util.Locale; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @@ -423,7 +425,92 @@ public class TestVertexImpl { .build(); return dag; } - + + // Simple dag with a CountingVM on v3 (which has v1, v2 as inputs) + // v1, v2 -> v3 + private DAGPlan createDAGPlanWithCountingVM() { + LOG.info("Setting up dag plan with coutning VertexManager"); + DAGPlan dag = DAGPlan.newBuilder() + .setName("dagWithCountingVM") + .addVertex( + VertexPlan.newBuilder() + .setName("vertex1") + .setType(PlanVertexType.NORMAL) + .setTaskConfig( + PlanTaskConfiguration.newBuilder() + .setNumTasks(1) + .setVirtualCores(4) + .setMemoryMb(1024) + .setJavaOpts("") + .setTaskModule("x1.y1") + .build() + ) + .addOutEdgeId("e1") + .build() + ) + .addVertex( + VertexPlan.newBuilder() + .setName("vertex2") + .setType(PlanVertexType.NORMAL) + .setTaskConfig( + PlanTaskConfiguration.newBuilder() + .setNumTasks(1) + .setVirtualCores(4) + .setMemoryMb(1024) + .setJavaOpts("") + .setTaskModule("x2.y2") + .build() + ) + .addOutEdgeId("e2") + .build() + ) + .addVertex( + VertexPlan.newBuilder() + .setName("vertex3") + .setType(PlanVertexType.NORMAL) + .setTaskConfig( + PlanTaskConfiguration.newBuilder() + .setNumTasks(1) + .setVirtualCores(4) + .setMemoryMb(1024) + .setJavaOpts("") + .setTaskModule("x3.y3") + .build() + ) + .addInEdgeId("e1") + .addInEdgeId("e2") + .setVertexManagerPlugin(TezEntityDescriptorProto.newBuilder() + .setClassName(InvocationCountingVertexManager.class.getName())) + .build() + ) + .addEdge( + EdgePlan.newBuilder() + .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v2")) + .setInputVertexName("vertex1") + .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2")) + .setOutputVertexName("vertex3") + .setDataMovementType(PlanEdgeDataMovementType.BROADCAST) + .setId("e1") + .setDataSourceType(PlanEdgeDataSourceType.PERSISTED) + .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL) + .build() + ) + .addEdge( + EdgePlan.newBuilder() + .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v3")) + .setInputVertexName("vertex2") + .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2")) + .setOutputVertexName("vertex3") + .setDataMovementType(PlanEdgeDataMovementType.BROADCAST) + .setId("e2") + .setDataSourceType(PlanEdgeDataSourceType.PERSISTED) + .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL) + .build() + ) + .build(); + return dag; + } + /** * v1 -> v2 */ @@ -5607,6 +5694,63 @@ public class TestVertexImpl { } @Test(timeout = 5000) + public void testVMEventBeforeVertexInitialized() throws Exception { + useCustomInitializer = true; + setupPreDagCreation(); + dagPlan = createDAGPlanWithCountingVM(); + setupPostDagCreation(); + + VertexImpl v1 = vertices.get("vertex1"); + VertexImpl v2 = vertices.get("vertex2"); + VertexImpl v3 = vertices.get("vertex3"); + dispatcher.getEventHandler().handle(new VertexEvent(v1.getVertexId(), + VertexEventType.V_INIT)); + dispatcher.await(); + assertEquals(VertexState.INITED, v1.getState()); + dispatcher.getEventHandler().handle(new VertexEvent(v1.getVertexId(), VertexEventType.V_START)); + dispatcher.await(); + assertEquals(VertexState.RUNNING, v1.getState()); + + assertEquals(VertexState.NEW, v3.getState()); + // Generate a VM event for v1, targeted at v3 + VertexManagerEvent vmEvent = VertexManagerEvent.create("vertex3", ByteBuffer.wrap(new byte[0])); + TezEvent tezVmEvent = new TezEvent(vmEvent, + new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", null, + TezTaskAttemptID.getInstance( + TezTaskID.getInstance(v1.getVertexId(), 1), 1))); + dispatcher.getEventHandler() + .handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezVmEvent))); + dispatcher.await(); + + assertEquals(1, v3.pendingVmEvents.size()); + assertEquals(0, InvocationCountingVertexManager.numVmEventsReceived.get()); + + // Initialize v2, which will trigger initialization of v3 + dispatcher.getEventHandler().handle(new VertexEvent(v2.getVertexId(), + VertexEventType.V_INIT)); + dispatcher.await(); + + assertEquals(VertexState.INITED, v3.getState()); + + // The VM event should have been processed. + assertEquals(0, v3.pendingVmEvents.size()); + assertEquals(1, InvocationCountingVertexManager.numVmEventsReceived.get()); + + // Send another VM event - make sure it's processed without additional events. + vmEvent = VertexManagerEvent.create("vertex3", ByteBuffer.wrap(new byte[0])); + tezVmEvent = new TezEvent(vmEvent, + new EventMetaData(EventProducerConsumerType.OUTPUT, "vertex1", null, + TezTaskAttemptID.getInstance( + TezTaskID.getInstance(v1.getVertexId(), 1), 2))); + dispatcher.getEventHandler() + .handle(new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezVmEvent))); + dispatcher.await(); + + assertEquals(0, v3.pendingVmEvents.size()); + assertEquals(2, InvocationCountingVertexManager.numVmEventsReceived.get()); + } + + @Test(timeout = 5000) public void testExceptionFromVM_Initialize() throws TezException { useCustomInitializer = true; setupPreDagCreation(); @@ -6075,6 +6219,40 @@ public class TestVertexImpl { } } + public static class InvocationCountingVertexManager extends VertexManagerPlugin { + + static final AtomicInteger numVmEventsReceived = new AtomicInteger(0); + static final AtomicInteger numInitializedInputs = new AtomicInteger(0); + + public InvocationCountingVertexManager(VertexManagerPluginContext context) { + super(context); + } + + @Override + public void initialize() throws Exception { + } + + @Override + public void onVertexStarted(Map<String, List<Integer>> completions) throws Exception { + } + + @Override + public void onSourceTaskCompleted(String srcVertexName, Integer taskId) throws Exception { + + } + + @Override + public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) throws Exception { + numVmEventsReceived.incrementAndGet(); + } + + @Override + public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor, + List<Event> events) throws Exception { + numInitializedInputs.incrementAndGet(); + } + } + @InterfaceAudience.Private public static class VertexManagerWithException extends RootInputVertexManager{
