http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/AbstractScheduler.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/AbstractScheduler.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/AbstractScheduler.java new file mode 100644 index 0000000..01b89ae --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/AbstractScheduler.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.appMaster; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.drill.yarn.core.ContainerRequestSpec; +import org.apache.hadoop.yarn.api.records.Resource; + +public abstract class AbstractScheduler implements Scheduler { + private static final Log LOG = LogFactory.getLog(AbstractScheduler.class); + private final String name; + private final String type; + protected TaskSpec taskSpec; + protected int priority; + protected int failCount; + protected TaskManager taskManager; + protected SchedulerState state; + protected boolean isTracked; + + public AbstractScheduler(String type, String name) { + this.type = type; + this.name = name; + taskManager = new AbstractTaskManager(); + } + + public void setTaskManager(TaskManager taskManager) { + this.taskManager = taskManager; + } + + @Override + public void registerState(SchedulerState state) { + this.state = state; + } + + @Override + public void setPriority(int priority) { + this.priority = priority; + taskSpec.containerSpec.priority = priority; + } + + @Override + public String getName() { + return name; + } + + @Override + public String getType() { + return type; + } + + @Override + public TaskManager getTaskManager() { + return taskManager; + } + + @Override + public void change(int delta) { + resize(getTarget() + delta); + } + + protected void addTasks(int n) { + LOG.info( "[" + getName( ) + "] - Adding " + n + " tasks" ); + for (int i = 0; i < n; i++) { + state.start(new Task(this, taskSpec)); + } + } + + @Override + public boolean isTracked() { + return isTracked; + } + + @Override + public ContainerRequestSpec getResource() { + return taskSpec.containerSpec; + } + + @Override + public void limitContainerSize(Resource maxResource) throws AMException { + if (taskSpec.containerSpec.memoryMb > maxResource.getMemory()) { + LOG.warn(taskSpec.name + " requires " + taskSpec.containerSpec.memoryMb + + " MB but the maximum YARN container size is " + + maxResource.getMemory() + " MB"); + taskSpec.containerSpec.memoryMb = maxResource.getMemory(); + } + if (taskSpec.containerSpec.vCores > maxResource.getVirtualCores()) { + LOG.warn(taskSpec.name + " requires " + taskSpec.containerSpec.vCores + + " vcores but the maximum YARN container size is " + + maxResource.getVirtualCores() + " vcores"); + taskSpec.containerSpec.vCores = maxResource.getVirtualCores(); + } + } + + @Override + public int getRequestTimeoutSec() { return 0; } +}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/AbstractTaskManager.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/AbstractTaskManager.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/AbstractTaskManager.java new file mode 100644 index 0000000..7acd402 --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/AbstractTaskManager.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.appMaster; + +import org.apache.drill.yarn.appMaster.Scheduler.TaskManager; +import org.apache.drill.yarn.core.LaunchSpec; + +/** + * Task manager that does nothing. + */ + +public class AbstractTaskManager implements TaskManager { + @Override + public int maxConcurrentAllocs() { + return Integer.MAX_VALUE; + } + + @Override + public void allocated(EventContext context) { + } + + @Override + public LaunchSpec getLaunchSpec(Task task) { + return task.getLaunchSpec(); + } + + @Override + public boolean stop(Task task) { return false; } + + @Override + public void completed(EventContext context) { } + + @Override + public boolean isLive(EventContext context) { return true; } +} http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/BatchScheduler.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/BatchScheduler.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/BatchScheduler.java new file mode 100644 index 0000000..8f3aaab --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/BatchScheduler.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.appMaster; + +public class BatchScheduler extends AbstractScheduler { + private int quantity; + private int completedCount; + + public BatchScheduler(String name, int quantity) { + super("batch", name); + this.quantity = quantity; + } + + @Override + public void completed(Task task) { + completedCount++; + if (task.getDisposition() != Task.Disposition.COMPLETED) { + failCount++; + } + } + + @Override + public int resize(int level) { quantity = level; return quantity; } + + @Override + public int getTarget() { return quantity; } + + @Override + public int[] getProgress() { + return new int[] { Math.min(completedCount, quantity), quantity }; + } + + @Override + public void adjust() { + int activeCount = state.getTaskCount(); + int delta = quantity - activeCount - completedCount; + if (delta < 0) { + addTasks(-delta); + } + if (delta > 0) { + cancelTasks(delta); + } + } + + /** + * Cancel any starting tasks. We don't cancel launched, in-flight tasks + * because there is no way to tell YARN to cancel tasks that are in the + * process of being launched: we have to wait for them to start + * before canceling. + * + * @param n + */ + + private void cancelTasks(int n) { + for (Task task : state.getStartingTasks()) { + state.cancel(task); + if (--n == 0) { + break; + } + } + } + + @Override + public boolean hasMoreTasks() { + return completedCount < quantity; + } + + @Override + public void requestTimedOut() { + // Not clear what to do here. Since this case is used only for testing, + // deal with this case later. + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ClusterController.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ClusterController.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ClusterController.java new file mode 100644 index 0000000..6aaa18b --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ClusterController.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.appMaster; + +import java.util.List; + +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; + +/** + * Interface which identifies the cluster controller methods that are save to + * call from the {@link Dispatcher}. Methods here are either designed to be + * called before the event threads start or after they complete. The remainder + * synchronized to coordinate between event threads. + */ + +public interface ClusterController extends RegistryHandler { + void enableFailureCheck(boolean flag); + + void registerLifecycleListener(TaskLifecycleListener listener); + + void registerScheduler(Scheduler resourceGroup); + + void setProperty(String key, Object value); + + Object getProperty(String key); + + /** + * Called after the dispatcher has started YARN and other server + * components. The controller can now begin to spin up tasks. + */ + + void started( ) throws YarnFacadeException, AMException; + + /** + * Called by the timer ("pulse") thread to trigger time-based events. + * + * @param curTime + */ + + void tick(long curTime); + + /** + * The RM has allocated one or more containers in response to container + * requests submitted to the RM. + * + * @param containers + * the set of containers provided by YARN + * @return the set of tasks to launch + */ + + void containersAllocated(List<Container> containers); + + /** + * The NM reports that a container has successfully started. + * + * @param containerId + * the container which started + */ + + void containerStarted(ContainerId containerId); + + /** + * The RM API reports that an attempt to start a container has failed locally. + * + * @param containerId + * the container that failed to launch + * @param t + * the error that occurred + */ + + void taskStartFailed(ContainerId containerId, Throwable t); + + /** + * The Node Manager reports that a container has stopped. + * + * @param containerId + */ + void containerStopped(ContainerId containerId); + + /** + * The Resource Manager reports that containers have completed with the given + * statuses. Find the task for each container and mark them as completed. + * + * @param statuses + */ + + void containersCompleted(List<ContainerStatus> statuses); + + float getProgress(); + + /** + * The Node Manager API reports that a request sent to the NM to stop a task + * has failed. + * + * @param containerId + * the container that failed to stop + * @param t + * the reason that the stop request failed + */ + + void stopTaskFailed(ContainerId containerId, Throwable t); + + /** + * Request to resize the Drill cluster by a relative amount. + * + * @param delta + * the amount of change. Can be positive (to grow) or negative (to + * shrink the cluster) + */ + + void resizeDelta(int delta); + + /** + * Request to resize the Drill cluster to the given size. + * + * @param n + * the desired cluster size + */ + + int resizeTo(int n); + + /** + * Indicates a request to gracefully shut down the cluster. + */ + + void shutDown(); + + /** + * Called by the main thread to wait for the normal shutdown of the + * controller. Such shutdown occurs when the admin sends a sutdown + * command from the UI or REST API. + * + * @return + */ + + boolean waitForCompletion(); + + void updateRMStatus(); + + void setMaxRetries(int value); + + /** + * Allow an observer to see a consistent view of the controller's + * state by performing the visit in a synchronized block. + * @param visitor + */ + + void visit( ControllerVisitor visitor ); + + /** + * Allow an observer to see a consistent view of the controller's + * task state by performing the visit in a synchronized block. + * + * @param visitor + */ + + void visitTasks( TaskVisitor visitor ); + + /** + * Return the target number of tasks that the controller seeks to maintain. + * This is the sum across all pools. + * + * @return + */ + + int getTargetCount(); + + boolean isTaskLive(int id); + + /** + * Cancels the given task, reducing the target task count. Called + * from the UI to allow the user to select the specific task to end + * when reducing cluster size. + * + * @param id + * @return + */ + + boolean cancelTask(int id); + + /** + * Whether this distribution of YARN supports disk resources. + * @return + */ + + boolean supportsDiskResource(); + + int getFreeNodeCount(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ClusterControllerImpl.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ClusterControllerImpl.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ClusterControllerImpl.java new file mode 100644 index 0000000..3c011ec --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ClusterControllerImpl.java @@ -0,0 +1,785 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.appMaster; + +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.drill.yarn.appMaster.TaskLifecycleListener.Event; +import org.apache.drill.yarn.core.DoYUtil; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; + +/** + * Controls the Drill cluster by representing the current cluster state with a + * desired state, taking corrective action to keep the cluster in the desired + * state. The cluster as a whole has a state, as do each task (node) within the + * cluster. + * <p> + * This class is designed to allow unit tests. In general, testing the + * controller on a live cluster is tedious. This class encapsulates the + * controller algorithm so it can be driven by a simulated cluster. + * <p> + * This object is shared between threads, thus synchronized. + */ + +public class ClusterControllerImpl implements ClusterController { + /** + * Controller lifecycle state. + */ + + public enum State { + /** + * Cluster is starting. Things are in a partially-built state. No tasks are + * started until the cluster moves to LIVE. + */ + + START, + + /** + * Normal operating state: the controller seeks to maintain the desired + * number of tasks. + */ + + LIVE, + + /** + * Controller is shutting down. Tasks are gracefully (where possible) ended; + * no new tasks are started. (That is, when we detect the exit of a task, + * the controller no longer immediately tries to start a replacement. + */ + + ENDING, + + /** + * The controller has shut down. All tasks and threads are stopped. The + * controller allows the main thread (which has been patiently waiting) to + * continue, allowing the AM itself to shut down. Thus, this is a very + * short-lived state. + */ + + ENDED, + + /** + * Something bad happened on start-up; the AM can't start and must shut + * down. + */ + + FAILED + } + + private final static int PRIORITY_OFFSET = 1; + + private static final Log LOG = LogFactory.getLog(ClusterControllerImpl.class); + + /** + * Signals the completion of the cluster run. The main program waits on this + * mutex until all tasks complete (batch) or the cluster is explicitly shut + * down (persistent tasks.) + */ + + private Object completionMutex = new Object(); + + /** + * Maximum number of retries for each task launch. + */ + + protected int maxRetries = 3; + + /** + * Controller state. + * + * @see {@link State} + */ + + State state = State.START; + + /** + * Definition of the task types that can be run by this controller, along with + * the target task levels for each. + */ + + private Map<String, SchedulerStateActions> taskPools = new HashMap<>(); + + /** + * List of task pools prioritized in the order in which tasks should start. + * DoY supports only one task pool at present. The idea is to, later, support + * multiple pools that represent, say, pool 1 as the minimum number of + * Drillbits to run at all times, with pool 2 as extra Drillbits to start up + * during peak demand. + * <p> + * The priority also gives rise to YARN request priorities which are the only + * tool the AM has to associate container grants with the requests to which + * they correspond. + */ + + private List<SchedulerStateActions> prioritizedGroups = new ArrayList<>(); + + /** + * Cluster-wide association of YARN container IDs to tasks. + */ + + private Set<ContainerId> allocatedContainers = new HashSet<>(); + + /** + * Cluster-wide list of active tasks. Allows lookup from container ID to task + * (and then from task to task type.) + */ + + private Map<ContainerId, Task> activeContainers = new HashMap<>(); + + /** + * Tracks the tasks that have completed: either successfully (state == ENDED) + * or failed (state == FAILED). Eventually store this information elsewhere to + * avoid cluttering memory with historical data. Entries here are static + * copies, preserving the state at the time that the task completed. + */ + + private List<Task> completedTasks = new LinkedList<>(); + + /** + * Wrapper around the YARN API. Abstracts the details of YARN operations. + */ + + private final AMYarnFacade yarn; + + /** + * Maximum number of new tasks to start on each "pulse" tick. + */ + + private int maxRequestsPerTick = 2; + + private int stopTimoutMs = 10_000; + + /** + * Time (in ms) between request to YARN to get an updated list of the node + * "inventory". + */ + + private int configPollPeriod = 60_000; + private long nextResourcePollTime; + + /** + * List of nodes available in the cluster. Necessary as part of the process of + * ensuring that we run one Drillbit per node. (The YARN blacklist only half + * works for this purpose.) + */ + + private NodeInventory nodeInventory; + + private long lastFailureCheckTime; + + private int failureCheckPeriodMs = 60_000; + + private int taskCheckPeriodMs = 10_000; + private long lastTaskCheckTime; + + /** + * To increase code modularity, add-ons (such as the ZK monitor) register as + * lifecycle listeners that are alerted to "interesting" lifecycle events. + */ + + private List<TaskLifecycleListener> lifecycleListeners = new ArrayList<>(); + + /** + * Handy mechanism for setting properties on this controller that are + * available to plugins and UI without cluttering this class with member + * variables. + */ + + private Map<String, Object> properties = new HashMap<>(); + + /** + * When enabled, allows the controller to check for failures that result in no + * drillbits running. The controller will then automatically exit as no useful + * work can be done. Disable this to make debugging easier on a single-node + * cluster (lets you, say, start a "stray" drill bit and see what happens + * without the AM exiting.) + */ + + private boolean enableFailureCheck = true; + + public ClusterControllerImpl(AMYarnFacade yarn) { + this.yarn = yarn; + } + + @Override + public void enableFailureCheck(boolean flag) { + this.enableFailureCheck = flag; + } + + /** + * Define a task type. Registration order is important: the controller starts + * task in the order that they are registered. Must happen before the YARN + * callbacks start. + * + * @param scheduler + */ + + @Override + public void registerScheduler(Scheduler scheduler) { + assert !taskPools.containsKey(scheduler.getName()); + scheduler.setPriority(taskPools.size() + PRIORITY_OFFSET); + SchedulerStateActions taskGroup = new SchedulerStateImpl(this, scheduler); + taskPools.put(taskGroup.getName(), taskGroup); + prioritizedGroups.add(taskGroup); + } + + /** + * Called when the caller has completed start-up and the controller should + * become live. + */ + + @Override + public synchronized void started() throws YarnFacadeException, AMException { + nodeInventory = new NodeInventory(yarn); + + // Verify that no resource seeks a container larger than + // what YARN can provide. Ensures a graceful exit in this + // case. + + Resource maxResource = yarn.getRegistrationResponse() + .getMaximumResourceCapability(); + for (SchedulerStateActions group : prioritizedGroups) { + group.getScheduler().limitContainerSize(maxResource); + } + state = State.LIVE; + } + + @Override + public synchronized void tick(long curTime) { + if (state == State.LIVE) { + adjustTasks(curTime); + requestContainers(); + } + if (state == State.LIVE || state == State.ENDING) { + checkTasks(curTime); + } + } + + /** + * Adjust the number of running tasks to match the desired level. + * + * @param curTime + */ + + private void adjustTasks(long curTime) { + if (enableFailureCheck && getFreeNodeCount() == 0) { + checkForFailure(curTime); + } + if (state != State.LIVE) { + return; + } + for (SchedulerStateActions group : prioritizedGroups) { + group.adjustTasks(); + } + } + + /** + * Get the approximate number of free YARN nodes (those that can + * accept a task request.) Starts with the number of nodes from + * the node inventory, then subtracts any in-flight requests (which + * do not, by definition, have node allocated.) + * <p> + * This approximation <b>does not</b> consider whether the node + * has sufficient resources to run a task; only whether the node + * itself exists. + * @return + */ + + @Override + public int getFreeNodeCount( ) { + int count = nodeInventory.getFreeNodeCount(); + for (SchedulerStateActions group : prioritizedGroups) { + count -= group.getRequestCount( ); + } + return Math.max( 0, count ); + } + + /** + * Check if the controller is unable to run any tasks. If so, and the option + * is enabled, then automatically exit since no useful work can be done. + * + * @param curTime + */ + + private void checkForFailure(long curTime) { + if (lastFailureCheckTime + failureCheckPeriodMs > curTime) { + return; + } + lastFailureCheckTime = curTime; + for (SchedulerStateActions group : prioritizedGroups) { + if (group.getTaskCount() > 0) { + return; + } + } + LOG.error( + "Application failure: no tasks are running and no nodes are available -- exiting."); + terminate(State.FAILED); + } + + /** + * Periodically check tasks, handling any timeout issues. + * + * @param curTime + */ + + private void checkTasks(long curTime) { + + // Check periodically, not on every tick. + + if (lastTaskCheckTime + taskCheckPeriodMs > curTime) { + return; + } + lastTaskCheckTime = curTime; + + // Check for task timeouts in states that have a timeout. + + EventContext context = new EventContext(this); + for (SchedulerStateActions group : prioritizedGroups) { + context.setGroup(group); + group.checkTasks(context, curTime); + } + } + + /** + * Get an update from YARN on available resources. + */ + + @Override + public void updateRMStatus() { + long curTime = System.currentTimeMillis(); + if (nextResourcePollTime > curTime) { + return; + } + + // yarnNodeCount = yarn.getNodeCount(); + // LOG.info("YARN reports " + yarnNodeCount + " nodes."); + + // Resource yarnResources = yarn.getResources(); + // if (yarnResources != null) { + // LOG.info("YARN reports " + yarnResources.getMemory() + " MB, " + + // yarnResources.getVirtualCores() + // + " vcores available."); + // } + nextResourcePollTime = curTime + configPollPeriod; + } + + /** + * Request any containers that have accumulated. + */ + + private void requestContainers() { + EventContext context = new EventContext(this); + for (SchedulerStateActions group : prioritizedGroups) { + context.setGroup(group); + if (group.requestContainers(context, maxRequestsPerTick)) { + break; + } + } + } + + @Override + public synchronized void containersAllocated(List<Container> containers) { + EventContext context = new EventContext(this); + for (Container container : containers) { + if (allocatedContainers.contains(container.getId())) { + continue; + } + + // We should never get a container on a node in the blacklist we + // sent to YARN. If we do, something is wrong. Log the error and + // reject the container. Else, bad things happen further along as + // the tracking mechanisms assume one task per node. + + String host = container.getNodeId().getHost(); + if (nodeInventory.isInUse(host)) { + LOG.error( "Host is in use, but YARN allocated a container: " + + DoYUtil.labelContainer(container) + " - container rejected." ); + yarn.releaseContainer(container); + continue; + } + + // The container is fine. + + allocatedContainers.add(container.getId()); + int priority = container.getPriority().getPriority(); + int offset = priority - PRIORITY_OFFSET; + if (offset < 0 || offset > prioritizedGroups.size()) { + LOG.error("Container allocated with unknown priority " + DoYUtil.labelContainer(container)); + continue; + } + context.setGroup(prioritizedGroups.get(offset)); + context.group.containerAllocated(context, container); + } + } + + @Override + public synchronized void containerStarted(ContainerId containerId) { + Task task = getTask(containerId); + if (task == null) { + return; + } + EventContext context = new EventContext(this, task); + context.getState().containerStarted(context); + LOG.trace("Container started: " + containerId); + } + + @Override + public synchronized void taskStartFailed(ContainerId containerId, + Throwable t) { + Task task = getTask(containerId); + if (task == null) { + return; + } + EventContext context = new EventContext(this, task); + context.getState().launchFailed(context, t); + } + + private Task getTask(ContainerId containerId) { + return activeContainers.get(containerId); + } + + @Override + public synchronized void containerStopped(ContainerId containerId) { + // Ignored because the node manager notification is very + // unreliable. Better to rely on the Resource Manager + // completion request. + // Task task = getTask(containerId); + // if (task == null) { + // return; } + // EventContext context = new EventContext(this, task); + // context.getState().containerStopped(context); + } + + @Override + public synchronized void containersCompleted(List<ContainerStatus> statuses) { + EventContext context = new EventContext(this); + for (ContainerStatus status : statuses) { + Task task = getTask(status.getContainerId()); + if (task == null) { + if (task == null) { + // Will occur if a container was allocated but rejected. + // Any other occurrence is unexpected and an error. + + LOG.warn("Container completed but no associated task state: " + status.getContainerId() ); + } + continue; + } + context.setTask(task); + context.getState().containerCompleted(context, status); + } + checkStatus(); + } + + @Override + public synchronized float getProgress() { + int numerator = 0; + int denominator = 0; + for (SchedulerStateActions group : taskPools.values()) { + Scheduler sched = group.getScheduler(); + int[] progress = sched.getProgress(); + numerator += progress[0]; + denominator += progress[1]; + } + if (numerator == 0) { + return 1; + } + return (float) denominator / (float) numerator; + } + + @Override + public synchronized void stopTaskFailed(ContainerId containerId, + Throwable t) { + Task task = getTask(containerId); + if (task == null) { + return; + } + EventContext context = new EventContext(this, task); + context.getState().stopTaskFailed(context, t); + } + + @Override + public synchronized void resizeDelta(int delta) { + // TODO: offer the delta to each scheduler in turn. + // For now, we support only one scheduler. + + prioritizedGroups.get(0).getScheduler().change(delta); + } + + @Override + public synchronized int resizeTo(int n) { + // TODO: offer the delta to each scheduler in turn. + // For now, we support only one scheduler. + + return prioritizedGroups.get(0).getScheduler().resize(n); + } + + @Override + public synchronized void shutDown() { + LOG.info("Shut down request received"); + this.state = State.ENDING; + EventContext context = new EventContext(this); + for (SchedulerStateActions group : prioritizedGroups) { + group.shutDown(context); + } + checkStatus(); + } + + @Override + public boolean waitForCompletion() { + start(); + synchronized (completionMutex) { + try { + completionMutex.wait(); + LOG.info("Controller shut down completed"); + } catch (InterruptedException e) { + // Should not happen + } + } + return succeeded(); + } + + private void start() { + yarnReport(); + } + + private void yarnReport() { + RegisterApplicationMasterResponse response = yarn.getRegistrationResponse(); + LOG.info("YARN queue: " + response.getQueue()); + Resource resource = response.getMaximumResourceCapability(); + LOG.info("YARN max resource: " + resource.getMemory() + " MB, " + + resource.getVirtualCores() + " cores"); + EnumSet<SchedulerResourceTypes> types = response + .getSchedulerResourceTypes(); + StringBuilder buf = new StringBuilder(); + String sep = ""; + for (SchedulerResourceTypes type : types) { + buf.append(sep); + buf.append(type.toString()); + sep = ", "; + } + LOG.info("YARN scheduler resource types: " + buf.toString()); + } + + /** + * Check for overall completion. We are done when either we've successfully + * run all tasks, or we've run some and given up on others. We're done when + * the number of completed or failed tasks reaches our target. + */ + + private void checkStatus() { + if (state != State.ENDING) { + return; + } + for (SchedulerStateActions group : prioritizedGroups) { + if (!group.isDone()) { + return; + } + } + terminate(State.ENDED); + } + + private void terminate(State state) { + this.state = state; + synchronized (completionMutex) { + completionMutex.notify(); + } + } + + public boolean isLive() { + return state == State.LIVE; + } + + public boolean succeeded() { + return state == State.ENDED; + } + + public void containerAllocated(Task task) { + activeContainers.put(task.getContainerId(), task); + } + + public AMYarnFacade getYarn() { + return yarn; + } + + public void containerReleased(Task task) { + activeContainers.remove(task.getContainerId()); + } + + public void taskEnded(Task task) { + completedTasks.add(task); + } + + public void taskRetried(Task task) { + Task copy = task.copy(); + copy.disposition = Task.Disposition.RETRIED; + completedTasks.add(copy); + } + + public void taskGroupCompleted(SchedulerStateActions taskGroup) { + checkStatus(); + } + + public int getMaxRetries() { + return maxRetries; + } + + public int getStopTimeoutMs() { + return stopTimoutMs; + } + + @Override + public synchronized void reserveHost(String hostName) { + nodeInventory.reserve(hostName); + } + + @Override + public synchronized void releaseHost(String hostName) { + nodeInventory.release(hostName); + } + + public NodeInventory getNodeInventory() { + return nodeInventory; + } + + @Override + public void setProperty(String key, Object value) { + properties.put(key, value); + } + + @Override + public Object getProperty(String key) { + return properties.get(key); + } + + @Override + public void registerLifecycleListener(TaskLifecycleListener listener) { + lifecycleListeners.add(listener); + } + + public void fireLifecycleChange(Event event, EventContext context) { + for (TaskLifecycleListener listener : lifecycleListeners) { + listener.stateChange(event, context); + } + } + + @Override + public void setMaxRetries(int value) { + maxRetries = value; + } + + @Override + public int getTargetCount() { + int count = 0; + for (SchedulerStateActions group : prioritizedGroups) { + count += group.getScheduler().getTarget(); + } + return count; + } + + public State getState() { + return state; + } + + @Override + public synchronized void visit(ControllerVisitor visitor) { + visitor.visit(this); + } + + public List<SchedulerStateActions> getPools() { + return prioritizedGroups; + } + + @Override + public synchronized void visitTasks(TaskVisitor visitor) { + for (SchedulerStateActions pool : prioritizedGroups) { + pool.visitTaskModels(visitor); + } + } + + public List<Task> getHistory() { + return completedTasks; + } + + @Override + public boolean isTaskLive(int id) { + for (SchedulerStateActions group : prioritizedGroups) { + Task task = group.getTask(id); + if (task != null) { + return task.isLive(); + } + } + return false; + } + + @Override + public synchronized boolean cancelTask(int id) { + for (SchedulerStateActions group : prioritizedGroups) { + Task task = group.getTask(id); + if (task != null) { + group.cancel(task); + group.getScheduler().change(-1); + return true; + } + } + LOG.warn( "Requested to cancel task, but no task found: " + id ); + return false; + } + + @Override + public synchronized void completionAck(Task task, String propertyKey) { + EventContext context = new EventContext(this); + context.setTask(task); + context.getState().completionAck(context); + if (propertyKey != null) { + task.properties.remove(propertyKey); + } + } + + @Override + public synchronized void startAck(Task task, String propertyKey, + Object value) { + if (propertyKey != null && value != null) { + task.properties.put(propertyKey, value); + } + EventContext context = new EventContext(this); + context.setTask(task); + context.getState().startAck(context); + } + + @Override + public boolean supportsDiskResource() { + return getYarn().supportsDiskResource(); + } + + @Override + public void registryDown() { shutDown( ); } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ControllerFactory.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ControllerFactory.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ControllerFactory.java new file mode 100644 index 0000000..b8d6e06 --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ControllerFactory.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.appMaster; + +public interface ControllerFactory { + public static class ControllerFactoryException extends Exception { + private static final long serialVersionUID = 1L; + + public ControllerFactoryException(String msg, Exception e) { + super(msg, e); + } + } + + Dispatcher build() throws ControllerFactoryException; +} http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ControllerVisitor.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ControllerVisitor.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ControllerVisitor.java new file mode 100644 index 0000000..5774d7d --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/ControllerVisitor.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.appMaster; + +public interface ControllerVisitor { + void visit(ClusterController controller); +} http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Dispatcher.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Dispatcher.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Dispatcher.java new file mode 100644 index 0000000..f5257e6 --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Dispatcher.java @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.appMaster; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.drill.yarn.appMaster.AMRegistrar.AMRegistrationException; +import org.apache.drill.yarn.appMaster.AMYarnFacade.YarnAppHostReport; +import org.apache.drill.yarn.core.DrillOnYarnConfig; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.client.api.async.NMClientAsync; + +/** + * Dispatches YARN, timer and ZooKeeper events to the cluster controller. + * Allows the controller to be independent of the plumbing needed to + * receive events. Divides work among + * various components to separate concerns. Three streams of events + * feed into an app master "strategy". The three streams are + * <ol> + * <li>Resource manager</li> + * <li>Node manager</li> + * <li>Timer</li> + * </ol> + * <p> + * This class is "lightly" multi-threaded: it responds to events + * from the RM, NM and timer threads. Within each of these, events + * are sequential. So, synchronization is needed across the three event + * types, but not within event types. (That is, we won't see two RM events, + * say, occurring at the same time from separate threads.) + */ + +public class Dispatcher +{ + private static final Log LOG = LogFactory.getLog(Dispatcher.class); + + /** + * Handle YARN Resource Manager events. This is a separate class to clarify + * which events are from the Resource Manager. + */ + + private class ResourceCallback implements AMRMClientAsync.CallbackHandler { + @Override + public void onContainersAllocated(List<Container> containers) { + LOG.trace("NM: Containers allocated: " + containers.size()); + controller.containersAllocated(containers); + } + + @Override + public void onContainersCompleted(List<ContainerStatus> statuses) { + LOG.trace("NM: Containers completed: " + statuses.size()); + controller.containersCompleted(statuses); + } + + @Override + public void onShutdownRequest() { + LOG.trace("RM: Shutdown request"); + controller.shutDown(); + } + + @Override + public void onNodesUpdated(List<NodeReport> updatedNodes) { + LOG.trace("RM: Nodes updated, count= " + updatedNodes.size()); + } + + @Override + public float getProgress() { + // getProgress is called on each fetch from the NM response queue. + // This is a good time to update status, even if it looks a bit + // bizarre... + + controller.updateRMStatus(); + return controller.getProgress(); + } + + @Override + public void onError(Throwable e) { + LOG.error("Fatal RM Error: " + e.getMessage()); + LOG.error("AM Shutting down!"); + controller.shutDown(); + } + } + + /** + * Handle YARN Node Manager events. This is a separate class to clarify which + * events are, in fact, from the node manager. + */ + + public class NodeCallback implements NMClientAsync.CallbackHandler { + @Override + public void onStartContainerError(ContainerId containerId, Throwable t) { + LOG.trace("CNM: ontainer start error: " + containerId, t); + controller.taskStartFailed(containerId, t); + } + + @Override + public void onContainerStarted(ContainerId containerId, + Map<String, ByteBuffer> allServiceResponse) { + LOG.trace("NM: Container started: " + containerId); + controller.containerStarted(containerId); + } + + @Override + public void onContainerStatusReceived(ContainerId containerId, + ContainerStatus containerStatus) { + LOG.trace("NM: Container status: " + containerId + " - " + + containerStatus.toString()); + } + + @Override + public void onGetContainerStatusError(ContainerId containerId, + Throwable t) { + LOG.trace("NM: Container error: " + containerId, t); + } + + @Override + public void onStopContainerError(ContainerId containerId, Throwable t) { + LOG.trace("NM: Stop container error: " + containerId, t); + controller.stopTaskFailed(containerId, t); + } + + @Override + public void onContainerStopped(ContainerId containerId) { + LOG.trace("NM: Container stopped: " + containerId); + controller.containerStopped(containerId); + } + } + + /** + * Handle timer events: a constant tick to handle time-based actions such as + * timeouts. + */ + + public class TimerCallback implements PulseRunnable.PulseCallback { + /** + * The lifecycle of each task is driven by RM and NM callbacks. We use the + * timer to start the process. While this is overkill here, in a real app, + * we'd check requested resource levels (which might change) and number of + * tasks (which might change if tasks die), and take corrective action: + * adding or removing tasks. + */ + + @Override + public void onTick(long curTime) { + for (Pollable pollable : pollables) { + pollable.tick(curTime); + } + controller.tick(curTime); + } + } + + private AMYarnFacade yarn; + private ClusterController controller; + + /** + * Add-on tools that are called once on each timer tick. + */ + + private List<Pollable> pollables = new ArrayList<>(); + + /** + * Add-ons for which the dispatcher should managed the start/end lifecycle. + */ + + private List<DispatcherAddOn> addOns = new ArrayList<>(); + private String trackingUrl; + private AMRegistrar amRegistrar; + private int httpPort; + private PulseRunnable timer; + private Thread pulseThread; + private final int timerPeriodMs; + + public Dispatcher(int timerPeriodMs) { + this.timerPeriodMs = timerPeriodMs; + } + + public void setYarn(AMYarnFacade yarn) throws YarnFacadeException { + this.yarn = yarn; + controller = new ClusterControllerImpl(yarn); + } + + public ClusterController getController() { + return controller; + } + + public void registerPollable(Pollable pollable) { + pollables.add(pollable); + } + + public void registerAddOn(DispatcherAddOn addOn) { + addOns.add(addOn); + } + + public void setHttpPort(int port) { + httpPort = port; + } + + public void setTrackingUrl(String trackingUrl) { + this.trackingUrl = trackingUrl; + } + + public String getTrackingUrl() { + return yarn.getTrackingUrl(); + } + + public void setAMRegistrar(AMRegistrar registrar) { + amRegistrar = registrar; + } + + /** + * Start the dispatcher by initializing YARN and registering the AM. + * + * @return true if successful, false if the dispatcher did not start. + */ + + public boolean start() throws YarnFacadeException { + + // Start the connection to YARN to get information about this app, and to + // create a session we can use to report problems. + + try { + setup(); + } catch (AMException e) { + String msg = e.getMessage(); + LOG.error("Fatal error: " + msg); + yarn.finish(false, msg); + return false; + } + + // Ensure that this is the only AM. If not, shut down the AM, + // reporting to YARN that this is a failure and the message explaining + // the conflict. Report this as a SUCCESS run so that YARN does not + // attempt to retry the AM. + + try { + register(); + } catch (AMRegistrationException e) { + LOG.error(e.getMessage(), e); + yarn.finish(true, e.getMessage()); + return false; + } + return true; + } + + public void run() throws YarnFacadeException { + // Only if registration is successful do we start the pulse thread + // which will cause containers to be requested. + + startTimer(); + + // Run until the controller decides to shut down. + + LOG.trace("Running"); + boolean success = controller.waitForCompletion(); + + // Shut down. + + LOG.trace("Finishing"); + finish(success, null); + } + + private void setup() throws YarnFacadeException, AMException { + LOG.trace("Starting YARN agent"); + yarn.start(new ResourceCallback(), new NodeCallback()); + String url = trackingUrl.replace("<port>", Integer.toString(httpPort)); + if (DrillOnYarnConfig.config().getBoolean(DrillOnYarnConfig.HTTP_ENABLE_SSL)) { + url = url.replace("http:", "https:"); + } + LOG.trace("Registering YARN application, URL: " + url); + yarn.register(url); + controller.started(); + + for (DispatcherAddOn addOn : addOns) { + addOn.start(controller); + } + } + + private void register() throws AMRegistrationException { + if (amRegistrar == null) { + LOG.warn( + "No AM Registrar provided: cannot check if this is the only AM for the Drill cluster."); + } else { + YarnAppHostReport rpt = yarn.getAppHostReport(); + amRegistrar.register(rpt.amHost, httpPort, rpt.appId); + } + } + + private void startTimer() { + timer = new PulseRunnable(timerPeriodMs, new TimerCallback()); + + // Start the pulse thread after registering so that we're in + // a state where we can interact with the RM. + + pulseThread = new Thread(timer); + pulseThread.setName("Pulse"); + pulseThread.start(); + } + + private void finish(boolean success, String msg) throws YarnFacadeException { + for (DispatcherAddOn addOn : addOns) { + addOn.finish(controller); + } + + LOG.trace("Shutting down YARN agent"); + + // Stop the timer thread first. This ensures that the + // timer events don't try to use the YARN API during + // shutdown. + + stopTimer(); + yarn.finish(success, msg); + } + + private void stopTimer() { + timer.stop(); + try { + pulseThread.join(); + } catch (InterruptedException e) { + // Ignore + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DispatcherAddOn.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DispatcherAddOn.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DispatcherAddOn.java new file mode 100644 index 0000000..5c7100b --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DispatcherAddOn.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.appMaster; + +/** + * Interface for an add-on to the dispatcher that + * should be started at start of the run and ended + * at the end of the run. + */ + +public interface DispatcherAddOn { + void start(ClusterController controller); + + void finish(ClusterController controller); +} http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DrillApplicationMaster.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DrillApplicationMaster.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DrillApplicationMaster.java new file mode 100644 index 0000000..c0db9a1 --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DrillApplicationMaster.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.appMaster; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.drill.yarn.appMaster.ControllerFactory.ControllerFactoryException; +import org.apache.drill.yarn.appMaster.http.WebServer; +import org.apache.drill.yarn.core.DoyConfigException; +import org.apache.drill.yarn.core.DrillOnYarnConfig; + +/** + * Application Master for Drill. The name is visible when using the "jps" + * command and is chosen to make sense on a busy YARN node. + * <p> + * To debug this AM use the customized unmanaged AM launcher in this + * jar. (The "stock" YARN version does not give you time to attach + * the debugger.) + * <pre><code> + * TARGET_JAR=/your-git-folder/drill-yarn/target/drill-yarn-1.6-SNAPSHOT.jar + * TARGET_CLASS=org.apache.drill.yarn.appMaster.ApplicationMaster + * LAUNCHER_JAR=$TARGET_JAR + * LAUNCHER_CLASS=org.apache.drill.yarn.mock.UnmanagedAMLauncher + * $HH/bin/hadoop jar $LAUNCHER_JAR \ + * $LAUNCHER_CLASS -classpath $TARGET_JAR \ + * -cmd "java -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005 \ + * $TARGET_CLASS" + * </pre></code> + */ + +public class DrillApplicationMaster { + private static final Log LOG = LogFactory + .getLog(DrillApplicationMaster.class); + + public static void main(String[] args) { + LOG.trace("Drill Application Master starting."); + + // Load the configuration. Assumes that the user's Drill-on-YARN + // configuration was archived along with the Drill software in + // the $DRILL_HOME/conf directory, and that $DRILL_HOME/conf is + // on the class-path. + + try { + DrillOnYarnConfig.load().setAmDrillHome(); + } catch (DoyConfigException e) { + System.err.println(e.getMessage()); + System.exit(-1); + } + + // Build the dispatcher using the Drillbit factory. Allows inserting + // other factories for testing, or if we need to manage a cluster of + // processes other than Drillbits. + + // Dispatcher am = (new SimpleBatchFactory( )).build( ); + // Dispatcher am = (new MockDrillbitFactory( )).build( ); + Dispatcher dispatcher; + try { + dispatcher = (new DrillControllerFactory()).build(); + } catch (ControllerFactoryException e) { + LOG.error("Setup failed, exiting: " + e.getMessage(), e); + System.exit(-1); + return; + } + + // Start the Dispatcher. This will return false if this AM conflicts with + // a running AM. + + try { + if (!dispatcher.start()) { + return; + } + } catch (Throwable e) { + LOG.error("Fatal error, exiting: " + e.getMessage(), e); + System.exit(-1); + } + + // Create and start the web server. Do this after starting the AM + // so that we don't learn about a conflict via the a web server port + // conflict. + + WebServer webServer = new WebServer(dispatcher); + try { + webServer.start(); + } catch (Exception e) { + LOG.error("Web server setup failed, exiting: " + e.getMessage(), e); + System.exit(-1); + } + + // Run the dispatcher until the cluster shuts down. + + try { + dispatcher.run(); + } catch (Throwable e) { + LOG.error("Fatal error, exiting: " + e.getMessage(), e); + System.exit(-1); + } finally { + try { + webServer.close(); + } catch (Exception e) { + // Ignore + } + } + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DrillControllerFactory.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DrillControllerFactory.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DrillControllerFactory.java new file mode 100644 index 0000000..013fdba --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DrillControllerFactory.java @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.appMaster; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.drill.yarn.core.ContainerRequestSpec; +import org.apache.drill.yarn.core.DfsFacade; +import org.apache.drill.yarn.core.DfsFacade.DfsFacadeException; +import org.apache.drill.yarn.core.DoYUtil; +import org.apache.drill.yarn.core.DoyConfigException; +import org.apache.drill.yarn.core.DrillOnYarnConfig; +import org.apache.drill.yarn.core.LaunchSpec; +import org.apache.drill.yarn.appMaster.http.AMSecurityManagerImpl; +import org.apache.drill.yarn.core.ClusterDef; +import org.apache.drill.yarn.zk.ZKClusterCoordinatorDriver; +import org.apache.drill.yarn.zk.ZKRegistry; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.records.LocalResource; + +import com.typesafe.config.Config; + +/** + * Builds a controller for a cluster of Drillbits. The AM is designed to be + * mostly generic; only this class contains knowledge that the tasks being + * managed are drillbits. This design ensures that we can add other Drill + * components in the future without the need to make major changes to the AM + * logic. + * <p> + * The controller consists of a generic dispatcher and cluster controller, along + * with a Drill-specific scheduler and task launch specification. Drill also + * includes an interface to ZooKeeper to monitor Drillbits. + * <p> + * The AM is launched by YARN. All it knows is what is in its launch environment + * or configuration files. The client must set up all the information that the + * AM needs. Static information appears in configuration files. But, dynamic + * information (or that which is inconvenient to repeat in configuration files) + * must arrive in environment variables. See {@link DrillOnYarnConfig} for more + * information. + */ + +public class DrillControllerFactory implements ControllerFactory { + private static final Log LOG = LogFactory.getLog(DrillControllerFactory.class); + private Config config = DrillOnYarnConfig.config(); + private String drillArchivePath; + private String siteArchivePath; + private boolean localized; + + @Override + public Dispatcher build() throws ControllerFactoryException { + LOG.info( + "Initializing AM for " + config.getString(DrillOnYarnConfig.APP_NAME)); + Dispatcher dispatcher; + try { + Map<String, LocalResource> resources = prepareResources(); + + TaskSpec taskSpec = buildDrillTaskSpec(resources); + + // Prepare dispatcher + + int timerPeriodMs = config.getInt(DrillOnYarnConfig.AM_TICK_PERIOD_MS); + dispatcher = new Dispatcher(timerPeriodMs); + int pollPeriodMs = config.getInt(DrillOnYarnConfig.AM_POLL_PERIOD_MS); + AMYarnFacadeImpl yarn = new AMYarnFacadeImpl(pollPeriodMs); + dispatcher.setYarn(yarn); + dispatcher.getController() + .setMaxRetries(config.getInt(DrillOnYarnConfig.DRILLBIT_MAX_RETRIES)); + + int requestTimeoutSecs = DrillOnYarnConfig.config().getInt( DrillOnYarnConfig.DRILLBIT_REQUEST_TIMEOUT_SEC); + int maxExtraNodes = DrillOnYarnConfig.config().getInt(DrillOnYarnConfig.DRILLBIT_MAX_EXTRA_NODES); + + // Assume basic scheduler for now. + ClusterDef.ClusterGroup pool = ClusterDef.getCluster(config, 0); + Scheduler testGroup = new DrillbitScheduler(pool.getName(), taskSpec, + pool.getCount(), requestTimeoutSecs, maxExtraNodes); + dispatcher.getController().registerScheduler(testGroup); + pool.modifyTaskSpec(taskSpec); + + // ZooKeeper setup + + buildZooKeeper(config, dispatcher); + } catch (YarnFacadeException | DoyConfigException e) { + throw new ControllerFactoryException("Drill AM intitialization failed", e); + } + + // Tracking Url + // TODO: HTTPS support + + dispatcher.setHttpPort(config.getInt(DrillOnYarnConfig.HTTP_PORT)); + String trackingUrl = null; + if (config.getBoolean(DrillOnYarnConfig.HTTP_ENABLED)) { + trackingUrl = "http://<host>:<port>/redirect"; + dispatcher.setTrackingUrl(trackingUrl); + } + + // Enable/disable check for auto shutdown when no nodes are running. + + dispatcher.getController().enableFailureCheck( + config.getBoolean(DrillOnYarnConfig.AM_ENABLE_AUTO_SHUTDOWN)); + + // Define the security manager + + AMSecurityManagerImpl.setup(); + + return dispatcher; + } + + /** + * Prepare the files ("resources" in YARN terminology) that YARN should + * download ("localize") for the Drillbit. We need both the Drill software and + * the user's site-specific configuration. + * + * @return + * @throws YarnFacadeException + */ + + private Map<String, LocalResource> prepareResources() + throws YarnFacadeException { + try { + DfsFacade dfs = new DfsFacade(config); + localized = dfs.isLocalized(); + if (!localized) { + return null; + } + dfs.connect(); + Map<String, LocalResource> resources = new HashMap<>(); + DrillOnYarnConfig drillConfig = DrillOnYarnConfig.instance(); + + // Localize the Drill archive. + + drillArchivePath = drillConfig.getDrillArchiveDfsPath(); + DfsFacade.Localizer localizer = new DfsFacade.Localizer(dfs, + drillArchivePath); + String key = config.getString(DrillOnYarnConfig.DRILL_ARCHIVE_KEY); + localizer.defineResources(resources, key); + LOG.info("Localizing " + drillArchivePath + " with key \"" + key + "\""); + + // Localize the site archive, if any. + + siteArchivePath = drillConfig.getSiteArchiveDfsPath(); + if (siteArchivePath != null) { + localizer = new DfsFacade.Localizer(dfs, siteArchivePath); + key = config.getString(DrillOnYarnConfig.SITE_ARCHIVE_KEY); + localizer.defineResources(resources, key); + LOG.info("Localizing " + siteArchivePath + " with key \"" + key + "\""); + } + return resources; + } catch (DfsFacadeException e) { + throw new YarnFacadeException( + "Failed to get DFS status for Drill archive", e); + } + } + + /** + * Constructs the Drill launch command. The launch uses the YARN-specific + * yarn-drillbit.sh script, setting up the required input environment + * variables. + * <p> + * This is an exercise in getting many details just right. The code here sets + * the environment variables required by (and documented in) yarn-drillbit.sh. + * The easiest way to understand this code is to insert an "echo" statement in + * drill-bit.sh to echo the launch command there. Then, look in YARN's NM + * private container directory for the launch_container.sh script to see the + * command generated by the following code. Compare the two to validate that + * the code does the right thing. + * <p> + * This class is very Linux-specific. The usual adjustments must be made to + * adapt it to Windows. + * + * @param config + * @return + * @throws DoyConfigException + */ + + private TaskSpec buildDrillTaskSpec(Map<String, LocalResource> resources) + throws DoyConfigException { + DrillOnYarnConfig doyConfig = DrillOnYarnConfig.instance(); + + // Drillbit launch description + + ContainerRequestSpec containerSpec = new ContainerRequestSpec(); + containerSpec.memoryMb = config.getInt(DrillOnYarnConfig.DRILLBIT_MEMORY); + containerSpec.vCores = config.getInt(DrillOnYarnConfig.DRILLBIT_VCORES); + containerSpec.disks = config.getDouble(DrillOnYarnConfig.DRILLBIT_DISKS); + + LaunchSpec drillbitSpec = new LaunchSpec(); + + // The drill home location is either a non-localized location, + // or, more typically, the expanded Drill directory under the + // container's working directory. When the localized directory, + // we rely on the fact that the current working directory is + // set to the container directory, so we just need the name + // of the Drill folder under the cwd. + + String drillHome = doyConfig.getRemoteDrillHome(); + drillbitSpec.env.put("DRILL_HOME", drillHome); + LOG.trace("Drillbit DRILL_HOME: " + drillHome); + + // Heap memory + + addIfSet(drillbitSpec, DrillOnYarnConfig.DRILLBIT_HEAP, "DRILL_HEAP"); + + // Direct memory + + addIfSet(drillbitSpec, DrillOnYarnConfig.DRILLBIT_DIRECT_MEM, + "DRILL_MAX_DIRECT_MEMORY"); + + // Code cache + + addIfSet(drillbitSpec, DrillOnYarnConfig.DRILLBIT_CODE_CACHE, + "DRILLBIT_CODE_CACHE_SIZE"); + + // Any additional VM arguments from the config file. + + addIfSet(drillbitSpec, DrillOnYarnConfig.DRILLBIT_VM_ARGS, + "DRILL_JVM_OPTS"); + + // Any user-specified library path + + addIfSet(drillbitSpec, DrillOnYarnConfig.JAVA_LIB_PATH, + DrillOnYarnConfig.DOY_LIBPATH_ENV_VAR); + + // Drill logs. + // Relies on the LOG_DIR_EXPANSION_VAR marker which is replaced by + // the container log directory. + + if (!config.getBoolean(DrillOnYarnConfig.DISABLE_YARN_LOGS)) { + drillbitSpec.env.put("DRILL_YARN_LOG_DIR", + ApplicationConstants.LOG_DIR_EXPANSION_VAR); + } + + // Debug option. + + if (config.getBoolean(DrillOnYarnConfig.DRILLBIT_DEBUG_LAUNCH)) { + drillbitSpec.env.put(DrillOnYarnConfig.DRILL_DEBUG_ENV_VAR, "1"); + } + + // Hadoop home should be set in drill-env.sh since it is needed + // for client launch as well as the AM. + + // addIfSet( drillbitSpec, DrillOnYarnConfig.HADOOP_HOME, "HADOOP_HOME" ); + + // Garbage collection (gc) logging. In drillbit.sh logging can be + // configured to go anywhere. In YARN, all logs go to the YARN log + // directory; the gc log file is always called "gc.log". + + if (config.getBoolean(DrillOnYarnConfig.DRILLBIT_LOG_GC)) { + drillbitSpec.env.put("ENABLE_GC_LOG", "1"); + } + + // Class path additions. + + addIfSet(drillbitSpec, DrillOnYarnConfig.DRILLBIT_PREFIX_CLASSPATH, + DrillOnYarnConfig.DRILL_CLASSPATH_PREFIX_ENV_VAR); + addIfSet(drillbitSpec, DrillOnYarnConfig.DRILLBIT_CLASSPATH, + DrillOnYarnConfig.DRILL_CLASSPATH_ENV_VAR); + + // Drill-config.sh has specific entries for Hadoop and Hbase. To prevent + // an endless number of such one-off cases, we add a general extension + // class path. But, we retain Hadoop and Hbase for backward compatibility. + + addIfSet(drillbitSpec, DrillOnYarnConfig.DRILLBIT_EXTN_CLASSPATH, + "EXTN_CLASSPATH"); + addIfSet(drillbitSpec, DrillOnYarnConfig.HADOOP_CLASSPATH, + "DRILL_HADOOP_CLASSPATH"); + addIfSet(drillbitSpec, DrillOnYarnConfig.HBASE_CLASSPATH, + "DRILL_HBASE_CLASSPATH"); + + // Note that there is no equivalent of niceness for YARN: YARN controls + // the niceness of its child processes. + + // Drillbit launch script under YARN + // Here we can use DRILL_HOME because all env vars are set before + // issuing this command. + + drillbitSpec.command = "$DRILL_HOME/bin/yarn-drillbit.sh"; + + // Configuration (site directory), if given. + + String siteDirPath = doyConfig.getRemoteSiteDir(); + if (siteDirPath != null) { + drillbitSpec.cmdArgs.add("--site"); + drillbitSpec.cmdArgs.add(siteDirPath); + } + + // Localized resources + + if (resources != null) { + drillbitSpec.resources.putAll(resources); + } + + // Container definition. + + TaskSpec taskSpec = new TaskSpec(); + taskSpec.name = "Drillbit"; + taskSpec.containerSpec = containerSpec; + taskSpec.launchSpec = drillbitSpec; + taskSpec.maxRetries = config.getInt(DrillOnYarnConfig.DRILLBIT_MAX_RETRIES); + return taskSpec; + } + + /** + * Utility method to create an environment variable in the process launch + * specification if a given Drill-on-YARN configuration variable is set, + * copying the config value to the environment variable. + * + * @param spec + * @param configParam + * @param envVar + */ + + public void addIfSet(LaunchSpec spec, String configParam, String envVar) { + String value = config.getString(configParam); + if (!DoYUtil.isBlank(value)) { + spec.env.put(envVar, value); + } + } + + public static class ZKRegistryAddOn implements DispatcherAddOn { + ZKRegistry zkRegistry; + + public ZKRegistryAddOn(ZKRegistry zkRegistry) { + this.zkRegistry = zkRegistry; + } + + @Override + public void start(ClusterController controller) { + zkRegistry.start(controller); + } + + @Override + public void finish(ClusterController controller) { + zkRegistry.finish(controller); + } + } + + /** + * Create the Drill-on-YARN version of the ZooKeeper cluster coordinator. + * Compared to the Drill version, this one takes its parameters via a builder + * pattern in the form of the cluster coordinator driver. + * + * @param config + * @param dispatcher + */ + + private void buildZooKeeper(Config config, Dispatcher dispatcher) { + String zkConnect = config.getString(DrillOnYarnConfig.ZK_CONNECT); + String zkRoot = config.getString(DrillOnYarnConfig.ZK_ROOT); + String clusterId = config.getString(DrillOnYarnConfig.CLUSTER_ID); + int failureTimeoutMs = config + .getInt(DrillOnYarnConfig.ZK_FAILURE_TIMEOUT_MS); + int retryCount = config.getInt(DrillOnYarnConfig.ZK_RETRY_COUNT); + int retryDelayMs = config.getInt(DrillOnYarnConfig.ZK_RETRY_DELAY_MS); + int userPort = config.getInt(DrillOnYarnConfig.DRILLBIT_USER_PORT); + int bitPort = config.getInt(DrillOnYarnConfig.DRILLBIT_BIT_PORT); + ZKClusterCoordinatorDriver driver = new ZKClusterCoordinatorDriver() + .setConnect(zkConnect, zkRoot, clusterId) + .setFailureTimoutMs(failureTimeoutMs) + .setRetryCount(retryCount) + .setRetryDelayMs(retryDelayMs) + .setPorts(userPort, bitPort, bitPort + 1); + ZKRegistry zkRegistry = new ZKRegistry(driver); + dispatcher.registerAddOn(new ZKRegistryAddOn(zkRegistry)); + + // The ZK driver is started and stopped in conjunction with the + // controller lifecycle. + + dispatcher.getController().registerLifecycleListener(zkRegistry); + + // The ZK driver also handles registering the AM for the cluster. + + dispatcher.setAMRegistrar(driver); + + // The UI needs access to ZK to report unmanaged drillbits. We use + // a property to avoid unnecessary code dependencies. + + dispatcher.getController().setProperty(ZKRegistry.CONTROLLER_PROPERTY, + zkRegistry); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DrillbitScheduler.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DrillbitScheduler.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DrillbitScheduler.java new file mode 100644 index 0000000..76936b5 --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/DrillbitScheduler.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.appMaster; + +public class DrillbitScheduler extends AbstractDrillbitScheduler { + private int requestTimeoutSecs; + private int maxExtraNodes; + + + public DrillbitScheduler(String name, TaskSpec taskSpec, int quantity, + int requestTimeoutSecs, int maxExtraNodes) { + super("basic", name, quantity); + this.taskSpec = taskSpec; + this.requestTimeoutSecs = requestTimeoutSecs; + this.maxExtraNodes = maxExtraNodes; + } + + /** + * Set the number of running tasks to the quantity given. + * Limits the quantity to only a small margin above the number + * of estimated free YARN nodes. This avoids a common users error + * where someone requests 20 nodes on a 5-node cluster. + */ + + @Override + public int resize(int level) { + int limit = quantity + state.getController().getFreeNodeCount( ) + + maxExtraNodes; + return super.resize( Math.min( limit, level ) ); + } + + @Override + public int getRequestTimeoutSec() { + return requestTimeoutSecs; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/EventContext.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/EventContext.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/EventContext.java new file mode 100644 index 0000000..bec8cf9 --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/EventContext.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.appMaster; + +import org.apache.drill.yarn.appMaster.Scheduler.TaskManager; + +public class EventContext { + public final AMYarnFacade yarn; + public final ClusterControllerImpl controller; + public SchedulerStateImpl group; + public Task task; + + public EventContext(ClusterControllerImpl controller) { + yarn = controller.getYarn(); + this.controller = controller; + } + + public EventContext(ClusterController controller) { + this((ClusterControllerImpl) controller); + } + + public EventContext(ClusterControllerImpl controller, Task task) { + this(controller); + setTask(task); + } + + /** + * For testing only, omits the controller and YARN. + * + * @param task + */ + + public EventContext(Task task) { + controller = null; + yarn = null; + this.task = task; + } + + public void setTask(Task task) { + this.task = task; + group = task.getGroup(); + } + + public TaskState getState() { + return task.state; + } + + public void setGroup(SchedulerStateActions group) { + this.group = (SchedulerStateImpl) group; + } + + public TaskManager getTaskManager() { + return group.getScheduler().getTaskManager(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/NodeInventory.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/NodeInventory.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/NodeInventory.java new file mode 100644 index 0000000..ec20307 --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/NodeInventory.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.appMaster; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.NodeReport; + +/** + * Creates an AM-side inventory of cluster nodes. Used to track node + * reservations (container allocations) to prevent requesting multiple + * containers on the same node. Tracks blacklisted nodes that have failed too + * often. Since YARN will discard our blacklist if we add to many nodes, tracks + * when a container is allocated on a blacklisted node and signals that the + * cluster is in a bad state. + */ + +public class NodeInventory { + private static final Log LOG = LogFactory.getLog(NodeInventory.class); + + /** + * Indicates the case in which we've failed so many nodes that YARN has + * cancelled some of our blacklist entries and we've received a container for + * a blacklisted node. At this point, we should stop adding new tasks else + * we'll get into a nasty loop. + */ + private boolean failed; + + private Map<String, String> nodeMap = new HashMap<>(); + + /** + * The set of nodes available that YARN reports are available. + * Not clear if these are all nodes in the cluster, or just those usable + * by the current app (when the app is associated to a queue that + * uses node labels.) + */ + + private Map<String, NodeReport> yarnNodes = new HashMap<>(); + + /** + * The set of nodes in use by Drill. Includes both nodes on which the AM + * has requested to run Drillbits, and those nodes found to be running + * "stray" Drillbits started outside of DoY. + */ + + private Set<String> nodesInUse = new HashSet<>(); + + /** + * Nodes that have failed (typically due to mis-configuration) and + * are to be excluded from future container requests. + */ + + private Set<String> blacklist = new HashSet<>(); + private final AMYarnFacade yarn; + + public NodeInventory(AMYarnFacade yarn) throws YarnFacadeException { + this.yarn = yarn; + buildNodeMap(); + } + + private void buildNodeMap() throws YarnFacadeException { + List<NodeReport> nodes = yarn.getNodeReports(); + for (NodeReport node : nodes) { + String hostName = node.getNodeId().getHost(); + nodeMap.put(hostName, node.getHttpAddress()); + yarnNodes.put(hostName, node); + } + if (LOG.isInfoEnabled()) { + LOG.info("YARN Node report"); + for (NodeReport node : nodes) { + LOG.info("Node: " + node.getHttpAddress() + ", Rack: " + + node.getRackName() + " has " + node.getCapability().getMemory() + + " MB, " + node.getCapability().getVirtualCores() + + " vcores, labels: " + node.getNodeLabels()); + } + } + } + + public boolean isFailed() { + return failed; + } + + public void reserve(Container container) { + reserve(container.getNodeId().getHost()); + } + + public void reserve(String hostName) { + if (blacklist.contains(hostName)) { + LOG.error( "Node to be reserved is in the blacklist: " + hostName ); + failed = true; + } + if (nodesInUse.contains(hostName)) { + LOG.error( "Node to be reserved is already in use: " + hostName ); + return; + } + if (!yarnNodes.containsKey(hostName)) { + LOG.warn( "Node to be reserved was not in YARN node inventory: " + hostName ); + } + nodesInUse.add(hostName); + yarn.blacklistNode(hostName); + } + + public void release(Container container) { + release(container.getNodeId().getHost()); + } + + public void release(String hostName) { + if (!yarnNodes.containsKey(hostName)) { + return; + } + nodesInUse.remove(hostName); + yarn.removeBlacklist(hostName); + } + + public void blacklist(String hostName) { + if (!yarnNodes.containsKey(hostName)) { + return; + } + assert !nodesInUse.contains(hostName); + blacklist.add(hostName); + yarn.blacklistNode(hostName); + LOG.info("Node blacklisted: " + hostName); + } + + /** + * Determine the number of free nodes in the YARN cluster. The free set is the + * set of all YARN nodes minus those that are allocated and those that are + * blacklisted. Note that a node might be both in use and blacklisted if + * DoY blacklists a node, but then the user starts a "stray" Drillbit on + * that same node. + * <p> + * This number is an approximation: the set of nodes managed by YARN can + * change any time, and in-flight container requests will consume a node, + * but since the request is not yet completed, we don't know which node + * will be assigned, so the node does not yet appear in the in-use list. + * + * @return an approximation of the free node count + */ + + public int getFreeNodeCount() { + Set<String> free = new HashSet<>( ); + free.addAll( yarnNodes.keySet() ); + free.removeAll( nodesInUse ); + free.removeAll( blacklist ); + return free.size( ); + } + + /** + * Return a copy of the blacklist (list of failed nodes) for use in display + * to the user or similar purpose. + * + * @return a copy of the blacklist. + */ + + public List<String> getBlacklist() { + List<String> copy = new ArrayList<>( ); + copy.addAll(blacklist); + return copy; + } + + /** + * Report if the given host name is in use. + * + * @param hostName + * @return true if the host is reserved (in use by a container) or + * blacklisted (failed.) + */ + + public boolean isInUse(String hostName) { + return blacklist.contains(hostName) || nodesInUse.contains(hostName); + } +}
