Repository: tez
Updated Branches:
refs/heads/branch-0.7 33351895e -> ea23646de
TEZ-3224. User payload is not initialized before creating vertex manager
plugin. (Zhiyuan Yang via hitesh)
(cherry picked from commit 014d7c4ab4fca8d6119ea6ea993de3632b9f7609)
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ea23646d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ea23646d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ea23646d
Branch: refs/heads/branch-0.7
Commit: ea23646de6de6656a437ffee3a855abb66093953
Parents: 3335189
Author: Hitesh Shah <[email protected]>
Authored: Fri Apr 22 15:32:06 2016 -0700
Committer: Hitesh Shah <[email protected]>
Committed: Fri Apr 22 15:46:45 2016 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../tez/dag/app/dag/impl/VertexManager.java | 4 +--
.../tez/dag/app/dag/impl/TestVertexManager.java | 34 ++++++++++++++++++++
3 files changed, 37 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/ea23646d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d5f35d8..cc4e5ae 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
TEZ-2972. Avoid task rescheduling when a node turns unhealthy
ALL CHANGES:
+ TEZ-3224. User payload is not initialized before creating vertex manager
plugin.
TEZ-3165. Allow Inputs/Outputs to be initialized serially, control processor
initialization relative to Inputs/Outputs
TEZ-3202. Reduce the memory need for jobs with high number of segments
TEZ-3188. Move tez.submit.hosts out of TezConfiguration to
TezConfigurationConstants.
http://git-wip-us.apache.org/repos/asf/tez/blob/ea23646d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index f9cce0e..8890513 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -387,13 +387,13 @@ public class VertexManager {
pluginContext = new VertexManagerPluginContextImpl();
Preconditions.checkArgument(pluginDesc != null);
+ payload = pluginDesc.getUserPayload();
+ pluginFailed = new AtomicBoolean(false);
plugin = ReflectionUtils.createClazzInstance(pluginDesc.getClassName(),
new Class[] { VertexManagerPluginContext.class }, new Object[] {
pluginContext });
- payload = pluginDesc.getUserPayload();
execService = appContext.getExecService();
eventQueue = new LinkedBlockingQueue<VertexManagerEvent>();
eventInFlight = new AtomicBoolean(false);
- pluginFailed = new AtomicBoolean(false);
}
public VertexManagerPlugin getPlugin() {
http://git-wip-us.apache.org/repos/asf/tez/blob/ea23646d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
index 9c16f5e..b93b298 100644
---
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
+++
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
@@ -19,6 +19,7 @@
package org.apache.tez.dag.app.dag.impl;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.doReturn;
@@ -27,6 +28,8 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@@ -38,6 +41,8 @@ import java.util.concurrent.Callable;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
@@ -95,6 +100,35 @@ public class TestVertexManager {
requestCaptor =
ArgumentCaptor.forClass(VertexEventInputDataInformation.class);
}
+
+ public static class CheckUserPayloadVertexManagerPlugin extends
VertexManagerPlugin {
+ public CheckUserPayloadVertexManagerPlugin(VertexManagerPluginContext
context) {
+ super(context);
+ assertNotNull(context.getUserPayload());
+ }
+
+ @Override
+ public void initialize() throws Exception {}
+
+ @Override
+ public void onVertexManagerEventReceived(VertexManagerEvent vmEvent)
throws Exception {}
+
+ @Override
+ public void onRootVertexInitialized(String inputName, InputDescriptor
inputDescriptor,
+ List<Event> events) throws Exception {}
+ }
+
+ @Test(timeout = 5000)
+ public void testVertexManagerPluginCtorAccessUserPayload() throws
IOException, TezException {
+ byte[] randomUserPayload = {1,2,3};
+ UserPayload userPayload =
UserPayload.create(ByteBuffer.wrap(randomUserPayload));
+ VertexManager vm =
+ new VertexManager(
+
VertexManagerPluginDescriptor.create(CheckUserPayloadVertexManagerPlugin.class
+ .getName()).setUserPayload(userPayload),
UserGroupInformation.getCurrentUser(),
+ mockVertex, mockAppContext, mock(StateChangeNotifier.class));
+ }
+
@Test(timeout = 5000)
public void testOnRootVertexInitialized() throws Exception {