Repository: tez Updated Branches: refs/heads/TEZ-2003 85cbf1695 -> ded95e59e (forced update)
TEZ-2125. Create a task communicator for local mode. Allow tasks to run in the AM. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/899a310b Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/899a310b Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/899a310b Branch: refs/heads/TEZ-2003 Commit: 899a310b61123c7fa6757935eaf63733d04d7699 Parents: 699634f Author: Siddharth Seth <[email protected]> Authored: Fri Feb 20 16:12:52 2015 -0800 Committer: Siddharth Seth <[email protected]> Committed: Fri Aug 14 13:46:42 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../org/apache/tez/dag/app/DAGAppMaster.java | 25 ++++--- .../dag/app/TaskAttemptListenerImpTezDag.java | 18 +++-- .../dag/app/TezLocalTaskCommunicatorImpl.java | 46 +++++++++++++ .../tez/dag/app/TezTaskCommunicatorImpl.java | 71 ++++++++------------ .../app/launcher/ContainerLauncherRouter.java | 17 +++-- .../app/launcher/LocalContainerLauncher.java | 31 ++++++--- .../dag/app/rm/TaskSchedulerEventHandler.java | 2 + .../apache/tez/dag/app/MockDAGAppMaster.java | 3 +- .../app/TestTaskAttemptListenerImplTezDag.java | 2 +- .../tez/service/impl/ContainerRunnerImpl.java | 2 +- .../tez/tests/TestExternalTezServices.java | 57 +++++++++++++--- .../org/apache/tez/runtime/task/TezChild.java | 34 +++++----- 13 files changed, 206 insertions(+), 103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/899a310b/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index 1a2264c..76496c9 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -5,5 +5,6 @@ ALL CHANGES: TEZ-2117. Add a manager for ContainerLaunchers running in the AM. TEZ-2122. Setup pluggable components at AM/Vertex level. TEZ-2123. Fix component managers to use pluggable components. (Enable hybrid mode) + TEZ-2125. Create a task communicator for local mode. Allow tasks to run in the AM. INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/899a310b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index a304b37..43f8794 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -469,7 +469,7 @@ public class DAGAppMaster extends AbstractService { //service to handle requests to TaskUmbilicalProtocol taskAttemptListener = createTaskAttemptListener(context, - taskHeartbeatHandler, containerHeartbeatHandler, taskCommunicatorClassIdentifiers); + taskHeartbeatHandler, containerHeartbeatHandler, taskCommunicatorClassIdentifiers, isLocal); addIfService(taskAttemptListener, true); containerSignatureMatcher = createContainerSignatureMatcher(); @@ -535,7 +535,7 @@ public class DAGAppMaster extends AbstractService { taskSchedulerEventHandler); addIfServiceDependency(taskSchedulerEventHandler, clientRpcServer); - this.containerLauncherRouter = createContainerLauncherRouter(conf, containerLauncherClassIdentifiers); + this.containerLauncherRouter = createContainerLauncherRouter(conf, containerLauncherClassIdentifiers, isLocal); addIfService(containerLauncherRouter, true); dispatcher.register(NMCommunicatorEventType.class, containerLauncherRouter); @@ -1043,9 +1043,13 @@ public class DAGAppMaster extends AbstractService { } protected TaskAttemptListener createTaskAttemptListener(AppContext context, - TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh, String[] taskCommunicatorClasses) { + TaskHeartbeatHandler thh, + ContainerHeartbeatHandler chh, + String[] taskCommunicatorClasses, + boolean isLocal) { TaskAttemptListener lis = - new TaskAttemptListenerImpTezDag(context, thh, chh,jobTokenSecretManager, taskCommunicatorClasses); + new TaskAttemptListenerImpTezDag(context, thh, chh, jobTokenSecretManager, + taskCommunicatorClasses, isLocal); return lis; } @@ -1066,10 +1070,12 @@ public class DAGAppMaster extends AbstractService { return chh; } - protected ContainerLauncherRouter createContainerLauncherRouter(Configuration conf, String []containerLauncherClasses) throws + protected ContainerLauncherRouter createContainerLauncherRouter(Configuration conf, + String[] containerLauncherClasses, + boolean isLocal) throws UnknownHostException { - return new ContainerLauncherRouter(conf, context, taskAttemptListener, workingDirectory, containerLauncherClasses); - + return new ContainerLauncherRouter(conf, context, taskAttemptListener, workingDirectory, + containerLauncherClasses, isLocal); } public ApplicationId getAppID() { @@ -2389,9 +2395,8 @@ public class DAGAppMaster extends AbstractService { StringBuilder sb = new StringBuilder(); sb.append("AM Level configured ").append(component).append(": "); for (int i = 0; i < classIdentifiers.length; i++) { - sb.append("[").append(i).append(":").append(map.inverse().get(i)).append(":") - .append(taskSchedulers.inverse().get(i)).append( - "]"); + sb.append("[").append(i).append(":").append(map.inverse().get(i)) + .append(":").append(classIdentifiers[i]).append("]"); if (i != classIdentifiers.length - 1) { sb.append(","); } http://git-wip-us.apache.org/repos/asf/tez/blob/899a310b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java index 05c4623..c48601c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java @@ -98,13 +98,20 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh, // TODO TEZ-2003 pre-merge. Remove reference to JobTokenSecretManager. JobTokenSecretManager jobTokenSecretManager, - String [] taskCommunicatorClassIdentifiers) { + String [] taskCommunicatorClassIdentifiers, + boolean isPureLocalMode) { super(TaskAttemptListenerImpTezDag.class.getName()); this.context = context; this.taskHeartbeatHandler = thh; this.containerHeartbeatHandler = chh; if (taskCommunicatorClassIdentifiers == null || taskCommunicatorClassIdentifiers.length == 0) { - taskCommunicatorClassIdentifiers = new String[] {TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT}; + if (isPureLocalMode) { + taskCommunicatorClassIdentifiers = + new String[]{TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT}; + } else { + taskCommunicatorClassIdentifiers = + new String[]{TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT}; + } } this.taskCommunicators = new TaskCommunicator[taskCommunicatorClassIdentifiers.length]; for (int i = 0 ; i < taskCommunicatorClassIdentifiers.length ; i++) { @@ -130,11 +137,12 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements } private TaskCommunicator createTaskCommunicator(String taskCommClassIdentifier) { - if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT) || - taskCommClassIdentifier - .equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) { + if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) { LOG.info("Using Default Task Communicator"); return new TezTaskCommunicatorImpl(this); + } else if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) { + LOG.info("Using Default Local Task Communicator"); + return new TezLocalTaskCommunicatorImpl(this); } else { LOG.info("Using TaskCommunicator: " + taskCommClassIdentifier); Class<? extends TaskCommunicator> taskCommClazz = (Class<? extends TaskCommunicator>) ReflectionUtils http://git-wip-us.apache.org/repos/asf/tez/blob/899a310b/tez-dag/src/main/java/org/apache/tez/dag/app/TezLocalTaskCommunicatorImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezLocalTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezLocalTaskCommunicatorImpl.java new file mode 100644 index 0000000..3704cc4 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezLocalTaskCommunicatorImpl.java @@ -0,0 +1,46 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tez.dag.api.TaskCommunicatorContext; +import org.apache.tez.dag.api.TezUncheckedException; + +public class TezLocalTaskCommunicatorImpl extends TezTaskCommunicatorImpl { + + private static final Log LOG = LogFactory.getLog(TezLocalTaskCommunicatorImpl.class); + + public TezLocalTaskCommunicatorImpl( + TaskCommunicatorContext taskCommunicatorContext) { + super(taskCommunicatorContext); + } + + @Override + protected void startRpcServer() { + try { + this.address = new InetSocketAddress(InetAddress.getLocalHost(), 0); + } catch (UnknownHostException e) { + throw new TezUncheckedException(e); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Not starting TaskAttemptListener RPC in LocalMode"); + } + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/899a310b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java index 77d2e39..ac8da40 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java @@ -15,10 +15,8 @@ package org.apache.tez.dag.app; import java.io.IOException; -import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.URISyntaxException; -import java.net.UnknownHostException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -76,7 +74,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { private final TezTaskUmbilicalProtocol taskUmbilical; private final String tokenIdentifier; private final Token<JobTokenIdentifier> sessionToken; - private InetSocketAddress address; + protected InetSocketAddress address; private Server server; public static final class ContainerInfo { @@ -120,10 +118,8 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { this.sessionToken = TokenCache.getSessionToken(taskCommunicatorContext.getCredentials()); } - @Override public void serviceStart() { - startRpcServer(); } @@ -134,45 +130,34 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { protected void startRpcServer() { Configuration conf = getConfig(); - if (!conf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE, TezConfiguration.TEZ_LOCAL_MODE_DEFAULT)) { - try { - JobTokenSecretManager jobTokenSecretManager = - new JobTokenSecretManager(); - jobTokenSecretManager.addTokenForJob(tokenIdentifier, sessionToken); - - server = new RPC.Builder(conf) - .setProtocol(TezTaskUmbilicalProtocol.class) - .setBindAddress("0.0.0.0") - .setPort(0) - .setInstance(taskUmbilical) - .setNumHandlers( - conf.getInt(TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT, - TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT)) - .setPortRangeConfig(TezConfiguration.TEZ_AM_TASK_AM_PORT_RANGE) - .setSecretManager(jobTokenSecretManager).build(); - - // Enable service authorization? - if (conf.getBoolean( - CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, - false)) { - refreshServiceAcls(conf, new TezAMPolicyProvider()); - } - - server.start(); - this.address = NetUtils.getConnectAddress(server); - LOG.info("Instantiated TezTaskCommunicator RPC at " + this.address); - } catch (IOException e) { - throw new TezUncheckedException(e); - } - } else { - try { - this.address = new InetSocketAddress(InetAddress.getLocalHost(), 0); - } catch (UnknownHostException e) { - throw new TezUncheckedException(e); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Not starting TaskAttemptListener RPC in LocalMode"); + try { + JobTokenSecretManager jobTokenSecretManager = + new JobTokenSecretManager(); + jobTokenSecretManager.addTokenForJob(tokenIdentifier, sessionToken); + + server = new RPC.Builder(conf) + .setProtocol(TezTaskUmbilicalProtocol.class) + .setBindAddress("0.0.0.0") + .setPort(0) + .setInstance(taskUmbilical) + .setNumHandlers( + conf.getInt(TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT, + TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT)) + .setPortRangeConfig(TezConfiguration.TEZ_AM_TASK_AM_PORT_RANGE) + .setSecretManager(jobTokenSecretManager).build(); + + // Enable service authorization? + if (conf.getBoolean( + CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, + false)) { + refreshServiceAcls(conf, new TezAMPolicyProvider()); } + + server.start(); + this.address = NetUtils.getConnectAddress(server); + LOG.info("Instantiated TezTaskCommunicator RPC at " + this.address); + } catch (IOException e) { + throw new TezUncheckedException(e); } } http://git-wip-us.apache.org/repos/asf/tez/blob/899a310b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java index 4f9b5bf..70b0cbc 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java @@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.common.ReflectionUtils; -import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.app.AppContext; @@ -49,17 +48,24 @@ public class ContainerLauncherRouter extends AbstractService public ContainerLauncherRouter(Configuration conf, AppContext context, TaskAttemptListener taskAttemptListener, String workingDirectory, - String[] containerLauncherClassIdentifiers) throws UnknownHostException { + String[] containerLauncherClassIdentifiers, + boolean isPureLocalMode) throws UnknownHostException { super(ContainerLauncherRouter.class.getName()); if (containerLauncherClassIdentifiers == null || containerLauncherClassIdentifiers.length == 0) { - containerLauncherClassIdentifiers = new String[] {TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT}; + if (isPureLocalMode) { + containerLauncherClassIdentifiers = + new String[]{TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT}; + } else { + containerLauncherClassIdentifiers = + new String[]{TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT}; + } } containerLaunchers = new ContainerLauncher[containerLauncherClassIdentifiers.length]; for (int i = 0; i < containerLauncherClassIdentifiers.length; i++) { containerLaunchers[i] = createContainerLauncher(containerLauncherClassIdentifiers[i], context, - taskAttemptListener, workingDirectory, conf); + taskAttemptListener, workingDirectory, isPureLocalMode, conf); } } @@ -67,6 +73,7 @@ public class ContainerLauncherRouter extends AbstractService AppContext context, TaskAttemptListener taskAttemptListener, String workingDirectory, + boolean isPureLocalMode, Configuration conf) throws UnknownHostException { if (containerLauncherClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) { @@ -76,7 +83,7 @@ public class ContainerLauncherRouter extends AbstractService .equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) { LOG.info("Creating LocalContainerLauncher"); return - new LocalContainerLauncher(context, taskAttemptListener, workingDirectory); + new LocalContainerLauncher(context, taskAttemptListener, workingDirectory, isPureLocalMode); } else { LOG.info("Creating container launcher : " + containerLauncherClassIdentifier); Class<? extends ContainerLauncher> containerLauncherClazz = http://git-wip-us.apache.org/repos/asf/tez/blob/899a310b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java index 3c27678..7dbf937 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java @@ -36,6 +36,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -90,9 +91,10 @@ public class LocalContainerLauncher extends AbstractService implements private final AtomicBoolean serviceStopped = new AtomicBoolean(false); private final String workingDirectory; private final TaskAttemptListener tal; - private final Map<String, String> localEnv = new HashMap<String, String>(); + private final Map<String, String> localEnv; private final ExecutionContext executionContext; private int numExecutors; + private final boolean isPureLocalMode; private final ConcurrentHashMap<ContainerId, RunningTaskCallback> runningContainers = @@ -112,16 +114,26 @@ public class LocalContainerLauncher extends AbstractService implements public LocalContainerLauncher(AppContext context, TaskAttemptListener taskAttemptListener, - String workingDirectory) throws UnknownHostException { + String workingDirectory, + boolean isPureLocalMode) throws UnknownHostException { super(LocalContainerLauncher.class.getName()); this.context = context; this.tal = taskAttemptListener; this.workingDirectory = workingDirectory; - AuxiliaryServiceHelper.setServiceDataIntoEnv( - ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, ByteBuffer.allocate(4).putInt(0), localEnv); - executionContext = new ExecutionContextImpl(InetAddress.getLocalHost().getHostName()); - // User cannot be set here since it isn't available till a DAG is running. + this.isPureLocalMode = isPureLocalMode; + if (isPureLocalMode) { + localEnv = Maps.newHashMap(); + AuxiliaryServiceHelper.setServiceDataIntoEnv( + ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, ByteBuffer.allocate(4).putInt(0), localEnv); + } else { + localEnv = System.getenv(); + } + + // Check if the hostname is set in the environment before overriding it. + String host = isPureLocalMode ? InetAddress.getLocalHost().getHostName() : + System.getenv(Environment.NM_HOST.name()); + executionContext = new ExecutionContextImpl(host); } @Override @@ -349,7 +361,9 @@ public class LocalContainerLauncher extends AbstractService implements InterruptedException, TezException, IOException { Map<String, String> containerEnv = new HashMap<String, String>(); containerEnv.putAll(localEnv); - containerEnv.put(Environment.USER.name(), context.getUser()); + // Use the user from env if it's available. + String user = isPureLocalMode ? System.getenv(Environment.USER.name()) : context.getUser(); + containerEnv.put(Environment.USER.name(), user); long memAvailable; synchronized (this) { // needed to fix findbugs Inconsistent synchronization warning @@ -358,8 +372,7 @@ public class LocalContainerLauncher extends AbstractService implements TezChild tezChild = TezChild.newTezChild(defaultConf, null, 0, containerId.toString(), tokenIdentifier, attemptNumber, localDirs, workingDirectory, containerEnv, "", executionContext, credentials, - memAvailable, context.getUser()); - tezChild.setUmbilical(tezTaskUmbilicalProtocol); + memAvailable, context.getUser(), tezTaskUmbilicalProtocol); return tezChild; } http://git-wip-us.apache.org/repos/asf/tez/blob/899a310b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java index ae83730..ba46a67 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java @@ -434,6 +434,8 @@ public class TaskSchedulerEventHandler extends AbstractService } else { customAppIdIdentifier = SCHEDULER_APP_ID_BASE + (j++ * SCHEDULER_APP_ID_INCREMENT); } + LOG.info("ClusterIdentifier for TaskScheduler [" + i + ":" + taskSchedulerClasses[i] + "]=" + + customAppIdIdentifier); taskSchedulers[i] = createTaskScheduler(host, port, trackingUrl, appContext, taskSchedulerClasses[i], customAppIdIdentifier); } http://git-wip-us.apache.org/repos/asf/tez/blob/899a310b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java index b70b9ea..7f45ee6 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java @@ -516,7 +516,8 @@ public class MockDAGAppMaster extends DAGAppMaster { // use mock container launcher for tests @Override protected ContainerLauncherRouter createContainerLauncherRouter(final Configuration conf, - String[] containerLaunchers) + String[] containerLaunchers, + boolean isLocal) throws UnknownHostException { return new ContainerLauncherRouter(containerLauncher); } http://git-wip-us.apache.org/repos/asf/tez/blob/899a310b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java index 200e737..1572c8b 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java @@ -125,7 +125,7 @@ public class TestTaskAttemptListenerImplTezDag { doReturn(container).when(amContainer).getContainer(); taskAttemptListener = new TaskAttemptListenerImplForTest(appContext, - mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, null); + mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, null, false); TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator(); TezTaskUmbilicalProtocol tezUmbilical = taskCommunicator.getUmbilical(); http://git-wip-us.apache.org/repos/asf/tez/blob/899a310b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java index 4a6ce33..25d6030 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java @@ -282,7 +282,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun request.getContainerIdString(), request.getTokenIdentifier(), request.getAppAttemptNumber(), workingDir, localDirs, envMap, objectRegistry, pid, - executionContext, credentials, memoryAvailable, request.getUser()); + executionContext, credentials, memoryAvailable, request.getUser(), null); ContainerExecutionResult result = tezChild.run(); LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" + sw.stop().elapsedMillis()); http://git-wip-us.apache.org/repos/asf/tez/blob/899a310b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java index 9c149c6..01c2080 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java @@ -40,6 +40,7 @@ import org.apache.tez.service.MiniTezTestServiceCluster; import org.apache.tez.test.MiniTezCluster; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; public class TestExternalTezServices { @@ -120,26 +121,23 @@ public class TestExternalTezServices { confForJobs.setStrings(TezConfiguration.TEZ_AM_TASK_SCHEDULERS, TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT, -// TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT, + TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT, EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceTaskSchedulerService.class.getName()); confForJobs.setStrings(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS, TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT, -// TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT, + TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT, EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceNoOpContainerLauncher.class.getName()); confForJobs.setStrings(TezConfiguration.TEZ_AM_TASK_COMMUNICATORS, TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT, -// TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT, + TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT, EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceTaskCommunicatorImpl.class.getName()); // Default all jobs to run via the service. Individual tests override this on a per vertex/dag level. - confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, - TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT); - confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, - TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT); - confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, - TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT); + confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, EXT_PUSH_ENTITY_NAME); + confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, EXT_PUSH_ENTITY_NAME); + confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, EXT_PUSH_ENTITY_NAME); // Setup various executor sets PROPS_REGULAR_CONTAINERS.put(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, @@ -232,18 +230,55 @@ public class TestExternalTezServices { @Test(timeout = 60000) public void testMixed1() throws Exception { // M-ExtService, R-containers - int expectedExternalSubmissions = 4 + 0; //4 for 4 src files, 3 for num reducers. + int expectedExternalSubmissions = 4 + 0; //4 for 4 src files, 0 for num reducers. runJoinValidate("Mixed1", expectedExternalSubmissions, PROPS_EXT_SERVICE_PUSH, PROPS_EXT_SERVICE_PUSH, PROPS_REGULAR_CONTAINERS); } @Test(timeout = 60000) public void testMixed2() throws Exception { // M-Containers, R-ExtService - int expectedExternalSubmissions = 0 + 3; //4 for 4 src files, 3 for num reducers. + int expectedExternalSubmissions = 0 + 3; // 3 for num reducers. runJoinValidate("Mixed2", expectedExternalSubmissions, PROPS_REGULAR_CONTAINERS, PROPS_REGULAR_CONTAINERS, PROPS_EXT_SERVICE_PUSH); } + @Test(timeout = 60000) + public void testMixed3() throws Exception { // M - service, R-AM + int expectedExternalSubmissions = 4 + 0; //4 for 4 src files, 0 for num reducers (in-AM). + runJoinValidate("Mixed3", expectedExternalSubmissions, PROPS_EXT_SERVICE_PUSH, + PROPS_EXT_SERVICE_PUSH, PROPS_IN_AM); + } + + @Test(timeout = 60000) + public void testMixed4() throws Exception { // M - containers, R-AM + int expectedExternalSubmissions = 0 + 0; // Nothing in external service. + runJoinValidate("Mixed4", expectedExternalSubmissions, PROPS_REGULAR_CONTAINERS, + PROPS_REGULAR_CONTAINERS, PROPS_IN_AM); + } + + @Test(timeout = 60000) + public void testMixed5() throws Exception { // M1 - containers, M2-extservice, R-AM + int expectedExternalSubmissions = 2 + 0; // 2 for M2 + runJoinValidate("Mixed5", expectedExternalSubmissions, PROPS_REGULAR_CONTAINERS, + PROPS_EXT_SERVICE_PUSH, PROPS_IN_AM); + } + + + @Ignore // Re-activate this after the AM registers the shuffle token with the launcher. + @Test(timeout = 60000) + public void testMixed6() throws Exception { // M - AM, R - Service + int expectedExternalSubmissions = 0 + 3; // 3 for R in service + runJoinValidate("Mixed6", expectedExternalSubmissions, PROPS_IN_AM, + PROPS_IN_AM, PROPS_EXT_SERVICE_PUSH); + } + + @Test(timeout = 60000) + public void testMixed7() throws Exception { // M - AM, R - Containers + int expectedExternalSubmissions = 0; // Nothing in ext service + runJoinValidate("Mixed7", expectedExternalSubmissions, PROPS_IN_AM, + PROPS_IN_AM, PROPS_REGULAR_CONTAINERS); + } + private void runJoinValidate(String name, int extExpectedCount, Map<String, String> lhsProps, Map<String, String> rhsProps, http://git-wip-us.apache.org/repos/asf/tez/blob/899a310b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java index e6ef5e2..32da8fb 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -67,6 +67,7 @@ import org.apache.tez.dag.utils.RelocalizationUtils; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.api.impl.TaskSpec; +import org.apache.tez.runtime.api.impl.TezUmbilical; import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,7 +97,6 @@ public class TezChild { private final int amHeartbeatInterval; private final long sendCounterInterval; private final int maxEventsToGet; - private final boolean isLocal; private final String workingDir; private final ListeningExecutorService executor; @@ -111,9 +111,10 @@ public class TezChild { private final String user; private Multimap<String, String> startedInputsMap = HashMultimap.create(); + private final boolean ownUmbilical; + private final TezTaskUmbilicalProtocol umbilical; private TaskReporter taskReporter; - private TezTaskUmbilicalProtocol umbilical; private int taskCount = 0; private TezVertexID lastVertexID; @@ -122,7 +123,7 @@ public class TezChild { Map<String, String> serviceProviderEnvMap, ObjectRegistryImpl objectRegistry, String pid, ExecutionContext executionContext, - Credentials credentials, long memAvailable, String user) + Credentials credentials, long memAvailable, String user, TezTaskUmbilicalProtocol umbilical) throws IOException, InterruptedException { this.defaultConf = conf; this.containerIdString = containerIdentifier; @@ -136,6 +137,8 @@ public class TezChild { this.memAvailable = memAvailable; this.user = user; + LOG.info("TezChild created with umbilical: " + umbilical); + getTaskMaxSleepTime = defaultConf.getInt( TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX, TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX_DEFAULT); @@ -164,25 +167,27 @@ public class TezChild { } } - this.isLocal = defaultConf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE, - TezConfiguration.TEZ_LOCAL_MODE_DEFAULT); UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(tokenIdentifier); Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials); serviceConsumerMetadata.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID, TezCommonUtils.convertJobTokenToBytes(jobToken)); - if (!isLocal) { + if (umbilical == null) { final InetSocketAddress address = NetUtils.createSocketAddrForHost(host, port); SecurityUtil.setTokenService(jobToken, address); taskOwner.addToken(jobToken); - umbilical = taskOwner.doAs(new PrivilegedExceptionAction<TezTaskUmbilicalProtocol>() { + this.umbilical = taskOwner.doAs(new PrivilegedExceptionAction<TezTaskUmbilicalProtocol>() { @Override public TezTaskUmbilicalProtocol run() throws Exception { return RPC.getProxy(TezTaskUmbilicalProtocol.class, TezTaskUmbilicalProtocol.versionID, address, defaultConf); } }); + ownUmbilical = true; + } else { + this.umbilical = umbilical; + ownUmbilical = false; } } @@ -368,7 +373,7 @@ public class TezChild { if (taskReporter != null) { taskReporter.shutdown(); } - if (!isLocal) { + if (ownUmbilical) { RPC.stopProxy(umbilical); // TODO Temporary change. Revert. Ideally, move this over to the main method in TezChild if possible. // LogManager.shutdown(); @@ -376,12 +381,6 @@ public class TezChild { } } - public void setUmbilical(TezTaskUmbilicalProtocol tezTaskUmbilicalProtocol){ - if(tezTaskUmbilicalProtocol != null){ - this.umbilical = tezTaskUmbilicalProtocol; - } - } - public static class ContainerExecutionResult { public static enum ExitStatus { SUCCESS(0), @@ -436,7 +435,8 @@ public class TezChild { public static TezChild newTezChild(Configuration conf, String host, int port, String containerIdentifier, String tokenIdentifier, int attemptNumber, String[] localDirs, String workingDirectory, Map<String, String> serviceProviderEnvMap, @Nullable String pid, - ExecutionContext executionContext, Credentials credentials, long memAvailable, String user) + ExecutionContext executionContext, Credentials credentials, long memAvailable, String user, + TezTaskUmbilicalProtocol tezUmbilical) throws IOException, InterruptedException, TezException { // Pull in configuration specified for the session. @@ -449,7 +449,7 @@ public class TezChild { return new TezChild(conf, host, port, containerIdentifier, tokenIdentifier, attemptNumber, workingDirectory, localDirs, serviceProviderEnvMap, objectRegistry, pid, - executionContext, credentials, memAvailable, user); + executionContext, credentials, memAvailable, user, tezUmbilical); } public static void main(String[] args) throws IOException, InterruptedException, TezException { @@ -483,7 +483,7 @@ public class TezChild { tokenIdentifier, attemptNumber, localDirs, System.getenv(Environment.PWD.name()), System.getenv(), pid, new ExecutionContextImpl(System.getenv(Environment.NM_HOST.name())), credentials, Runtime.getRuntime().maxMemory(), System - .getenv(ApplicationConstants.Environment.USER.toString())); + .getenv(ApplicationConstants.Environment.USER.toString()), null); tezChild.run(); }
