http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java index f688b57..e4612b6 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java @@ -32,9 +32,12 @@ import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.tez.Utils; import org.apache.tez.dag.api.NamedEntityDescriptor; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService; +import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError; import org.apache.tez.serviceplugins.api.TaskScheduler; import org.apache.tez.serviceplugins.api.TaskSchedulerContext; import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AppFinalStatus; @@ -112,7 +115,7 @@ public class TaskSchedulerManager extends AbstractService implements new AtomicBoolean(false); private final WebUIService webUI; private final NamedEntityDescriptor[] taskSchedulerDescriptors; - protected final TaskScheduler[]taskSchedulers; + protected final TaskSchedulerWrapper[] taskSchedulers; protected final ServicePluginLifecycleAbstractService []taskSchedulerServiceWrappers; // Single executor service shared by all Schedulers for context callbacks @@ -134,6 +137,29 @@ public class TaskSchedulerManager extends AbstractService implements // Not tracking container / task to schedulerId. Instead relying on everything flowing through // the system and being propagated back via events. + @VisibleForTesting + @InterfaceAudience.Private + /** + * For Testing only + */ + public TaskSchedulerManager(TaskScheduler taskScheduler, AppContext appContext, + ContainerSignatureMatcher containerSignatureMatcher, + DAGClientServer clientService, ExecutorService appCallbackExecutor) { + super(TaskSchedulerManager.class.getName()); + this.appContext = appContext; + this.containerSignatureMatcher = containerSignatureMatcher; + this.clientService = clientService; + this.eventHandler = appContext.getEventHandler(); + this.appCallbackExecutor = appCallbackExecutor; + this.taskSchedulers = new TaskSchedulerWrapper[]{new TaskSchedulerWrapper(taskScheduler)}; + this.taskSchedulerServiceWrappers = new ServicePluginLifecycleAbstractService[]{ + new ServicePluginLifecycleAbstractService<>(taskScheduler)}; + this.taskSchedulerDescriptors = null; + this.webUI = null; + this.historyUrl = null; + this.isPureLocalMode = false; + } + /** * * @param appContext @@ -169,7 +195,7 @@ public class TaskSchedulerManager extends AbstractService implements this.taskSchedulerDescriptors = schedulerDescriptors.toArray(new NamedEntityDescriptor[schedulerDescriptors.size()]); - taskSchedulers = new TaskScheduler[this.taskSchedulerDescriptors.length]; + taskSchedulers = new TaskSchedulerWrapper[this.taskSchedulerDescriptors.length]; taskSchedulerServiceWrappers = new ServicePluginLifecycleAbstractService[this.taskSchedulerDescriptors.length]; } @@ -187,11 +213,33 @@ public class TaskSchedulerManager extends AbstractService implements } public Resource getAvailableResources(int schedulerId) { - return taskSchedulers[schedulerId].getAvailableResources(); + try { + return taskSchedulers[schedulerId].getAvailableResources(); + } catch (Exception e) { + String msg = "Error in TaskScheduler while getting available resources" + + ", schedule=" + Utils.getTaskSchedulerIdentifierString(schedulerId, appContext); + LOG.error(msg, e); + sendEvent( + new DAGAppMasterEventUserServiceFatalError( + DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, + msg, e)); + throw new RuntimeException(e); + } } public Resource getTotalResources(int schedulerId) { - return taskSchedulers[schedulerId].getTotalResources(); + try { + return taskSchedulers[schedulerId].getTotalResources(); + } catch (Exception e) { + String msg = "Error in TaskScheduler while getting total resources" + + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(schedulerId, appContext); + LOG.error(msg, e); + sendEvent( + new DAGAppMasterEventUserServiceFatalError( + DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, + msg, e)); + throw new RuntimeException(e); + } } private ExecutorService createAppCallbackExecutorService() { @@ -265,11 +313,27 @@ public class TaskSchedulerManager extends AbstractService implements } private void handleNodeBlacklistUpdate(AMSchedulerEventNodeBlacklistUpdate event) { - if (event.getType() == AMSchedulerEventType.S_NODE_BLACKLISTED) { - taskSchedulers[event.getSchedulerId()].blacklistNode(event.getNodeId()); - } else if (event.getType() == AMSchedulerEventType.S_NODE_UNBLACKLISTED) { - taskSchedulers[event.getSchedulerId()].unblacklistNode(event.getNodeId()); - } else { + boolean invalidEventType = false; + try { + if (event.getType() == AMSchedulerEventType.S_NODE_BLACKLISTED) { + taskSchedulers[event.getSchedulerId()].blacklistNode(event.getNodeId()); + } else if (event.getType() == AMSchedulerEventType.S_NODE_UNBLACKLISTED) { + taskSchedulers[event.getSchedulerId()].unblacklistNode(event.getNodeId()); + } else { + invalidEventType = true; + } + } catch (Exception e) { + String msg = "Error in TaskScheduler for handling node blacklisting" + + ", eventType=" + event.getType() + + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(event.getSchedulerId(), appContext); + LOG.error(msg, e); + sendEvent( + new DAGAppMasterEventUserServiceFatalError( + DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, + msg, e)); + return; + } + if (invalidEventType) { throw new TezUncheckedException("Invalid event type: " + event.getType()); } } @@ -280,7 +344,20 @@ public class TaskSchedulerManager extends AbstractService implements // TODO what happens to the task that was connected to this container? // current assumption is that it will eventually call handleTaStopRequest //TaskAttempt taskAttempt = (TaskAttempt) - taskSchedulers[event.getSchedulerId()].deallocateContainer(containerId); + try { + taskSchedulers[event.getSchedulerId()].deallocateContainer(containerId); + } catch (Exception e) { + String msg = "Error in TaskScheduler for handling Container De-allocation" + + ", eventType=" + event.getType() + + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(event.getSchedulerId(), appContext) + + ", containerId=" + containerId; + LOG.error(msg, e); + sendEvent( + new DAGAppMasterEventUserServiceFatalError( + DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, + msg, e)); + return; + } // TODO does this container need to be stopped via C_STOP_REQUEST sendEvent(new AMContainerEventStopRequest(containerId)); } @@ -288,8 +365,22 @@ public class TaskSchedulerManager extends AbstractService implements private void handleTAUnsuccessfulEnd(AMSchedulerEventTAEnded event) { TaskAttempt attempt = event.getAttempt(); // Propagate state and failure cause (if any) when informing the scheduler about the de-allocation. - boolean wasContainerAllocated = taskSchedulers[event.getSchedulerId()] - .deallocateTask(attempt, false, event.getTaskAttemptEndReason(), event.getDiagnostics()); + boolean wasContainerAllocated = false; + try { + wasContainerAllocated = taskSchedulers[event.getSchedulerId()] + .deallocateTask(attempt, false, event.getTaskAttemptEndReason(), event.getDiagnostics()); + } catch (Exception e) { + String msg = "Error in TaskScheduler for handling Task De-allocation" + + ", eventType=" + event.getType() + + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(event.getSchedulerId(), appContext) + + ", taskAttemptId=" + attempt.getID(); + LOG.error(msg, e); + sendEvent( + new DAGAppMasterEventUserServiceFatalError( + DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, + msg, e)); + return; + } // use stored value of container id in case the scheduler has removed this // assignment because the task has been deallocated earlier. // retroactive case @@ -333,8 +424,24 @@ public class TaskSchedulerManager extends AbstractService implements event.getAttemptID())); } - boolean wasContainerAllocated = taskSchedulers[event.getSchedulerId()].deallocateTask(attempt, + boolean wasContainerAllocated = false; + + try { + wasContainerAllocated = taskSchedulers[event.getSchedulerId()].deallocateTask(attempt, true, null, event.getDiagnostics()); + } catch (Exception e) { + String msg = "Error in TaskScheduler for handling Task De-allocation" + + ", eventType=" + event.getType() + + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(event.getSchedulerId(), appContext) + + ", taskAttemptId=" + attempt.getID(); + LOG.error(msg, e); + sendEvent( + new DAGAppMasterEventUserServiceFatalError( + DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, + msg, e)); + return; + } + if (!wasContainerAllocated) { LOG.error("De-allocated successful task: " + attempt.getID() + ", but TaskScheduler reported no container assigned to task"); @@ -359,12 +466,24 @@ public class TaskSchedulerManager extends AbstractService implements TaskAttempt affinityAttempt = vertex.getTask(taskIndex).getSuccessfulAttempt(); if (affinityAttempt != null) { Preconditions.checkNotNull(affinityAttempt.getAssignedContainerID(), affinityAttempt.getID()); - taskSchedulers[event.getSchedulerId()].allocateTask(taskAttempt, - event.getCapability(), - affinityAttempt.getAssignedContainerID(), - Priority.newInstance(event.getPriority()), - event.getContainerContext(), - event); + try { + taskSchedulers[event.getSchedulerId()].allocateTask(taskAttempt, + event.getCapability(), + affinityAttempt.getAssignedContainerID(), + Priority.newInstance(event.getPriority()), + event.getContainerContext(), + event); + } catch (Exception e) { + String msg = "Error in TaskScheduler for handling Task Allocation" + + ", eventType=" + event.getType() + + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(event.getSchedulerId(), appContext) + + ", taskAttemptId=" + taskAttempt.getID(); + LOG.error(msg, e); + sendEvent( + new DAGAppMasterEventUserServiceFatalError( + DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, + msg, e)); + } return; } LOG.info("No attempt for task affinity to " + taskAffinity + " for attempt " @@ -379,21 +498,33 @@ public class TaskSchedulerManager extends AbstractService implements } } - taskSchedulers[event.getSchedulerId()].allocateTask(taskAttempt, - event.getCapability(), - hosts, - racks, - Priority.newInstance(event.getPriority()), - event.getContainerContext(), - event); + try { + taskSchedulers[event.getSchedulerId()].allocateTask(taskAttempt, + event.getCapability(), + hosts, + racks, + Priority.newInstance(event.getPriority()), + event.getContainerContext(), + event); + } catch (Exception e) { + String msg = "Error in TaskScheduler for handling Task Allocation" + + ", eventType=" + event.getType() + + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(event.getSchedulerId(), appContext) + + ", taskAttemptId=" + taskAttempt.getID(); + LOG.error(msg, e); + sendEvent( + new DAGAppMasterEventUserServiceFatalError( + DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, + msg, e)); + } } @VisibleForTesting TaskScheduler createTaskScheduler(String host, int port, String trackingUrl, - AppContext appContext, - NamedEntityDescriptor taskSchedulerDescriptor, - long customAppIdIdentifier, - int schedulerId) throws TezException { + AppContext appContext, + NamedEntityDescriptor taskSchedulerDescriptor, + long customAppIdIdentifier, + int schedulerId) throws TezException { TaskSchedulerContext rawContext = new TaskSchedulerContextImpl(this, appContext, schedulerId, trackingUrl, customAppIdIdentifier, host, port, taskSchedulerDescriptor.getUserPayload()); @@ -452,9 +583,10 @@ public class TaskSchedulerManager extends AbstractService implements } else { customAppIdIdentifier = SCHEDULER_APP_ID_BASE + (j++ * SCHEDULER_APP_ID_INCREMENT); } - taskSchedulers[i] = createTaskScheduler(host, port, - trackingUrl, appContext, taskSchedulerDescriptors[i], customAppIdIdentifier, i); - taskSchedulerServiceWrappers[i] = new ServicePluginLifecycleAbstractService<>(taskSchedulers[i]); + taskSchedulers[i] = new TaskSchedulerWrapper(createTaskScheduler(host, port, + trackingUrl, appContext, taskSchedulerDescriptors[i], customAppIdIdentifier, i)); + taskSchedulerServiceWrappers[i] = + new ServicePluginLifecycleAbstractService<>(taskSchedulers[i].getTaskScheduler()); } } @@ -521,7 +653,13 @@ public class TaskSchedulerManager extends AbstractService implements public void initiateStop() { for (int i = 0 ; i < taskSchedulers.length ; i++) { - taskSchedulers[i].initiateStop(); + try { + taskSchedulers[i].getTaskScheduler().initiateStop(); + } catch (Exception e) { + // Ignore for now as scheduler stop invoked on shutdown + LOG.error("Failed to do a clean initiateStop for Scheduler: " + + Utils.getTaskSchedulerIdentifierString(i, appContext), e); + } } } @@ -686,7 +824,19 @@ public class TaskSchedulerManager extends AbstractService implements // Doubles as a mechanism to update node counts periodically. Hence schedulerId required. // TODO Handle this in TEZ-2124. Need a way to know which scheduler is calling in. - int nodeCount = taskSchedulers[0].getClusterNodeCount(); + int nodeCount = 0; + try { + nodeCount = taskSchedulers[0].getClusterNodeCount(); + } catch (Exception e) { + String msg = "Error in TaskScheduler while getting node count" + + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(schedulerId, appContext); + LOG.error(msg, e); + sendEvent( + new DAGAppMasterEventUserServiceFatalError( + DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, + msg, e)); + throw new RuntimeException(e); + } if (nodeCount != cachedNodeCount) { cachedNodeCount = nodeCount; sendEvent(new AMNodeEventNodeCountUpdated(cachedNodeCount, schedulerId)); @@ -701,7 +851,17 @@ public class TaskSchedulerManager extends AbstractService implements public void dagCompleted() { for (int i = 0 ; i < taskSchedulers.length ; i++) { - taskSchedulers[i].dagComplete(); + try { + taskSchedulers[i].dagComplete(); + } catch (Exception e) { + String msg = "Error in TaskScheduler when notified for Dag Completion" + + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(i, appContext); + LOG.error(msg, e); + sendEvent( + new DAGAppMasterEventUserServiceFatalError( + DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, + msg, e)); + } } } @@ -714,7 +874,18 @@ public class TaskSchedulerManager extends AbstractService implements // TODO Why is this making a call back into the scheduler, when the call is originating from there. // An AMContainer instance should already exist if an attempt is being made to preempt it AMContainer amContainer = appContext.getAllContainers().get(containerId); - taskSchedulers[amContainer.getTaskSchedulerIdentifier()].deallocateContainer(containerId); + try { + taskSchedulers[amContainer.getTaskSchedulerIdentifier()].deallocateContainer(containerId); + } catch (Exception e) { + String msg = "Error in TaskScheduler when preempting container" + + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(amContainer.getTaskSchedulerIdentifier(), appContext) + + ", containerId=" + containerId; + LOG.error(msg, e); + sendEvent( + new DAGAppMasterEventUserServiceFatalError( + DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, + msg, e)); + } // Inform the Containers about completion. sendEvent(new AMContainerEventCompleted(containerId, ContainerExitStatus.INVALID, "Container preempted internally", TaskAttemptTerminationCause.INTERNAL_PREEMPTION)); @@ -725,7 +896,17 @@ public class TaskSchedulerManager extends AbstractService implements this.shouldUnregisterFlag.set(true); for (int i = 0 ; i < taskSchedulers.length ; i++) { if (this.taskSchedulers[i] != null) { - this.taskSchedulers[i].setShouldUnregister(); + try { + this.taskSchedulers[i].setShouldUnregister(); + } catch (Exception e) { + String msg = "Error in TaskScheduler when setting Unregister Flag" + + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(i, appContext); + LOG.error(msg, e); + sendEvent( + new DAGAppMasterEventUserServiceFatalError( + DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, + msg, e)); + } } } } @@ -737,7 +918,19 @@ public class TaskSchedulerManager extends AbstractService implements public boolean hasUnregistered() { boolean result = true; for (int i = 0 ; i < taskSchedulers.length ; i++) { - result = result & this.taskSchedulers[i].hasUnregistered(); + // Explicitly not catching any exceptions around this API + // No clear route to recover. Better to crash. + try { + result = result & this.taskSchedulers[i].hasUnregistered(); + } catch (Exception e) { + String msg = "Error in TaskScheduler when checking if a scheduler has unregistered" + + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(i, appContext); + LOG.error(msg, e); + sendEvent( + new DAGAppMasterEventUserServiceFatalError( + DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, + msg, e)); + } if (result == false) { return result; }
http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerWrapper.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerWrapper.java new file mode 100644 index 0000000..43cf045 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerWrapper.java @@ -0,0 +1,90 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.rm; + +import javax.annotation.Nullable; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; +import org.apache.tez.serviceplugins.api.TaskScheduler; + +public class TaskSchedulerWrapper { + + private final TaskScheduler real; + + public TaskSchedulerWrapper(TaskScheduler real) { + this.real = real; + } + + public Resource getAvailableResources() throws Exception { + return real.getAvailableResources(); + } + + public Resource getTotalResources() throws Exception { + return real.getTotalResources(); + } + + public int getClusterNodeCount() throws Exception { + return real.getClusterNodeCount(); + } + + public void blacklistNode(NodeId nodeId) throws Exception { + real.blacklistNode(nodeId); + } + + public void unblacklistNode(NodeId nodeId) throws Exception { + real.unblacklistNode(nodeId); + } + + public void allocateTask(Object task, Resource capability, String[] hosts, String[] racks, + Priority priority, Object containerSignature, Object clientCookie) throws + Exception { + real.allocateTask(task, capability, hosts, racks, priority, containerSignature, clientCookie); + } + + public void allocateTask(Object task, Resource capability, ContainerId containerId, + Priority priority, Object containerSignature, Object clientCookie) throws + Exception { + real.allocateTask(task, capability, containerId, priority, containerSignature, clientCookie); + } + + public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason, + @Nullable String diagnostics) throws Exception { + return real.deallocateTask(task, taskSucceeded, endReason, diagnostics); + } + + public Object deallocateContainer(ContainerId containerId) throws Exception { + return real.deallocateContainer(containerId); + } + + public void setShouldUnregister() throws Exception { + real.setShouldUnregister(); + } + + public boolean hasUnregistered() throws Exception { + return real.hasUnregistered(); + } + + public void dagComplete() throws Exception { + real.dagComplete(); + } + + public TaskScheduler getTaskScheduler() { + return real; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java index d37d106..e4302aa 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java @@ -18,6 +18,7 @@ package org.apache.tez.dag.app.rm.container; +import java.net.InetSocketAddress; import java.util.EnumSet; import java.util.HashMap; import java.util.LinkedList; @@ -27,7 +28,10 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import org.apache.tez.Utils; import org.apache.tez.common.TezUtilsInternal; +import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType; +import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError; import org.apache.tez.serviceplugins.api.ContainerEndReason; import org.apache.tez.common.ContainerSignatureMatcher; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; @@ -461,6 +465,29 @@ public class AMContainerImpl implements AMContainer { dagId = container.appContext.getCurrentDAG().getID(); dagLocalResources = container.appContext.getCurrentDAG().getLocalResources(); } + + // TODO TEZ-2625 This should ideally be handled inside of user code. Will change once + // CLC construction moves into user code. For now, generating a user code error here + InetSocketAddress cAddress = null; + try { + cAddress = + container.taskCommunicatorManagerInterface.getTaskCommunicator(container.taskCommId).getAddress(); + } catch (Exception e) { + String msg = "Error in TaskCommunicator when getting address" + + ", communicator=" + Utils.getTaskCommIdentifierString(container.taskCommId, container.appContext) + + ", containerId=" + container.containerId; + LOG.error(msg, e); + container.sendEvent( + new DAGAppMasterEventUserServiceFatalError( + DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR, + msg, e)); + // We have not registered with any of the listeners etc yet. Send out a deallocateContainer + // message and return. The AM will shutdown shortly. + container.inError = true; + container.deAllocate(); + return; + } + ContainerLaunchContext clc = AMContainerHelpers.createContainerLaunchContext( dagId, dagLocalResources, container.appContext.getApplicationACLs(), @@ -468,7 +495,8 @@ public class AMContainerImpl implements AMContainer { containerContext.getLocalResources(), containerContext.getEnvironment(), containerContext.getJavaOpts(), - container.taskCommunicatorManagerInterface.getTaskCommunicator(container.taskCommId).getAddress(), containerContext.getCredentials(), + cAddress, + containerContext.getCredentials(), container.appContext, container.container.getResource(), container.appContext.getAMConf()); http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java index b322e05..08f81fb 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java @@ -193,7 +193,7 @@ public class MockDAGAppMaster extends DAGAppMaster { @Override public void start() throws Exception { taskCommunicatorManager = (TaskCommunicatorManager) getTaskCommunicatorManager(); - taskCommunicator = (TezTaskCommunicatorImpl) taskCommunicatorManager.getTaskCommunicator(0); + taskCommunicator = (TezTaskCommunicatorImpl) taskCommunicatorManager.getTaskCommunicator(0).getTaskCommunicator(); eventHandlingThread = new Thread(this); eventHandlingThread.start(); ExecutorService rawExecutor = Executors.newFixedThreadPool(handlerConcurrency, http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/test/java/org/apache/tez/dag/app/PluginWrapperTestHelpers.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/PluginWrapperTestHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/PluginWrapperTestHelpers.java new file mode 100644 index 0000000..fb6faa1 --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/PluginWrapperTestHelpers.java @@ -0,0 +1,149 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.Set; + +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class PluginWrapperTestHelpers { + + private static final Logger LOG = LoggerFactory.getLogger(PluginWrapperTestHelpers.class); + + public static void testDelegation(Class<?> delegateClass, Class<?> rawClass, + Set<String> skipMethods) throws Exception { + TrackingAnswer answer = new TrackingAnswer(); + Object mock = mock(rawClass, answer); + Constructor ctor = delegateClass.getConstructor(rawClass); + Object wrapper = ctor.newInstance(mock); + + // Run through all the methods on the wrapper, and invoke the methods. Constructs + // arguments and return types for each of them. + Method[] methods = delegateClass.getMethods(); + for (Method method : methods) { + if (method.getDeclaringClass().equals(delegateClass) && + !skipMethods.contains(method.getName())) { + + assertTrue(method.getExceptionTypes().length == 1); + assertEquals(Exception.class, method.getExceptionTypes()[0]); + + LOG.info("Checking method [{}] with parameterTypes [{}]", method.getName(), Arrays.toString(method.getParameterTypes())); + + Object[] params = constructMethodArgs(method); + Object result = method.invoke(wrapper, params); + + // Validate the correct arguments are forwarded, and the real instance is invoked. + assertEquals(method.getName(), answer.lastMethodName); + assertArrayEquals(params, answer.lastArgs); + + // Validate the results. + // Handle auto-boxing + if (answer.compareAsPrimitive) { + assertEquals(answer.lastRetValue, result); + } else { + assertTrue("Expected: " + System.identityHashCode(answer.lastRetValue) + ", actual=" + + System.identityHashCode(result), answer.lastRetValue == result); + } + } + } + + + } + + public static Object[] constructMethodArgs(Method method) throws IllegalAccessException, + InstantiationException { + Class<?>[] paramTypes = method.getParameterTypes(); + Object[] params = new Object[paramTypes.length]; + for (int i = 0; i < paramTypes.length; i++) { + params[i] = constructSingleArg(paramTypes[i]); + } + return params; + } + + private static Object constructSingleArg(Class<?> clazz) { + if (clazz.isPrimitive() || clazz.equals(String.class)) { + return getValueForPrimitiveOrString(clazz); + } else if (clazz.isEnum()) { + if (clazz.getEnumConstants().length == 0) { + return null; + } else { + return clazz.getEnumConstants()[0]; + } + } else if (clazz.isArray() && + (clazz.getComponentType().isPrimitive() || clazz.getComponentType().equals(String.class))) { + // Cannot mock. For now using null. Also does not handle deeply nested arrays. + return null; + } else { + return mock(clazz); + } + } + + private static Object getValueForPrimitiveOrString(Class<?> clazz) { + if (clazz.equals(String.class)) { + return "teststring"; + } else if (clazz.equals(byte.class)) { + return 'b'; + } else if (clazz.equals(short.class)) { + return 2; + } else if (clazz.equals(int.class)) { + return 224; + } else if (clazz.equals(long.class)) { + return 445l; + } else if (clazz.equals(float.class)) { + return 2.24f; + } else if (clazz.equals(double.class)) { + return 4.57d; + } else if (clazz.equals(boolean.class)) { + return true; + } else if (clazz.equals(char.class)) { + return 'c'; + } else if (clazz.equals(void.class)) { + return null; + } else { + throw new RuntimeException("Unrecognized type: " + clazz.getName()); + } + } + + public static class TrackingAnswer implements Answer { + + public String lastMethodName; + public Object[] lastArgs; + public Object lastRetValue; + boolean compareAsPrimitive; + + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + lastArgs = invocation.getArguments(); + lastMethodName = invocation.getMethod().getName(); + Class<?> retType = invocation.getMethod().getReturnType(); + lastRetValue = constructSingleArg(retType); + compareAsPrimitive = retType.isPrimitive() || retType.isEnum() || retType.equals(String.class); + + return lastRetValue; + } + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java index d1fd4f3..d76a5b3 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java @@ -21,12 +21,15 @@ import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; +import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.HashSet; @@ -42,6 +45,8 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.NamedEntityDescriptor; import org.apache.tez.dag.api.TaskCommunicator; @@ -50,6 +55,9 @@ import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.dag.app.dag.DAG; +import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType; +import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.serviceplugins.api.ContainerEndReason; @@ -57,6 +65,9 @@ import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class TestTaskCommunicatorManager { @@ -215,8 +226,90 @@ public class TestTaskCommunicatorManager { } finally { tcm.stop(); - verify(tcm.getTaskCommunicator(0)).shutdown(); - verify(tcm.getTaskCommunicator(1)).shutdown(); + verify(tcm.getTaskCommunicator(0).getTaskCommunicator()).shutdown(); + verify(tcm.getTaskCommunicator(1).getTaskCommunicator()).shutdown(); + } + } + + @SuppressWarnings("unchecked") + @Test(timeout = 5000) + public void testTaskCommunicatorUserError() { + TaskCommunicatorContextImpl taskCommContext = mock(TaskCommunicatorContextImpl.class); + TaskCommunicator taskCommunicator = mock(TaskCommunicator.class, new ExceptionAnswer()); + doReturn(taskCommContext).when(taskCommunicator).getContext(); + + EventHandler eventHandler = mock(EventHandler.class); + AppContext appContext = mock(AppContext.class, RETURNS_DEEP_STUBS); + + when(appContext.getEventHandler()).thenReturn(eventHandler); + doReturn("testTaskCommunicator").when(appContext).getTaskCommunicatorName(0); + String expectedId = "[0:testTaskCommunicator]"; + + Configuration conf = new Configuration(false); + + TaskCommunicatorManager taskCommunicatorManager = + new TaskCommunicatorManager(taskCommunicator, appContext, mock(TaskHeartbeatHandler.class), + mock(ContainerHeartbeatHandler.class)); + try { + taskCommunicatorManager.init(conf); + taskCommunicatorManager.start(); + + // Invoking a couple of random methods. + + DAG mockDag = mock(DAG.class, RETURNS_DEEP_STUBS); + when(mockDag.getID().getId()).thenReturn(1); + + taskCommunicatorManager.dagComplete(mockDag); + ArgumentCaptor<Event> argumentCaptor = ArgumentCaptor.forClass(Event.class); + verify(eventHandler, times(1)).handle(argumentCaptor.capture()); + + Event rawEvent = argumentCaptor.getValue(); + assertTrue(rawEvent instanceof DAGAppMasterEventUserServiceFatalError); + DAGAppMasterEventUserServiceFatalError event = + (DAGAppMasterEventUserServiceFatalError) rawEvent; + + assertEquals(DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR, event.getType()); + assertTrue(event.getError().getMessage().contains("TestException_" + "dagComplete")); + assertTrue(event.getDiagnosticInfo().contains("DAG completion")); + assertTrue(event.getDiagnosticInfo().contains(expectedId)); + + + when(appContext.getAllContainers().get(any(ContainerId.class)).getContainer().getNodeId()) + .thenReturn(mock(NodeId.class)); + + taskCommunicatorManager.registerRunningContainer(mock(ContainerId.class), 0); + argumentCaptor = ArgumentCaptor.forClass(Event.class); + verify(eventHandler, times(2)).handle(argumentCaptor.capture()); + + rawEvent = argumentCaptor.getAllValues().get(1); + assertTrue(rawEvent instanceof DAGAppMasterEventUserServiceFatalError); + event = (DAGAppMasterEventUserServiceFatalError) rawEvent; + + assertEquals(DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR, event.getType()); + assertTrue( + event.getError().getMessage().contains("TestException_" + "registerRunningContainer")); + assertTrue(event.getDiagnosticInfo().contains("registering running Container")); + assertTrue(event.getDiagnosticInfo().contains(expectedId)); + + + } finally { + taskCommunicatorManager.stop(); + } + + } + + private static class ExceptionAnswer implements Answer { + + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + Method method = invocation.getMethod(); + if (method.getDeclaringClass().equals(TaskCommunicator.class) && + !method.getName().equals("getContext") && !method.getName().equals("initialize") && + !method.getName().equals("start") && !method.getName().equals("shutdown")) { + throw new RuntimeException("TestException_" + method.getName()); + } else { + return invocation.callRealMethod(); + } } } @@ -353,7 +446,7 @@ public class TestTaskCommunicatorManager { } @Override - public void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception { + public void onVertexStateUpdated(VertexStateUpdate stateUpdate) { } http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java index 3f80928..2921a22 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java @@ -169,7 +169,8 @@ public class TestTaskCommunicatorManager1 { @Test(timeout = 5000) public void testGetTask() throws IOException { - TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator(0); + TezTaskCommunicatorImpl taskCommunicator = + (TezTaskCommunicatorImpl) taskAttemptListener.getTaskCommunicator(0).getTaskCommunicator(); TezTaskUmbilicalProtocol tezUmbilical = taskCommunicator.getUmbilical(); ContainerId containerId1 = createContainerId(appId, 1); @@ -216,7 +217,8 @@ public class TestTaskCommunicatorManager1 { @Test(timeout = 5000) public void testGetTaskMultiplePulls() throws IOException { - TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator(0); + TezTaskCommunicatorImpl taskCommunicator = + (TezTaskCommunicatorImpl) taskAttemptListener.getTaskCommunicator(0).getTaskCommunicator(); TezTaskUmbilicalProtocol tezUmbilical = taskCommunicator.getUmbilical(); ContainerId containerId1 = createContainerId(appId, 1); http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorWrapper.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorWrapper.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorWrapper.java new file mode 100644 index 0000000..212bca4 --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorWrapper.java @@ -0,0 +1,43 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app; + +import com.google.common.collect.Sets; +import org.apache.tez.dag.api.TaskCommunicator; +import org.junit.Test; + +public class TestTaskCommunicatorWrapper { + + @Test(timeout = 5000) + public void testDelegation() throws Exception { + PluginWrapperTestHelpers.testDelegation(TaskCommunicatorWrapper.class, TaskCommunicator.class, + Sets.newHashSet("getTaskCommunicator")); + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index 6dd578f..4772492 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -69,6 +69,7 @@ import org.apache.tez.dag.app.ClusterInfo; import org.apache.tez.dag.app.ContainerContext; import org.apache.tez.dag.app.ContainerHeartbeatHandler; import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; +import org.apache.tez.dag.app.TaskCommunicatorWrapper; import org.apache.tez.dag.app.TaskHeartbeatHandler; import org.apache.tez.dag.app.dag.Task; import org.apache.tez.dag.app.dag.TaskAttemptStateInternal; @@ -108,6 +109,7 @@ import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; import org.apache.tez.runtime.api.impl.EventMetaData; import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TezEvent; +import org.apache.tez.serviceplugins.api.ServicePluginException; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; @@ -1469,7 +1471,7 @@ public class TestTaskAttempt { @SuppressWarnings("deprecation") @Test(timeout = 5000) - public void testKilledInNew() { + public void testKilledInNew() throws ServicePluginException { ApplicationId appId = ApplicationId.newInstance(1, 2); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( appId, 0); @@ -1609,11 +1611,12 @@ public class TestTaskAttempt { new Credentials(), new HashMap<String, String>(), ""); } - private TaskCommunicatorManagerInterface createMockTaskAttemptListener() { + private TaskCommunicatorManagerInterface createMockTaskAttemptListener() throws + ServicePluginException { TaskCommunicatorManagerInterface taListener = mock(TaskCommunicatorManagerInterface.class); TaskCommunicator taskComm = mock(TaskCommunicator.class); doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress(); - doReturn(taskComm).when(taListener).getTaskCommunicator(0); + doReturn(new TaskCommunicatorWrapper(taskComm)).when(taListener).getTaskCommunicator(0); return taListener; } } http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java index a8af808..1f75afb 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java @@ -19,8 +19,13 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import java.io.IOException; @@ -35,20 +40,27 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.NamedEntityDescriptor; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezException; -import org.apache.tez.dag.api.TezReflectionException; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; +import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType; +import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError; import org.apache.tez.dag.app.rm.ContainerLauncherLaunchRequestEvent; +import org.apache.tez.dag.app.rm.ContainerLauncherStopRequestEvent; import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; import org.apache.tez.serviceplugins.api.ContainerLauncher; import org.apache.tez.serviceplugins.api.ContainerLauncherContext; import org.apache.tez.serviceplugins.api.ContainerStopRequest; +import org.apache.tez.serviceplugins.api.ServicePluginException; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -58,7 +70,7 @@ public class TestContainerLauncherManager { @Before @After - public void reset() { + public void resetTest() { ContainerLaucherRouterForMultipleLauncherTest.reset(); } @@ -230,6 +242,73 @@ public class TestContainerLauncherManager { } } + @SuppressWarnings("unchecked") + @Test(timeout = 5000) + public void testContainerLauncherUserError() throws ServicePluginException { + + ContainerLauncher containerLauncher = mock(ContainerLauncher.class); + + EventHandler eventHandler = mock(EventHandler.class); + AppContext appContext = mock(AppContext.class); + doReturn(eventHandler).when(appContext).getEventHandler(); + doReturn("testlauncher").when(appContext).getContainerLauncherName(0); + + Configuration conf = new Configuration(false); + + ContainerLauncherManager containerLauncherManager = + new ContainerLauncherManager(containerLauncher, appContext); + try { + containerLauncherManager.init(conf); + containerLauncherManager.start(); + + // launch container + doThrow(new RuntimeException("testexception")).when(containerLauncher) + .launchContainer(any(ContainerLaunchRequest.class)); + ContainerLaunchContext clc1 = mock(ContainerLaunchContext.class); + Container container1 = mock(Container.class); + ContainerLauncherLaunchRequestEvent launchRequestEvent = + new ContainerLauncherLaunchRequestEvent(clc1, container1, 0, 0, 0); + + + containerLauncherManager.handle(launchRequestEvent); + + ArgumentCaptor<Event> argumentCaptor = ArgumentCaptor.forClass(Event.class); + verify(eventHandler, times(1)).handle(argumentCaptor.capture()); + + Event rawEvent = argumentCaptor.getValue(); + assertTrue(rawEvent instanceof DAGAppMasterEventUserServiceFatalError); + DAGAppMasterEventUserServiceFatalError event = + (DAGAppMasterEventUserServiceFatalError) rawEvent; + assertEquals(DAGAppMasterEventType.CONTAINER_LAUNCHER_SERVICE_FATAL_ERROR, event.getType()); + assertTrue(event.getError().getMessage().contains("testexception")); + assertTrue(event.getDiagnosticInfo().contains("launching container")); + assertTrue(event.getDiagnosticInfo().contains("[0:testlauncher]")); + + reset(eventHandler); + // stop container + + doThrow(new RuntimeException("teststopexception")).when(containerLauncher) + .stopContainer(any(ContainerStopRequest.class)); + ContainerId containerId2 = mock(ContainerId.class); + NodeId nodeId2 = mock(NodeId.class); + ContainerLauncherStopRequestEvent stopRequestEvent = + new ContainerLauncherStopRequestEvent(containerId2, nodeId2, null, 0, 0, 0); + + argumentCaptor = ArgumentCaptor.forClass(Event.class); + + containerLauncherManager.handle(stopRequestEvent); + verify(eventHandler, times(1)).handle(argumentCaptor.capture()); + rawEvent = argumentCaptor.getValue(); + assertTrue(rawEvent instanceof DAGAppMasterEventUserServiceFatalError); + event = (DAGAppMasterEventUserServiceFatalError) rawEvent; + assertTrue(event.getError().getMessage().contains("teststopexception")); + assertTrue(event.getDiagnosticInfo().contains("stopping container")); + assertTrue(event.getDiagnosticInfo().contains("[0:testlauncher]")); + } finally { + containerLauncherManager.stop(); + } + } + private static class ContainerLaucherRouterForMultipleLauncherTest extends ContainerLauncherManager { http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherWrapper.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherWrapper.java b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherWrapper.java new file mode 100644 index 0000000..d786bf9 --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherWrapper.java @@ -0,0 +1,30 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.launcher; + +import com.google.common.collect.Sets; +import org.apache.tez.dag.app.PluginWrapperTestHelpers; +import org.apache.tez.serviceplugins.api.ContainerLauncher; +import org.junit.Test; + +public class TestContainerLauncherWrapper { + + @Test(timeout = 5000) + public void testDelegation() throws Exception { + PluginWrapperTestHelpers.testDelegation(ContainerLauncherWrapper.class, ContainerLauncher.class, + Sets.newHashSet("getContainerLauncher")); + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java index 0e90681..78dc8fd 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java @@ -1407,8 +1407,12 @@ public class TestContainerReuse { private void verifyDeAllocateTask(TaskScheduler taskScheduler, Object ta, boolean taskSucceeded, TaskAttemptEndReason endReason, String diagContains) { ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class); - verify(taskScheduler) - .deallocateTask(eq(ta), eq(taskSucceeded), eq(endReason), argumentCaptor.capture()); + try { + verify(taskScheduler) + .deallocateTask(eq(ta), eq(taskSucceeded), eq(endReason), argumentCaptor.capture()); + } catch (Exception e) { + throw new RuntimeException(e); + } assertEquals(1, argumentCaptor.getAllValues().size()); if (diagContains == null) { assertNull(argumentCaptor.getValue()); http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java index 8b489ea..b54d024 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java @@ -157,13 +157,15 @@ class TestTaskSchedulerHelpers { new TaskSchedulerContextImplWrapper(taskSchedulerContext, new CountingExecutorService(appCallbackExecutor)); TaskSchedulerContextDrainable drainable = new TaskSchedulerContextDrainable(wrapper); - taskSchedulers[0] = - new TaskSchedulerWithDrainableContext(drainable, amrmClientAsync); - taskSchedulerServiceWrappers[0] = new ServicePluginLifecycleAbstractService(taskSchedulers[0]); + + taskSchedulers[0] = new TaskSchedulerWrapper( + new TaskSchedulerWithDrainableContext(drainable, amrmClientAsync)); + taskSchedulerServiceWrappers[0] = + new ServicePluginLifecycleAbstractService(taskSchedulers[0].getTaskScheduler()); } public TaskScheduler getSpyTaskScheduler() { - return taskSchedulers[0]; + return taskSchedulers[0].getTaskScheduler(); } @Override @@ -172,7 +174,9 @@ class TestTaskSchedulerHelpers { // Init the service so that reuse configuration is picked up. ((AbstractService)taskSchedulerServiceWrappers[0]).init(getConfig()); ((AbstractService)taskSchedulerServiceWrappers[0]).start(); - taskSchedulers[0] = spy(taskSchedulers[0]); + // For some reason, the spy needs to be setup after sertvice startup. + taskSchedulers[0] = new TaskSchedulerWrapper(spy(taskSchedulers[0].getTaskScheduler())); + } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java index 8e4e4f0..c649870 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java @@ -34,6 +34,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; +import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.HashMap; @@ -41,6 +42,7 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Set; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -60,6 +62,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.common.ContainerSignatureMatcher; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.NamedEntityDescriptor; +import org.apache.tez.dag.api.TaskCommunicator; import org.apache.tez.dag.api.TaskLocationHint; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; @@ -68,8 +71,13 @@ import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.client.DAGClientServer; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ContainerContext; +import org.apache.tez.dag.app.ContainerHeartbeatHandler; import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService; +import org.apache.tez.dag.app.TaskCommunicatorManager; +import org.apache.tez.dag.app.TaskHeartbeatHandler; import org.apache.tez.dag.app.dag.TaskAttempt; +import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType; +import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError; import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl; import org.apache.tez.dag.app.dag.impl.TaskImpl; import org.apache.tez.dag.app.dag.impl.VertexImpl; @@ -86,6 +94,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.runtime.api.impl.TaskSpec; +import org.apache.tez.serviceplugins.api.ServicePluginException; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.serviceplugins.api.TaskScheduler; import org.apache.tez.serviceplugins.api.TaskSchedulerContext; @@ -94,6 +103,9 @@ import org.junit.Before; import org.junit.Test; import com.google.common.collect.Lists; +import org.mockito.ArgumentCaptor; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; @SuppressWarnings("rawtypes") public class TestTaskSchedulerManager { @@ -121,8 +133,9 @@ public class TestTaskSchedulerManager { @Override protected void instantiateSchedulers(String host, int port, String trackingUrl, AppContext appContext) { - taskSchedulers[0] = mockTaskScheduler; - taskSchedulerServiceWrappers[0] = new ServicePluginLifecycleAbstractService<>(taskSchedulers[0]); + taskSchedulers[0] = new TaskSchedulerWrapper(mockTaskScheduler); + taskSchedulerServiceWrappers[0] = + new ServicePluginLifecycleAbstractService<>(taskSchedulers[0].getTaskScheduler()); } @Override @@ -272,7 +285,7 @@ public class TestTaskSchedulerManager { } @Test (timeout = 5000) - public void testContainerInternalPreempted() throws IOException { + public void testContainerInternalPreempted() throws IOException, ServicePluginException { Configuration conf = new Configuration(false); schedulerHandler.init(conf); schedulerHandler.start(); @@ -533,6 +546,93 @@ public class TestTaskSchedulerManager { eq(launchRequest2)); } + @SuppressWarnings("unchecked") + @Test(timeout = 5000) + public void testTaskSchedulerUserError() { + TaskScheduler taskScheduler = mock(TaskScheduler.class, new ExceptionAnswer()); + + EventHandler eventHandler = mock(EventHandler.class); + AppContext appContext = mock(AppContext.class, RETURNS_DEEP_STUBS); + + when(appContext.getEventHandler()).thenReturn(eventHandler); + doReturn("testTaskScheduler").when(appContext).getTaskSchedulerName(0); + String expectedId = "[0:testTaskScheduler]"; + + Configuration conf = new Configuration(false); + + InetSocketAddress address = new InetSocketAddress(15222); + DAGClientServer mockClientService = mock(DAGClientServer.class); + doReturn(address).when(mockClientService).getBindAddress(); + TaskSchedulerManager taskSchedulerManager = + new TaskSchedulerManager(taskScheduler, appContext, mock(ContainerSignatureMatcher.class), + mockClientService, + Executors.newFixedThreadPool(1)) { + @Override + protected void instantiateSchedulers(String host, int port, String trackingUrl, + AppContext appContext) throws TezException { + // Stubbed out since these are setup up front in the constructor used for testing + } + }; + + try { + taskSchedulerManager.init(conf); + taskSchedulerManager.start(); + + // Invoking a couple of random methods + + AMSchedulerEventTALaunchRequest launchRequest = + new AMSchedulerEventTALaunchRequest(mock(TezTaskAttemptID.class), mock(Resource.class), + mock(TaskSpec.class), mock(TaskAttempt.class), mock(TaskLocationHint.class), 0, + mock(ContainerContext.class), 0, 0, 0); + taskSchedulerManager.handleEvent(launchRequest); + + ArgumentCaptor<Event> argumentCaptor = ArgumentCaptor.forClass(Event.class); + + verify(eventHandler, times(1)).handle(argumentCaptor.capture()); + + Event rawEvent = argumentCaptor.getValue(); + assertTrue(rawEvent instanceof DAGAppMasterEventUserServiceFatalError); + DAGAppMasterEventUserServiceFatalError event = + (DAGAppMasterEventUserServiceFatalError) rawEvent; + + assertEquals(DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, event.getType()); + assertTrue(event.getError().getMessage().contains("TestException_" + "allocateTask")); + assertTrue(event.getDiagnosticInfo().contains("Task Allocation")); + assertTrue(event.getDiagnosticInfo().contains(expectedId)); + + + taskSchedulerManager.dagCompleted(); + argumentCaptor = ArgumentCaptor.forClass(Event.class); + verify(eventHandler, times(2)).handle(argumentCaptor.capture()); + + rawEvent = argumentCaptor.getAllValues().get(1); + assertTrue(rawEvent instanceof DAGAppMasterEventUserServiceFatalError); + event = (DAGAppMasterEventUserServiceFatalError) rawEvent; + + assertEquals(DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, event.getType()); + assertTrue(event.getError().getMessage().contains("TestException_" + "dagComplete")); + assertTrue(event.getDiagnosticInfo().contains("Dag Completion")); + assertTrue(event.getDiagnosticInfo().contains(expectedId)); + + } finally { + taskSchedulerManager.stop(); + } + } + + private static class ExceptionAnswer implements Answer { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + Method method = invocation.getMethod(); + if (method.getDeclaringClass().equals(TaskScheduler.class) && + !method.getName().equals("getContext") && !method.getName().equals("initialize") && + !method.getName().equals("start") && !method.getName().equals("shutdown")) { + throw new RuntimeException("TestException_" + method.getName()); + } else { + return invocation.callRealMethod(); + } + } + } + public static class TSEHForMultipleSchedulersTest extends TaskSchedulerManager { private final TaskScheduler yarnTaskScheduler; http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerWrapper.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerWrapper.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerWrapper.java new file mode 100644 index 0000000..cd8a496 --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerWrapper.java @@ -0,0 +1,29 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.rm; + +import com.google.common.collect.Sets; +import org.apache.tez.dag.app.PluginWrapperTestHelpers; +import org.apache.tez.serviceplugins.api.TaskScheduler; +import org.junit.Test; + +public class TestTaskSchedulerWrapper { + + @Test(timeout = 5000) + public void testDelegation() throws Exception { + PluginWrapperTestHelpers.testDelegation(TaskSchedulerWrapper.class, TaskScheduler.class, + Sets.newHashSet("getTaskScheduler")); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java index cc88f0d..8b8b6d7 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java @@ -63,7 +63,9 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.TokenCache; +import org.apache.tez.dag.app.TaskCommunicatorWrapper; import org.apache.tez.serviceplugins.api.ContainerEndReason; +import org.apache.tez.serviceplugins.api.ServicePluginException; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.dag.api.TaskCommunicator; import org.apache.tez.dag.app.AppContext; @@ -1228,8 +1230,12 @@ public class TestAMContainer { tal = mock(TaskCommunicatorManagerInterface.class); TaskCommunicator taskComm = mock(TaskCommunicator.class); - doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress(); - doReturn(taskComm).when(tal).getTaskCommunicator(0); + try { + doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress(); + } catch (ServicePluginException e) { + throw new RuntimeException(e); + } + doReturn(new TaskCommunicatorWrapper(taskComm)).when(tal).getTaskCommunicator(0); dagID = TezDAGID.getInstance(applicationID, 1); vertexID = TezVertexID.getInstance(dagID, 1); http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java index 31d756c..e21dda1 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java @@ -35,6 +35,7 @@ import org.apache.tez.dag.api.TaskCommunicator; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ContainerHeartbeatHandler; import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; +import org.apache.tez.serviceplugins.api.ServicePluginException; public class TestAMContainerMap { @@ -42,7 +43,7 @@ public class TestAMContainerMap { return mock(ContainerHeartbeatHandler.class); } - private TaskCommunicatorManagerInterface mockTaskAttemptListener() { + private TaskCommunicatorManagerInterface mockTaskAttemptListener() throws ServicePluginException { TaskCommunicatorManagerInterface tal = mock(TaskCommunicatorManagerInterface.class); TaskCommunicator taskComm = mock(TaskCommunicator.class); doReturn(new InetSocketAddress("localhost", 21000)).when(taskComm).getAddress(); http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java ---------------------------------------------------------------------- diff --git a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java index 186bacd..f9358bf 100644 --- a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java +++ b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java @@ -21,6 +21,7 @@ package org.apache.tez.examples; import java.io.IOException; import java.util.Set; +import com.google.common.annotations.VisibleForTesting; import org.apache.tez.dag.api.Vertex.VertexExecutionContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -134,7 +135,8 @@ public class JoinValidate extends TezExampleBase { return 0; } - private DAG createDag(TezConfiguration tezConf, Path lhs, Path rhs, int numPartitions) + @VisibleForTesting + DAG createDag(TezConfiguration tezConf, Path lhs, Path rhs, int numPartitions) throws IOException { DAG dag = DAG.create(getDagName()); if (getDefaultExecutionContext() != null) { http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncherWithErrors.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncherWithErrors.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncherWithErrors.java new file mode 100644 index 0000000..d489cca --- /dev/null +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncherWithErrors.java @@ -0,0 +1,37 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.launcher; + +import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; +import org.apache.tez.serviceplugins.api.ContainerLauncher; +import org.apache.tez.serviceplugins.api.ContainerLauncherContext; +import org.apache.tez.serviceplugins.api.ContainerStopRequest; + +public class TezTestServiceContainerLauncherWithErrors extends ContainerLauncher { + public TezTestServiceContainerLauncherWithErrors( + ContainerLauncherContext containerLauncherContext) { + super(containerLauncherContext); + } + + @Override + public void launchContainer(ContainerLaunchRequest launchRequest) { + throw new RuntimeException("Simulated Error"); + } + + @Override + public void stopContainer(ContainerStopRequest stopRequest) { + throw new RuntimeException("Simulated Error"); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerServiceWithErrors.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerServiceWithErrors.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerServiceWithErrors.java new file mode 100644 index 0000000..1705eac --- /dev/null +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerServiceWithErrors.java @@ -0,0 +1,93 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.rm; + +import javax.annotation.Nullable; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; +import org.apache.tez.serviceplugins.api.TaskScheduler; +import org.apache.tez.serviceplugins.api.TaskSchedulerContext; + +public class TezTestServiceTaskSchedulerServiceWithErrors extends TaskScheduler { + public TezTestServiceTaskSchedulerServiceWithErrors( + TaskSchedulerContext taskSchedulerContext) { + super(taskSchedulerContext); + } + + @Override + public Resource getAvailableResources() { + return Resource.newInstance(2048, 2); + } + + @Override + public Resource getTotalResources() { + return Resource.newInstance(2048, 2); + } + + @Override + public int getClusterNodeCount() { + return 1; + } + + @Override + public void blacklistNode(NodeId nodeId) { + throw new RuntimeException("Simulated Error"); + } + + @Override + public void unblacklistNode(NodeId nodeId) { + throw new RuntimeException("Simulated Error"); + } + + @Override + public void allocateTask(Object task, Resource capability, String[] hosts, String[] racks, + Priority priority, Object containerSignature, Object clientCookie) { + throw new RuntimeException("Simulated Error"); + } + + @Override + public void allocateTask(Object task, Resource capability, ContainerId containerId, + Priority priority, Object containerSignature, Object clientCookie) { + throw new RuntimeException("Simulated Error"); + } + + @Override + public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason, + @Nullable String diagnostics) { + throw new RuntimeException("Simulated Error"); + } + + @Override + public Object deallocateContainer(ContainerId containerId) { + throw new RuntimeException("Simulated Error"); + } + + @Override + public void setShouldUnregister() { + } + + @Override + public boolean hasUnregistered() { + return false; + } + + @Override + public void dagComplete() { + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorWithErrors.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorWithErrors.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorWithErrors.java new file mode 100644 index 0000000..0a3d8d4 --- /dev/null +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorWithErrors.java @@ -0,0 +1,83 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.taskcomm; + +import javax.annotation.Nullable; +import java.net.InetSocketAddress; +import java.util.Map; + +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.tez.dag.api.TaskCommunicator; +import org.apache.tez.dag.api.TaskCommunicatorContext; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.api.impl.TaskSpec; +import org.apache.tez.serviceplugins.api.ContainerEndReason; +import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; + +public class TezTestServiceTaskCommunicatorWithErrors extends TaskCommunicator { + public TezTestServiceTaskCommunicatorWithErrors( + TaskCommunicatorContext taskCommunicatorContext) { + super(taskCommunicatorContext); + } + + @Override + public void registerRunningContainer(ContainerId containerId, String hostname, int port) { + throw new RuntimeException("Simulated Error"); + } + + @Override + public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason, + @Nullable String diagnostics) { + throw new RuntimeException("Simulated Error"); + } + + @Override + public void registerRunningTaskAttempt(ContainerId containerId, TaskSpec taskSpec, + Map<String, LocalResource> additionalResources, + Credentials credentials, boolean credentialsChanged, + int priority) { + throw new RuntimeException("Simulated Error"); + } + + @Override + public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, + TaskAttemptEndReason endReason, + @Nullable String diagnostics) { + throw new RuntimeException("Simulated Error"); + } + + @Override + public InetSocketAddress getAddress() { + return NetUtils.createSocketAddrForHost("localhost", 0); + } + + @Override + public void onVertexStateUpdated(VertexStateUpdate stateUpdate) { + throw new RuntimeException("Simulated Error"); + } + + @Override + public void dagComplete(int dagIdentifier) { + } + + @Override + public Object getMetaInfo() { + throw new RuntimeException("Simulated Error"); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java b/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java index f31476f..64b9063 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java @@ -15,6 +15,11 @@ package org.apache.tez.examples; +import java.io.IOException; + +import org.apache.hadoop.fs.Path; +import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.Vertex.VertexExecutionContext; public class JoinValidateConfigured extends JoinValidate { @@ -60,4 +65,9 @@ public class JoinValidateConfigured extends JoinValidate { protected String getDagName() { return "JoinValidate_" + dagNameSuffix; } + + public DAG createDag(TezConfiguration tezConf, Path lhs, Path rhs, int numPartitions) + throws IOException { + return super.createDag(tezConf, lhs, rhs, numPartitions); + } }
