TEZ-3224. User payload is not initialized before creating vertex manager plugin. (Zhiyuan Yang via hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/014d7c4a Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/014d7c4a Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/014d7c4a Branch: refs/heads/master Commit: 014d7c4ab4fca8d6119ea6ea993de3632b9f7609 Parents: 8c59eb8 Author: Hitesh Shah <[email protected]> Authored: Fri Apr 22 15:32:06 2016 -0700 Committer: Hitesh Shah <[email protected]> Committed: Fri Apr 22 15:32:06 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../tez/dag/app/dag/impl/VertexManager.java | 4 +-- .../tez/dag/app/dag/impl/TestVertexManager.java | 34 ++++++++++++++++++++ 3 files changed, 37 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/014d7c4a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 049f90c..3ef01ba 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.8.4: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3224. User payload is not initialized before creating vertex manager plugin. TEZ-3226. Tez UI 2: All DAGs UX improvements. TEZ-3077. TezClient.waitTillReady should support timeout. TEZ-3202. Reduce the memory need for jobs with high number of segments http://git-wip-us.apache.org/repos/asf/tez/blob/014d7c4a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java index 388d3c7..45f72bd 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java @@ -384,13 +384,13 @@ public class VertexManager { pluginContext = new VertexManagerPluginContextImpl(); Preconditions.checkArgument(pluginDesc != null); + payload = pluginDesc.getUserPayload(); + pluginFailed = new AtomicBoolean(false); plugin = ReflectionUtils.createClazzInstance(pluginDesc.getClassName(), new Class[] { VertexManagerPluginContext.class }, new Object[] { pluginContext }); - payload = pluginDesc.getUserPayload(); execService = appContext.getExecService(); eventQueue = new LinkedBlockingQueue<VertexManagerEvent>(); eventInFlight = new AtomicBoolean(false); - pluginFailed = new AtomicBoolean(false); } public VertexManagerPlugin getPlugin() { http://git-wip-us.apache.org/repos/asf/tez/blob/014d7c4a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java index 9c16f5e..b93b298 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java @@ -19,6 +19,7 @@ package org.apache.tez.dag.app.dag.impl; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.mockito.Matchers.any; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.doReturn; @@ -27,6 +28,8 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.IOException; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; @@ -38,6 +41,8 @@ import java.util.concurrent.Callable; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.VertexManagerPlugin; import org.apache.tez.dag.api.VertexManagerPluginContext; import org.apache.tez.dag.api.VertexManagerPluginDescriptor; @@ -95,6 +100,35 @@ public class TestVertexManager { requestCaptor = ArgumentCaptor.forClass(VertexEventInputDataInformation.class); } + + public static class CheckUserPayloadVertexManagerPlugin extends VertexManagerPlugin { + public CheckUserPayloadVertexManagerPlugin(VertexManagerPluginContext context) { + super(context); + assertNotNull(context.getUserPayload()); + } + + @Override + public void initialize() throws Exception {} + + @Override + public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) throws Exception {} + + @Override + public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor, + List<Event> events) throws Exception {} + } + + @Test(timeout = 5000) + public void testVertexManagerPluginCtorAccessUserPayload() throws IOException, TezException { + byte[] randomUserPayload = {1,2,3}; + UserPayload userPayload = UserPayload.create(ByteBuffer.wrap(randomUserPayload)); + VertexManager vm = + new VertexManager( + VertexManagerPluginDescriptor.create(CheckUserPayloadVertexManagerPlugin.class + .getName()).setUserPayload(userPayload), UserGroupInformation.getCurrentUser(), + mockVertex, mockAppContext, mock(StateChangeNotifier.class)); + } + @Test(timeout = 5000) public void testOnRootVertexInitialized() throws Exception {
