http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java index d4cf317..1e76dc9 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java @@ -37,6 +37,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.tez.serviceplugins.api.TaskScheduler; +import org.apache.tez.serviceplugins.api.TaskSchedulerContext; +import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AMState; +import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AppFinalStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.commons.math3.random.RandomDataGenerator; @@ -59,10 +63,7 @@ import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezUncheckedException; -import org.apache.tez.dag.app.AppContext; -import org.apache.tez.dag.app.DAGAppMasterState; -import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback.AppFinalStatus; -import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher; +import org.apache.tez.common.ContainerSignatureMatcher; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -80,17 +81,14 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; eventHandler.handle(new AMNodeEventNodeCountUpdated(clusterNmCount)); } */ -public class YarnTaskSchedulerService extends TaskSchedulerService +public class YarnTaskSchedulerService extends TaskScheduler implements AMRMClientAsync.CallbackHandler { private static final Logger LOG = LoggerFactory.getLogger(YarnTaskSchedulerService.class); final TezAMRMClientAsync<CookieContainerRequest> amRmClient; - final TaskSchedulerAppCallback realAppClient; - final TaskSchedulerAppCallback appClientDelegate; final ContainerSignatureMatcher containerSignatureMatcher; - ExecutorService appCallbackExecutor; // Container Re-Use configuration private boolean shouldReuseContainers; @@ -131,7 +129,6 @@ public class YarnTaskSchedulerService extends TaskSchedulerService final String appHostName; final int appHostPort; final String appTrackingUrl; - final AppContext appContext; private AtomicBoolean hasUnregistered = new AtomicBoolean(false); AtomicBoolean isStopped = new AtomicBoolean(false); @@ -150,6 +147,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService Set<ContainerId> sessionMinHeldContainers = Sets.newHashSet(); RandomDataGenerator random = new RandomDataGenerator(); + private final Configuration conf; @VisibleForTesting protected AtomicBoolean shouldUnregister = new AtomicBoolean(false); @@ -213,51 +211,29 @@ public class YarnTaskSchedulerService extends TaskSchedulerService } } - public YarnTaskSchedulerService(TaskSchedulerAppCallback appClient, - ContainerSignatureMatcher containerSignatureMatcher, - String appHostName, - int appHostPort, - String appTrackingUrl, - AppContext appContext) { - super(YarnTaskSchedulerService.class.getName()); - this.realAppClient = appClient; - this.appCallbackExecutor = createAppCallbackExecutorService(); - this.containerSignatureMatcher = containerSignatureMatcher; - this.appClientDelegate = createAppCallbackDelegate(appClient); + public YarnTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) { + super(taskSchedulerContext); + this.containerSignatureMatcher = taskSchedulerContext.getContainerSignatureMatcher(); this.amRmClient = TezAMRMClientAsync.createAMRMClientAsync(1000, this); - this.appHostName = appHostName; - this.appHostPort = appHostPort; - this.appTrackingUrl = appTrackingUrl; - this.appContext = appContext; + this.appHostName = taskSchedulerContext.getAppHostName(); + this.appHostPort = taskSchedulerContext.getAppClientPort(); + this.appTrackingUrl = taskSchedulerContext.getAppTrackingUrl(); + this.conf = taskSchedulerContext.getInitialConfiguration(); } @Private @VisibleForTesting - YarnTaskSchedulerService(TaskSchedulerAppCallback appClient, - ContainerSignatureMatcher containerSignatureMatcher, - String appHostName, - int appHostPort, - String appTrackingUrl, - TezAMRMClientAsync<CookieContainerRequest> client, - AppContext appContext) { - super(YarnTaskSchedulerService.class.getName()); - this.realAppClient = appClient; - this.appCallbackExecutor = createAppCallbackExecutorService(); - this.containerSignatureMatcher = containerSignatureMatcher; - this.appClientDelegate = createAppCallbackDelegate(appClient); + YarnTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, + TezAMRMClientAsync<CookieContainerRequest> client) { + super(taskSchedulerContext); + this.containerSignatureMatcher = taskSchedulerContext.getContainerSignatureMatcher(); this.amRmClient = client; - this.appHostName = appHostName; - this.appHostPort = appHostPort; - this.appTrackingUrl = appTrackingUrl; - this.appContext = appContext; + this.appHostName = taskSchedulerContext.getAppHostName(); + this.appHostPort = taskSchedulerContext.getAppClientPort(); + this.appTrackingUrl = taskSchedulerContext.getAppTrackingUrl(); + this.conf = taskSchedulerContext.getInitialConfiguration(); } - @VisibleForTesting - ExecutorService createAppCallbackExecutorService() { - return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() - .setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build()); - } - @Override public Resource getAvailableResources() { return amRmClient.getAvailableResources(); @@ -269,12 +245,6 @@ public class YarnTaskSchedulerService extends TaskSchedulerService return amRmClient.getClusterNodeCount(); } - TaskSchedulerAppCallback createAppCallbackDelegate( - TaskSchedulerAppCallback realAppClient) { - return new TaskSchedulerAppCallbackWrapper(realAppClient, - appCallbackExecutor); - } - @Override public void setShouldUnregister() { this.shouldUnregister.set(true); @@ -287,8 +257,9 @@ public class YarnTaskSchedulerService extends TaskSchedulerService // AbstractService methods @Override - public synchronized void serviceInit(Configuration conf) { + public synchronized void initialize() { + // TODO Post TEZ-2003. Make all of these final fields. amRmClient.init(conf); int heartbeatIntervalMax = conf.getInt( TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX, @@ -361,7 +332,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService } @Override - public void serviceStart() { + public void start() { try { RegisterApplicationMasterResponse response; synchronized (this) { @@ -371,7 +342,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService appTrackingUrl); } // upcall to app outside locks - appClientDelegate.setApplicationRegistrationData( + getContext().setApplicationRegistrationData( response.getMaximumResourceCapability(), response.getApplicationACLs(), response.getClientToAMTokenMasterKey()); @@ -387,7 +358,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService } @Override - public void serviceStop() throws InterruptedException { + public void shutdown() throws InterruptedException { // upcall to app outside of locks try { delayedContainerManager.shutdown(); @@ -396,7 +367,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService synchronized (this) { isStopped.set(true); if (shouldUnregister.get()) { - AppFinalStatus status = appClientDelegate.getFinalAppStatus(); + AppFinalStatus status = getContext().getFinalAppStatus(); LOG.info("Unregistering application from RM" + ", exitStatus=" + status.exitStatus + ", exitMessage=" + status.exitMessage @@ -413,8 +384,6 @@ public class YarnTaskSchedulerService extends TaskSchedulerService // operation and at the same time the callback operation might be trying // to get our lock. amRmClient.stop(); - appCallbackExecutor.shutdown(); - appCallbackExecutor.awaitTermination(1000l, TimeUnit.MILLISECONDS); } catch (YarnException e) { LOG.error("Yarn Exception while unregistering ", e); throw new TezUncheckedException(e); @@ -478,7 +447,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService // upcall to app must be outside locks for (Entry<Object, ContainerStatus> entry : appContainerStatus.entrySet()) { - appClientDelegate.containerCompleted(entry.getKey(), entry.getValue()); + getContext().containerCompleted(entry.getKey(), entry.getValue()); } } @@ -528,7 +497,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService private synchronized Map<CookieContainerRequest, Container> assignNewlyAllocatedContainers(Iterable<Container> containers) { - boolean amInCompletionState = appContext.isAMInCompletionState(); + boolean amInCompletionState = (getContext().getAMState() == AMState.COMPLETED); Map<CookieContainerRequest, Container> assignedContainers = new HashMap<CookieContainerRequest, Container>(); @@ -550,7 +519,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService private synchronized Map<CookieContainerRequest, Container> tryAssignReUsedContainers(Iterable<Container> containers) { - boolean amInCompletionState = appContext.isAMInCompletionState(); + boolean amInCompletionState = (getContext().getAMState() == AMState.COMPLETED); Map<CookieContainerRequest, Container> assignedContainers = new HashMap<CookieContainerRequest, Container>(); @@ -590,7 +559,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService private synchronized Map<CookieContainerRequest, Container> assignDelayedContainer(HeldContainer heldContainer) { - DAGAppMasterState state = appContext.getAMState(); + AMState state = getContext().getAMState(); boolean isNew = heldContainer.isNew(); if (LOG.isDebugEnabled()) { @@ -606,13 +575,13 @@ public class YarnTaskSchedulerService extends TaskSchedulerService + ", isNew=" + isNew); } - if (state.equals(DAGAppMasterState.IDLE) || taskRequests.isEmpty()) { + if (state.equals(AMState.IDLE) || taskRequests.isEmpty()) { // reset locality level on held container // if sessionDelay defined, push back into delayed queue if not already // done so // Compute min held containers. - if (appContext.isSession() && sessionNumMinHeldContainers > 0 && + if (getContext().isSession() && sessionNumMinHeldContainers > 0 && sessionMinHeldContainers.isEmpty()) { // session mode and need to hold onto containers and not done so already determineMinHeldContainers(); @@ -626,7 +595,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService && idleContainerTimeoutMin != -1)) { // container idle timeout has expired or is a new unused container. // new container is possibly a spurious race condition allocation. - if (appContext.isSession() + if (getContext().isSession() && sessionMinHeldContainers.contains(heldContainer.getContainer().getId())) { // There are no outstanding requests. So its safe to hold new containers. // We may have received more containers than necessary and some are unused @@ -667,7 +636,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService heldContainer.getContainer(), currentTime + localitySchedulingDelay); } - } else if (state.equals(DAGAppMasterState.RUNNING)) { + } else if (state.equals(AMState.RUNNING_APP)) { // clear min held containers since we need to allocate to tasks if (!sessionMinHeldContainers.isEmpty()) { // update the expire time of min held containers so that they are @@ -806,12 +775,12 @@ public class YarnTaskSchedulerService extends TaskSchedulerService // Are there any pending requests at any priority? // release if there are tasks or this is not a session if (safeToRelease && - (!taskRequests.isEmpty() || !appContext.isSession())) { + (!taskRequests.isEmpty() || !getContext().isSession())) { LOG.info("Releasing held container as either there are pending but " + " unmatched requests or this is not a session" + ", containerId=" + heldContainer.container.getId() + ", pendingTasks=" + taskRequests.size() - + ", isSession=" + appContext.isSession() + + ", isSession=" + getContext().isSession() + ". isNew=" + isNew); releaseUnassignedContainers( Lists.newArrayList(heldContainer.container)); @@ -862,7 +831,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService return; } // upcall to app must be outside locks - appClientDelegate.appShutdownRequested(); + getContext().appShutdownRequested(); } @Override @@ -872,7 +841,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService } // ignore bad nodes for now // upcall to app must be outside locks - appClientDelegate.nodesUpdated(updatedNodes); + getContext().nodesUpdated(updatedNodes); } @Override @@ -894,7 +863,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService numHeartbeats++; preemptIfNeeded(); - return appClientDelegate.getProgress(); + return getContext().getProgress(); } @Override @@ -902,7 +871,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService if (isStopped.get()) { return; } - appClientDelegate.onError(t); + getContext().onError(t); } @Override @@ -1289,7 +1258,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService ContainerId cId = preemptedContainers[i]; if (cId != null) { LOG.info("Preempting container: " + cId + " currently allocated to a task."); - appClientDelegate.preemptContainer(cId); + getContext().preemptContainer(cId); } } } @@ -1422,7 +1391,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService Object assignedTask = containerAssignments.remove(containerId); if (assignedTask != null) { // A task was assigned to this container at some point. Inform the app. - appClientDelegate.containerBeingReleased(containerId); + getContext().containerBeingReleased(containerId); } HeldContainer delayedContainer = heldContainers.remove(containerId); if (delayedContainer != null) { @@ -1626,7 +1595,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService private void informAppAboutAssignment(CookieContainerRequest assigned, Container container) { - appClientDelegate.taskAllocated(getTask(assigned), + getContext().taskAllocated(getTask(assigned), assigned.getCookie().getAppCookie(), container); }
http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java index 5cff766..aeacf84 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java @@ -29,6 +29,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.serviceplugins.api.ContainerEndReason; +import org.apache.tez.common.ContainerSignatureMatcher; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java index 938096d..fcb9eaf 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.concurrent.ConcurrentHashMap; import org.apache.tez.dag.app.dag.DAG; +import org.apache.tez.common.ContainerSignatureMatcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.service.AbstractService; http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerContextMatcher.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerContextMatcher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerContextMatcher.java index 211c537..436f098 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerContextMatcher.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerContextMatcher.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.tez.dag.app.ContainerContext; import com.google.common.base.Preconditions; +import org.apache.tez.common.ContainerSignatureMatcher; public class ContainerContextMatcher implements ContainerSignatureMatcher { http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerSignatureMatcher.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerSignatureMatcher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerSignatureMatcher.java deleted file mode 100644 index 0f9c2d6..0000000 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/ContainerSignatureMatcher.java +++ /dev/null @@ -1,60 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.app.rm.container; - -import java.util.Map; - -import org.apache.hadoop.yarn.api.records.LocalResource; - -public interface ContainerSignatureMatcher { - /** - * Checks the compatibility between the specified container signatures. - * - * @return true if the first signature is a super set of the second - * signature. - */ - public boolean isSuperSet(Object cs1, Object cs2); - - /** - * Checks if the container signatures match exactly - * @return true if exact match - */ - public boolean isExactMatch(Object cs1, Object cs2); - - /** - * Gets additional resources specified in lr2, which are not present for lr1 - * - * @param lr1 - * @param lr2 - * @return additional resources specified in lr2, which are not present for lr1 - */ - public Map<String, LocalResource> getAdditionalResources(Map<String, LocalResource> lr1, - Map<String, LocalResource> lr2); - - - /** - * Do a union of 2 signatures - * Pre-condition. This function should only be invoked iff cs1 is compatible with cs2. - * i.e. isSuperSet should not return false. - * @param cs1 Signature 1 Original signature - * @param cs2 Signature 2 New signature - * @return Union of 2 signatures - */ - public Object union(Object cs1, Object cs2); - -} http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java index e37ab4a..88f6066 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java @@ -22,7 +22,6 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; -import static org.mockito.Matchers.isNull; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -46,7 +45,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; @@ -68,16 +66,14 @@ import org.apache.tez.dag.app.ContainerHeartbeatHandler; import org.apache.tez.dag.app.DAGAppMasterState; import org.apache.tez.dag.app.TaskAttemptListener; import org.apache.tez.dag.app.dag.TaskAttempt; -import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback; -import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback.AppFinalStatus; import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.CookieContainerRequest; import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AMRMClientAsyncForTest; import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AMRMClientForTest; import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher; import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.CapturingEventHandler; -import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerAppCallbackDrainable; +import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerContextDrainable; import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerEventHandlerForTest; -import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback; +import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerWithDrainableContext; import org.apache.tez.dag.app.rm.container.AMContainerEventAssignTA; import org.apache.tez.dag.app.rm.container.AMContainerEventStopRequest; import org.apache.tez.dag.app.rm.container.AMContainerMap; @@ -116,14 +112,13 @@ public class TestContainerReuse { conf.setBoolean( TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true); conf.setBoolean( - TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, false); + TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, false); conf.setBoolean( - TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED, false); + TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED, false); conf.setLong( TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 3000l); conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0); conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0); - TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class); CapturingEventHandler eventHandler = new CapturingEventHandler(); TezDAGID dagID = TezDAGID.getInstance("0", 0, 0); @@ -132,12 +127,6 @@ public class TestContainerReuse { AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest(); TezAMRMClientAsync<CookieContainerRequest> rmClient = spy(new AMRMClientAsyncForTest(rmClientCore, 100)); - String appUrl = "url"; - String appMsg = "success"; - AppFinalStatus finalStatus = - new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl); - - doReturn(finalStatus).when(mockApp).getFinalAppStatus(); AppContext appContext = mock(AppContext.class); doReturn(conf).when(appContext).getAMConf(); @@ -161,11 +150,11 @@ public class TestContainerReuse { taskSchedulerEventHandler.init(conf); taskSchedulerEventHandler.start(); - TaskSchedulerWithDrainableAppCallback taskScheduler = - (TaskSchedulerWithDrainableAppCallback) + TaskSchedulerWithDrainableContext taskScheduler = + (TaskSchedulerWithDrainableContext) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler) .getSpyTaskScheduler(); - TaskSchedulerAppCallbackDrainable drainableAppCallback = + TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback(); AtomicBoolean drainNotifier = new AtomicBoolean(false); @@ -251,8 +240,7 @@ public class TestContainerReuse { } } assertTrue("containerHost2 was not released", exception == null); - taskScheduler.stop(); - taskScheduler.close(); + taskScheduler.shutdown(); taskSchedulerEventHandler.close(); } @@ -267,7 +255,6 @@ public class TestContainerReuse { conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 1000l); conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0); conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0); - TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class); CapturingEventHandler eventHandler = new CapturingEventHandler(); TezDAGID dagID = TezDAGID.getInstance("0", 0, 0); @@ -276,12 +263,6 @@ public class TestContainerReuse { AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest(); TezAMRMClientAsync<CookieContainerRequest> rmClient = spy(new AMRMClientAsyncForTest(rmClientCore, 100)); - String appUrl = "url"; - String appMsg = "success"; - AppFinalStatus finalStatus = - new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl); - - doReturn(finalStatus).when(mockApp).getFinalAppStatus(); AppContext appContext = mock(AppContext.class); doReturn(new Configuration(false)).when(appContext).getAMConf(); @@ -304,11 +285,11 @@ public class TestContainerReuse { taskSchedulerEventHandler.init(conf); taskSchedulerEventHandler.start(); - TaskSchedulerWithDrainableAppCallback taskScheduler = - (TaskSchedulerWithDrainableAppCallback) + TaskSchedulerWithDrainableContext taskScheduler = + (TaskSchedulerWithDrainableContext) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler) .getSpyTaskScheduler(); - TaskSchedulerAppCallbackDrainable drainableAppCallback = + TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback(); AtomicBoolean drainNotifier = new AtomicBoolean(false); @@ -366,8 +347,7 @@ public class TestContainerReuse { eq(containerHost2.getId())); eventHandler.verifyInvocation(AMContainerEventStopRequest.class); - taskScheduler.stop(); - taskScheduler.close(); + taskScheduler.shutdown(); taskSchedulerEventHandler.close(); } @@ -380,19 +360,12 @@ public class TestContainerReuse { tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0); tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0); tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0); - TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class); CapturingEventHandler eventHandler = new CapturingEventHandler(); TezDAGID dagID = TezDAGID.getInstance("0", 0, 0); AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest(); TezAMRMClientAsync<CookieContainerRequest> rmClient = spy(new AMRMClientAsyncForTest(rmClientCore, 100)); - String appUrl = "url"; - String appMsg = "success"; - AppFinalStatus finalStatus = - new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl); - - doReturn(finalStatus).when(mockApp).getFinalAppStatus(); AppContext appContext = mock(AppContext.class); doReturn(new Configuration(false)).when(appContext).getAMConf(); @@ -410,9 +383,9 @@ public class TestContainerReuse { taskSchedulerEventHandler.init(tezConf); taskSchedulerEventHandler.start(); - TaskSchedulerWithDrainableAppCallback taskScheduler = (TaskSchedulerWithDrainableAppCallback) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler) + TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler) .getSpyTaskScheduler(); - TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback(); + TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback(); AtomicBoolean drainNotifier = new AtomicBoolean(false); taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier; @@ -504,7 +477,7 @@ public class TestContainerReuse { eventHandler.verifyInvocation(AMContainerEventStopRequest.class); eventHandler.reset(); - taskScheduler.close(); + taskScheduler.shutdown(); taskSchedulerEventHandler.close(); } @@ -522,19 +495,11 @@ public class TestContainerReuse { tezConf.set(TezConfiguration.TEZ_TASK_SPECIFIC_LAUNCH_CMD_OPTS, "dir=/tmp/__VERTEX_NAME__/__TASK_INDEX__"); TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption = new TaskSpecificLaunchCmdOption(tezConf); - TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class); - CapturingEventHandler eventHandler = new CapturingEventHandler(); TezDAGID dagID = TezDAGID.getInstance("0", 0, 0); AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest(); TezAMRMClientAsync<CookieContainerRequest> rmClient = spy(new AMRMClientAsyncForTest(rmClientCore, 100)); - String appUrl = "url"; - String appMsg = "success"; - AppFinalStatus finalStatus = - new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl); - - doReturn(finalStatus).when(mockApp).getFinalAppStatus(); AppContext appContext = mock(AppContext.class); doReturn(new Configuration(false)).when(appContext).getAMConf(); @@ -554,10 +519,10 @@ public class TestContainerReuse { taskSchedulerEventHandler.init(tezConf); taskSchedulerEventHandler.start(); - TaskSchedulerWithDrainableAppCallback taskScheduler = - (TaskSchedulerWithDrainableAppCallback) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler) + TaskSchedulerWithDrainableContext taskScheduler = + (TaskSchedulerWithDrainableContext) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler) .getSpyTaskScheduler(); - TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback(); + TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback(); AtomicBoolean drainNotifier = new AtomicBoolean(false); taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier; @@ -705,7 +670,7 @@ public class TestContainerReuse { verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta16), any(Object.class), eq(container3)); eventHandler.reset(); - taskScheduler.close(); + taskScheduler.shutdown(); taskSchedulerEventHandler.close(); } @@ -721,20 +686,12 @@ public class TestContainerReuse { tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 1000l); tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 1000l); - TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class); - CapturingEventHandler eventHandler = new CapturingEventHandler(); TezDAGID dagID = TezDAGID.getInstance("0", 0, 0); AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest(); TezAMRMClientAsync<CookieContainerRequest> rmClient = spy(new AMRMClientAsyncForTest(rmClientCore, 100)); - String appUrl = "url"; - String appMsg = "success"; - AppFinalStatus finalStatus = - new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl); - - doReturn(finalStatus).when(mockApp).getFinalAppStatus(); AppContext appContext = mock(AppContext.class); doReturn(new Configuration(false)).when(appContext).getAMConf(); @@ -758,11 +715,11 @@ public class TestContainerReuse { taskSchedulerEventHandler.init(tezConf); taskSchedulerEventHandler.start(); - TaskSchedulerWithDrainableAppCallback taskScheduler = - (TaskSchedulerWithDrainableAppCallback) + TaskSchedulerWithDrainableContext taskScheduler = + (TaskSchedulerWithDrainableContext) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler) .getSpyTaskScheduler(); - TaskSchedulerAppCallbackDrainable drainableAppCallback = + TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback(); AtomicBoolean drainNotifier = new AtomicBoolean(false); taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier; @@ -836,7 +793,7 @@ public class TestContainerReuse { verify(rmClient).releaseAssignedContainer(eq(container1.getId())); eventHandler.verifyInvocation(AMContainerEventStopRequest.class); - taskScheduler.close(); + taskScheduler.shutdown(); taskSchedulerEventHandler.close(); } @@ -853,20 +810,12 @@ public class TestContainerReuse { tezConf.setInt( TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS, 1); - TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class); - CapturingEventHandler eventHandler = new CapturingEventHandler(); TezDAGID dagID = TezDAGID.getInstance("0", 0, 0); AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest(); TezAMRMClientAsync<CookieContainerRequest> rmClient = spy(new AMRMClientAsyncForTest(rmClientCore, 100)); - String appUrl = "url"; - String appMsg = "success"; - AppFinalStatus finalStatus = - new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl); - - doReturn(finalStatus).when(mockApp).getFinalAppStatus(); AppContext appContext = mock(AppContext.class); doReturn(new Configuration(false)).when(appContext).getAMConf(); @@ -890,11 +839,11 @@ public class TestContainerReuse { taskSchedulerEventHandler.init(tezConf); taskSchedulerEventHandler.start(); - TaskSchedulerWithDrainableAppCallback taskScheduler = - (TaskSchedulerWithDrainableAppCallback) + TaskSchedulerWithDrainableContext taskScheduler = + (TaskSchedulerWithDrainableContext) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler) .getSpyTaskScheduler(); - TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback(); + TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback(); AtomicBoolean drainNotifier = new AtomicBoolean(false); taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier; @@ -965,7 +914,7 @@ public class TestContainerReuse { // container should not get released due to min held containers verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); - taskScheduler.close(); + taskScheduler.shutdown(); taskSchedulerEventHandler.close(); } @@ -979,19 +928,11 @@ public class TestContainerReuse { tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0); tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, -1); - TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class); - CapturingEventHandler eventHandler = new CapturingEventHandler(); TezDAGID dagID1 = TezDAGID.getInstance("0", 1, 0); AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest(); TezAMRMClientAsync<CookieContainerRequest> rmClient = spy(new AMRMClientAsyncForTest(rmClientCore, 100)); - String appUrl = "url"; - String appMsg = "success"; - AppFinalStatus finalStatus = - new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl); - - doReturn(finalStatus).when(mockApp).getFinalAppStatus(); AppContext appContext = mock(AppContext.class); doReturn(new Configuration(false)).when(appContext).getAMConf(); @@ -1011,9 +952,9 @@ public class TestContainerReuse { taskSchedulerEventHandler.init(tezConf); taskSchedulerEventHandler.start(); - TaskSchedulerWithDrainableAppCallback taskScheduler = (TaskSchedulerWithDrainableAppCallback) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler) + TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler) .getSpyTaskScheduler(); - TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback(); + TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback(); AtomicBoolean drainNotifier = new AtomicBoolean(false); taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier; @@ -1129,7 +1070,7 @@ public class TestContainerReuse { assertEquals(2, assignEvent.getRemoteTaskLocalResources().size()); eventHandler.reset(); - taskScheduler.close(); + taskScheduler.shutdown(); taskSchedulerEventHandler.close(); } @@ -1143,19 +1084,11 @@ public class TestContainerReuse { tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0); tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, -1); - TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class); - CapturingEventHandler eventHandler = new CapturingEventHandler(); TezDAGID dagID1 = TezDAGID.getInstance("0", 1, 0); AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest(); TezAMRMClientAsync<CookieContainerRequest> rmClient = spy(new AMRMClientAsyncForTest(rmClientCore, 100)); - String appUrl = "url"; - String appMsg = "success"; - AppFinalStatus finalStatus = - new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl); - - doReturn(finalStatus).when(mockApp).getFinalAppStatus(); AppContext appContext = mock(AppContext.class); doReturn(new Configuration(false)).when(appContext).getAMConf(); @@ -1177,9 +1110,9 @@ public class TestContainerReuse { taskSchedulerEventHandler.init(tezConf); taskSchedulerEventHandler.start(); - TaskSchedulerWithDrainableAppCallback taskScheduler = (TaskSchedulerWithDrainableAppCallback) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler) + TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler) .getSpyTaskScheduler(); - TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback(); + TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback(); AtomicBoolean drainNotifier = new AtomicBoolean(false); taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier; @@ -1291,7 +1224,7 @@ public class TestContainerReuse { verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta211), any(Object.class), eq(container2)); eventHandler.reset(); - taskScheduler.close(); + taskScheduler.shutdown(); taskSchedulerEventHandler.close(); } @@ -1305,19 +1238,12 @@ public class TestContainerReuse { tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0); tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0); tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0); - TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class); CapturingEventHandler eventHandler = new CapturingEventHandler(); TezDAGID dagID = TezDAGID.getInstance("0", 0, 0); AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest(); TezAMRMClientAsync<CookieContainerRequest> rmClient = spy(new AMRMClientAsyncForTest(rmClientCore, 100)); - String appUrl = "url"; - String appMsg = "success"; - AppFinalStatus finalStatus = - new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl); - - doReturn(finalStatus).when(mockApp).getFinalAppStatus(); AppContext appContext = mock(AppContext.class); doReturn(new Configuration(false)).when(appContext).getAMConf(); @@ -1326,7 +1252,7 @@ public class TestContainerReuse { AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext); doReturn(amContainerMap).when(appContext).getAllContainers(); doReturn(amNodeTracker).when(appContext).getNodeTracker(); - doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState(); + doReturn(DAGAppMasterState.SUCCEEDED).when(appContext).getAMState(); doReturn(true).when(appContext).isAMInCompletionState(); doReturn(dagID).when(appContext).getCurrentDAGID(); doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo(); @@ -1338,10 +1264,10 @@ public class TestContainerReuse { taskSchedulerEventHandler.init(tezConf); taskSchedulerEventHandler.start(); - TaskSchedulerWithDrainableAppCallback taskScheduler = (TaskSchedulerWithDrainableAppCallback) + TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler) .getSpyTaskScheduler(); - TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback(); + TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback(); AtomicBoolean drainNotifier = new AtomicBoolean(false); taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier; @@ -1369,7 +1295,7 @@ public class TestContainerReuse { drainableAppCallback.drain(); verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(container1)); - taskScheduler.close(); + taskScheduler.shutdown(); taskSchedulerEventHandler.close(); } http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java index 12390b2..2ada2f1 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java @@ -18,12 +18,12 @@ package org.apache.tez.dag.app.rm; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.concurrent.PriorityBlockingQueue; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.serviceplugins.api.TaskSchedulerContext; import org.junit.Assert; import org.junit.Test; @@ -33,24 +33,12 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.rm.LocalTaskSchedulerService.AsyncDelegateRequestHandler; import org.apache.tez.dag.app.rm.LocalTaskSchedulerService.LocalContainerFactory; import org.apache.tez.dag.app.rm.LocalTaskSchedulerService.TaskRequest; -import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback; public class TestLocalTaskScheduler { - public AppContext createMockAppContext() { - - ApplicationId appId = ApplicationId.newInstance(2000, 1); - ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); - - AppContext appContext = mock(AppContext.class); - doReturn(appAttemptId).when(appContext).getApplicationAttemptId(); - - return appContext; - } @Test(timeout = 5000) public void maxTasksAllocationsCannotBeExceeded() { @@ -59,17 +47,24 @@ public class TestLocalTaskScheduler { TezConfiguration tezConf = new TezConfiguration(); tezConf.setInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS, MAX_TASKS); - LocalContainerFactory containerFactory = new LocalContainerFactory(createMockAppContext(), 1000); + ApplicationId appId = ApplicationId.newInstance(2000, 1); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); + + TaskSchedulerContext + mockContext = TestTaskSchedulerHelpers.setupMockTaskSchedulerContext("", 0, "", true, + appAttemptId, 1000l, null, new Configuration()); + + LocalContainerFactory containerFactory = new LocalContainerFactory(appAttemptId, 1000); + HashMap<Object, Container> taskAllocations = new LinkedHashMap<Object, Container>(); PriorityBlockingQueue<TaskRequest> taskRequestQueue = new PriorityBlockingQueue<TaskRequest>(); - TaskSchedulerAppCallback appClientDelegate = mock(TaskSchedulerAppCallback.class); // Object under test AsyncDelegateRequestHandler requestHandler = new AsyncDelegateRequestHandler(taskRequestQueue, containerFactory, taskAllocations, - appClientDelegate, + mockContext, tezConf); // Allocate up to max tasks http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java index b555c62..c637f5f 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java @@ -27,11 +27,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.dag.Task; -import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback; import org.apache.tez.dag.app.rm.TestLocalTaskSchedulerService.MockLocalTaskSchedulerSerivce.MockAsyncDelegateRequestHandler; -import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher; +import org.apache.tez.serviceplugins.api.TaskSchedulerContext; import org.junit.Assert; import org.junit.Test; @@ -82,14 +80,15 @@ public class TestLocalTaskSchedulerService { * Normal flow of TaskAttempt */ @Test(timeout = 5000) - public void testDeallocationBeforeAllocation() { - AppContext appContext = mock(AppContext.class); + public void testDeallocationBeforeAllocation() throws InterruptedException { ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(10000l, 1), 1); - doReturn(appAttemptId).when(appContext).getApplicationAttemptId(); - MockLocalTaskSchedulerSerivce taskSchedulerService = new MockLocalTaskSchedulerSerivce - (mock(TaskSchedulerAppCallback.class), mock(ContainerSignatureMatcher.class), "", 0, "", appContext); - taskSchedulerService.init(new Configuration()); + + TaskSchedulerContext mockContext = TestTaskSchedulerHelpers + .setupMockTaskSchedulerContext("", 0, "", false, appAttemptId, 10000l, null, new Configuration()); + + MockLocalTaskSchedulerSerivce taskSchedulerService = new MockLocalTaskSchedulerSerivce(mockContext); + taskSchedulerService.initialize(); taskSchedulerService.start(); Task task = mock(Task.class); @@ -103,21 +102,24 @@ public class TestLocalTaskSchedulerService { assertEquals(1, requestHandler.deallocateCount); // The corresponding AllocateTaskRequest will be removed, so won't been processed. assertEquals(0, requestHandler.allocateCount); - taskSchedulerService.stop(); + taskSchedulerService.shutdown(); } /** * TaskAttempt Killed from START_WAIT */ @Test(timeout = 5000) - public void testDeallocationAfterAllocation() { - AppContext appContext = mock(AppContext.class); + public void testDeallocationAfterAllocation() throws InterruptedException { ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(10000l, 1), 1); - doReturn(appAttemptId).when(appContext).getApplicationAttemptId(); - MockLocalTaskSchedulerSerivce taskSchedulerService = new MockLocalTaskSchedulerSerivce - (mock(TaskSchedulerAppCallback.class), mock(ContainerSignatureMatcher.class), "", 0, "", appContext); - taskSchedulerService.init(new Configuration()); + + TaskSchedulerContext mockContext = TestTaskSchedulerHelpers + .setupMockTaskSchedulerContext("", 0, "", false, appAttemptId, 10000l, null, new Configuration()); + + MockLocalTaskSchedulerSerivce taskSchedulerService = + new MockLocalTaskSchedulerSerivce(mockContext); + + taskSchedulerService.initialize(); taskSchedulerService.start(); Task task = mock(Task.class); @@ -130,33 +132,29 @@ public class TestLocalTaskSchedulerService { requestHandler.drainRequest(2); assertEquals(1, requestHandler.deallocateCount); assertEquals(1, requestHandler.allocateCount); - taskSchedulerService.stop(); + taskSchedulerService.shutdown(); } static class MockLocalTaskSchedulerSerivce extends LocalTaskSchedulerService { private MockAsyncDelegateRequestHandler requestHandler; - public MockLocalTaskSchedulerSerivce(TaskSchedulerAppCallback appClient, - ContainerSignatureMatcher containerSignatureMatcher, - String appHostName, int appHostPort, String appTrackingUrl, - AppContext appContext) { - super(appClient, containerSignatureMatcher, appHostName, appHostPort, - appTrackingUrl, 10000l, appContext); + public MockLocalTaskSchedulerSerivce(TaskSchedulerContext appClient) { + super(appClient); } @Override public AsyncDelegateRequestHandler createRequestHandler(Configuration conf) { requestHandler = new MockAsyncDelegateRequestHandler(taskRequestQueue, - new LocalContainerFactory(appContext, customContainerAppId), + new LocalContainerFactory(getContext().getApplicationAttemptId(), customContainerAppId), taskAllocations, - appClientDelegate, + getContext(), conf); return requestHandler; } @Override - public void serviceStart() { + public void start() { // don't start RequestHandler thread, control it in unit test } @@ -178,7 +176,7 @@ public class TestLocalTaskSchedulerService { BlockingQueue<TaskRequest> taskRequestQueue, LocalContainerFactory localContainerFactory, HashMap<Object, Container> taskAllocations, - TaskSchedulerAppCallback appClientDelegate, Configuration conf) { + TaskSchedulerContext appClientDelegate, Configuration conf) { super(taskRequestQueue, localContainerFactory, taskAllocations, appClientDelegate, conf); } http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java index 807e772..123a4d7 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java @@ -18,6 +18,8 @@ package org.apache.tez.dag.app.rm; +import static org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.createCountingExecutingService; +import static org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.setupMockTaskSchedulerContext; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -42,8 +44,11 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; @@ -59,23 +64,21 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.util.RackResolver; +import org.apache.tez.common.ContainerSignatureMatcher; import org.apache.tez.common.MockDNSToSwitchMapping; import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.dag.app.AppContext; -import org.apache.tez.dag.app.DAGAppMasterState; import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.CookieContainerRequest; import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.HeldContainer; -import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback; -import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback.AppFinalStatus; -import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerAppCallbackDrainable; -import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback; +import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerContextDrainable; +import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerWithDrainableContext; import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher; import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.PreemptionMatcher; -import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher; +import org.apache.tez.serviceplugins.api.TaskSchedulerContext; +import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AppFinalStatus; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -88,23 +91,39 @@ import com.google.common.collect.Sets; public class TestTaskScheduler { - RecordFactory recordFactory = - RecordFactoryProvider.getRecordFactory(null); - static ContainerSignatureMatcher containerSignatureMatcher = new AlwaysMatchesContainerMatcher(); + private ExecutorService contextCallbackExecutor; @BeforeClass public static void beforeClass() { MockDNSToSwitchMapping.initializeMockRackResolver(); } + @Before + public void preTest() { + contextCallbackExecutor = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setNameFormat("TaskSchedulerAppCallbackExecutor #%d") + .setDaemon(true) + .build()); + } + + @After + public void postTest() { + contextCallbackExecutor.shutdownNow(); + } + + private TaskSchedulerContextDrainable createDrainableContext( + TaskSchedulerContext taskSchedulerContext) { + TaskSchedulerContextImplWrapper wrapper = + new TaskSchedulerContextImplWrapper(taskSchedulerContext, + createCountingExecutingService(contextCallbackExecutor)); + return new TaskSchedulerContextDrainable(wrapper); + } + @SuppressWarnings({ "unchecked" }) @Test(timeout=10000) public void testTaskSchedulerNoReuse() throws Exception { RackResolver.init(new YarnConfiguration()); - TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class); - AppContext mockAppContext = mock(AppContext.class); - when(mockAppContext.getAMState()).thenReturn(DAGAppMasterState.RUNNING); TezAMRMClientAsync<CookieContainerRequest> mockRMClient = mock(TezAMRMClientAsync.class); @@ -112,18 +131,19 @@ public class TestTaskScheduler { String appHost = "host"; int appPort = 0; String appUrl = "url"; - TaskSchedulerWithDrainableAppCallback scheduler = - new TaskSchedulerWithDrainableAppCallback( - mockApp, new AlwaysMatchesContainerMatcher(), appHost, appPort, - appUrl, mockRMClient, mockAppContext); - TaskSchedulerAppCallbackDrainable drainableAppCallback = scheduler - .getDrainableAppCallback(); Configuration conf = new Configuration(); conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false); int interval = 100; conf.setInt(TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX, interval); - scheduler.init(conf); + + TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf); + TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp); + + TaskSchedulerWithDrainableContext scheduler = + new TaskSchedulerWithDrainableContext(drainableAppCallback, mockRMClient); + + scheduler.initialize(); drainableAppCallback.drain(); verify(mockRMClient).init(conf); verify(mockRMClient).setHeartbeatInterval(interval); @@ -495,22 +515,18 @@ public class TestTaskScheduler { AppFinalStatus finalStatus = new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl); when(mockApp.getFinalAppStatus()).thenReturn(finalStatus); - scheduler.stop(); + scheduler.shutdown(); drainableAppCallback.drain(); verify(mockRMClient). unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, - appMsg, appUrl); + appMsg, appUrl); verify(mockRMClient).stop(); - scheduler.close(); } @SuppressWarnings({ "unchecked" }) @Test(timeout=10000) public void testTaskSchedulerWithReuse() throws Exception { RackResolver.init(new YarnConfiguration()); - TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class); - AppContext mockAppContext = mock(AppContext.class); - when(mockAppContext.getAMState()).thenReturn(DAGAppMasterState.RUNNING); TezAMRMClientAsync<CookieContainerRequest> mockRMClient = mock(TezAMRMClientAsync.class); @@ -518,12 +534,6 @@ public class TestTaskScheduler { String appHost = "host"; int appPort = 0; String appUrl = "url"; - TaskSchedulerWithDrainableAppCallback scheduler = - new TaskSchedulerWithDrainableAppCallback( - mockApp, new AlwaysMatchesContainerMatcher(), appHost, appPort, - appUrl, mockRMClient, mockAppContext); - final TaskSchedulerAppCallbackDrainable drainableAppCallback = scheduler - .getDrainableAppCallback(); Configuration conf = new Configuration(); // to match all in the same pass @@ -531,7 +541,15 @@ public class TestTaskScheduler { // to release immediately after deallocate conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0); conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0); - scheduler.init(conf); + + TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf); + final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp); + + TaskSchedulerWithDrainableContext scheduler = + new TaskSchedulerWithDrainableContext(drainableAppCallback, mockRMClient); + + + scheduler.initialize(); drainableAppCallback.drain(); RegisterApplicationMasterResponse mockRegResponse = @@ -992,23 +1010,18 @@ public class TestTaskScheduler { AppFinalStatus finalStatus = new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl); when(mockApp.getFinalAppStatus()).thenReturn(finalStatus); - scheduler.stop(); + scheduler.shutdown(); drainableAppCallback.drain(); verify(mockRMClient). unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, - appMsg, appUrl); + appMsg, appUrl); verify(mockRMClient).stop(); - scheduler.close(); } @SuppressWarnings("unchecked") @Test (timeout=5000) public void testTaskSchedulerDetermineMinHeldContainers() throws Exception { RackResolver.init(new YarnConfiguration()); - TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class); - AppContext mockAppContext = mock(AppContext.class); - when(mockAppContext.getAMState()).thenReturn(DAGAppMasterState.RUNNING); - when(mockAppContext.isSession()).thenReturn(true); TezAMRMClientAsync<CookieContainerRequest> mockRMClient = mock(TezAMRMClientAsync.class); @@ -1016,15 +1029,15 @@ public class TestTaskScheduler { String appHost = "host"; int appPort = 0; String appUrl = "url"; - TaskSchedulerWithDrainableAppCallback scheduler = - new TaskSchedulerWithDrainableAppCallback( - mockApp, new AlwaysMatchesContainerMatcher(), appHost, appPort, - appUrl, mockRMClient, mockAppContext); - TaskSchedulerAppCallbackDrainable drainableAppCallback = scheduler - .getDrainableAppCallback(); - Configuration conf = new Configuration(); - scheduler.init(conf); + TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, true, + new Configuration()); + final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp); + + TaskSchedulerWithDrainableContext scheduler = + new TaskSchedulerWithDrainableContext(drainableAppCallback, mockRMClient); + + scheduler.initialize(); RegisterApplicationMasterResponse mockRegResponse = mock(RegisterApplicationMasterResponse.class); Resource mockMaxResource = mock(Resource.class); Map<ApplicationAccessType, String> mockAcls = mock(Map.class); @@ -1176,17 +1189,13 @@ public class TestTaskScheduler { AppFinalStatus finalStatus = new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl); when(mockApp.getFinalAppStatus()).thenReturn(finalStatus); - scheduler.stop(); - scheduler.close(); + scheduler.shutdown(); } @SuppressWarnings("unchecked") @Test(timeout=5000) public void testTaskSchedulerRandomReuseExpireTime() throws Exception { RackResolver.init(new YarnConfiguration()); - TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class); - AppContext mockAppContext = mock(AppContext.class); - when(mockAppContext.getAMState()).thenReturn(DAGAppMasterState.RUNNING); TezAMRMClientAsync<CookieContainerRequest> mockRMClient = mock(TezAMRMClientAsync.class); @@ -1194,25 +1203,31 @@ public class TestTaskScheduler { String appHost = "host"; int appPort = 0; String appUrl = "url"; - TaskSchedulerWithDrainableAppCallback scheduler1 = - new TaskSchedulerWithDrainableAppCallback( - mockApp, new AlwaysMatchesContainerMatcher(), appHost, appPort, - appUrl, mockRMClient, mockAppContext); - TaskSchedulerWithDrainableAppCallback scheduler2 = - new TaskSchedulerWithDrainableAppCallback( - mockApp, new AlwaysMatchesContainerMatcher(), appHost, appPort, - appUrl, mockRMClient, mockAppContext); long minTime = 1000l; long maxTime = 100000l; Configuration conf1 = new Configuration(); conf1.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, minTime); conf1.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, minTime); - scheduler1.init(conf1); + Configuration conf2 = new Configuration(); conf2.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, minTime); conf2.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, maxTime); - scheduler2.init(conf2); + + TaskSchedulerContext mockApp1 = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf1); + TaskSchedulerContext mockApp2 = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf2); + final TaskSchedulerContextDrainable drainableAppCallback1 = createDrainableContext(mockApp1); + final TaskSchedulerContextDrainable drainableAppCallback2 = createDrainableContext(mockApp2); + + + TaskSchedulerWithDrainableContext scheduler1 = + new TaskSchedulerWithDrainableContext(drainableAppCallback1, mockRMClient); + TaskSchedulerWithDrainableContext scheduler2 = + new TaskSchedulerWithDrainableContext(drainableAppCallback2, mockRMClient); + + scheduler1.initialize(); + scheduler2.initialize(); + RegisterApplicationMasterResponse mockRegResponse = mock(RegisterApplicationMasterResponse.class); @@ -1250,20 +1265,16 @@ public class TestTaskScheduler { String appMsg = "success"; AppFinalStatus finalStatus = new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl); - when(mockApp.getFinalAppStatus()).thenReturn(finalStatus); - scheduler1.stop(); - scheduler1.close(); - scheduler2.stop(); - scheduler2.close(); + when(mockApp1.getFinalAppStatus()).thenReturn(finalStatus); + when(mockApp2.getFinalAppStatus()).thenReturn(finalStatus); + scheduler1.shutdown(); + scheduler2.shutdown(); } @SuppressWarnings({ "unchecked", "rawtypes" }) @Test (timeout=5000) public void testTaskSchedulerPreemption() throws Exception { RackResolver.init(new YarnConfiguration()); - TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class); - AppContext mockAppContext = mock(AppContext.class); - when(mockAppContext.getAMState()).thenReturn(DAGAppMasterState.RUNNING); TezAMRMClientAsync<CookieContainerRequest> mockRMClient = mock(TezAMRMClientAsync.class); @@ -1271,16 +1282,18 @@ public class TestTaskScheduler { String appHost = "host"; int appPort = 0; String appUrl = "url"; - final TaskSchedulerWithDrainableAppCallback scheduler = - new TaskSchedulerWithDrainableAppCallback( - mockApp, new PreemptionMatcher(), appHost, appPort, - appUrl, mockRMClient, mockAppContext); - TaskSchedulerAppCallbackDrainable drainableAppCallback = scheduler - .getDrainableAppCallback(); Configuration conf = new Configuration(); conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false); - scheduler.init(conf); + + TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, false, + null, null, new PreemptionMatcher(), conf); + final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp); + + final TaskSchedulerWithDrainableContext scheduler = + new TaskSchedulerWithDrainableContext(drainableAppCallback, mockRMClient); + + scheduler.initialize(); RegisterApplicationMasterResponse mockRegResponse = mock(RegisterApplicationMasterResponse.class); @@ -1530,7 +1543,7 @@ public class TestTaskScheduler { scheduler.getProgress(); scheduler.getProgress(); scheduler.getProgress(); - verify(mockRMClient, times(2)).releaseAssignedContainer((ContainerId)any()); + verify(mockRMClient, times(2)).releaseAssignedContainer((ContainerId) any()); scheduler.getProgress(); drainableAppCallback.drain(); // Next oldest mockTaskPri3KillA gets preempted to clear 10% of outstanding running preemptable tasks @@ -1540,9 +1553,8 @@ public class TestTaskScheduler { AppFinalStatus finalStatus = new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, "", appUrl); when(mockApp.getFinalAppStatus()).thenReturn(finalStatus); - scheduler.stop(); + scheduler.shutdown(); drainableAppCallback.drain(); - scheduler.close(); } @SuppressWarnings("unchecked") @@ -1550,22 +1562,19 @@ public class TestTaskScheduler { public void testLocalityMatching() throws Exception { RackResolver.init(new Configuration()); - TaskSchedulerAppCallback appClient = mock(TaskSchedulerAppCallback.class); TezAMRMClientAsync<CookieContainerRequest> amrmClient = mock(TezAMRMClientAsync.class); - AppContext mockAppContext = mock(AppContext.class); - when(mockAppContext.getAMState()).thenReturn(DAGAppMasterState.RUNNING); - - TaskSchedulerWithDrainableAppCallback taskScheduler = - new TaskSchedulerWithDrainableAppCallback( - appClient, new AlwaysMatchesContainerMatcher(), "host", 0, "", - amrmClient, mockAppContext); - TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler - .getDrainableAppCallback(); - + Configuration conf = new Configuration(); conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false); - taskScheduler.init(conf); + + TaskSchedulerContext appClient = setupMockTaskSchedulerContext("host", 0, "", conf); + final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(appClient); + + TaskSchedulerWithDrainableContext taskScheduler = + new TaskSchedulerWithDrainableContext(drainableAppCallback, amrmClient); + + taskScheduler.initialize(); RegisterApplicationMasterResponse mockRegResponse = mock(RegisterApplicationMasterResponse.class); Resource mockMaxResource = mock(Resource.class); @@ -1693,7 +1702,7 @@ public class TestTaskScheduler { AppFinalStatus finalStatus = new AppFinalStatus( FinalApplicationStatus.SUCCEEDED, "", ""); when(appClient.getFinalAppStatus()).thenReturn(finalStatus); - taskScheduler.close(); + taskScheduler.shutdown(); } @Test (timeout=5000) http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java index 005692e..3ea0446 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java @@ -47,11 +47,13 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.tez.common.ContainerSignatureMatcher; import org.apache.tez.dag.api.TaskLocationHint; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.client.DAGClientServer; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ContainerContext; +import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService; import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl; import org.apache.tez.dag.app.dag.impl.TaskImpl; import org.apache.tez.dag.app.dag.impl.VertexImpl; @@ -61,10 +63,10 @@ import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted; import org.apache.tez.dag.app.rm.container.AMContainerEventType; import org.apache.tez.dag.app.rm.container.AMContainerMap; import org.apache.tez.dag.app.rm.container.AMContainerState; -import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher; import org.apache.tez.dag.app.web.WebUIService; import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.serviceplugins.api.TaskScheduler; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -96,6 +98,7 @@ public class TestTaskSchedulerEventHandler { protected void instantiateScheduelrs(String host, int port, String trackingUrl, AppContext appContext) { taskSchedulers[0] = mockTaskScheduler; + taskSchedulerServiceWrappers[0] = new ServicePluginLifecycleAbstractService(taskSchedulers[0]); } @Override @@ -113,7 +116,7 @@ public class TestTaskSchedulerEventHandler { TestEventHandler mockEventHandler; ContainerSignatureMatcher mockSigMatcher; MockTaskSchedulerEventHandler schedulerHandler; - TaskSchedulerService mockTaskScheduler; + TaskScheduler mockTaskScheduler; AMContainerMap mockAMContainerMap; WebUIService mockWebUIService; @@ -124,7 +127,7 @@ public class TestTaskSchedulerEventHandler { mockClientService = mock(DAGClientServer.class); mockEventHandler = new TestEventHandler(); mockSigMatcher = mock(ContainerSignatureMatcher.class); - mockTaskScheduler = mock(TaskSchedulerService.class); + mockTaskScheduler = mock(TaskScheduler.class); mockAMContainerMap = mock(AMContainerMap.class); mockWebUIService = mock(WebUIService.class); when(mockAppContext.getAllContainers()).thenReturn(mockAMContainerMap); http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java index 04610ab..966c95a 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java @@ -40,9 +40,13 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -54,13 +58,12 @@ import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.tez.common.ContainerSignatureMatcher; import org.apache.tez.dag.app.AppContext; +import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService; import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.CookieContainerRequest; -import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback; -import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; +import org.apache.tez.serviceplugins.api.TaskScheduler; +import org.apache.tez.serviceplugins.api.TaskSchedulerContext; class TestTaskSchedulerHelpers { @@ -134,12 +137,19 @@ class TestTaskSchedulerHelpers { @Override public void instantiateScheduelrs(String host, int port, String trackingUrl, AppContext appContext) { - taskSchedulers[0] = new TaskSchedulerWithDrainableAppCallback(new TaskSchedulerAppCallbackImpl(this, 0), - containerSignatureMatcher, host, port, trackingUrl, amrmClientAsync, - appContext); - } - - public TaskSchedulerService getSpyTaskScheduler() { + TaskSchedulerContext taskSchedulerContext = + new TaskSchedulerContextImpl(this, appContext, 0, trackingUrl, 1000, host, port, + getConfig()); + TaskSchedulerContextImplWrapper wrapper = + new TaskSchedulerContextImplWrapper(taskSchedulerContext, + new CountingExecutorService(appCallbackExecutor)); + TaskSchedulerContextDrainable drainable = new TaskSchedulerContextDrainable(wrapper); + taskSchedulers[0] = + new TaskSchedulerWithDrainableContext(drainable, amrmClientAsync); + taskSchedulerServiceWrappers[0] = new ServicePluginLifecycleAbstractService(taskSchedulers[0]); + } + + public TaskScheduler getSpyTaskScheduler() { return taskSchedulers[0]; } @@ -147,8 +157,8 @@ class TestTaskSchedulerHelpers { public void serviceStart() { instantiateScheduelrs("host", 0, "", appContext); // Init the service so that reuse configuration is picked up. - ((AbstractService)taskSchedulers[0]).init(getConfig()); - ((AbstractService)taskSchedulers[0]).start(); + ((AbstractService)taskSchedulerServiceWrappers[0]).init(getConfig()); + ((AbstractService)taskSchedulerServiceWrappers[0]).start(); taskSchedulers[0] = spy(taskSchedulers[0]); } @@ -188,61 +198,31 @@ class TestTaskSchedulerHelpers { } } - static class TaskSchedulerWithDrainableAppCallback extends YarnTaskSchedulerService { + static class TaskSchedulerWithDrainableContext extends YarnTaskSchedulerService { - private TaskSchedulerAppCallbackDrainable drainableAppCallback; - public TaskSchedulerWithDrainableAppCallback( - TaskSchedulerAppCallback appClient, - ContainerSignatureMatcher containerSignatureMatcher, - String appHostName, int appHostPort, String appTrackingUrl, - AppContext appContext) { - super(appClient, containerSignatureMatcher, appHostName, appHostPort, - appTrackingUrl, appContext); + public TaskSchedulerWithDrainableContext( + TaskSchedulerContextDrainable appClient, + TezAMRMClientAsync<CookieContainerRequest> client) { + super(appClient, client); shouldUnregister.set(true); } - public TaskSchedulerWithDrainableAppCallback( - TaskSchedulerAppCallback appClient, - ContainerSignatureMatcher containerSignatureMatcher, - String appHostName, int appHostPort, String appTrackingUrl, - TezAMRMClientAsync<CookieContainerRequest> client, - AppContext appContext) { - super(appClient, containerSignatureMatcher, appHostName, appHostPort, - appTrackingUrl, client, appContext); - shouldUnregister.set(true); - } - - @Override - TaskSchedulerAppCallback createAppCallbackDelegate( - TaskSchedulerAppCallback realAppClient) { - drainableAppCallback = new TaskSchedulerAppCallbackDrainable( - new TaskSchedulerAppCallbackWrapper(realAppClient, - appCallbackExecutor)); - return drainableAppCallback; - } - - @Override - ExecutorService createAppCallbackExecutorService() { - ExecutorService real = super.createAppCallbackExecutorService(); - return new CountingExecutorService(real); - } - - public TaskSchedulerAppCallbackDrainable getDrainableAppCallback() { - return drainableAppCallback; + public TaskSchedulerContextDrainable getDrainableAppCallback() { + return (TaskSchedulerContextDrainable)getContext(); } } @SuppressWarnings("rawtypes") - static class TaskSchedulerAppCallbackDrainable implements TaskSchedulerAppCallback { + static class TaskSchedulerContextDrainable implements TaskSchedulerContext { int completedEvents; int invocations; - private TaskSchedulerAppCallback real; + private TaskSchedulerContext real; private CountingExecutorService countingExecutorService; final AtomicInteger count = new AtomicInteger(0); - public TaskSchedulerAppCallbackDrainable(TaskSchedulerAppCallbackWrapper real) { - countingExecutorService = (CountingExecutorService) real.executorService; + public TaskSchedulerContextDrainable(TaskSchedulerContextImplWrapper real) { + countingExecutorService = (CountingExecutorService) real.getExecutorService(); this.real = real; } @@ -303,6 +283,53 @@ class TestTaskSchedulerHelpers { return real.getFinalAppStatus(); } + // Not incrementing invocations for methods which to not obtain locks, + // and do not go via the executor service. + @Override + public Configuration getInitialConfiguration() { + return real.getInitialConfiguration(); + } + + @Override + public String getAppTrackingUrl() { + return real.getAppTrackingUrl(); + } + + @Override + public long getCustomClusterIdentifier() { + return real.getCustomClusterIdentifier(); + } + + @Override + public ContainerSignatureMatcher getContainerSignatureMatcher() { + return real.getContainerSignatureMatcher(); + } + + @Override + public ApplicationAttemptId getApplicationAttemptId() { + return real.getApplicationAttemptId(); + } + + @Override + public String getAppHostName() { + return real.getAppHostName(); + } + + @Override + public int getAppClientPort() { + return real.getAppClientPort(); + } + + @Override + public boolean isSession() { + return real.isSession(); + } + + @Override + public AMState getAMState() { + return real.getAMState(); + } + @Override public void preemptContainer(ContainerId cId) { invocations++; @@ -384,7 +411,11 @@ class TestTaskSchedulerHelpers { } } } - + + static CountingExecutorService createCountingExecutingService(ExecutorService rawExecutor) { + return new CountingExecutorService(rawExecutor); + } + @SuppressWarnings({"rawtypes", "unchecked"}) private static class CountingExecutorService implements ExecutorService { @@ -464,7 +495,50 @@ class TestTaskSchedulerHelpers { throws InterruptedException, ExecutionException, TimeoutException { throw new UnsupportedOperationException("Not expected to be used"); } - + } + + static TaskSchedulerContext setupMockTaskSchedulerContext(String appHost, int appPort, + String appUrl, Configuration conf) { + return setupMockTaskSchedulerContext(appHost, appPort, appUrl, false, conf); + } + + static TaskSchedulerContext setupMockTaskSchedulerContext(String appHost, int appPort, + String appUrl, boolean isSession, + Configuration conf) { + return setupMockTaskSchedulerContext(appHost, appPort, appUrl, isSession, null, null, null, + conf); + } + + static TaskSchedulerContext setupMockTaskSchedulerContext(String appHost, int appPort, + String appUrl, boolean isSession, + ApplicationAttemptId appAttemptId, + Long customAppIdentifier, + ContainerSignatureMatcher containerSignatureMatcher, + Configuration conf) { + + TaskSchedulerContext mockContext = mock(TaskSchedulerContext.class); + when(mockContext.getAppHostName()).thenReturn(appHost); + when(mockContext.getAppClientPort()).thenReturn(appPort); + when(mockContext.getAppTrackingUrl()).thenReturn(appUrl); + + when(mockContext.getAMState()).thenReturn(TaskSchedulerContext.AMState.RUNNING_APP); + when(mockContext.getInitialConfiguration()).thenReturn(conf); + when(mockContext.isSession()).thenReturn(isSession); + if (containerSignatureMatcher != null) { + when(mockContext.getContainerSignatureMatcher()) + .thenReturn(containerSignatureMatcher); + } else { + when(mockContext.getContainerSignatureMatcher()) + .thenReturn(new AlwaysMatchesContainerMatcher()); + } + if (appAttemptId != null) { + when(mockContext.getApplicationAttemptId()).thenReturn(appAttemptId); + } + if (customAppIdentifier != null) { + when(mockContext.getCustomClusterIdentifier()).thenReturn(customAppIdentifier); + } + + return mockContext; } }
