TEZ-2005. Define basic interface for pluggable TaskScheduler. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/cebbb019 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/cebbb019 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/cebbb019 Branch: refs/heads/master Commit: cebbb0192602baca4da2352b82f18317595d9247 Parents: eec648d Author: Siddharth Seth <[email protected]> Authored: Wed Jul 22 22:25:01 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Fri Aug 21 18:14:40 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../tez/common/ContainerSignatureMatcher.java | 64 ++++ .../tez/common/ServicePluginLifecycle.java | 39 ++ .../tez/serviceplugins/api/TaskScheduler.java | 87 +++++ .../api/TaskSchedulerContext.java | 114 ++++++ .../org/apache/tez/common/TezUtilsInternal.java | 1 + .../tez/dag/api/TaskCommunicatorInterface.java | 18 - .../org/apache/tez/dag/app/DAGAppMaster.java | 3 +- .../ServicePluginLifecycleAbstractService.java | 52 +++ .../dag/app/rm/LocalTaskSchedulerService.java | 77 ++-- .../app/rm/TaskSchedulerAppCallbackImpl.java | 89 ----- .../app/rm/TaskSchedulerAppCallbackWrapper.java | 307 ---------------- .../dag/app/rm/TaskSchedulerContextImpl.java | 174 +++++++++ .../app/rm/TaskSchedulerContextImplWrapper.java | 368 +++++++++++++++++++ .../dag/app/rm/TaskSchedulerEventHandler.java | 81 ++-- .../tez/dag/app/rm/TaskSchedulerService.java | 113 ------ .../dag/app/rm/YarnTaskSchedulerService.java | 121 +++--- .../dag/app/rm/container/AMContainerImpl.java | 1 + .../dag/app/rm/container/AMContainerMap.java | 1 + .../rm/container/ContainerContextMatcher.java | 1 + .../rm/container/ContainerSignatureMatcher.java | 60 --- .../tez/dag/app/rm/TestContainerReuse.java | 148 ++------ .../tez/dag/app/rm/TestLocalTaskScheduler.java | 29 +- .../app/rm/TestLocalTaskSchedulerService.java | 52 ++- .../tez/dag/app/rm/TestTaskScheduler.java | 199 +++++----- .../app/rm/TestTaskSchedulerEventHandler.java | 9 +- .../dag/app/rm/TestTaskSchedulerHelpers.java | 186 +++++++--- .../rm/TezTestServiceTaskSchedulerService.java | 66 +--- 28 files changed, 1358 insertions(+), 1103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/cebbb019/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index 88dd0c7..a51669d 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -35,5 +35,6 @@ ALL CHANGES: TEZ-2621. rebase 07/14 TEZ-2124. Change Node tracking to work per external container source. TEZ-2004. Define basic interface for pluggable ContainerLaunchers. + TEZ-2005. Define basic interface for pluggable TaskScheduler. INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/cebbb019/tez-api/src/main/java/org/apache/tez/common/ContainerSignatureMatcher.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/ContainerSignatureMatcher.java b/tez-api/src/main/java/org/apache/tez/common/ContainerSignatureMatcher.java new file mode 100644 index 0000000..c0a1245 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/common/ContainerSignatureMatcher.java @@ -0,0 +1,64 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.common; + +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.LocalResource; + [email protected] [email protected] +public interface ContainerSignatureMatcher { + /** + * Checks the compatibility between the specified container signatures. + * + * @return true if the first signature is a super set of the second + * signature. + */ + public boolean isSuperSet(Object cs1, Object cs2); + + /** + * Checks if the container signatures match exactly + * @return true if exact match + */ + public boolean isExactMatch(Object cs1, Object cs2); + + /** + * Gets additional resources specified in lr2, which are not present for lr1 + * + * @param lr1 + * @param lr2 + * @return additional resources specified in lr2, which are not present for lr1 + */ + public Map<String, LocalResource> getAdditionalResources(Map<String, LocalResource> lr1, + Map<String, LocalResource> lr2); + + + /** + * Do a union of 2 signatures + * Pre-condition. This function should only be invoked iff cs1 is compatible with cs2. + * i.e. isSuperSet should not return false. + * @param cs1 Signature 1 Original signature + * @param cs2 Signature 2 New signature + * @return Union of 2 signatures + */ + public Object union(Object cs1, Object cs2); + +} http://git-wip-us.apache.org/repos/asf/tez/blob/cebbb019/tez-api/src/main/java/org/apache/tez/common/ServicePluginLifecycle.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/ServicePluginLifecycle.java b/tez-api/src/main/java/org/apache/tez/common/ServicePluginLifecycle.java new file mode 100644 index 0000000..2eaa7be --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/common/ServicePluginLifecycle.java @@ -0,0 +1,39 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.common; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + [email protected] [email protected] +public interface ServicePluginLifecycle { + + /** + * Perform any additional initialization which may be required beyond the constructor. + */ + void initialize() throws Exception; + + /** + * Start the service. This will be invoked after initialization. + */ + void start() throws Exception; + + /** + * Shutdown the service. This will be invoked when the service is shutting down. + */ + void shutdown() throws Exception; + +} http://git-wip-us.apache.org/repos/asf/tez/blob/cebbb019/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java new file mode 100644 index 0000000..9594612 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java @@ -0,0 +1,87 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.serviceplugins.api; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.tez.common.ServicePluginLifecycle; + [email protected] [email protected] +public abstract class TaskScheduler implements ServicePluginLifecycle { + + private final TaskSchedulerContext taskSchedulerContext; + + public TaskScheduler(TaskSchedulerContext taskSchedulerContext) { + this.taskSchedulerContext = taskSchedulerContext; + } + + @Override + public void initialize() throws Exception { + } + + @Override + public void start() throws Exception { + } + + @Override + public void shutdown() throws Exception { + } + + public void initiateStop() { + } + + public abstract Resource getAvailableResources(); + + public abstract int getClusterNodeCount(); + + public abstract void dagComplete(); + + public abstract Resource getTotalResources(); + + public abstract void blacklistNode(NodeId nodeId); + + public abstract void unblacklistNode(NodeId nodeId); + + public abstract void allocateTask(Object task, Resource capability, + String[] hosts, String[] racks, Priority priority, + Object containerSignature, Object clientCookie); + + /** + * Allocate affinitized to a specific container + */ + public abstract void allocateTask(Object task, Resource capability, + ContainerId containerId, Priority priority, Object containerSignature, + Object clientCookie); + + /** Plugin writers must ensure to de-allocate a container once it's done, so that it can be collected. */ + public abstract boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason); + + public abstract Object deallocateContainer(ContainerId containerId); + + public abstract void setShouldUnregister(); + + public abstract boolean hasUnregistered(); + + + public final TaskSchedulerContext getContext() { + return taskSchedulerContext; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/cebbb019/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java new file mode 100644 index 0000000..b2c8799 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java @@ -0,0 +1,114 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.serviceplugins.api; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.tez.common.ContainerSignatureMatcher; + [email protected] [email protected] + +public interface TaskSchedulerContext { + + public class AppFinalStatus { + public final FinalApplicationStatus exitStatus; + public final String exitMessage; + public final String postCompletionTrackingUrl; + public AppFinalStatus(FinalApplicationStatus exitStatus, + String exitMessage, + String posCompletionTrackingUrl) { + this.exitStatus = exitStatus; + this.exitMessage = exitMessage; + this.postCompletionTrackingUrl = posCompletionTrackingUrl; + } + } + + enum AMState { + IDLE, RUNNING_APP, COMPLETED + } + + // TODO Post TEZ-2003. Remove references to YARN constructs like Container, ContainerStatus, NodeReport + // upcall to app must be outside locks + public void taskAllocated(Object task, + Object appCookie, + Container container); + // this may end up being called for a task+container pair that the app + // has not heard about. this can happen because of a race between + // taskAllocated() upcall and deallocateTask() downcall + public void containerCompleted(Object taskLastAllocated, + ContainerStatus containerStatus); + public void containerBeingReleased(ContainerId containerId); + public void nodesUpdated(List<NodeReport> updatedNodes); + public void appShutdownRequested(); + + // TODO Post TEZ-2003, this method specifically needs some cleaning up. + // ClientAMSecretKey is only relevant when running under YARN. As are ApplicationACLs. + public void setApplicationRegistrationData( + Resource maxContainerCapability, + Map<ApplicationAccessType, String> appAcls, + ByteBuffer clientAMSecretKey + ); + public void onError(Throwable t); + public float getProgress(); + public void preemptContainer(ContainerId containerId); + + // TODO Post TEZ-2003. Another method which is primarily relevant to YARN clusters for unregistration. + public AppFinalStatus getFinalAppStatus(); + + + // Getters + + // TODO TEZ-2003. To be replaced by getInitialPayload + public Configuration getInitialConfiguration(); + + public String getAppTrackingUrl(); + + /** + * A custom cluster identifier allocated to schedulers to generate an AppId, if not making + * use of YARN + * @return + */ + public long getCustomClusterIdentifier(); + + public ContainerSignatureMatcher getContainerSignatureMatcher(); + + /** + * Get the application attempt id for the running application. Relevant when running under YARN + * @return + */ + public ApplicationAttemptId getApplicationAttemptId(); + + public String getAppHostName(); + + public int getAppClientPort(); + + public boolean isSession(); + + public AMState getAMState(); +} http://git-wip-us.apache.org/repos/asf/tez/blob/cebbb019/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java index 4c8c227..532e83c 100644 --- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java +++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java @@ -45,6 +45,7 @@ import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.records.DAGProtos; import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto; import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair; +import org.apache.tez.serviceplugins.api.TaskSchedulerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/tez/blob/cebbb019/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorInterface.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorInterface.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorInterface.java deleted file mode 100644 index 022cd7b..0000000 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorInterface.java +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.api; - -public interface TaskCommunicatorInterface { -} http://git-wip-us.apache.org/repos/asf/tez/blob/cebbb019/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 04e2578..70c0bc6 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -98,7 +98,6 @@ import org.apache.hadoop.yarn.util.SystemClock; import org.apache.tez.common.AsyncDispatcher; import org.apache.tez.common.AsyncDispatcherConcurrent; import org.apache.tez.common.GcTimeUpdater; -import org.apache.tez.common.ReflectionUtils; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.TezConverterUtils; import org.apache.tez.common.TezUtilsInternal; @@ -149,7 +148,7 @@ import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler; import org.apache.tez.dag.app.rm.container.AMContainerEventType; import org.apache.tez.dag.app.rm.container.AMContainerMap; import org.apache.tez.dag.app.rm.container.ContainerContextMatcher; -import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher; +import org.apache.tez.common.ContainerSignatureMatcher; import org.apache.tez.dag.app.rm.node.AMNodeEventType; import org.apache.tez.dag.app.rm.node.AMNodeTracker; import org.apache.tez.dag.app.web.WebUIService; http://git-wip-us.apache.org/repos/asf/tez/blob/cebbb019/tez-dag/src/main/java/org/apache/tez/dag/app/ServicePluginLifecycleAbstractService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/ServicePluginLifecycleAbstractService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/ServicePluginLifecycleAbstractService.java new file mode 100644 index 0000000..dac1b82 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/ServicePluginLifecycleAbstractService.java @@ -0,0 +1,52 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.tez.common.ServicePluginLifecycle; + +/** + * Provides service lifecycle management over ServicePlugins using {@link AbstractService} + * @param <T> + */ +public class ServicePluginLifecycleAbstractService<T extends ServicePluginLifecycle> extends AbstractService { + + private final T service; + + public ServicePluginLifecycleAbstractService(T service) { + super(service.getClass().getName()); + this.service = service; + } + + @Override + public void serviceInit(Configuration unused) throws Exception { + service.initialize(); + } + + @Override + public void serviceStart() throws Exception { + service.start(); + } + + @Override + public void serviceStop() throws Exception { + service.shutdown(); + } + + public T getService() { + return service; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/cebbb019/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java index 365517e..1d889ae 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java @@ -20,16 +20,15 @@ package org.apache.tez.dag.app.rm; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.TimeUnit; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; import com.google.common.primitives.Ints; +import org.apache.tez.serviceplugins.api.TaskScheduler; +import org.apache.tez.serviceplugins.api.TaskSchedulerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -43,56 +42,30 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezUncheckedException; -import org.apache.tez.dag.app.AppContext; -import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher; +import org.apache.tez.common.ContainerSignatureMatcher; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -public class LocalTaskSchedulerService extends TaskSchedulerService { +public class LocalTaskSchedulerService extends TaskScheduler { private static final Logger LOG = LoggerFactory.getLogger(LocalTaskSchedulerService.class); - final TaskSchedulerAppCallback realAppClient; - final TaskSchedulerAppCallback appClientDelegate; final ContainerSignatureMatcher containerSignatureMatcher; final PriorityBlockingQueue<TaskRequest> taskRequestQueue; + final Configuration conf; AsyncDelegateRequestHandler taskRequestHandler; Thread asyncDelegateRequestThread; - final ExecutorService appCallbackExecutor; final HashMap<Object, Container> taskAllocations; - final String appHostName; - final int appHostPort; final String appTrackingUrl; - final AppContext appContext; final long customContainerAppId; - public LocalTaskSchedulerService(TaskSchedulerAppCallback appClient, - ContainerSignatureMatcher containerSignatureMatcher, String appHostName, - int appHostPort, String appTrackingUrl, long customContainerAppId, AppContext appContext) { - super(LocalTaskSchedulerService.class.getName()); - this.realAppClient = appClient; - this.appCallbackExecutor = createAppCallbackExecutorService(); - this.containerSignatureMatcher = containerSignatureMatcher; - this.appClientDelegate = createAppCallbackDelegate(appClient); - this.appHostName = appHostName; - this.appHostPort = appHostPort; - this.appTrackingUrl = appTrackingUrl; - this.appContext = appContext; + public LocalTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) { + super(taskSchedulerContext); taskRequestQueue = new PriorityBlockingQueue<TaskRequest>(); taskAllocations = new LinkedHashMap<Object, Container>(); - this.customContainerAppId = customContainerAppId; - } - - private ExecutorService createAppCallbackExecutorService() { - return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() - .setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build()); - } - - private TaskSchedulerAppCallback createAppCallbackDelegate( - TaskSchedulerAppCallback realAppClient) { - return new TaskSchedulerAppCallbackWrapper(realAppClient, - appCallbackExecutor); + this.appTrackingUrl = taskSchedulerContext.getAppTrackingUrl(); + this.containerSignatureMatcher = taskSchedulerContext.getContainerSignatureMatcher(); + this.customContainerAppId = taskSchedulerContext.getCustomClusterIdentifier(); + this.conf = taskSchedulerContext.getInitialConfiguration(); } @Override @@ -160,7 +133,7 @@ public class LocalTaskSchedulerService extends TaskSchedulerService { } @Override - public void serviceInit(Configuration conf) { + public void initialize() { taskRequestHandler = createRequestHandler(conf); asyncDelegateRequestThread = new Thread(taskRequestHandler); asyncDelegateRequestThread.setDaemon(true); @@ -168,24 +141,22 @@ public class LocalTaskSchedulerService extends TaskSchedulerService { protected AsyncDelegateRequestHandler createRequestHandler(Configuration conf) { return new AsyncDelegateRequestHandler(taskRequestQueue, - new LocalContainerFactory(appContext, customContainerAppId), + new LocalContainerFactory(getContext().getApplicationAttemptId(), customContainerAppId), taskAllocations, - appClientDelegate, + getContext(), conf); } @Override - public void serviceStart() { + public void start() { asyncDelegateRequestThread.start(); } @Override - public void serviceStop() throws InterruptedException { + public void shutdown() throws InterruptedException { if (asyncDelegateRequestThread != null) { asyncDelegateRequestThread.interrupt(); } - appCallbackExecutor.shutdownNow(); - appCallbackExecutor.awaitTermination(1000l, TimeUnit.MILLISECONDS); } @Override @@ -207,12 +178,12 @@ public class LocalTaskSchedulerService extends TaskSchedulerService { AtomicInteger nextId; final ApplicationAttemptId customAppAttemptId; - public LocalContainerFactory(AppContext appContext, long appIdLong) { + public LocalContainerFactory(ApplicationAttemptId appAttemptId, long customAppId) { this.nextId = new AtomicInteger(1); ApplicationId appId = ApplicationId - .newInstance(appIdLong, appContext.getApplicationAttemptId().getApplicationId().getId()); + .newInstance(customAppId, appAttemptId.getApplicationId().getId()); this.customAppAttemptId = ApplicationAttemptId - .newInstance(appId, appContext.getApplicationAttemptId().getAttemptId()); + .newInstance(appId, appAttemptId.getAttemptId()); } public Container createContainer(Resource capability, Priority priority) { @@ -335,18 +306,18 @@ public class LocalTaskSchedulerService extends TaskSchedulerService { final BlockingQueue<TaskRequest> taskRequestQueue; final LocalContainerFactory localContainerFactory; final HashMap<Object, Container> taskAllocations; - final TaskSchedulerAppCallback appClientDelegate; + final TaskSchedulerContext taskSchedulerContext; final int MAX_TASKS; AsyncDelegateRequestHandler(BlockingQueue<TaskRequest> taskRequestQueue, LocalContainerFactory localContainerFactory, HashMap<Object, Container> taskAllocations, - TaskSchedulerAppCallback appClientDelegate, + TaskSchedulerContext taskSchedulerContext, Configuration conf) { this.taskRequestQueue = taskRequestQueue; this.localContainerFactory = localContainerFactory; this.taskAllocations = taskAllocations; - this.appClientDelegate = appClientDelegate; + this.taskSchedulerContext = taskSchedulerContext; this.MAX_TASKS = conf.getInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS, TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS_DEFAULT); } @@ -412,13 +383,13 @@ public class LocalTaskSchedulerService extends TaskSchedulerService { Container container = localContainerFactory.createContainer(request.capability, request.priority); taskAllocations.put(request.task, container); - appClientDelegate.taskAllocated(request.task, request.clientCookie, container); + taskSchedulerContext.taskAllocated(request.task, request.clientCookie, container); } void deallocateTask(DeallocateTaskRequest request) { Container container = taskAllocations.remove(request.task); if (container != null) { - appClientDelegate.containerBeingReleased(container.getId()); + taskSchedulerContext.containerBeingReleased(container.getId()); } else { boolean deallocationBeforeAllocation = false; http://git-wip-us.apache.org/repos/asf/tez/blob/cebbb019/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackImpl.java deleted file mode 100644 index ea37e94..0000000 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackImpl.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.app.rm; - -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.yarn.api.records.ApplicationAccessType; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.NodeReport; -import org.apache.hadoop.yarn.api.records.Resource; - -public class TaskSchedulerAppCallbackImpl implements TaskSchedulerService.TaskSchedulerAppCallback{ - - private final TaskSchedulerEventHandler tseh; - private final int schedulerId; - - public TaskSchedulerAppCallbackImpl(TaskSchedulerEventHandler tseh, int schedulerId) { - this.tseh = tseh; - this.schedulerId = schedulerId; - } - - @Override - public void taskAllocated(Object task, Object appCookie, Container container) { - tseh.taskAllocated(schedulerId, task, appCookie, container); - } - - @Override - public void containerCompleted(Object taskLastAllocated, ContainerStatus containerStatus) { - tseh.containerCompleted(schedulerId, taskLastAllocated, containerStatus); - } - - @Override - public void containerBeingReleased(ContainerId containerId) { - tseh.containerBeingReleased(schedulerId, containerId); - } - - @Override - public void nodesUpdated(List<NodeReport> updatedNodes) { - tseh.nodesUpdated(schedulerId, updatedNodes); - } - - @Override - public void appShutdownRequested() { - tseh.appShutdownRequested(schedulerId); - } - - @Override - public void setApplicationRegistrationData(Resource maxContainerCapability, - Map<ApplicationAccessType, String> appAcls, - ByteBuffer clientAMSecretKey) { - tseh.setApplicationRegistrationData(schedulerId, maxContainerCapability, appAcls, clientAMSecretKey); - } - - @Override - public void onError(Throwable t) { - tseh.onError(schedulerId, t); - } - - @Override - public float getProgress() { - return tseh.getProgress(schedulerId); - } - - @Override - public void preemptContainer(ContainerId containerId) { - tseh.preemptContainer(schedulerId, containerId); - } - - @Override - public AppFinalStatus getFinalAppStatus() { - return tseh.getFinalAppStatus(); - } -} http://git-wip-us.apache.org/repos/asf/tez/blob/cebbb019/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java deleted file mode 100644 index 5de8032..0000000 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java +++ /dev/null @@ -1,307 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.app.rm; - -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; - -import org.apache.hadoop.yarn.api.records.ApplicationAccessType; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.NodeReport; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.tez.dag.api.TezUncheckedException; -import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback; - -/** - * Makes use of an ExecutionService to invoke application callbacks. Methods - * which return values wait for execution to complete - effectively waiting for - * all previous events in the queue to complete. - */ -class TaskSchedulerAppCallbackWrapper implements TaskSchedulerAppCallback { - - private TaskSchedulerAppCallback real; - - ExecutorService executorService; - - /** - * @param real the actual TaskSchedulerAppCallback - * @param executorService the ExecutorService to be used to send these events. - */ - public TaskSchedulerAppCallbackWrapper(TaskSchedulerAppCallback real, - ExecutorService executorService) { - this.real = real; - this.executorService = executorService; - } - - @Override - public void taskAllocated(Object task, Object appCookie, Container container) { - executorService.submit(new TaskAllocatedCallable(real, task, appCookie, - container)); - } - - @Override - public void containerCompleted(Object taskLastAllocated, - ContainerStatus containerStatus) { - executorService.submit(new ContainerCompletedCallable(real, - taskLastAllocated, containerStatus)); - } - - @Override - public void containerBeingReleased(ContainerId containerId) { - executorService - .submit(new ContainerBeingReleasedCallable(real, containerId)); - } - - @Override - public void nodesUpdated(List<NodeReport> updatedNodes) { - executorService.submit(new NodesUpdatedCallable(real, updatedNodes)); - } - - @Override - public void appShutdownRequested() { - executorService.submit(new AppShudownRequestedCallable(real)); - } - - @Override - public void setApplicationRegistrationData(Resource maxContainerCapability, - Map<ApplicationAccessType, String> appAcls, ByteBuffer key) { - executorService.submit(new SetApplicationRegistrationDataCallable(real, - maxContainerCapability, appAcls, key)); - } - - @Override - public void onError(Throwable t) { - executorService.submit(new OnErrorCallable(real, t)); - } - - @Override - public float getProgress() { - Future<Float> progressFuture = executorService - .submit(new GetProgressCallable(real)); - try { - return progressFuture.get(); - } catch (Exception e) { - throw new TezUncheckedException(e); - } - } - - @Override - public void preemptContainer(ContainerId containerId) { - executorService.submit(new PreemptContainerCallable(real, containerId)); - } - - @Override - public AppFinalStatus getFinalAppStatus() { - Future<AppFinalStatus> appFinalStatusFuture = executorService - .submit(new GetFinalAppStatusCallable(real)); - try { - return appFinalStatusFuture.get(); - } catch (Exception e) { - throw new TezUncheckedException(e); - } - } - - - static abstract class TaskSchedulerAppCallbackBase { - - protected TaskSchedulerAppCallback app; - - public TaskSchedulerAppCallbackBase(TaskSchedulerAppCallback app) { - this.app = app; - } - } - - static class TaskAllocatedCallable extends TaskSchedulerAppCallbackBase - implements Callable<Void> { - private final Object task; - private final Object appCookie; - private final Container container; - - public TaskAllocatedCallable(TaskSchedulerAppCallback app, Object task, - Object appCookie, Container container) { - super(app); - this.task = task; - this.appCookie = appCookie; - this.container = container; - } - - @Override - public Void call() throws Exception { - app.taskAllocated(task, appCookie, container); - return null; - } - } - - static class ContainerCompletedCallable extends TaskSchedulerAppCallbackBase - implements Callable<Void> { - - private final Object taskLastAllocated; - private final ContainerStatus containerStatus; - - public ContainerCompletedCallable(TaskSchedulerAppCallback app, - Object taskLastAllocated, ContainerStatus containerStatus) { - super(app); - this.taskLastAllocated = taskLastAllocated; - this.containerStatus = containerStatus; - } - - @Override - public Void call() throws Exception { - app.containerCompleted(taskLastAllocated, containerStatus); - return null; - } - } - - static class ContainerBeingReleasedCallable extends - TaskSchedulerAppCallbackBase implements Callable<Void> { - private final ContainerId containerId; - - public ContainerBeingReleasedCallable(TaskSchedulerAppCallback app, - ContainerId containerId) { - super(app); - this.containerId = containerId; - } - - @Override - public Void call() throws Exception { - app.containerBeingReleased(containerId); - return null; - } - } - - static class NodesUpdatedCallable extends TaskSchedulerAppCallbackBase - implements Callable<Void> { - private final List<NodeReport> updatedNodes; - - public NodesUpdatedCallable(TaskSchedulerAppCallback app, - List<NodeReport> updatedNodes) { - super(app); - this.updatedNodes = updatedNodes; - } - - @Override - public Void call() throws Exception { - app.nodesUpdated(updatedNodes); - return null; - } - } - - static class AppShudownRequestedCallable extends TaskSchedulerAppCallbackBase - implements Callable<Void> { - - public AppShudownRequestedCallable(TaskSchedulerAppCallback app) { - super(app); - } - - @Override - public Void call() throws Exception { - app.appShutdownRequested(); - return null; - } - } - - static class SetApplicationRegistrationDataCallable extends - TaskSchedulerAppCallbackBase implements Callable<Void> { - - private final Resource maxContainerCapability; - private final Map<ApplicationAccessType, String> appAcls; - private final ByteBuffer key; - - public SetApplicationRegistrationDataCallable(TaskSchedulerAppCallback app, - Resource maxContainerCapability, - Map<ApplicationAccessType, String> appAcls, - ByteBuffer key) { - super(app); - this.maxContainerCapability = maxContainerCapability; - this.appAcls = appAcls; - this.key = key; - } - - @Override - public Void call() throws Exception { - app.setApplicationRegistrationData(maxContainerCapability, appAcls, key); - return null; - } - } - - static class OnErrorCallable extends TaskSchedulerAppCallbackBase implements - Callable<Void> { - - private final Throwable throwable; - - public OnErrorCallable(TaskSchedulerAppCallback app, Throwable throwable) { - super(app); - this.throwable = throwable; - } - - @Override - public Void call() throws Exception { - app.onError(throwable); - return null; - } - } - - static class PreemptContainerCallable extends TaskSchedulerAppCallbackBase - implements Callable<Void> { - private final ContainerId containerId; - - public PreemptContainerCallable(TaskSchedulerAppCallback app, ContainerId id) { - super(app); - this.containerId = id; - } - - @Override - public Void call() throws Exception { - app.preemptContainer(containerId); - return null; - } - } - - static class GetProgressCallable extends TaskSchedulerAppCallbackBase - implements Callable<Float> { - - public GetProgressCallable(TaskSchedulerAppCallback app) { - super(app); - } - - @Override - public Float call() throws Exception { - return app.getProgress(); - } - } - - static class GetFinalAppStatusCallable extends TaskSchedulerAppCallbackBase - implements Callable<AppFinalStatus> { - - public GetFinalAppStatusCallable(TaskSchedulerAppCallback app) { - super(app); - } - - @Override - public AppFinalStatus call() throws Exception { - return app.getFinalAppStatus(); - } - } -} http://git-wip-us.apache.org/repos/asf/tez/blob/cebbb019/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java new file mode 100644 index 0000000..890870e --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java @@ -0,0 +1,174 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.rm; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.tez.common.ContainerSignatureMatcher; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.app.AppContext; +import org.apache.tez.serviceplugins.api.TaskSchedulerContext; + +public class TaskSchedulerContextImpl implements TaskSchedulerContext { + + private final TaskSchedulerEventHandler tseh; + private final AppContext appContext; + private final int schedulerId; + private final String trackingUrl; + private final long customClusterIdentifier; + private final String appHostName; + private final int clientPort; + private final Configuration conf; + + public TaskSchedulerContextImpl(TaskSchedulerEventHandler tseh, AppContext appContext, + int schedulerId, String trackingUrl, long customClusterIdentifier, + String appHostname, int clientPort, + Configuration conf) { + this.tseh = tseh; + this.appContext = appContext; + this.schedulerId = schedulerId; + this.trackingUrl = trackingUrl; + this.customClusterIdentifier = customClusterIdentifier; + this.appHostName = appHostname; + this.clientPort = clientPort; + this.conf = conf; + + } + + @Override + public void taskAllocated(Object task, Object appCookie, Container container) { + tseh.taskAllocated(schedulerId, task, appCookie, container); + } + + @Override + public void containerCompleted(Object taskLastAllocated, ContainerStatus containerStatus) { + tseh.containerCompleted(schedulerId, taskLastAllocated, containerStatus); + } + + @Override + public void containerBeingReleased(ContainerId containerId) { + tseh.containerBeingReleased(schedulerId, containerId); + } + + @Override + public void nodesUpdated(List<NodeReport> updatedNodes) { + tseh.nodesUpdated(schedulerId, updatedNodes); + } + + @Override + public void appShutdownRequested() { + tseh.appShutdownRequested(schedulerId); + } + + @Override + public void setApplicationRegistrationData(Resource maxContainerCapability, + Map<ApplicationAccessType, String> appAcls, + ByteBuffer clientAMSecretKey) { + tseh.setApplicationRegistrationData(schedulerId, maxContainerCapability, appAcls, clientAMSecretKey); + } + + @Override + public void onError(Throwable t) { + tseh.onError(schedulerId, t); + } + + @Override + public float getProgress() { + return tseh.getProgress(schedulerId); + } + + @Override + public void preemptContainer(ContainerId containerId) { + tseh.preemptContainer(schedulerId, containerId); + } + + @Override + public AppFinalStatus getFinalAppStatus() { + return tseh.getFinalAppStatus(); + } + + @Override + public Configuration getInitialConfiguration() { + return conf; + } + + + @Override + public String getAppTrackingUrl() { + return trackingUrl; + } + + @Override + public long getCustomClusterIdentifier() { + return customClusterIdentifier; + } + + @Override + public ContainerSignatureMatcher getContainerSignatureMatcher() { + return tseh.getContainerSignatureMatcher(); + } + + @Override + public ApplicationAttemptId getApplicationAttemptId() { + return appContext.getApplicationAttemptId(); + } + + @Override + public String getAppHostName() { + return appHostName; + } + + @Override + public int getAppClientPort() { + return clientPort; + } + + @Override + public boolean isSession() { + return appContext.isSession(); + } + + @Override + public AMState getAMState() { + switch (appContext.getAMState()) { + + case NEW: + case INITED: + case IDLE: + return AMState.IDLE; + case RECOVERING: + // TODO Is this correct for recovery ? + case RUNNING: + return AMState.RUNNING_APP; + case SUCCEEDED: + case FAILED: + case KILLED: + case ERROR: + return AMState.COMPLETED; + default: + throw new TezUncheckedException("Unexpected state " + appContext.getAMState()); + } + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/cebbb019/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java new file mode 100644 index 0000000..e64ef43 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java @@ -0,0 +1,368 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.rm; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.tez.common.ContainerSignatureMatcher; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.serviceplugins.api.TaskSchedulerContext; + +/** + * Makes use of an ExecutionService to invoke application callbacks. Methods + * which return values wait for execution to complete - effectively waiting for + * all previous events in the queue to complete. + */ +class TaskSchedulerContextImplWrapper implements TaskSchedulerContext { + + private TaskSchedulerContext real; + + private ExecutorService executorService; + + /** + * @param real the actual TaskSchedulerAppCallback + * @param executorService the ExecutorService to be used to send these events. + */ + public TaskSchedulerContextImplWrapper(TaskSchedulerContext real, + ExecutorService executorService) { + this.real = real; + this.executorService = executorService; + } + + @Override + public void taskAllocated(Object task, Object appCookie, Container container) { + executorService.submit(new TaskAllocatedCallable(real, task, appCookie, + container)); + } + + @Override + public void containerCompleted(Object taskLastAllocated, + ContainerStatus containerStatus) { + executorService.submit(new ContainerCompletedCallable(real, + taskLastAllocated, containerStatus)); + } + + @Override + public void containerBeingReleased(ContainerId containerId) { + executorService + .submit(new ContainerBeingReleasedCallable(real, containerId)); + } + + @Override + public void nodesUpdated(List<NodeReport> updatedNodes) { + executorService.submit(new NodesUpdatedCallable(real, updatedNodes)); + } + + @Override + public void appShutdownRequested() { + executorService.submit(new AppShudownRequestedCallable(real)); + } + + @Override + public void setApplicationRegistrationData(Resource maxContainerCapability, + Map<ApplicationAccessType, String> appAcls, ByteBuffer key) { + executorService.submit(new SetApplicationRegistrationDataCallable(real, + maxContainerCapability, appAcls, key)); + } + + @Override + public void onError(Throwable t) { + executorService.submit(new OnErrorCallable(real, t)); + } + + @Override + public float getProgress() { + Future<Float> progressFuture = executorService + .submit(new GetProgressCallable(real)); + try { + return progressFuture.get(); + } catch (Exception e) { + throw new TezUncheckedException(e); + } + } + + @Override + public void preemptContainer(ContainerId containerId) { + executorService.submit(new PreemptContainerCallable(real, containerId)); + } + + @Override + public AppFinalStatus getFinalAppStatus() { + Future<AppFinalStatus> appFinalStatusFuture = executorService + .submit(new GetFinalAppStatusCallable(real)); + try { + return appFinalStatusFuture.get(); + } catch (Exception e) { + throw new TezUncheckedException(e); + } + } + + // Getters which do not need to go through a thread. Underlying implementation + // does not use locks. + + @Override + public Configuration getInitialConfiguration() { + return real.getInitialConfiguration(); + } + + @Override + public String getAppTrackingUrl() { + return real.getAppTrackingUrl(); + } + + @Override + public long getCustomClusterIdentifier() { + return real.getCustomClusterIdentifier(); + } + + @Override + public ContainerSignatureMatcher getContainerSignatureMatcher() { + return real.getContainerSignatureMatcher(); + } + + @Override + public ApplicationAttemptId getApplicationAttemptId() { + return real.getApplicationAttemptId(); + } + + @Override + public String getAppHostName() { + return real.getAppHostName(); + } + + @Override + public int getAppClientPort() { + return real.getAppClientPort(); + } + + @Override + public boolean isSession() { + return real.isSession(); + } + + @Override + public AMState getAMState() { + return real.getAMState(); + } + // End of getters which do not need to go through a thread. Underlying implementation + // does not use locks. + + + static abstract class TaskSchedulerContextCallbackBase { + + protected TaskSchedulerContext app; + + public TaskSchedulerContextCallbackBase(TaskSchedulerContext app) { + this.app = app; + } + } + + static class TaskAllocatedCallable extends TaskSchedulerContextCallbackBase + implements Callable<Void> { + private final Object task; + private final Object appCookie; + private final Container container; + + public TaskAllocatedCallable(TaskSchedulerContext app, Object task, + Object appCookie, Container container) { + super(app); + this.task = task; + this.appCookie = appCookie; + this.container = container; + } + + @Override + public Void call() throws Exception { + app.taskAllocated(task, appCookie, container); + return null; + } + } + + static class ContainerCompletedCallable extends TaskSchedulerContextCallbackBase + implements Callable<Void> { + + private final Object taskLastAllocated; + private final ContainerStatus containerStatus; + + public ContainerCompletedCallable(TaskSchedulerContext app, + Object taskLastAllocated, ContainerStatus containerStatus) { + super(app); + this.taskLastAllocated = taskLastAllocated; + this.containerStatus = containerStatus; + } + + @Override + public Void call() throws Exception { + app.containerCompleted(taskLastAllocated, containerStatus); + return null; + } + } + + static class ContainerBeingReleasedCallable extends + TaskSchedulerContextCallbackBase implements Callable<Void> { + private final ContainerId containerId; + + public ContainerBeingReleasedCallable(TaskSchedulerContext app, + ContainerId containerId) { + super(app); + this.containerId = containerId; + } + + @Override + public Void call() throws Exception { + app.containerBeingReleased(containerId); + return null; + } + } + + static class NodesUpdatedCallable extends TaskSchedulerContextCallbackBase + implements Callable<Void> { + private final List<NodeReport> updatedNodes; + + public NodesUpdatedCallable(TaskSchedulerContext app, + List<NodeReport> updatedNodes) { + super(app); + this.updatedNodes = updatedNodes; + } + + @Override + public Void call() throws Exception { + app.nodesUpdated(updatedNodes); + return null; + } + } + + static class AppShudownRequestedCallable extends TaskSchedulerContextCallbackBase + implements Callable<Void> { + + public AppShudownRequestedCallable(TaskSchedulerContext app) { + super(app); + } + + @Override + public Void call() throws Exception { + app.appShutdownRequested(); + return null; + } + } + + static class SetApplicationRegistrationDataCallable extends + TaskSchedulerContextCallbackBase implements Callable<Void> { + + private final Resource maxContainerCapability; + private final Map<ApplicationAccessType, String> appAcls; + private final ByteBuffer key; + + public SetApplicationRegistrationDataCallable(TaskSchedulerContext app, + Resource maxContainerCapability, + Map<ApplicationAccessType, String> appAcls, + ByteBuffer key) { + super(app); + this.maxContainerCapability = maxContainerCapability; + this.appAcls = appAcls; + this.key = key; + } + + @Override + public Void call() throws Exception { + app.setApplicationRegistrationData(maxContainerCapability, appAcls, key); + return null; + } + } + + static class OnErrorCallable extends TaskSchedulerContextCallbackBase implements + Callable<Void> { + + private final Throwable throwable; + + public OnErrorCallable(TaskSchedulerContext app, Throwable throwable) { + super(app); + this.throwable = throwable; + } + + @Override + public Void call() throws Exception { + app.onError(throwable); + return null; + } + } + + static class PreemptContainerCallable extends TaskSchedulerContextCallbackBase + implements Callable<Void> { + private final ContainerId containerId; + + public PreemptContainerCallable(TaskSchedulerContext app, ContainerId id) { + super(app); + this.containerId = id; + } + + @Override + public Void call() throws Exception { + app.preemptContainer(containerId); + return null; + } + } + + static class GetProgressCallable extends TaskSchedulerContextCallbackBase + implements Callable<Float> { + + public GetProgressCallable(TaskSchedulerContext app) { + super(app); + } + + @Override + public Float call() throws Exception { + return app.getProgress(); + } + } + + static class GetFinalAppStatusCallable extends TaskSchedulerContextCallbackBase + implements Callable<AppFinalStatus> { + + public GetFinalAppStatusCallable(TaskSchedulerContext app) { + super(app); + } + + @Override + public AppFinalStatus call() throws Exception { + return app.getFinalAppStatus(); + } + } + + @VisibleForTesting + @InterfaceAudience.Private + ExecutorService getExecutorService() { + return executorService; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/cebbb019/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java index 549db14..b66e5fa 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java @@ -25,11 +25,19 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.annotations.VisibleForTesting; -import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback.AppFinalStatus; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService; +import org.apache.tez.serviceplugins.api.TaskScheduler; +import org.apache.tez.serviceplugins.api.TaskSchedulerContext; +import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AppFinalStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -62,7 +70,6 @@ import org.apache.tez.dag.app.dag.event.DAGAppMasterEvent; import org.apache.tez.dag.app.dag.event.DAGAppMasterEventSchedulingServiceError; import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType; import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned; -import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback; import org.apache.tez.dag.app.rm.container.AMContainer; import org.apache.tez.dag.app.rm.container.AMContainerEventAssignTA; import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted; @@ -70,7 +77,7 @@ import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchRequest; import org.apache.tez.dag.app.rm.container.AMContainerEventStopRequest; import org.apache.tez.dag.app.rm.container.AMContainerEventTASucceeded; import org.apache.tez.dag.app.rm.container.AMContainerState; -import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher; +import org.apache.tez.common.ContainerSignatureMatcher; import org.apache.tez.dag.app.rm.node.AMNodeEventContainerAllocated; import org.apache.tez.dag.app.rm.node.AMNodeEventNodeCountUpdated; import org.apache.tez.dag.app.rm.node.AMNodeEventStateChanged; @@ -106,7 +113,12 @@ public class TaskSchedulerEventHandler extends AbstractService implements new AtomicBoolean(false); private final WebUIService webUI; private final String[] taskSchedulerClasses; - protected final TaskSchedulerService []taskSchedulers; + protected final TaskScheduler[]taskSchedulers; + protected final ServicePluginLifecycleAbstractService []taskSchedulerServiceWrappers; + + // Single executor service shared by all Schedulers for context callbacks + @VisibleForTesting + final ExecutorService appCallbackExecutor; private final boolean isPureLocalMode; // If running in non local-only mode, the YARN task scheduler will always run to take care of @@ -147,6 +159,7 @@ public class TaskSchedulerEventHandler extends AbstractService implements this.webUI = webUI; this.historyUrl = getHistoryUrl(); this.isPureLocalMode = isPureLocalMode; + this.appCallbackExecutor = createAppCallbackExecutorService(); if (this.webUI != null) { this.webUI.setHistoryUrl(this.historyUrl); } @@ -181,7 +194,8 @@ public class TaskSchedulerEventHandler extends AbstractService implements this.yarnTaskSchedulerIndex = foundYarnTaskSchedulerIndex; } } - taskSchedulers = new TaskSchedulerService[this.taskSchedulerClasses.length]; + taskSchedulers = new TaskScheduler[this.taskSchedulerClasses.length]; + taskSchedulerServiceWrappers = new ServicePluginLifecycleAbstractService[this.taskSchedulerClasses.length]; } public Map<ApplicationAccessType, String> getApplicationAcls() { @@ -205,6 +219,12 @@ public class TaskSchedulerEventHandler extends AbstractService implements return taskSchedulers[schedulerId].getTotalResources(); } + private ExecutorService createAppCallbackExecutorService() { + return Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setNameFormat("TaskSchedulerAppCallbackExecutor #%d").setDaemon(true) + .build()); + } + public synchronized void handleEvent(AMSchedulerEvent sEvent) { LOG.info("Processing the event " + sEvent.toString()); switch (sEvent.getType()) { @@ -315,7 +335,8 @@ public class TaskSchedulerEventHandler extends AbstractService implements // stopped. // AMNodeImpl blacklisting logic does not account for KILLED attempts. sendEvent(new AMNodeEventTaskAttemptEnded(appContext.getAllContainers(). - get(attemptContainerId).getContainer().getNodeId(), event.getSchedulerId(), attemptContainerId, + get(attemptContainerId).getContainer().getNodeId(), event.getSchedulerId(), + attemptContainerId, attempt.getID(), event.getState() == TaskAttemptState.FAILED)); } } @@ -389,32 +410,30 @@ public class TaskSchedulerEventHandler extends AbstractService implements event); } - private TaskSchedulerService createTaskScheduler(String host, int port, String trackingUrl, + private TaskScheduler createTaskScheduler(String host, int port, String trackingUrl, AppContext appContext, String schedulerClassName, long customAppIdIdentifier, int schedulerId) { - TaskSchedulerAppCallback appCallback = new TaskSchedulerAppCallbackImpl(this, schedulerId); + TaskSchedulerContext rawContext = + new TaskSchedulerContextImpl(this, appContext, schedulerId, trackingUrl, + customAppIdIdentifier, host, port, getConfig()); + TaskSchedulerContext wrappedContext = new TaskSchedulerContextImplWrapper(rawContext, appCallbackExecutor); if (schedulerClassName.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) { LOG.info("Creating TaskScheduler: YarnTaskSchedulerService"); - return new YarnTaskSchedulerService(appCallback, this.containerSignatureMatcher, - host, port, trackingUrl, appContext); + return new YarnTaskSchedulerService(wrappedContext); } else if (schedulerClassName.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) { LOG.info("Creating TaskScheduler: Local TaskScheduler"); - return new LocalTaskSchedulerService(appCallback, this.containerSignatureMatcher, - host, port, trackingUrl, customAppIdIdentifier, appContext); + return new LocalTaskSchedulerService(wrappedContext); } else { LOG.info("Creating custom TaskScheduler: " + schedulerClassName); - // TODO TEZ-2003 Temporary reflection with specific parameters. Remove once there is a clean interface. - Class<? extends TaskSchedulerService> taskSchedulerClazz = - (Class<? extends TaskSchedulerService>) ReflectionUtils.getClazz(schedulerClassName); + Class<? extends TaskScheduler> taskSchedulerClazz = + (Class<? extends TaskScheduler>) ReflectionUtils.getClazz(schedulerClassName); try { - Constructor<? extends TaskSchedulerService> ctor = taskSchedulerClazz - .getConstructor(TaskSchedulerAppCallback.class, AppContext.class, String.class, - int.class, String.class, long.class, Configuration.class); + Constructor<? extends TaskScheduler> ctor = taskSchedulerClazz + .getConstructor(TaskSchedulerContext.class); ctor.setAccessible(true); - return ctor.newInstance(appCallback, appContext, host, port, trackingUrl, customAppIdIdentifier, - getConfig()); + return ctor.newInstance(wrappedContext); } catch (NoSuchMethodException e) { throw new TezUncheckedException(e); } catch (InvocationTargetException e) { @@ -444,6 +463,7 @@ public class TaskSchedulerEventHandler extends AbstractService implements customAppIdIdentifier); taskSchedulers[i] = createTaskScheduler(host, port, trackingUrl, appContext, taskSchedulerClasses[i], customAppIdIdentifier, i); + taskSchedulerServiceWrappers[i] = new ServicePluginLifecycleAbstractService(taskSchedulers[i]); } } @@ -460,8 +480,8 @@ public class TaskSchedulerEventHandler extends AbstractService implements instantiateScheduelrs(serviceAddr.getHostName(), serviceAddr.getPort(), trackingUrl, appContext); for (int i = 0 ; i < taskSchedulers.length ; i++) { - taskSchedulers[i].init(getConfig()); - taskSchedulers[i].start(); + taskSchedulerServiceWrappers[i].init(getConfig()); + taskSchedulerServiceWrappers[i].start(); if (shouldUnregisterFlag.get()) { // Flag may have been set earlier when task scheduler was not initialized // TODO TEZ-2003 Should setRegister / unregister be part of APIs when not YARN specific ? @@ -514,7 +534,7 @@ public class TaskSchedulerEventHandler extends AbstractService implements } @Override - public void serviceStop() { + public void serviceStop() throws InterruptedException { synchronized(this) { this.stopEventHandling = true; if (eventHandlingThread != null) @@ -522,9 +542,12 @@ public class TaskSchedulerEventHandler extends AbstractService implements } for (int i = 0 ; i < taskSchedulers.length ; i++) { if (taskSchedulers[i] != null) { - taskSchedulers[i].stop(); + taskSchedulerServiceWrappers[i].stop(); } } + LOG.info("Shutting down AppCallbackExecutor"); + appCallbackExecutor.shutdownNow(); + appCallbackExecutor.awaitTermination(1000l, TimeUnit.MILLISECONDS); } // TODO TEZ-2003 Consolidate TaskSchedulerAppCallback methods once these methods are moved into context @@ -720,6 +743,10 @@ public class TaskSchedulerEventHandler extends AbstractService implements } } + public ContainerSignatureMatcher getContainerSignatureMatcher() { + return containerSignatureMatcher; + } + public boolean hasUnregistered() { boolean result = true; for (int i = 0 ; i < taskSchedulers.length ; i++) { @@ -761,4 +788,10 @@ public class TaskSchedulerEventHandler extends AbstractService implements return historyUrl; } + + @VisibleForTesting + @InterfaceAudience.Private + ExecutorService getContextExecutorService() { + return appCallbackExecutor; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/cebbb019/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java deleted file mode 100644 index 2871691..0000000 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java +++ /dev/null @@ -1,113 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.tez.dag.app.rm; - -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.api.records.ApplicationAccessType; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.NodeReport; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; - -public abstract class TaskSchedulerService extends AbstractService{ - - public TaskSchedulerService(String name) { - super(name); - } - - public abstract Resource getAvailableResources(); - - public abstract int getClusterNodeCount(); - - public abstract void dagComplete(); - - public abstract Resource getTotalResources(); - - public abstract void blacklistNode(NodeId nodeId); - - public abstract void unblacklistNode(NodeId nodeId); - - public abstract void allocateTask(Object task, Resource capability, - String[] hosts, String[] racks, Priority priority, - Object containerSignature, Object clientCookie); - - /** - * Allocate affinitized to a specific container - */ - public abstract void allocateTask(Object task, Resource capability, - ContainerId containerId, Priority priority, Object containerSignature, - Object clientCookie); - - /** Plugin writers must ensure to de-allocate a container once it's done, so that it can be collected. */ - public abstract boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason); - - public abstract Object deallocateContainer(ContainerId containerId); - - public abstract void setShouldUnregister(); - - public abstract boolean hasUnregistered(); - - public abstract void initiateStop(); - - public interface TaskSchedulerAppCallback { - public class AppFinalStatus { - public final FinalApplicationStatus exitStatus; - public final String exitMessage; - public final String postCompletionTrackingUrl; - public AppFinalStatus(FinalApplicationStatus exitStatus, - String exitMessage, - String posCompletionTrackingUrl) { - this.exitStatus = exitStatus; - this.exitMessage = exitMessage; - this.postCompletionTrackingUrl = posCompletionTrackingUrl; - } - } - // upcall to app must be outside locks - public void taskAllocated(Object task, - Object appCookie, - Container container); - // this may end up being called for a task+container pair that the app - // has not heard about. this can happen because of a race between - // taskAllocated() upcall and deallocateTask() downcall - public void containerCompleted(Object taskLastAllocated, - ContainerStatus containerStatus); - public void containerBeingReleased(ContainerId containerId); - public void nodesUpdated(List<NodeReport> updatedNodes); - public void appShutdownRequested(); - public void setApplicationRegistrationData( - Resource maxContainerCapability, - Map<ApplicationAccessType, String> appAcls, - ByteBuffer clientAMSecretKey - ); - public void onError(Throwable t); - public float getProgress(); - public void preemptContainer(ContainerId containerId); - public AppFinalStatus getFinalAppStatus(); - - } -}
