http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/PersistentTaskScheduler.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/PersistentTaskScheduler.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/PersistentTaskScheduler.java new file mode 100644 index 0000000..73a045f --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/PersistentTaskScheduler.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.appMaster; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Abstract base class for schedulers that work with persistent + * (long-running) tasks. Such tasks are intended to run until + * explicitly shut down (unlike batch tasks that run until + * some expected completion.) + * <p> + * Provides a target quantity of tasks + * (see {@link #getTarget()}, along with operations to increase, + * decrease or set the target number. + * <p> + * The scheduler acts as a controller: starting new tasks as needed to + * match the desired target, or stopping tasks as needed when the + * target level is reduced. + */ + +public abstract class PersistentTaskScheduler extends AbstractScheduler { + private static final Log LOG = LogFactory.getLog(PersistentTaskScheduler.class); + protected int quantity; + + public PersistentTaskScheduler(String type, String name, int quantity) { + super(type, name); + this.quantity = quantity; + } + + /** + * Set the number of running tasks to the quantity given. + * + * @param level + * the target number of tasks + */ + + @Override + public int resize(int level) { + quantity = level; + if (quantity < 0) { + quantity = 0; + } + return quantity; + } + + @Override + public int getTarget() { return quantity; } + + /** + * Indicate that a task is completed. Normally occurs only + * when shutting down excess tasks. + * + * @param task + */ + + + @Override + public void completed(Task task) { } + + /** + * Progress for persistent tasks defaults to the ratio of + * running tasks to target level. Thus, a persistent cluster + * will normally report 100% progress. + * + * @return + */ + + @Override + public int[] getProgress() { + int activeCount = state.getTaskCount(); + return new int[] { Math.min(activeCount, quantity), quantity }; + } + + /** + * Adjust the number of running tasks to better match the target + * by starting or stopping tasks as needed. + */ + + @Override + public void adjust() { + int activeCount = state.getTaskCount(); + int delta = quantity - activeCount; + if (delta > 0) { + addTasks(delta); + } else if (delta < 0) { + cancelTasks(activeCount); + } + } + + /** + * Cancel the requested number of tasks. We exclude any tasks that are already + * in the process of being cancelled. Because we ignore those tasks, it might + * be that we want to reduce the task count, but there is nothing left to cancel. + * + * @param cancelCount + */ + + private void cancelTasks(int cancelCount) { + int cancelled = state.getCancelledTaskCount(); + int cancellable = cancelCount - cancelled; + int n = cancellable - quantity; + LOG.info("[" + getName( ) + "] - Cancelling " + cancelCount + + " tasks. " + cancelled + " are already cancelled, " + + cancellable + " more will be cancelled."); + if (n <= 0) { + return; + } + for (Task task : state.getStartingTasks()) { + state.cancel(task); + if (--n == 0) { + return; + } + } + for (Task task : state.getActiveTasks()) { + state.cancel(task); + if (--n == 0) { + return; + } + } + + // If we get here it means something has gotten out of whack. + + LOG.error("Tried to cancel " + cancellable + " tasks, but " + n + " could not be cancelled."); + assert false; + } + + /** + * The persistent scheduler has no fixed sequence of tasks to run, it launches + * a set and is never "done". For purposes of completion tracking claim we + * have no further tasks. + * + * @return false + */ + + @Override + public boolean hasMoreTasks() { return false; } + + @Override + public void requestTimedOut() { + + // We requested a node a while back, requested a container from YARN, + // but waited too long to receive it. Most likely cause is that we + // want a container on a node that either does not exist, or is too + // heavily loaded. (That is, we have a 3-node cluster and are requesting + // a 4th node. Or, we have 2 nodes but node 3 has insufficient resources.) + // In either case, we're not likely to ever get the container, so just + // reduce the target size to what we an get. + + assert quantity > 0; + if (quantity == 0) { + LOG.error("Container timed out, but target quantity is already 0!"); + } else { + quantity--; + LOG.info("Container request timed out. Reducing target container count by 1 to " + quantity); + } + } +}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Pollable.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Pollable.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Pollable.java new file mode 100644 index 0000000..7e1c9a3 --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Pollable.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.appMaster; + +/** + * Interface for objects that are polled on each + * controller clock tick in order to perform + * time-based tasks. + */ + +public interface Pollable { + public void tick(long curTime); +} http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/PulseRunnable.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/PulseRunnable.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/PulseRunnable.java new file mode 100644 index 0000000..81d5a5d --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/PulseRunnable.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.appMaster; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Clock driver that calls a callback once each pulse period. Used to react to + * time-based events such as timeouts, checking for changed files, etc. + * This is called a "pulse" because it is periodic, like your pulse. But, + * unlike the "heartbeat" between the AM and YARN or the AM and ZK, + * this is purely internal. + */ + +public class PulseRunnable implements Runnable { + private static final Log LOG = LogFactory.getLog(PulseRunnable.class); + + /** + * Interface implemented to receive calls on each clock "tick." + */ + + public interface PulseCallback { + void onTick(long curTime); + } + + private final int pulsePeriod; + private final PulseRunnable.PulseCallback callback; + public AtomicBoolean isLive = new AtomicBoolean(true); + + public PulseRunnable(int pulsePeriodMS, + PulseRunnable.PulseCallback callback) { + pulsePeriod = pulsePeriodMS; + this.callback = callback; + } + + @Override + public void run() { + while (isLive.get()) { + try { + Thread.sleep(pulsePeriod); + } catch (InterruptedException e) { + break; + } + try { + callback.onTick(System.currentTimeMillis()); + } catch (Exception e) { + + // Ignore exceptions. Seems strange, but is required to allow + // graceful shutdown of the AM when errors occur. For example, we + // start tasks on tick events. If those tasks fail, the timer + // goes down. But, the timer is also needed to time out failed + // requests in order to bring down the AM. So, just log the error + // and soldier on. + + LOG.error("Timer thread caught, ignored an exception", e); + } + } + } + + public void stop() { isLive.set(false); } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/RegistryHandler.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/RegistryHandler.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/RegistryHandler.java new file mode 100644 index 0000000..ff29bdf --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/RegistryHandler.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.appMaster; + +/** + * Callback from the ZooKeeper registry to announce events + * related to Drillbit registration. + */ + +public interface RegistryHandler { + void reserveHost(String hostName); + + void releaseHost(String hostName); + + void startAck(Task task, String propertyKey, Object value); + + void completionAck(Task task, String endpointProperty); + + void registryDown(); +} http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Scheduler.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Scheduler.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Scheduler.java new file mode 100644 index 0000000..7f8be0c --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Scheduler.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.appMaster; + +import org.apache.drill.yarn.core.ContainerRequestSpec; +import org.apache.drill.yarn.core.LaunchSpec; +import org.apache.hadoop.yarn.api.records.Resource; + +/** + * The scheduler describes the set of tasks to run. It provides the details + * required to launch each task and optionally a specification of the containers + * required to run the task. + * <p> + * Schedulers can manage batch task (which do their job and complete), or + * persistent tasks (which run until terminated.) + * <p> + * The scheduler tracks task completion (for batch tasks) and task levels (for + * persistent tasks.) + */ + +public interface Scheduler { + public interface TaskManager { + int maxConcurrentAllocs(); + + LaunchSpec getLaunchSpec(Task task); + + void allocated(EventContext context); + + boolean stop(Task task); + + void completed(EventContext context); + + boolean isLive(EventContext context); + } + + /** + * Controller-assigned priority for this scheduler. Used to differentiate + * container requests by scheduler. + * + * @param priority + */ + + void setPriority(int priority); + + /** + * Register the state object that tracks tasks launched by this scheduler. + * + * @param state + */ + + void registerState(SchedulerState state); + + String getName(); + + String getType(); + + /** + * Whether tasks from this scheduler should incorporate app startup/shutdown + * acknowledgements (acks) into the task lifecycle. + * + * @return + */ + + boolean isTracked(); + + TaskManager getTaskManager(); + + /** + * Get the desired number of running tasks. + * + * @return + */ + int getTarget(); + + /** + * Increase (positive) or decrease (negative) the number of desired tasks by + * the given amount. + * + * @param delta + */ + void change(int delta); + + /** + * Set the number of desired tasks to the given level. + * + * @param level + * @return the actual resize level, which may be lower than the requested + * level if the system cannot provide the requested level + */ + + int resize(int level); + + void completed(Task task); + + /** + * Adjust the number of running tasks to better track the desired number. + * Starts or stops tasks using the {@link SchedulerState} registered with + * {@link #registerState(SchedulerState)}. + */ + + void adjust(); + + /** + * Return an estimate of progress given as a ratio of (work completed, total + * work). + * + * @return + */ + int[] getProgress(); + + /** + * If this is a batch scheduler, whether all tasks for the batch have + * completed. If this is a persistent task scheduler, always returns false. + * + * @return true if the scheduler has more tasks to run, false if the + * scheduler has no more tasks or manages a set of long-running tasks + */ + boolean hasMoreTasks(); + + /** + * For reporting, get the YARN resources requested by processes in + * this pool. + * @return + */ + + ContainerRequestSpec getResource( ); + + void limitContainerSize(Resource maxResource) throws AMException; + + /** + * Maximum amount of time to wait when cancelling a job in the REQUESTING + * state. YARN will happily wait forever for a resource, this setting + * forcibly cancels the request at timeout. + * + * @return the number of seconds to wait for timeout. 0 means no timeout + */ + + int getRequestTimeoutSec(); + + /** + * Informs the scheduler that a YARN resource request timed out. The scheduler + * can either retry or (more productively) assume that the requested node is + * not available and adjust its target size downward. + */ + + void requestTimedOut(); +} http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/SchedulerState.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/SchedulerState.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/SchedulerState.java new file mode 100644 index 0000000..7a1f8bd --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/SchedulerState.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.appMaster; + +import java.util.List; + +/** + * The cluster state for tasks managed by a scheduler. Abstracts away the + * details of managing tasks, allowing the scheduler to work only with overall + * number of tasks. + */ + +public interface SchedulerState { + /** + * The number of tasks in any active (non-ended) lifecycle state. + * + * @return + */ + + int getTaskCount(); + + /** + * The number of active tasks that have been cancelled, but have not yet + * ended. + * + * @return + */ + + int getCancelledTaskCount(); + + /** + * Returns the list of tasks awaiting a container request to be sent to YARN + * or for which a container request has been sent to YARN, but no container + * allocation has yet been received. Such tasks are simple to cancel. The list + * does not contain any tasks in this state which have previously been + * cancelled. + * + * @return + */ + + List<Task> getStartingTasks(); + + /** + * Returns the list of active tasks that have not yet been cancelled. Active + * tasks are any task for which a container has been assigned, but has not yet + * received a RM container completion event. + * + * @return + */ + + List<Task> getActiveTasks(); + + /** + * Start the given task. + * + * @param task + */ + + void start(Task task); + + void cancel(Task task); + + ClusterController getController(); +} http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/SchedulerStateActions.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/SchedulerStateActions.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/SchedulerStateActions.java new file mode 100644 index 0000000..65e8f2a --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/SchedulerStateActions.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.appMaster; + +import org.apache.hadoop.yarn.api.records.Container; + +/** + * Represents the set of commands called by the cluster controller to manage the + * state of tasks within a task group. Each task group is managed by a + * scheduler. + */ + +public interface SchedulerStateActions { + /** + * Returns the name of the scheduler associated with this task action group. + * + * @return + */ + + String getName(); + + /** + * Returns the scheduler associated with this task group. + * + * @return + */ + + Scheduler getScheduler(); + + /** + * Adjust the number of running tasks as needed to balance the number of + * running tasks with the desired number. May result in no change it the + * cluster is already in balance (or is in the process of achieving balance.) + */ + + void adjustTasks(); + + /** + * Request a container the first task that we wish to start. + */ + + boolean requestContainers(EventContext context, int maxRequests); + + /** + * A container request has been granted. Match the container up with the first + * task waiting for a container and launch the task. + * + * @param context + * @param container + */ + + void containerAllocated(EventContext context, Container container); + + /** + * Shut down this task group by canceling all tasks not already cancelled. + * + * @param context + */ + + void shutDown(EventContext context); + + /** + * Determine if this task group is done. It is done when there are no active + * tasks and the controller itself is shutting down. This latter check + * differentiates the start state (when no tasks are active) from the end + * state. The AM will not shut down until all task groups are done. + * + * @return + */ + + boolean isDone(); + + int getTaskCount( ); + + int getLiveCount(); + + int getRequestCount( ); + + void visitTaskModels( TaskVisitor visitor ); + + void checkTasks(EventContext context, long curTime); + + void cancel(Task task); + + Task getTask(int id); +} http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/SchedulerStateImpl.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/SchedulerStateImpl.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/SchedulerStateImpl.java new file mode 100644 index 0000000..4c85cf3 --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/SchedulerStateImpl.java @@ -0,0 +1,467 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.appMaster; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.drill.yarn.core.DoYUtil; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; + +/** + * Manages a the set of tasks associated with a scheduler. The scheduler decides + * which tasks to run or stop; the task group manages the life-cycle of the + * tasks for the given scheduler. + * <p> + * Schedulers, and hence their groups, define a priority. When starting, higher + * priority (lower priority value) groups run before lower priority groups. + * Similarly, when shrinking the cluster, lower priority groups shrink before + * higher priority groups. + */ + +public final class SchedulerStateImpl + implements SchedulerState, SchedulerStateActions { + static final Log LOG = LogFactory.getLog(SchedulerStateImpl.class); + + private final Scheduler scheduler; + + private final ClusterControllerImpl controller; + + /** + * Tracks the tasks to be started, but for which no work has yet been done. + * (State == PENDING). + */ + + protected List<Task> pendingTasks = new LinkedList<>(); + + /** + * Tracks the tasks for which containers have been requested. (State == + * REQUESTED). + */ + + protected List<Task> allocatingTasks = new LinkedList<>(); + + /** + * Tracks running tasks: those that have been allocated containers and are + * starting, running, failed or ended. We use a map for this because, during + * these states, the task is identified by its container. (State == LAUNCHING, + * RUNNING or ENDING). + */ + + protected Map<ContainerId, Task> activeContainers = new HashMap<>(); + + public SchedulerStateImpl(ClusterControllerImpl controller, + Scheduler scheduler) { + this.controller = controller; + this.scheduler = scheduler; + scheduler.registerState(this); + } + + @Override + public String getName() { + return scheduler.getName(); + } + + public int getMaxRetries() { + return controller.getMaxRetries(); + } + + public int getStopTimeoutMs() { + return controller.getStopTimeoutMs(); + } + + @Override + public Scheduler getScheduler() { return scheduler; } + + /** + * Define a new task in this group. Adds it to the pending queue so that a + * container will be requested. + * + * @param task + */ + + @Override + public void start(Task task) { + assert task.getGroup() == null; + task.setGroup(this); + enqueuePendingRequest(task); + } + + /** + * Put a task into the queue waiting to send a container request to YARN. + * + * @param task + */ + + public void enqueuePendingRequest(Task task) { + assert !activeContainers.containsValue(task); + assert !allocatingTasks.contains(task); + assert !pendingTasks.contains(task); + pendingTasks.add(task); + + // Special initial-state notification + + EventContext context = new EventContext(controller, task); + controller.fireLifecycleChange(TaskLifecycleListener.Event.CREATED, + context); + } + + public int maxCurrentRequests() { + return this.scheduler.getTaskManager().maxConcurrentAllocs(); + } + + @Override + public boolean requestContainers(EventContext context, int maxRequests) { + if (pendingTasks.isEmpty()) { + return false; + } + + // Limit the maximum number of requests to the limit set by + // the scheduler. + + maxRequests = Math.min(maxRequests, maxCurrentRequests()); + + // Further limit requests to account for in-flight requests. + + maxRequests -= allocatingTasks.size( ); + + // Request containers as long as there are pending tasks remaining. + + for (int i = 0; i < maxRequests && !pendingTasks.isEmpty(); i++) { + context.setTask(pendingTasks.get(0)); + context.getState().requestContainer(context); + } + return true; + } + + /** + * Remove a task from the queue of tasks waiting to send a container request. + * The caller must put the task into the proper next state: the allocating + * queue or the completed task list. + * + * @param task + */ + + public void dequeuePendingRequest(Task task) { + assert !activeContainers.containsValue(task); + assert !allocatingTasks.contains(task); + assert pendingTasks.contains(task); + pendingTasks.remove(task); + } + + /** + * Put a task onto the queue awaiting an allocation response from YARN. + * + * @param task + */ + + public void enqueueAllocatingTask(Task task) { + assert !activeContainers.containsValue(task); + assert !allocatingTasks.contains(task); + assert !pendingTasks.contains(task); + allocatingTasks.add(task); + } + + @Override + public void containerAllocated(EventContext context, Container container) { + if (activeContainers.containsKey(container.getId())) { + LOG.error("Container allocated again: " + DoYUtil.labelContainer(container)); + return; + } + if (allocatingTasks.isEmpty()) { + + // Not sure why this happens. Maybe only in debug mode + // due stopping execution one thread while the RM + // heartbeat keeps sending our request over & over? + // One known case: the user requests a container. While YARN is + // considering the request, the user cancels the task. + + LOG.warn("Releasing unwanted container: " + DoYUtil.labelContainer(container) ); + context.yarn.releaseContainer(container); + return; + } + context.setTask(allocatingTasks.get(0)); + context.getState().containerAllocated(context, container); + } + + @Override + public void checkTasks(EventContext context, long curTime) { + + // Iterate over tasks using a temporary list. The tick event may cause a timeout + // that turns around and modifies these lists. + + List<Task> temp = new ArrayList<>( ); + temp.addAll( allocatingTasks ); + for (Task task : temp) { + context.setTask(task); + context.getState().tick(context, curTime); + } + temp.clear(); + temp.addAll( pendingTasks ); + for (Task task : temp) { + context.setTask(task); + context.getState().tick(context, curTime); + } + temp.clear(); + temp.addAll( activeContainers.values( ) ); + for (Task task : temp) { + context.setTask(task); + context.getState().tick(context, curTime); + } + } + + /** + * Remove a task from the list of those waiting for a container allocation. + * The allocation may be done, or cancelled. The caller is responsible for + * moving the task to the next collection. + * + * @param task + */ + + public void dequeueAllocatingTask(Task task) { + assert allocatingTasks.contains(task); + allocatingTasks.remove(task); + } + + /** + * Mark that a task has become active and should be tracked by its container + * ID. Prior to this, the task is not associated with a container. + * + * @param task + */ + + public void containerAllocated(Task task) { + assert !activeContainers.containsValue(task); + assert !allocatingTasks.contains(task); + assert !pendingTasks.contains(task); + activeContainers.put(task.getContainerId(), task); + controller.containerAllocated(task); + } + + /** + * Mark that a task has completed: its container has expired or been revoked + * or the task has completed: successfully or a failure, as given by the + * task's disposition. The task can no longer be tracked by its container ID. + * If this is the last active task for this group, mark the group itself as + * completed. + * + * @param task + */ + + public void containerReleased(Task task) { + assert activeContainers.containsKey(task.getContainerId()); + activeContainers.remove(task.getContainerId()); + controller.containerReleased(task); + } + + /** + * Mark that a task has completed successfully or a failure, as given by the + * task's disposition. If this is the last active task for this group, mark + * the group itself as completed. + * + * @param task + */ + + public void taskEnded(Task task) { + scheduler.completed(task); + controller.taskEnded(task); + if (isDone()) { + controller.taskGroupCompleted(this); + } + LOG.info(task.toString() + " - Task completed" ); + } + + /** + * Mark that a task is about to be retried. Task still retains its state from + * the current try. + * + * @param task + */ + + public void taskRetried(Task task) { + controller.taskRetried(task); + } + + @Override + public void shutDown(EventContext context) { + for (Task task : getStartingTasks()) { + context.setTask(task); + context.getState().cancel(context); + } + for (Task task : getActiveTasks()) { + context.setTask(task); + context.getState().cancel(context); + } + } + + /** + * Report if this task group has any tasks in the active part of their + * life-cycle: pending, allocating or active. + * + * @return + */ + + public boolean hasTasks() { + return getTaskCount() != 0; + } + + @Override + public boolean isDone() { + return !hasTasks() && !scheduler.hasMoreTasks(); + } + + @Override + public void adjustTasks() { + scheduler.adjust(); + } + + /** + * Request a graceful stop of the task. Delegates to the task manager to do + * the actual work. + * + * @return true if the graceful stop request was sent, false if not, or if + * this task type has no graceful stop + */ + + public boolean requestStop(Task task) { + return scheduler.getTaskManager().stop(task); + } + + @Override + public int getTaskCount() { + return pendingTasks.size() + allocatingTasks.size() + + activeContainers.size(); + } + + @Override + public int getCancelledTaskCount() { + + // TODO Crude first cut. This value should be maintained + // as a count. + + int count = 0; + for (Task task : pendingTasks) { + if (task.isCancelled()) { + count++; + } + } + for (Task task : allocatingTasks) { + if (task.isCancelled()) { + count++; + } + } + for (Task task : activeContainers.values()) { + if (task.isCancelled()) { + count++; + } + } + return count; + } + + @Override + public List<Task> getStartingTasks() { + List<Task> tasks = new ArrayList<>(); + for (Task task : pendingTasks) { + if (!task.isCancelled()) { + tasks.add(task); + } + } + for (Task task : allocatingTasks) { + if (!task.isCancelled()) { + tasks.add(task); + } + } + return tasks; + } + + @Override + public List<Task> getActiveTasks() { + List<Task> tasks = new ArrayList<>(); + for (Task task : activeContainers.values()) { + if (!task.isCancelled()) { + tasks.add(task); + } + } + return tasks; + } + + @Override + public void cancel(Task task) { + EventContext context = new EventContext(controller, task); + LOG.info( task.getLabel() + " Task cancelled" ); + context.getState().cancel(context); + } + + @Override + public int getLiveCount() { + int count = 0; + for (Task task : activeContainers.values()) { + if (task.isLive()) { + count++; + } + } + return count; + } + + @Override + public void visitTaskModels(TaskVisitor visitor) { + for (Task task : pendingTasks) { + visitor.visit(task); + } + for (Task task : allocatingTasks) { + visitor.visit(task); + } + for (Task task : activeContainers.values()) { + visitor.visit(task); + } + } + + @Override + public Task getTask(int id) { + for (Task task : pendingTasks) { + if (task.getId() == id) { + return task; + } + } + for (Task task : allocatingTasks) { + if (task.getId() == id) { + return task; + } + } + for (Task task : activeContainers.values()) { + if (task.getId() == id) { + return task; + } + } + return null; + } + + @Override + public int getRequestCount() { + return allocatingTasks.size(); + } + + @Override + public ClusterController getController( ) { return controller; } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Task.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Task.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Task.java new file mode 100644 index 0000000..147f5f7 --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/Task.java @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.appMaster; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.drill.yarn.core.ContainerRequestSpec; +import org.apache.drill.yarn.core.LaunchSpec; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; + +/** + * AM-side state of individual containers. This class is mostly + * a holder of state. Behavior is provided by the + * {@link TaskState} subclasses. + */ + +public class Task { + /** + * Tracking plugin state. A task can be untracked, or moves + * though states<br> + * NEW --> START_ACK --> END_ACK + * <p> + * Tracking state is separate from, but integrated with, + * task state. This is because, due to latency, tracking + * events may be slightly out of sync with YARN events. + */ + + public enum TrackingState + { + UNTRACKED( "N/A" ), + NEW( "Waiting" ), + START_ACK( "OK" ), + END_ACK( "Deregistered" ); + + private String displayName; + + private TrackingState( String displayName ) { + this.displayName = displayName; + } + + public String getDisplayName( ) { return displayName; } + } + + public enum Disposition + { + CANCELLED, LAUNCH_FAILED, RUN_FAILED, COMPLETED, TOO_MANY_RETRIES, RETRIED + } + + /** + * Maximum amount of time to wait when canceling a job in the REQUESTING + * state. YARN will happily wait forever for a resource, this setting allows + * the user to request to cancel a task, give YARN a while to respond, then + * forcibly cancel the job at timeout. + */ + + public static final long MAX_CANCELLATION_TIME = 10_000; // ms = 10s + + /** + * Tasks receive a sequential internal task ID. Since all task + * creation is single-threaded, no additional concurrency controls + * are needed to protect this value. + */ + + private static volatile int taskCounter = 0; + + /** + * Internal identifier for the task. + */ + + public final int taskId; + + + public final Scheduler scheduler; + + /** + * Identifies the type of container needed and the details of the task to run. + */ + + public TaskSpec taskSpec; + + /** + * The scheduler group that manages this task. + */ + + public SchedulerStateImpl taskGroup; + + /** + * Tracking state for an additional task tracker (such as using + * ZooKeeper to track Drill-bits.) + */ + + protected TrackingState trackingState; + + /** + * Tracks the container request between request and allocation. We must pass + * the container request back to YARN to remove it once it is allocated. + */ + + public ContainerRequest containerRequest; + + /** + * The YARN container assigned to this task. The container is set only during + * the ALLOCATED, LAUNCHING, RUNNING and ENDING states. + */ + + public Container container; + + /** + * Life-cycle state of this task. + */ + + protected TaskState state; + + /** + * True if the application has requested that the resource request or + * application run be cancelled. Cancelled tasks are not subject to retry. + */ + + protected boolean cancelled; + + /** + * Disposition of a completed task: whether it was cancelled, succeeded or + * failed. + */ + + public Disposition disposition; + + public Throwable error; + + public int tryCount; + + public ContainerStatus completionStatus; + + public long launchTime; + public long stateStartTime; + public long completionTime; + + long cancellationTime; + + public Map<String,Object> properties = new HashMap<>( ); + + public Task(Scheduler scheduler, TaskSpec taskSpec) { + taskId = ++taskCounter; + this.scheduler = scheduler; + this.taskSpec = taskSpec; + state = TaskState.START; + resetTrackingState(); + } + + /** + * Special constructor to create a static copy of the current + * task. The copy is placed in the completed tasks list. + * @param task + */ + + private Task(Task task) { + taskId = task.taskId; + scheduler = task.scheduler; + taskSpec = task.taskSpec; + taskGroup = task.taskGroup; + trackingState = task.trackingState; + containerRequest = task.containerRequest; + container = task.container; + state = task.state; + cancelled = task.cancelled; + disposition = task.disposition; + error = task.error; + tryCount = task.tryCount; + completionStatus = task.completionStatus; + launchTime = task.launchTime; + stateStartTime = task.stateStartTime; + completionTime = task.completionTime; + cancellationTime = task.cancellationTime; + properties.putAll( task.properties ); + } + + public void resetTrackingState( ) { + trackingState = scheduler.isTracked() ? TrackingState.NEW : TrackingState.UNTRACKED; + } + + public int getId( ) { return taskId; } + public ContainerRequestSpec getContainerSpec() { return taskSpec.containerSpec; } + + public LaunchSpec getLaunchSpec() { return taskSpec.launchSpec; } + + public TaskState getState() { return state; } + + public ContainerId getContainerId() { + assert container != null; + return container.getId(); + } + + public Container getContainer() { + assert container != null; + return container; + } + + public int getTryCount() { return tryCount; } + + public boolean isFailed() { + return disposition != null && disposition != Disposition.COMPLETED; + } + + public Disposition getDisposition() { return disposition; } + + public SchedulerStateImpl getGroup() { return taskGroup; } + + public void setGroup(SchedulerStateImpl taskGroup) { this.taskGroup = taskGroup; } + + public boolean retryable() { + return !cancelled && disposition != Disposition.COMPLETED; + } + + public boolean isCancelled() { return cancelled; } + + /** + * Reset the task state in preparation for a retry. + * Note: state reset is done by the state class. + */ + + public void reset() { + assert !cancelled; + error = null; + disposition = null; + completionStatus = null; + launchTime = 0; + completionTime = 0; + cancellationTime = 0; + container = null; + resetTrackingState(); + } + + public long uptime() { + long endTime = completionTime; + if (endTime == 0) { + endTime = System.currentTimeMillis(); + } + return endTime - launchTime; + } + + public String getHostName() { + if (container == null) { + return null; + } + return container.getNodeId().getHost(); + } + + public TrackingState getTrackingState() { + return trackingState; + } + + @Override + public String toString() { + StringBuilder buf = new StringBuilder(); + buf.append("[id=") + .append(taskId) + .append(", type="); + // Scheduler is unset in some unit tests. + if (scheduler !=null ) { + buf.append(scheduler.getName()); + } + buf.append(", name=") + .append(getName()); + if (container != null) { + buf.append(", host=") + .append(getHostName()); + } + buf.append(", state=") + .append(state.toString()) + .append("]"); + return buf.toString(); + } + + public boolean isLive() { + return state == TaskState.RUNNING && !cancelled; + } + + public void cancel() { + cancelled = true; + cancellationTime = System.currentTimeMillis(); + } + + public Task copy() { + return new Task(this); + } + + public String getName() { + return taskSpec == null ? null : taskSpec.name; + } + + /** + * Label for this task displayed in log messages. + * + * @return + */ + + public String getLabel() { + return toString( ); + } + + public void setTrackingState(TrackingState tState) { + trackingState = tState; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskLifecycleListener.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskLifecycleListener.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskLifecycleListener.java new file mode 100644 index 0000000..218cd9b --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskLifecycleListener.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.appMaster; + +public interface TaskLifecycleListener { + public enum Event { + CREATED, ALLOCATED, RUNNING, ENDED + } + + void stateChange(Event event, EventContext context); +} http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskSpec.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskSpec.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskSpec.java new file mode 100644 index 0000000..4399a86 --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskSpec.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.appMaster; + +import org.apache.drill.yarn.core.ContainerRequestSpec; +import org.apache.drill.yarn.core.LaunchSpec; + +public class TaskSpec { + /** + * Number of YARN vcores (virtual cores) and amount of memory (in MB) needed + * by this task. + */ + + public ContainerRequestSpec containerSpec; + + /** + * Description of of the task process, environment and so on. + */ + + public LaunchSpec launchSpec; + + public int maxRetries; + + public String name; +} http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskState.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskState.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskState.java new file mode 100644 index 0000000..3d52105 --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskState.java @@ -0,0 +1,895 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.appMaster; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.drill.yarn.appMaster.Task.Disposition; +import org.apache.drill.yarn.core.DoYUtil; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerStatus; + +/** + * Represents the behaviors associated with each state in the lifecycle + * of a task. + * <p> + * Startup process: + * <dl> + * <dt>START --> REQUESTING<dt> + * <dd>New task sends a container request to YARN.</dd> + * <dt>REQUESTING --> LAUNCHING<dt> + * <dd>Container received from YARN, launching the tasks's process.</dd> + * <dt>LAUNCHING --> RUNNING<dt> + * <dd>Task launched and needs no start Ack.</dd> + * <dt>LAUNCHING --> WAIT_START_ACK<dt> + * <dd>Task launched and needs a start Ack.</dd> + * <dt>WAIT_START_ACK --> RUNNING<dt> + * <dd>Start Ack received.</dd> + * </dl> + * <p> + * Shutdown process: + * <dt>RUNNING --> WAIT_END_ACK | END<dt> + * <dd>The resource manager reported task completion.</dd> + * <dt>RUNNING --> ENDING<dt> + * <dd>Request sent to the task for a graceful shutdown.</dd> + * <dt>RUNNING --> KILLING<dt> + * <dd>Request sent to the node manager to forcibly kill the task.</dd> + * <dt>ENDING --> WAIT_END_ACK | END<dt> + * <dd>The task gracefully exited as reported by the resource manager.</dd> + * <dt>ENDING --> KILLING<dt> + * <dd>The wait for graceful exit timed out, a forced kill message + * sent to the node manager.</dd> + * <dt>KILLING --> WAIT_END_ACK | END<dt> + * <dd>The task exited as reported by the resource manager.</dd> + * <dt>END_ACK --> END<dt> + * <dd>The end-ack is received or the wait timed out.</dd> + * <dl> + * <p> + * This is a do-it-yourself enum. Java enums values are instances of a single + * class. In this version, each enum value is the sole instance of a separate + * class, allowing each state to have its own behavior. + */ + +public abstract class TaskState { + /** + * Task that is newly created and needs a container allocated. No messages + * have yet been sent to YARN for the task. + */ + + private static class StartState extends TaskState { + protected StartState() { super(false, TaskLifecycleListener.Event.CREATED, true); } + + @Override + public void requestContainer(EventContext context) { + Task task = context.task; + task.tryCount++; + context.group.dequeuePendingRequest(task); + if (task.cancelled) { + taskStartFailed(context, Disposition.CANCELLED); + } else { + transition(context, REQUESTING); + context.group.enqueueAllocatingTask(task); + task.containerRequest = context.yarn + .requestContainer(task.getContainerSpec()); + } + } + + /** + * Cancellation is trivial: just drop the task; no need to coordinate + * with YARN. + */ + + @Override + public void cancel(EventContext context) { + Task task = context.task; + assert !task.cancelled; + context.group.dequeuePendingRequest(task); + task.cancel(); + taskStartFailed(context, Disposition.CANCELLED); + } + } + + /** + * Task for which a container request has been sent but not yet received. + */ + + private static class RequestingState extends TaskState { + protected RequestingState() { + super(false, TaskLifecycleListener.Event.CREATED, true); + } + + /** + * Handle REQUESING --> LAUNCHING. Indicates that we've asked YARN to start + * the task on the allocated container. + */ + + @Override + public void containerAllocated(EventContext context, Container container) { + Task task = context.task; + LOG.info(task.getLabel() + " - Received container: " + + DoYUtil.describeContainer(container)); + context.group.dequeueAllocatingTask(task); + + // No matter what happens below, we don't want to ask for this + // container again. The RM async API is a bit bizarre in this + // regard: it will keep asking for container over and over until + // we tell it to stop. + + context.yarn.removeContainerRequest(task.containerRequest); + + // The container is need both in the normal and in the cancellation + // path, so set it here. + + task.container = container; + if (task.cancelled) { + context.yarn.releaseContainer(container); + taskStartFailed(context, Disposition.CANCELLED); + return; + } + task.error = null; + task.completionStatus = null; + transition(context, LAUNCHING); + + // The pool that manages this task wants to know that we have + // a container. The task manager may want to do some task- + // specific setup. + + context.group.containerAllocated(context.task); + context.getTaskManager().allocated(context); + + // Go ahead and launch a task in the container using the launch + // specification provided by the task group (pool). + + try { + context.yarn.launchContainer(container, task.getLaunchSpec()); + task.launchTime = System.currentTimeMillis(); + } catch (YarnFacadeException e) { + LOG.error("Container launch failed: " + task.getContainerId(), e); + + // This may not be the right response. RM may still think + // we have the container if the above is a local failure. + + task.error = e; + context.group.containerReleased(task); + task.container = null; + taskStartFailed(context, Disposition.LAUNCH_FAILED); + } + } + + /** + * Cancel the container request. We must wait for the response from YARN to + * do the actual cancellation. For now, just mark the task as cancelled. + */ + + @Override + public void cancel(EventContext context) { + Task task = context.task; + context.task.cancel(); + LOG.info(task.getLabel() + " - Cancelled at user request"); + context.yarn.removeContainerRequest(task.containerRequest); + context.group.dequeueAllocatingTask(task); + task.disposition = Task.Disposition.CANCELLED; + task.completionTime = System.currentTimeMillis(); + transition(context, END); + context.group.taskEnded(context.task); + } + + /** + * The task is requesting a container. If the request takes too long, + * cancel the request and shrink the target task count. This event + * generally indicates that the user wants to run more tasks than + * the cluster has capacity. + */ + + @Override + public void tick(EventContext context, long curTime) { + Task task = context.task; + int timeoutSec = task.scheduler.getRequestTimeoutSec( ); + if (timeoutSec == 0) { + return; + } + if (task.stateStartTime + timeoutSec * 1000 > curTime) { + return; + } + LOG.info(task.getLabel() + " - Request timed out after + " + + timeoutSec + " secs."); + context.yarn.removeContainerRequest(task.containerRequest); + context.group.dequeueAllocatingTask(task); + task.disposition = Task.Disposition.LAUNCH_FAILED; + task.completionTime = System.currentTimeMillis(); + transition(context, END); + context.group.taskEnded(context.task); + task.scheduler.requestTimedOut(); + } + } + + /** + * Task for which a container has been allocated and the task launch request + * sent. Awaiting confirmation that the task is running. + */ + + private static class LaunchingState extends TaskState { + protected LaunchingState() { + super(true, TaskLifecycleListener.Event.ALLOCATED, true); + } + + /** + * Handle launch failure. Results in a LAUNCHING --> END transition or + * restart. + * <p> + * This situation can occur, when debugging, if a timeout occurs after the + * allocation message, such as when, sitting in the debugger on the + * allocation event. + */ + + @Override + public void launchFailed(EventContext context, Throwable t) { + Task task = context.task; + LOG.info(task.getLabel() + " - Container start failed"); + context.task.error = t; + launchFailed(context); + } + + /** + * Handle LAUNCHING --> RUNNING/START_ACK. Indicates that YARN has confirmed + * that the task is, indeed, running. + */ + + @Override + public void containerStarted(EventContext context) { + Task task = context.task; + + // If this task is tracked (that is, it is a Drillbit which + // we monitor using ZK) then we have to decide if we've + // seen the task in the tracker yet. If we have, then the + // task is fully running. If we haven't, then we need to + // wait for the start acknowledgement. + + if (task.trackingState == Task.TrackingState.NEW) { + transition(context, WAIT_START_ACK); + } else { + transition(context, RUNNING); + } + task.error = null; + + // If someone came along and marked the task as cancelled, + // we are now done waiting for YARN so we can immediately + // turn around and kill the task. (Can't kill the task, + // however, until YARN starts it, hence the need to wait + // for YARN to start the task before killing it.) + + if (task.cancelled) { + transition(context, KILLING); + context.yarn.killContainer(task.getContainer()); + } + } + + /** + * Out-of-order start ACK, perhaps due to network latency. Handle by staying + * in this state, but later jump directly<br> + * LAUNCHING --> RUNNING + */ + + @Override + public void startAck(EventContext context) { + context.task.trackingState = Task.TrackingState.START_ACK; + } + + @Override + public void containerCompleted(EventContext context, + ContainerStatus status) { + // Seen on Mac when putting machine to sleep. + // Handle by failing & retrying. + completed(context, status); + endOrAck(context); + } + + @Override + public void cancel(EventContext context) { + context.task.cancel(); + context.yarn.killContainer(context.task.getContainer()); + } + + @Override + public void tick(EventContext context, long curTime) { + + // If we are canceling the task, and YARN has not reported container + // completion after some amount of time, just force failure. + + Task task = context.task; + if (task.isCancelled() + && task.cancellationTime + Task.MAX_CANCELLATION_TIME < curTime) { + LOG.error(task.getLabel() + " - Launch timed out after " + + Task.MAX_CANCELLATION_TIME / 1000 + " secs."); + launchFailed(context); + } + } + + private void launchFailed(EventContext context) { + Task task = context.task; + task.completionTime = System.currentTimeMillis(); + + // Not sure if releasing the container is needed... + + context.yarn.releaseContainer(task.container); + context.group.containerReleased(task); + task.container = null; + taskStartFailed(context, Disposition.LAUNCH_FAILED); + } + } + + /** + * Task has been launched, is tracked, but we've not yet received a start ack. + */ + + private static class WaitStartAckState extends TaskState { + protected WaitStartAckState() { + super(true, TaskLifecycleListener.Event.RUNNING, true); + } + + @Override + public void startAck(EventContext context) { + context.task.trackingState = Task.TrackingState.START_ACK; + transition(context, RUNNING); + } + + @Override + public void cancel(EventContext context) { + RUNNING.cancel(context); + } + + // @Override + // public void containerStopped(EventContext context) { + // transition(context, WAIT_COMPLETE ); + // } + + @Override + public void containerCompleted(EventContext context, + ContainerStatus status) { + completed(context, status); + taskTerminated(context); + } + + // TODO: Timeout in this state. + } + + /** + * Task in the normal running state. + */ + + private static class RunningState extends TaskState { + protected RunningState() { + super(true, TaskLifecycleListener.Event.RUNNING, true); + } + + /** + * Normal task completion. Implements the RUNNING --> END transition. + * + * @param status + */ + + @Override + public void containerCompleted(EventContext context, + ContainerStatus status) { + completed(context, status); + endOrAck(context); + } + + @Override + public void cancel(EventContext context) { + Task task = context.task; + task.cancel(); + if (context.group.requestStop(task)) { + transition(context, ENDING); + } else { + context.yarn.killContainer(task.container); + transition(context, KILLING); + } + } + + /** + * The task claims that it is complete, but we think it is running. Assume + * that the task has started its own graceful shutdown (or the + * equivalent).<br> + * RUNNING --> ENDING + */ + + @Override + public void completionAck(EventContext context) { + context.task.trackingState = Task.TrackingState.END_ACK; + transition(context, ENDING); + } + } + + /** + * Task for which a termination request has been sent to the Drill-bit, but + * confirmation has not yet been received from the Node Manager. (Not yet + * supported in the Drill-bit. + */ + + public static class EndingState extends TaskState { + protected EndingState() { super(true, TaskLifecycleListener.Event.RUNNING, false); } + + /* + * Normal ENDING --> WAIT_COMPLETE transition, awaiting Resource Manager + * confirmation. + */ + +// @Override +// public void containerStopped(EventContext context) { +// transition(context, WAIT_COMPLETE); +// } + + /** + * Normal ENDING --> WAIT_END_ACK | END transition. + * + * @param status + */ + + @Override + public void containerCompleted(EventContext context, + ContainerStatus status) { + completed(context, status); + endOrAck(context); + } + + @Override + public void cancel(EventContext context) { + context.task.cancel(); + } + + /** + * If the graceful stop process exceeds the maximum timeout, go ahead and + * forcibly kill the process. + */ + + @Override + public void tick(EventContext context, long curTime) { + Task task = context.task; + if (curTime - task.stateStartTime > task.taskGroup.getStopTimeoutMs()) { + context.yarn.killContainer(task.container); + transition(context, KILLING); + } + } + + @Override + public void completionAck(EventContext context) { + context.task.trackingState = Task.TrackingState.END_ACK; + } + } + + /** + * Task for which a forced termination request has been sent to the Node + * Manager, but a stop message has not yet been received. + */ + + public static class KillingState extends TaskState { + protected KillingState() { super(true, TaskLifecycleListener.Event.RUNNING, false); } + + /* + * Normal KILLING --> WAIT_COMPLETE transition, awaiting Resource Manager + * confirmation. + */ + +// @Override +// public void containerStopped(EventContext context) { +// transition(context, WAIT_COMPLETE); +// } + + /** + * Normal KILLING --> WAIT_END_ACK | END transition. + * + * @param status + */ + + @Override + public void containerCompleted(EventContext context, + ContainerStatus status) { + completed(context, status); + endOrAck(context); + } + + @Override + public void cancel(EventContext context) { + context.task.cancel(); + } + + @Override + public void startAck(EventContext context) { + // Better late than never... Happens during debugging sessions + // when order of messages is scrambled. + + context.task.trackingState = Task.TrackingState.START_ACK; + } + + @Override + public void completionAck(EventContext context) { + context.task.trackingState = Task.TrackingState.END_ACK; + } + + @Override + public void stopTaskFailed(EventContext context, Throwable t) { + assert false; + // What to do? + } + } + + /** + * Task exited, but we are waiting for confirmation from Zookeeper that + * the Drillbit registration has been removed. Required to associate + * ZK registrations with Drillbits. Ensures that we don't try to + * start a new Drillbit on a node until the previous Drillbit + * completely shut down, including dropping out of ZK. + */ + + private static class WaitEndAckState extends TaskState { + protected WaitEndAckState() { + super(false, TaskLifecycleListener.Event.RUNNING, false); + } + + @Override + public void cancel(EventContext context) { + context.task.cancel(); + } + + @Override + public void completionAck(EventContext context) { + context.task.trackingState = Task.TrackingState.END_ACK; + taskTerminated(context); + } + + /** + * Periodically check if the process is still live. We are supposed to + * receive events when the task becomes deregistered. But, we've seen + * cases where the task hangs in this state forever. Try to resolve + * the issue by polling periodically. + */ + + @Override + public void tick(EventContext context, long curTime) { + if(! context.getTaskManager().isLive(context)){ + taskTerminated(context); + } + } + } + + /** + * Task is completed or failed. The disposition field gives the details of the + * completion type. The task is not active on YARN, but could be retried. + */ + + private static class EndState extends TaskState { + protected EndState() { + super(false, TaskLifecycleListener.Event.ENDED, false); + } + + /* + * Ignore out-of-order Node Manager completion notices. + */ + + // @Override + // public void containerStopped(EventContext context) { + // } + + @Override + public void cancel(EventContext context) { + } + } + + private static final Log LOG = LogFactory.getLog(TaskState.class); + + public static final TaskState START = new StartState(); + public static final TaskState REQUESTING = new RequestingState(); + public static final TaskState LAUNCHING = new LaunchingState(); + public static final TaskState WAIT_START_ACK = new WaitStartAckState(); + public static final TaskState RUNNING = new RunningState(); + public static final TaskState ENDING = new EndingState(); + public static final TaskState KILLING = new KillingState(); + public static final TaskState WAIT_END_ACK = new WaitEndAckState(); + public static final TaskState END = new EndState(); + + protected final boolean hasContainer; + protected final TaskLifecycleListener.Event lifeCycleEvent; + protected final String label; + protected final boolean cancellable; + + public TaskState(boolean hasContainer, TaskLifecycleListener.Event lcEvent, + boolean cancellable) { + this.hasContainer = hasContainer; + lifeCycleEvent = lcEvent; + this.cancellable = cancellable; + String name = toString(); + name = name.replace("State", ""); + name = name.replaceAll("([a-z]+)([A-Z])", "$1_$2"); + label = name.toUpperCase(); + } + + protected void endOrAck(EventContext context) { + if (context.task.trackingState == Task.TrackingState.START_ACK) { + transition(context, WAIT_END_ACK); + } else { + taskTerminated(context); + } + } + + public void requestContainer(EventContext context) { + illegalState(context, "requestContainer"); + } + + /** + * Resource Manager reports that the task has been allocated a container. + * + * @param context + * @param container + */ + + public void containerAllocated(EventContext context, Container container) { + illegalState(context, "containerAllocated"); + } + + /** + * The launch of the container failed. + * + * @param context + * @param t + */ + + public void launchFailed(EventContext context, Throwable t) { + illegalState(context, "launchFailed"); + } + + /** + * Node Manager reports that the task has started execution. + * + * @param context + */ + + public void containerStarted(EventContext context) { + illegalState(context, "containerStarted"); + } + + /** + * The monitoring plugin has detected that the task has confirmed that it is + * fully started. + */ + + public void startAck(EventContext context) { + illegalState(context, "startAck"); + } + + /** + * The node manager request to stop a task failed. + * + * @param context + * @param t + */ + + public void stopTaskFailed(EventContext context, Throwable t) { + illegalState(context, "stopTaskFailed"); + } + + /** + * The monitoring plugin has detected that the task has confirmed that it has + * started shutdown. + */ + + public void completionAck(EventContext context) { + illegalState(context, "completionAck"); + } + + /** + * Node Manager reports that the task has stopped execution. We don't yet know + * if this was a success or failure. + * + * @param context + */ + + public void containerStopped(EventContext context) { + illegalState(context, "containerStopped"); + } + + /** + * Resource Manager reports that the task has completed execution and provided + * the completion status. + * + * @param context + * @param status + */ + + public void containerCompleted(EventContext context, ContainerStatus status) { + completed(context, status); + illegalState(context, "containerCompleted"); + } + + /** + * Cluster manager wishes to cancel this task. + * + * @param context + */ + + public void cancel(EventContext context) { + illegalState(context, "cancel"); + } + + public void tick(EventContext context, long curTime) { + // Ignore by default + } + + /** + * Implement a state transition, alerting any life cycle listeners and + * updating the log file. Marks the start time of the new state in support of + * states that implement a timeout. + * + * @param context + * @param newState + */ + + protected void transition(EventContext context, TaskState newState) { + TaskState oldState = context.task.state; + LOG.info(context.task.getLabel() + " " + oldState.toString() + " --> " + + newState.toString()); + context.task.state = newState; + if (newState.lifeCycleEvent != oldState.lifeCycleEvent) { + context.controller.fireLifecycleChange(newState.lifeCycleEvent, context); + } + context.task.stateStartTime = System.currentTimeMillis(); + } + + /** + * Task failed when starting. No container has been allocated. The task + * will go from:<br> + * * --> END + * <p> + * If the run failed, and the task can be retried, it may + * then move from<br> + * END --> STARTING + * @param context + * @param disposition + */ + + protected void taskStartFailed(EventContext context, + Disposition disposition) { + + // No container, so don't alert the task manager. + + assert context.task.container == null; + + context.getTaskManager().completed(context); + taskEnded(context, disposition); + retryTask(context); + } + + /** + * A running task terminated. It may have succeeded or failed, + * this method will determine which. + * <p> + * Every task goes from:<br> + * * --> END + * <p> + * If the run failed, and the task can be retried, it may + * then move from<br> + * END --> STARTING + * + * @param context + */ + + protected void taskTerminated(EventContext context) { + Task task = context.task; + + // Give the task manager a peek at the completed task. + // The task manager can override retry behavior. To + // cancel a task that would otherwise be retried, call + // cancel( ) on the task. + + context.getTaskManager().completed(context); + context.group.containerReleased(task); + assert task.completionStatus != null; + if (task.completionStatus.getExitStatus() == 0) { + taskEnded(context, Disposition.COMPLETED); + context.group.taskEnded(context.task); + } else { + taskEnded(context, Disposition.RUN_FAILED); + retryTask(context); + } + } + + /** + * Implements the details of marking a task as ended. Note, this method + * does not deregister the task with the scheduler state, we keep it + * registered in case we decide to retry. + * + * @param context + * @param disposition + */ + + private void taskEnded(EventContext context, Disposition disposition) { + Task task = context.task; + if (disposition == null) { + assert task.disposition != null; + } else { + task.disposition = disposition; + } + task.completionTime = System.currentTimeMillis(); + transition(context, END); + } + + /** + * Retry a task. Requires that the task currently be in the END state to provide + * clean state transitions. Will deregister the task if it cannot be retried + * because the cluster is ending or the task has failed too many times. + * Otherwise, starts the whole life cycle over again. + * + * @param context + */ + + private void retryTask(EventContext context) { + Task task = context.task; + assert task.state == END; + if (!context.controller.isLive() || !task.retryable()) { + context.group.taskEnded(task); + return; + } + if (task.tryCount > task.taskGroup.getMaxRetries()) { + LOG.error(task.getLabel() + " - Too many retries: " + task.tryCount); + task.disposition = Disposition.TOO_MANY_RETRIES; + context.group.taskEnded(task); + return; + } + LOG.info(task.getLabel() + " - Retrying task, try " + task.tryCount); + context.group.taskRetried(task); + task.reset(); + transition(context, START); + context.group.enqueuePendingRequest(task); + } + + /** + * An event is called in a state where it is not expected. Log it, ignore it + * and hope it goes away. + * + * @param action + */ + + private void illegalState(EventContext context, String action) { + // Intentionally assert: fails during debugging, soldiers on in production. + + assert false; + LOG.error(context.task.getLabel() + " - Action " + action + + " in wrong state: " + toString(), + new IllegalStateException("Action in wrong state")); + } + + protected void completed(EventContext context, ContainerStatus status) { + Task task = context.task; + String diag = status.getDiagnostics(); + LOG.trace( + task.getLabel() + " Completed, exit status: " + status.getExitStatus() + + (DoYUtil.isBlank(diag) ? "" : ": " + status.getDiagnostics())); + task.completionStatus = status; + } + + @Override + public String toString() { return getClass().getSimpleName(); } + + public boolean hasContainer() { return hasContainer; } + + public String getLabel() { return label; } + + public boolean isCancellable() { + return cancellable; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskVisitor.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskVisitor.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskVisitor.java new file mode 100644 index 0000000..c90d4f8 --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/TaskVisitor.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.appMaster; + +public interface TaskVisitor { + void visit(Task task); +} http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/YarnFacadeException.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/YarnFacadeException.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/YarnFacadeException.java new file mode 100644 index 0000000..8ac0a5d --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/YarnFacadeException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.appMaster; + +/** + * Exceptions thrown from the YARN facade: the wrapper around the YARN AM + * interfaces. + */ + +@SuppressWarnings("serial") +public class YarnFacadeException extends Exception { + public YarnFacadeException(String msg, Exception e) { + super(msg, e); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/http/AMSecurityManager.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/http/AMSecurityManager.java b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/http/AMSecurityManager.java new file mode 100644 index 0000000..fbca171 --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/appMaster/http/AMSecurityManager.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.appMaster.http; + +/** + * Security manager for the Application Master. Allows a variety + * of security systems, including Drill's user authentication + * and DoY's static user/password, or an open AM web UI. + */ + +public interface AMSecurityManager { + void init(); + + boolean requiresLogin(); + + boolean login(String user, String password); + + void close(); +}
