http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/TaskReconciler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskReconciler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskReconciler.java deleted file mode 100644 index 68d2e77..0000000 --- a/src/main/java/org/apache/aurora/scheduler/async/TaskReconciler.java +++ /dev/null @@ -1,155 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async; - -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.atomic.AtomicLong; - -import javax.inject.Inject; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableSet; -import com.google.common.util.concurrent.AbstractIdleService; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.stats.StatsProvider; - -import org.apache.aurora.scheduler.base.Query; -import org.apache.aurora.scheduler.base.Tasks; -import org.apache.aurora.scheduler.mesos.Driver; -import org.apache.aurora.scheduler.storage.Storage; -import org.apache.aurora.scheduler.storage.entities.IScheduledTask; -import org.apache.mesos.Protos; - -import static java.util.Objects.requireNonNull; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.twitter.common.quantity.Time.MINUTES; - -/** - * A task reconciler that periodically triggers Mesos (implicit) and Aurora (explicit) task - * reconciliation to synchronize global task states. More on task reconciliation: - * http://mesos.apache.org/documentation/latest/reconciliation. - */ -public class TaskReconciler extends AbstractIdleService { - - @VisibleForTesting - static final String EXPLICIT_STAT_NAME = "reconciliation_explicit_runs"; - - @VisibleForTesting - static final String IMPLICIT_STAT_NAME = "reconciliation_implicit_runs"; - - private final TaskReconcilerSettings settings; - private final Storage storage; - private final Driver driver; - private final ScheduledExecutorService executor; - private final AtomicLong explicitRuns; - private final AtomicLong implicitRuns; - - static class TaskReconcilerSettings { - private final Amount<Long, Time> explicitInterval; - private final Amount<Long, Time> implicitInterval; - private final long explicitDelayMinutes; - private final long implicitDelayMinutes; - - @VisibleForTesting - TaskReconcilerSettings( - Amount<Long, Time> initialDelay, - Amount<Long, Time> explicitInterval, - Amount<Long, Time> implicitInterval, - Amount<Long, Time> scheduleSpread) { - - this.explicitInterval = requireNonNull(explicitInterval); - this.implicitInterval = requireNonNull(implicitInterval); - explicitDelayMinutes = requireNonNull(initialDelay).as(MINUTES); - implicitDelayMinutes = initialDelay.as(MINUTES) + scheduleSpread.as(MINUTES); - checkArgument( - explicitDelayMinutes >= 0, - "Invalid explicit reconciliation delay: " + explicitDelayMinutes); - checkArgument( - implicitDelayMinutes >= 0L, - "Invalid implicit reconciliation delay: " + implicitDelayMinutes); - } - } - - @Inject - TaskReconciler( - TaskReconcilerSettings settings, - Storage storage, - Driver driver, - ScheduledExecutorService executor, - StatsProvider stats) { - - this.settings = requireNonNull(settings); - this.storage = requireNonNull(storage); - this.driver = requireNonNull(driver); - this.executor = requireNonNull(executor); - this.explicitRuns = stats.makeCounter(EXPLICIT_STAT_NAME); - this.implicitRuns = stats.makeCounter(IMPLICIT_STAT_NAME); - } - - @Override - protected void startUp() { - // Schedule explicit reconciliation. - executor.scheduleAtFixedRate( - new Runnable() { - @Override - public void run() { - ImmutableSet<Protos.TaskStatus> active = FluentIterable - .from(Storage.Util.fetchTasks( - storage, - Query.unscoped().byStatus(Tasks.SLAVE_ASSIGNED_STATES))) - .transform(TASK_TO_PROTO) - .toSet(); - - driver.reconcileTasks(active); - explicitRuns.incrementAndGet(); - } - }, - settings.explicitDelayMinutes, - settings.explicitInterval.as(MINUTES), - MINUTES.getTimeUnit()); - - // Schedule implicit reconciliation. - executor.scheduleAtFixedRate( - new Runnable() { - @Override - public void run() { - driver.reconcileTasks(ImmutableSet.of()); - implicitRuns.incrementAndGet(); - } - }, - settings.implicitDelayMinutes, - settings.implicitInterval.as(MINUTES), - MINUTES.getTimeUnit()); - } - - @Override - protected void shutDown() { - // Nothing to do - await VM shutdown. - } - - @VisibleForTesting - static final Function<IScheduledTask, Protos.TaskStatus> TASK_TO_PROTO = - t -> Protos.TaskStatus.newBuilder() - // TODO(maxim): State is required by protobuf but ignored by Mesos for reconciliation - // purposes. This is the artifact of the native API. The new HTTP Mesos API will be - // accepting task IDs instead. AURORA-1326 tracks solution on the scheduler side. - // Setting TASK_RUNNING as a safe dummy value here. - .setState(Protos.TaskState.TASK_RUNNING) - .setTaskId(Protos.TaskID.newBuilder().setValue(t.getAssignedTask().getTaskId()).build()) - .build(); -}
http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java deleted file mode 100644 index a500e55..0000000 --- a/src/main/java/org/apache/aurora/scheduler/async/TaskScheduler.java +++ /dev/null @@ -1,247 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async; - -import java.lang.annotation.Retention; -import java.lang.annotation.Target; -import java.util.concurrent.atomic.AtomicLong; -import java.util.logging.Level; -import java.util.logging.Logger; - -import javax.inject.Inject; -import javax.inject.Qualifier; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Optional; -import com.google.common.collect.Iterables; -import com.google.common.eventbus.Subscribe; -import com.twitter.common.inject.TimedInterceptor.Timed; -import com.twitter.common.stats.Stats; - -import org.apache.aurora.scheduler.HostOffer; -import org.apache.aurora.scheduler.async.preemptor.BiCache; -import org.apache.aurora.scheduler.async.preemptor.Preemptor; -import org.apache.aurora.scheduler.base.Query; -import org.apache.aurora.scheduler.base.TaskGroupKey; -import org.apache.aurora.scheduler.base.Tasks; -import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; -import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; -import org.apache.aurora.scheduler.filter.AttributeAggregate; -import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; -import org.apache.aurora.scheduler.state.StateManager; -import org.apache.aurora.scheduler.state.TaskAssigner; -import org.apache.aurora.scheduler.state.TaskAssigner.Assignment; -import org.apache.aurora.scheduler.storage.Storage; -import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; -import org.apache.aurora.scheduler.storage.Storage.MutateWork; -import org.apache.aurora.scheduler.storage.entities.IAssignedTask; -import org.apache.aurora.scheduler.storage.entities.ITaskConfig; - -import static java.lang.annotation.ElementType.FIELD; -import static java.lang.annotation.ElementType.METHOD; -import static java.lang.annotation.ElementType.PARAMETER; -import static java.lang.annotation.RetentionPolicy.RUNTIME; -import static java.util.Objects.requireNonNull; - -import static org.apache.aurora.gen.ScheduleStatus.LOST; -import static org.apache.aurora.gen.ScheduleStatus.PENDING; - -/** - * Enables scheduling and preemption of tasks. - */ -public interface TaskScheduler extends EventSubscriber { - - /** - * Attempts to schedule a task, possibly performing irreversible actions. - * - * @param taskId The task to attempt to schedule. - * @return {@code true} if the task was scheduled, {@code false} otherwise. The caller should - * call schedule again if {@code false} is returned. - */ - boolean schedule(String taskId); - - /** - * An asynchronous task scheduler. Scheduling of tasks is performed on a delay, where each task - * backs off after a failed scheduling attempt. - * <p> - * Pending tasks are advertised to the scheduler via internal pubsub notifications. - */ - class TaskSchedulerImpl implements TaskScheduler { - /** - * Binding annotation for the time duration of reservations. - */ - @VisibleForTesting - @Qualifier - @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) - public @interface ReservationDuration { } - - private static final Logger LOG = Logger.getLogger(TaskSchedulerImpl.class.getName()); - - private final Storage storage; - private final StateManager stateManager; - private final TaskAssigner assigner; - private final OfferManager offerManager; - private final Preemptor preemptor; - private final BiCache<String, TaskGroupKey> reservations; - - private final AtomicLong attemptsFired = Stats.exportLong("schedule_attempts_fired"); - private final AtomicLong attemptsFailed = Stats.exportLong("schedule_attempts_failed"); - private final AtomicLong attemptsNoMatch = Stats.exportLong("schedule_attempts_no_match"); - - @Inject - TaskSchedulerImpl( - Storage storage, - StateManager stateManager, - TaskAssigner assigner, - OfferManager offerManager, - Preemptor preemptor, - BiCache<String, TaskGroupKey> reservations) { - - this.storage = requireNonNull(storage); - this.stateManager = requireNonNull(stateManager); - this.assigner = requireNonNull(assigner); - this.offerManager = requireNonNull(offerManager); - this.preemptor = requireNonNull(preemptor); - this.reservations = requireNonNull(reservations); - } - - private Function<HostOffer, Assignment> getAssignerFunction( - final MutableStoreProvider storeProvider, - final ResourceRequest resourceRequest, - final String taskId) { - - // TODO(wfarner): Turn this into Predicate<Offer>, and in the caller, find the first match - // and perform the assignment at the very end. This will allow us to use optimistic locking - // at the top of the stack and avoid holding the write lock for too long. - return new Function<HostOffer, Assignment>() { - @Override - public Assignment apply(HostOffer offer) { - Optional<TaskGroupKey> reservation = - reservations.get(offer.getOffer().getSlaveId().getValue()); - - if (reservation.isPresent()) { - if (TaskGroupKey.from(resourceRequest.getTask()).equals(reservation.get())) { - // Slave is reserved to satisfy this task group. - return assigner.maybeAssign(storeProvider, offer, resourceRequest, taskId); - } else { - // Slave is reserved for another task. - return Assignment.failure(); - } - } else { - // Slave is not reserved. - return assigner.maybeAssign(storeProvider, offer, resourceRequest, taskId); - } - } - }; - } - - @VisibleForTesting - static final Optional<String> LAUNCH_FAILED_MSG = - Optional.of("Unknown exception attempting to schedule task."); - - @Timed("task_schedule_attempt") - @Override - public boolean schedule(final String taskId) { - attemptsFired.incrementAndGet(); - try { - return storage.write(new MutateWork.Quiet<Boolean>() { - @Override - public Boolean apply(MutableStoreProvider store) { - return scheduleTask(store, taskId); - } - }); - } catch (RuntimeException e) { - // We catch the generic unchecked exception here to ensure tasks are not abandoned - // if there is a transient issue resulting in an unchecked exception. - LOG.log(Level.WARNING, "Task scheduling unexpectedly failed, will be retried", e); - attemptsFailed.incrementAndGet(); - return false; - } - } - - @Timed("task_schedule_attempt_locked") - protected boolean scheduleTask(MutableStoreProvider store, String taskId) { - LOG.fine("Attempting to schedule task " + taskId); - IAssignedTask assignedTask = Iterables.getOnlyElement( - Iterables.transform( - store.getTaskStore().fetchTasks(Query.taskScoped(taskId).byStatus(PENDING)), - Tasks.SCHEDULED_TO_ASSIGNED), - null); - - if (assignedTask == null) { - LOG.warning("Failed to look up task " + taskId + ", it may have been deleted."); - } else { - ITaskConfig task = assignedTask.getTask(); - AttributeAggregate aggregate = AttributeAggregate.getJobActiveState(store, task.getJob()); - try { - boolean launched = offerManager.launchFirst( - getAssignerFunction(store, new ResourceRequest(task, aggregate), taskId), - TaskGroupKey.from(task)); - - if (!launched) { - // Task could not be scheduled. - // TODO(maxim): Now that preemption slots are searched asynchronously, consider - // retrying a launch attempt within the current scheduling round IFF a reservation is - // available. - maybePreemptFor(assignedTask, aggregate, store); - attemptsNoMatch.incrementAndGet(); - return false; - } - } catch (OfferManager.LaunchException e) { - LOG.log(Level.WARNING, "Failed to launch task.", e); - attemptsFailed.incrementAndGet(); - - // The attempt to schedule the task failed, so we need to backpedal on the - // assignment. - // It is in the LOST state and a new task will move to PENDING to replace it. - // Should the state change fail due to storage issues, that's okay. The task will - // time out in the ASSIGNED state and be moved to LOST. - stateManager.changeState( - store, - taskId, - Optional.of(PENDING), - LOST, - LAUNCH_FAILED_MSG); - } - } - - return true; - } - - private void maybePreemptFor( - IAssignedTask task, - AttributeAggregate jobState, - MutableStoreProvider storeProvider) { - - if (!reservations.getByValue(TaskGroupKey.from(task.getTask())).isEmpty()) { - return; - } - Optional<String> slaveId = preemptor.attemptPreemptionFor(task, jobState, storeProvider); - if (slaveId.isPresent()) { - reservations.put(slaveId.get(), TaskGroupKey.from(task.getTask())); - } - } - - @Subscribe - public void taskChanged(final TaskStateChange stateChangeEvent) { - if (Optional.of(PENDING).equals(stateChangeEvent.getOldState())) { - IAssignedTask assigned = stateChangeEvent.getTask().getAssignedTask(); - if (assigned.getSlaveId() != null) { - reservations.remove(assigned.getSlaveId(), TaskGroupKey.from(assigned.getTask())); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/TaskThrottler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskThrottler.java b/src/main/java/org/apache/aurora/scheduler/async/TaskThrottler.java deleted file mode 100644 index c8f2005..0000000 --- a/src/main/java/org/apache/aurora/scheduler/async/TaskThrottler.java +++ /dev/null @@ -1,96 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async; - -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import javax.inject.Inject; - -import com.google.common.base.Optional; -import com.google.common.eventbus.Subscribe; -import com.twitter.common.stats.SlidingStats; -import com.twitter.common.util.Clock; - -import org.apache.aurora.scheduler.base.Tasks; -import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; -import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; -import org.apache.aurora.scheduler.state.StateManager; -import org.apache.aurora.scheduler.storage.Storage; - -import static java.util.Objects.requireNonNull; - -import static org.apache.aurora.gen.ScheduleStatus.PENDING; -import static org.apache.aurora.gen.ScheduleStatus.THROTTLED; - -/** - * A holding area for tasks that have been throttled. Tasks entering the - * {@link org.apache.aurora.gen.ScheduleStatus#THROTTLED} state will be transitioned to - * {@link org.apache.aurora.gen.ScheduleStatus#PENDING} after the penalty period (as dictated by - * {@link RescheduleCalculator} has expired. - */ -class TaskThrottler implements EventSubscriber { - - private final RescheduleCalculator rescheduleCalculator; - private final Clock clock; - private final ScheduledExecutorService executor; - private final Storage storage; - private final StateManager stateManager; - - private final SlidingStats throttleStats = new SlidingStats("task_throttle", "ms"); - - @Inject - TaskThrottler( - RescheduleCalculator rescheduleCalculator, - Clock clock, - ScheduledExecutorService executor, - Storage storage, - StateManager stateManager) { - - this.rescheduleCalculator = requireNonNull(rescheduleCalculator); - this.clock = requireNonNull(clock); - this.executor = requireNonNull(executor); - this.storage = requireNonNull(storage); - this.stateManager = requireNonNull(stateManager); - } - - @Subscribe - public void taskChangedState(final TaskStateChange stateChange) { - if (stateChange.getNewState() == THROTTLED) { - long readyAtMs = Tasks.getLatestEvent(stateChange.getTask()).getTimestamp() - + rescheduleCalculator.getFlappingPenaltyMs(stateChange.getTask()); - long delayMs = Math.max(0, readyAtMs - clock.nowMillis()); - throttleStats.accumulate(delayMs); - executor.schedule( - new Runnable() { - @Override - public void run() { - storage.write(new Storage.MutateWork.NoResult.Quiet() { - @Override - protected void execute(Storage.MutableStoreProvider storeProvider) { - stateManager.changeState( - storeProvider, - stateChange.getTaskId(), - Optional.of(THROTTLED), - PENDING, - Optional.absent()); - } - }); - } - }, - delayMs, - TimeUnit.MILLISECONDS); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java b/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java deleted file mode 100644 index e250f33..0000000 --- a/src/main/java/org/apache/aurora/scheduler/async/TaskTimeout.java +++ /dev/null @@ -1,157 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async; - -import java.util.EnumSet; -import java.util.Set; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.atomic.AtomicLong; -import java.util.logging.Logger; - -import javax.inject.Inject; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.eventbus.Subscribe; -import com.google.common.util.concurrent.AbstractIdleService; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.stats.StatsProvider; - -import org.apache.aurora.gen.ScheduleStatus; -import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; -import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; -import org.apache.aurora.scheduler.state.StateChangeResult; -import org.apache.aurora.scheduler.state.StateManager; -import org.apache.aurora.scheduler.storage.Storage; - -import static java.util.Objects.requireNonNull; - -import static org.apache.aurora.scheduler.storage.Storage.MutateWork; - -/** - * Observes task transitions and identifies tasks that are 'stuck' in a transient state. Stuck - * tasks will be transitioned to the LOST state. - */ -class TaskTimeout extends AbstractIdleService implements EventSubscriber { - private static final Logger LOG = Logger.getLogger(TaskTimeout.class.getName()); - - @VisibleForTesting - static final Amount<Long, Time> NOT_STARTED_RETRY = Amount.of(5L, Time.SECONDS); - - @VisibleForTesting - static final String TIMED_OUT_TASKS_COUNTER = "timed_out_tasks"; - - @VisibleForTesting - static final Optional<String> TIMEOUT_MESSAGE = Optional.of("Task timed out"); - - @VisibleForTesting - static final Set<ScheduleStatus> TRANSIENT_STATES = EnumSet.of( - ScheduleStatus.ASSIGNED, - ScheduleStatus.PREEMPTING, - ScheduleStatus.RESTARTING, - ScheduleStatus.KILLING, - ScheduleStatus.DRAINING); - - private final ScheduledExecutorService executor; - private final Storage storage; - private final StateManager stateManager; - private final Amount<Long, Time> timeout; - private final AtomicLong timedOutTasks; - - @Inject - TaskTimeout( - ScheduledExecutorService executor, - Storage storage, - StateManager stateManager, - Amount<Long, Time> timeout, - StatsProvider statsProvider) { - - this.executor = requireNonNull(executor); - this.storage = requireNonNull(storage); - this.stateManager = requireNonNull(stateManager); - this.timeout = requireNonNull(timeout); - this.timedOutTasks = statsProvider.makeCounter(TIMED_OUT_TASKS_COUNTER); - } - - private static boolean isTransient(ScheduleStatus status) { - return TRANSIENT_STATES.contains(status); - } - - @Override - protected void startUp() { - // No work to do here for startup, however we leverage the state tracking in - // AbstractIdleService. - } - - @Override - protected void shutDown() { - // Nothing to do for shutting down. - } - - private class TimedOutTaskHandler implements Runnable { - private final String taskId; - private final ScheduleStatus newState; - - TimedOutTaskHandler(String taskId, ScheduleStatus newState) { - this.taskId = taskId; - this.newState = newState; - } - - @Override - public void run() { - if (isRunning()) { - // This query acts as a CAS by including the state that we expect the task to be in - // if the timeout is still valid. Ideally, the future would have already been - // canceled, but in the event of a state transition race, including transientState - // prevents an unintended task timeout. - // Note: This requires LOST transitions trigger Driver.killTask. - StateChangeResult result = storage.write(new MutateWork.Quiet<StateChangeResult>() { - @Override - public StateChangeResult apply(Storage.MutableStoreProvider storeProvider) { - return stateManager.changeState( - storeProvider, - taskId, - Optional.of(newState), - ScheduleStatus.LOST, - TIMEOUT_MESSAGE); - } - }); - - if (result == StateChangeResult.SUCCESS) { - LOG.info("Timeout reached for task " + taskId + ":" + taskId); - timedOutTasks.incrementAndGet(); - } - } else { - // Our service is not yet started. We don't want to lose track of the task, so - // we will try again later. - LOG.fine("Retrying timeout of task " + taskId + " in " + NOT_STARTED_RETRY); - executor.schedule( - this, - NOT_STARTED_RETRY.getValue(), - NOT_STARTED_RETRY.getUnit().getTimeUnit()); - } - } - } - - @Subscribe - public void recordStateChange(TaskStateChange change) { - if (isTransient(change.getNewState())) { - executor.schedule( - new TimedOutTaskHandler(change.getTaskId(), change.getNewState()), - timeout.getValue(), - timeout.getUnit().getTimeUnit()); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/preemptor/BiCache.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/BiCache.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/BiCache.java deleted file mode 100644 index 382099f..0000000 --- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/BiCache.java +++ /dev/null @@ -1,139 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async.preemptor; - -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import javax.inject.Inject; - -import com.google.common.base.Optional; -import com.google.common.base.Supplier; -import com.google.common.base.Ticker; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Multimap; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.stats.StatsProvider; -import com.twitter.common.util.Clock; - -import static java.util.Objects.requireNonNull; - -/** - * A bi-directional cache of items. Entries are purged from cache after - * {@link BiCacheSettings#expireAfter}. - * - * @param <K> Key type. - * @param <V> Value type. - */ -public class BiCache<K, V> { - - public static class BiCacheSettings { - private final Amount<Long, Time> expireAfter; - private final String cacheSizeStatName; - - public BiCacheSettings(Amount<Long, Time> expireAfter, String cacheSizeStatName) { - this.expireAfter = requireNonNull(expireAfter); - this.cacheSizeStatName = requireNonNull(cacheSizeStatName); - } - } - - private final Cache<K, V> cache; - private final Multimap<V, K> inverse = HashMultimap.create(); - - @Inject - public BiCache( - StatsProvider statsProvider, - BiCacheSettings settings, - final Clock clock) { - - requireNonNull(clock); - this.cache = CacheBuilder.newBuilder() - .expireAfterWrite(settings.expireAfter.as(Time.MINUTES), TimeUnit.MINUTES) - .ticker(new Ticker() { - @Override - public long read() { - return clock.nowNanos(); - } - }) - .removalListener(new RemovalListener<K, V>() { - @Override - public void onRemoval(RemovalNotification<K, V> notification) { - inverse.remove(notification.getValue(), notification.getKey()); - } - }) - .build(); - - statsProvider.makeGauge( - settings.cacheSizeStatName, - new Supplier<Long>() { - @Override - public Long get() { - return cache.size(); - } - }); - } - - /** - * Puts a new key/value pair. - * - * @param key Key to add. - * @param value Value to add. - */ - public synchronized void put(K key, V value) { - requireNonNull(key); - requireNonNull(value); - cache.put(key, value); - inverse.put(value, key); - } - - /** - * Gets a cached value by key. - * - * @param key Key to get value for. - * @return Optional of value. - */ - public synchronized Optional<V> get(K key) { - return Optional.fromNullable(cache.getIfPresent(key)); - } - - /** - * Gets a set of keys for a given value. - * - * @param value Value to get all keys for. - * @return An {@link Iterable} of keys or empty if value does not exist. - */ - public synchronized Set<K> getByValue(V value) { - // Cache items are lazily removed by routine maintenance operations during get/write access. - // Forcing cleanup here to ensure proper data integrity. - cache.cleanUp(); - return ImmutableSet.copyOf(inverse.get(value)); - } - - /** - * Removes a key/value pair from cache. - * - * @param key Key to remove. - * @param value Value to remove. - */ - public synchronized void remove(K key, V value) { - inverse.remove(value, key); - cache.invalidate(key); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java deleted file mode 100644 index 38610b2..0000000 --- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterState.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async.preemptor; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Multimap; - -/** - * A facade for the preemptor to gain access to the state of scheduled tasks in the cluster. - */ -@VisibleForTesting -public interface ClusterState { - - /** - * Gets a snapshot of the active tasks in the cluster, indexed by the slave IDs they are - * assigned to. - * <p> - * TODO(wfarner): Return a more minimal type than IAssignedTask here. - * - * @return Active tasks and their associated slave IDs. - */ - Multimap<String, PreemptionVictim> getSlavesToActiveTasks(); -} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterStateImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterStateImpl.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterStateImpl.java deleted file mode 100644 index d7a0c54..0000000 --- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/ClusterStateImpl.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async.preemptor; - -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; -import com.google.common.collect.Multimaps; -import com.google.common.eventbus.Subscribe; - -import org.apache.aurora.scheduler.base.Tasks; -import org.apache.aurora.scheduler.events.PubsubEvent; -import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; - -/** - * A cached view of cluster state, kept up to date by pubsub notifications. - */ -public class ClusterStateImpl implements ClusterState, PubsubEvent.EventSubscriber { - - private final Multimap<String, PreemptionVictim> victims = - Multimaps.synchronizedMultimap(HashMultimap.create()); - - @Override - public Multimap<String, PreemptionVictim> getSlavesToActiveTasks() { - return Multimaps.unmodifiableMultimap(victims); - } - - @Subscribe - public void taskChangedState(TaskStateChange stateChange) { - synchronized (victims) { - String slaveId = stateChange.getTask().getAssignedTask().getSlaveId(); - PreemptionVictim victim = PreemptionVictim.fromTask(stateChange.getTask().getAssignedTask()); - if (Tasks.SLAVE_ASSIGNED_STATES.contains(stateChange.getNewState())) { - victims.put(slaveId, victim); - } else { - victims.remove(slaveId, victim); - } - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java deleted file mode 100644 index 1f1eb4c..0000000 --- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PendingTaskProcessor.java +++ /dev/null @@ -1,258 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async.preemptor; - -import java.lang.annotation.Retention; -import java.lang.annotation.Target; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import javax.inject.Inject; -import javax.inject.Qualifier; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Functions; -import com.google.common.base.Optional; -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.HashMultiset; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Multimap; -import com.google.common.collect.Multiset; -import com.google.common.collect.Sets; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; -import com.twitter.common.util.Clock; - -import org.apache.aurora.scheduler.HostOffer; -import org.apache.aurora.scheduler.async.OfferManager; -import org.apache.aurora.scheduler.base.Query; -import org.apache.aurora.scheduler.base.TaskGroupKey; -import org.apache.aurora.scheduler.base.Tasks; -import org.apache.aurora.scheduler.filter.AttributeAggregate; -import org.apache.aurora.scheduler.storage.Storage; -import org.apache.aurora.scheduler.storage.Storage.StoreProvider; -import org.apache.aurora.scheduler.storage.entities.IAssignedTask; -import org.apache.aurora.scheduler.storage.entities.IJobKey; -import org.apache.aurora.scheduler.storage.entities.IScheduledTask; -import org.apache.aurora.scheduler.storage.entities.ITaskConfig; - -import static java.lang.annotation.ElementType.FIELD; -import static java.lang.annotation.ElementType.METHOD; -import static java.lang.annotation.ElementType.PARAMETER; -import static java.lang.annotation.RetentionPolicy.RUNTIME; -import static java.util.Objects.requireNonNull; - -import static org.apache.aurora.gen.ScheduleStatus.PENDING; -import static org.apache.aurora.scheduler.base.Tasks.SCHEDULED_TO_ASSIGNED; - -/** - * Attempts to find preemption slots for all PENDING tasks eligible for preemption. - */ -@VisibleForTesting -public class PendingTaskProcessor implements Runnable { - private final Storage storage; - private final OfferManager offerManager; - private final PreemptionVictimFilter preemptionVictimFilter; - private final PreemptorMetrics metrics; - private final Amount<Long, Time> preemptionCandidacyDelay; - private final BiCache<PreemptionProposal, TaskGroupKey> slotCache; - private final ClusterState clusterState; - private final Clock clock; - - /** - * Binding annotation for the time interval after which a pending task becomes eligible to - * preempt other tasks. To avoid excessive churn, the preemptor requires that a task is PENDING - * for a duration (dictated by {@link #preemptionCandidacyDelay}) before it becomes eligible - * to preempt other tasks. - */ - @VisibleForTesting - @Qualifier - @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) - public @interface PreemptionDelay { } - - @Inject - PendingTaskProcessor( - Storage storage, - OfferManager offerManager, - PreemptionVictimFilter preemptionVictimFilter, - PreemptorMetrics metrics, - @PreemptionDelay Amount<Long, Time> preemptionCandidacyDelay, - BiCache<PreemptionProposal, TaskGroupKey> slotCache, - ClusterState clusterState, - Clock clock) { - - this.storage = requireNonNull(storage); - this.offerManager = requireNonNull(offerManager); - this.preemptionVictimFilter = requireNonNull(preemptionVictimFilter); - this.metrics = requireNonNull(metrics); - this.preemptionCandidacyDelay = requireNonNull(preemptionCandidacyDelay); - this.slotCache = requireNonNull(slotCache); - this.clusterState = requireNonNull(clusterState); - this.clock = requireNonNull(clock); - } - - @Override - public void run() { - metrics.recordTaskProcessorRun(); - storage.read(new Storage.Work.Quiet<Void>() { - @Override - public Void apply(StoreProvider store) { - Multimap<String, PreemptionVictim> slavesToActiveTasks = - clusterState.getSlavesToActiveTasks(); - - if (slavesToActiveTasks.isEmpty()) { - // No preemption victims to consider. - return null; - } - - // Group the offers by slave id so they can be paired with active tasks from the same slave. - Map<String, HostOffer> slavesToOffers = - Maps.uniqueIndex(offerManager.getOffers(), OFFER_TO_SLAVE_ID); - - Set<String> allSlaves = Sets.newHashSet(Iterables.concat( - slavesToOffers.keySet(), - slavesToActiveTasks.keySet())); - - // The algorithm below attempts to find a reservation for every task group by matching - // it against all available slaves until a preemption slot is found. Groups are evaluated - // in a round-robin fashion to ensure fairness (e.g.: G1, G2, G3, G1, G2). - // A slave is removed from further matching once a reservation is made. Similarly, all - // identical task group instances are removed from further iteration if none of the - // available slaves could yield a preemption proposal. A consuming iterator is used for - // task groups to ensure iteration order is preserved after a task group is removed. - LoadingCache<IJobKey, AttributeAggregate> jobStates = attributeCache(store); - List<TaskGroupKey> pendingGroups = fetchIdlePendingGroups(store); - Iterator<TaskGroupKey> groups = Iterators.consumingIterator(pendingGroups.iterator()); - while (!pendingGroups.isEmpty()) { - boolean matched = false; - TaskGroupKey group = groups.next(); - ITaskConfig task = group.getTask(); - - metrics.recordPreemptionAttemptFor(task); - Iterator<String> slaveIterator = allSlaves.iterator(); - while (slaveIterator.hasNext()) { - String slaveId = slaveIterator.next(); - Optional<ImmutableSet<PreemptionVictim>> candidates = - preemptionVictimFilter.filterPreemptionVictims( - task, - slavesToActiveTasks.get(slaveId), - jobStates.getUnchecked(task.getJob()), - Optional.fromNullable(slavesToOffers.get(slaveId)), - store); - - metrics.recordSlotSearchResult(candidates, task); - if (candidates.isPresent()) { - // Slot found -> remove slave to avoid multiple task reservations. - slaveIterator.remove(); - slotCache.put(new PreemptionProposal(candidates.get(), slaveId), group); - matched = true; - break; - } - } - if (!matched) { - // No slot found for the group -> remove group and reset group iterator. - pendingGroups.removeAll(ImmutableSet.of(group)); - groups = Iterators.consumingIterator(pendingGroups.iterator()); - } - } - return null; - } - }); - } - - private List<TaskGroupKey> fetchIdlePendingGroups(StoreProvider store) { - Multiset<TaskGroupKey> taskGroupCounts = HashMultiset.create( - FluentIterable.from(store.getTaskStore().fetchTasks(Query.statusScoped(PENDING))) - .filter(Predicates.and(isIdleTask, Predicates.not(hasCachedSlot))) - .transform(Functions.compose(ASSIGNED_TO_GROUP_KEY, SCHEDULED_TO_ASSIGNED))); - - return getPreemptionSequence(taskGroupCounts); - } - - /** - * Creates execution sequence for pending task groups by interleaving their unique occurrences. - * For example: {G1, G1, G1, G2, G2} will be converted into {G1, G2, G1, G2, G1}. - * - * @param groups Multiset of task groups. - * @return A task group execution sequence. - */ - private static List<TaskGroupKey> getPreemptionSequence(Multiset<TaskGroupKey> groups) { - Multiset<TaskGroupKey> mutableGroups = HashMultiset.create(groups); - List<TaskGroupKey> instructions = Lists.newLinkedList(); - Set<TaskGroupKey> keys = ImmutableSet.copyOf(groups.elementSet()); - while (!mutableGroups.isEmpty()) { - for (TaskGroupKey key : keys) { - if (mutableGroups.contains(key)) { - instructions.add(key); - mutableGroups.remove(key); - } - } - } - - return instructions; - } - - private LoadingCache<IJobKey, AttributeAggregate> attributeCache(final StoreProvider store) { - return CacheBuilder.newBuilder().build(CacheLoader.from( - new Function<IJobKey, AttributeAggregate>() { - @Override - public AttributeAggregate apply(IJobKey job) { - return AttributeAggregate.getJobActiveState(store, job); - } - })); - } - - private static final Function<IAssignedTask, TaskGroupKey> ASSIGNED_TO_GROUP_KEY = - new Function<IAssignedTask, TaskGroupKey>() { - @Override - public TaskGroupKey apply(IAssignedTask task) { - return TaskGroupKey.from(task.getTask()); - } - }; - - private final Predicate<IScheduledTask> hasCachedSlot = new Predicate<IScheduledTask>() { - @Override - public boolean apply(IScheduledTask task) { - return !slotCache.getByValue(TaskGroupKey.from(task.getAssignedTask().getTask())).isEmpty(); - } - }; - - private final Predicate<IScheduledTask> isIdleTask = new Predicate<IScheduledTask>() { - @Override - public boolean apply(IScheduledTask task) { - return (clock.nowMillis() - Tasks.getLatestEvent(task).getTimestamp()) - >= preemptionCandidacyDelay.as(Time.MILLISECONDS); - } - }; - - private static final Function<HostOffer, String> OFFER_TO_SLAVE_ID = - new Function<HostOffer, String>() { - @Override - public String apply(HostOffer offer) { - return offer.getOffer().getSlaveId().getValue(); - } - }; -} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionProposal.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionProposal.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionProposal.java deleted file mode 100644 index 7a03168..0000000 --- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionProposal.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async.preemptor; - -import java.util.Objects; -import java.util.Set; - -import com.google.common.collect.ImmutableSet; - -import static java.util.Objects.requireNonNull; - -/** - * A set of tasks proposed for preemption on a given slave. - */ -class PreemptionProposal { - private final Set<PreemptionVictim> victims; - private final String slaveId; - - PreemptionProposal(ImmutableSet<PreemptionVictim> victims, String slaveId) { - this.victims = requireNonNull(victims); - this.slaveId = requireNonNull(slaveId); - } - - Set<PreemptionVictim> getVictims() { - return victims; - } - - String getSlaveId() { - return slaveId; - } - - @Override - public boolean equals(Object o) { - if (!(o instanceof PreemptionProposal)) { - return false; - } - - PreemptionProposal other = (PreemptionProposal) o; - return Objects.equals(getVictims(), other.getVictims()) - && Objects.equals(getSlaveId(), other.getSlaveId()); - } - - @Override - public int hashCode() { - return Objects.hash(victims, slaveId); - } - - @Override - public String toString() { - return com.google.common.base.Objects.toStringHelper(this) - .add("victims", getVictims()) - .add("slaveId", getSlaveId()) - .toString(); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictim.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictim.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictim.java deleted file mode 100644 index f196b21..0000000 --- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictim.java +++ /dev/null @@ -1,115 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async.preemptor; - -import java.util.Objects; - -import org.apache.aurora.scheduler.configuration.Resources; -import org.apache.aurora.scheduler.storage.entities.IAssignedTask; -import org.apache.aurora.scheduler.storage.entities.ITaskConfig; - -/** - * A victim to be considered as a candidate for preemption. - */ -public final class PreemptionVictim { - private final String slaveHost; - private final boolean production; - private final String role; - private final int priority; - private final Resources resources; - private final String taskId; - - private PreemptionVictim( - String slaveHost, - boolean production, - String role, - int priority, - Resources resources, - String taskId) { - - this.slaveHost = slaveHost; - this.production = production; - this.role = role; - this.priority = priority; - this.resources = resources; - this.taskId = taskId; - } - - public static PreemptionVictim fromTask(IAssignedTask task) { - ITaskConfig config = task.getTask(); - return new PreemptionVictim( - task.getSlaveHost(), - config.isProduction(), - config.getJob().getRole(), - config.getPriority(), - Resources.from(task.getTask()), - task.getTaskId()); - } - - public String getSlaveHost() { - return slaveHost; - } - - public boolean isProduction() { - return production; - } - - public String getRole() { - return role; - } - - public int getPriority() { - return priority; - } - - public Resources getResources() { - return resources; - } - - public String getTaskId() { - return taskId; - } - - @Override - public boolean equals(Object o) { - if (!(o instanceof PreemptionVictim)) { - return false; - } - - PreemptionVictim other = (PreemptionVictim) o; - return Objects.equals(getSlaveHost(), other.getSlaveHost()) - && Objects.equals(isProduction(), other.isProduction()) - && Objects.equals(getRole(), other.getRole()) - && Objects.equals(getPriority(), other.getPriority()) - && Objects.equals(getResources(), other.getResources()) - && Objects.equals(getTaskId(), other.getTaskId()); - } - - @Override - public int hashCode() { - return Objects.hash(slaveHost, production, role, priority, resources, taskId); - } - - @Override - public String toString() { - return com.google.common.base.Objects.toStringHelper(this) - .add("slaveHost", getSlaveHost()) - .add("production", isProduction()) - .add("role", getRole()) - .add("priority", getPriority()) - .add("resources", getResources()) - .add("taskId", getTaskId()) - .toString(); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimFilter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimFilter.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimFilter.java deleted file mode 100644 index 75e2370..0000000 --- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptionVictimFilter.java +++ /dev/null @@ -1,214 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async.preemptor; - -import java.util.Set; - -import javax.inject.Inject; - -import com.google.common.base.Function; -import com.google.common.base.Optional; -import com.google.common.base.Predicate; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.Ordering; -import com.google.common.collect.Sets; - -import org.apache.aurora.scheduler.HostOffer; -import org.apache.aurora.scheduler.ResourceSlot; -import org.apache.aurora.scheduler.filter.AttributeAggregate; -import org.apache.aurora.scheduler.filter.SchedulingFilter; -import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; -import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource; -import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto; -import org.apache.aurora.scheduler.mesos.ExecutorSettings; -import org.apache.aurora.scheduler.storage.Storage.StoreProvider; -import org.apache.aurora.scheduler.storage.entities.IHostAttributes; -import org.apache.aurora.scheduler.storage.entities.ITaskConfig; - -import static java.util.Objects.requireNonNull; - -/** - * Filters active tasks (victims) and available offer (slack) resources that can accommodate a - * given task (candidate), provided victims are preempted. - * <p> - * A task may preempt another task if the following conditions hold true: - * <ol> - * <li>The resources reserved for a victim (or a set of victims) are sufficient to satisfy - * the candidate. - * </li> - * <li>Both candidate and victim are owned by the same user and the - * {@link ITaskConfig#getPriority} of a victim is lower OR a victim is non-production and the - * candidate is production. - * </li> - * </ol> - */ -public interface PreemptionVictimFilter { - /** - * Returns a set of {@link PreemptionVictim} that can accommodate a given task if preempted. - * - * @param pendingTask Task to search preemption slot for. - * @param victims Active tasks on a slave. - * @param attributeAggregate An {@link AttributeAggregate} instance for the task's job. - * @param offer A resource offer for a slave. - * @param storeProvider A store provider to access task data. - * @return A set of {@code PreemptionVictim} instances to preempt for a given task. - */ - Optional<ImmutableSet<PreemptionVictim>> filterPreemptionVictims( - ITaskConfig pendingTask, - Iterable<PreemptionVictim> victims, - AttributeAggregate attributeAggregate, - Optional<HostOffer> offer, - StoreProvider storeProvider); - - class PreemptionVictimFilterImpl implements PreemptionVictimFilter { - private final SchedulingFilter schedulingFilter; - private final ExecutorSettings executorSettings; - private final PreemptorMetrics metrics; - - @Inject - PreemptionVictimFilterImpl( - SchedulingFilter schedulingFilter, - ExecutorSettings executorSettings, - PreemptorMetrics metrics) { - - this.schedulingFilter = requireNonNull(schedulingFilter); - this.executorSettings = requireNonNull(executorSettings); - this.metrics = requireNonNull(metrics); - } - - private static final Function<HostOffer, ResourceSlot> OFFER_TO_RESOURCE_SLOT = - new Function<HostOffer, ResourceSlot>() { - @Override - public ResourceSlot apply(HostOffer offer) { - return ResourceSlot.from(offer.getOffer()); - } - }; - - private static final Function<HostOffer, String> OFFER_TO_HOST = - new Function<HostOffer, String>() { - @Override - public String apply(HostOffer offer) { - return offer.getOffer().getHostname(); - } - }; - - private static final Function<PreemptionVictim, String> VICTIM_TO_HOST = - new Function<PreemptionVictim, String>() { - @Override - public String apply(PreemptionVictim victim) { - return victim.getSlaveHost(); - } - }; - - private final Function<PreemptionVictim, ResourceSlot> victimToResources = - new Function<PreemptionVictim, ResourceSlot>() { - @Override - public ResourceSlot apply(PreemptionVictim victim) { - return ResourceSlot.from(victim, executorSettings); - } - }; - - // TODO(zmanji) Consider using Dominant Resource Fairness for ordering instead of the vector - // ordering - private final Ordering<PreemptionVictim> resourceOrder = - ResourceSlot.ORDER.onResultOf(victimToResources).reverse(); - - @Override - public Optional<ImmutableSet<PreemptionVictim>> filterPreemptionVictims( - ITaskConfig pendingTask, - Iterable<PreemptionVictim> possibleVictims, - AttributeAggregate jobState, - Optional<HostOffer> offer, - StoreProvider storeProvider) { - - // This enforces the precondition that all of the resources are from the same host. We need to - // get the host for the schedulingFilter. - Set<String> hosts = ImmutableSet.<String>builder() - .addAll(Iterables.transform(possibleVictims, VICTIM_TO_HOST)) - .addAll(Iterables.transform(offer.asSet(), OFFER_TO_HOST)).build(); - - ResourceSlot slackResources = - ResourceSlot.sum(Iterables.transform(offer.asSet(), OFFER_TO_RESOURCE_SLOT)); - - FluentIterable<PreemptionVictim> preemptableTasks = FluentIterable.from(possibleVictims) - .filter(preemptionFilter(pendingTask)); - - if (preemptableTasks.isEmpty()) { - return Optional.absent(); - } - - Set<PreemptionVictim> toPreemptTasks = Sets.newHashSet(); - - Iterable<PreemptionVictim> sortedVictims = - resourceOrder.immutableSortedCopy(preemptableTasks); - - Optional<IHostAttributes> attributes = - storeProvider.getAttributeStore().getHostAttributes(Iterables.getOnlyElement(hosts)); - - if (!attributes.isPresent()) { - metrics.recordMissingAttributes(); - return Optional.absent(); - } - - for (PreemptionVictim victim : sortedVictims) { - toPreemptTasks.add(victim); - - ResourceSlot totalResource = ResourceSlot.sum( - ResourceSlot.sum(Iterables.transform(toPreemptTasks, victimToResources)), - slackResources); - - Set<Veto> vetoes = schedulingFilter.filter( - new UnusedResource(totalResource, attributes.get()), - new ResourceRequest(pendingTask, jobState)); - - if (vetoes.isEmpty()) { - return Optional.of(ImmutableSet.copyOf(toPreemptTasks)); - } - } - return Optional.absent(); - } - - /** - * Creates a filter that will find tasks that the provided {@code pendingTask} may preempt. - * - * @param pendingTask A task that is not scheduled to possibly preempt other tasks for. - * @return A filter that will compare the priorities and resources required by other tasks - * with {@code preemptableTask}. - */ - private static Predicate<PreemptionVictim> preemptionFilter(final ITaskConfig pendingTask) { - return new Predicate<PreemptionVictim>() { - @Override - public boolean apply(PreemptionVictim possibleVictim) { - boolean pendingIsProduction = pendingTask.isProduction(); - boolean victimIsProduction = possibleVictim.isProduction(); - - if (pendingIsProduction && !victimIsProduction) { - return true; - } else if (pendingIsProduction == victimIsProduction) { - // If production flags are equal, preemption is based on priority within the same role. - if (pendingTask.getJob().getRole().equals(possibleVictim.getRole())) { - return pendingTask.getPriority() > possibleVictim.getPriority(); - } else { - return false; - } - } else { - return false; - } - } - }; - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java deleted file mode 100644 index a2d5fcf..0000000 --- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/Preemptor.java +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async.preemptor; - -import java.util.Set; - -import javax.inject.Inject; - -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableSet; - -import org.apache.aurora.scheduler.async.OfferManager; -import org.apache.aurora.scheduler.base.TaskGroupKey; -import org.apache.aurora.scheduler.filter.AttributeAggregate; -import org.apache.aurora.scheduler.state.StateManager; -import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; -import org.apache.aurora.scheduler.storage.entities.IAssignedTask; -import org.apache.mesos.Protos.SlaveID; - -import static java.util.Objects.requireNonNull; - -import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING; - -/** - * Attempts to preempt active tasks in favor of the provided PENDING task in case a preemption - * slot has been previously found. - */ -public interface Preemptor { - /** - * Preempts victim tasks in case a valid preemption slot exists. - * - * @param task Preempting task. - * @param jobState Current job state aggregate. - * @param storeProvider Store provider to use for task preemption. - * @return ID of the slave where preemption occurred. - */ - Optional<String> attemptPreemptionFor( - IAssignedTask task, - AttributeAggregate jobState, - MutableStoreProvider storeProvider); - - class PreemptorImpl implements Preemptor { - private final StateManager stateManager; - private final OfferManager offerManager; - private final PreemptionVictimFilter preemptionVictimFilter; - private final PreemptorMetrics metrics; - private final BiCache<PreemptionProposal, TaskGroupKey> slotCache; - - @Inject - PreemptorImpl( - StateManager stateManager, - OfferManager offerManager, - PreemptionVictimFilter preemptionVictimFilter, - PreemptorMetrics metrics, - BiCache<PreemptionProposal, TaskGroupKey> slotCache) { - - this.stateManager = requireNonNull(stateManager); - this.offerManager = requireNonNull(offerManager); - this.preemptionVictimFilter = requireNonNull(preemptionVictimFilter); - this.metrics = requireNonNull(metrics); - this.slotCache = requireNonNull(slotCache); - } - - @Override - public Optional<String> attemptPreemptionFor( - IAssignedTask pendingTask, - AttributeAggregate jobState, - MutableStoreProvider store) { - - TaskGroupKey groupKey = TaskGroupKey.from(pendingTask.getTask()); - Set<PreemptionProposal> preemptionProposals = slotCache.getByValue(groupKey); - - // A preemption slot is available -> attempt to preempt tasks. - if (!preemptionProposals.isEmpty()) { - // Get the next available preemption slot. - PreemptionProposal slot = preemptionProposals.iterator().next(); - slotCache.remove(slot, groupKey); - - // Validate PreemptionProposal is still valid for the given task. - SlaveID slaveId = SlaveID.newBuilder().setValue(slot.getSlaveId()).build(); - Optional<ImmutableSet<PreemptionVictim>> validatedVictims = - preemptionVictimFilter.filterPreemptionVictims( - pendingTask.getTask(), - slot.getVictims(), - jobState, - offerManager.getOffer(slaveId), - store); - - metrics.recordSlotValidationResult(validatedVictims); - if (!validatedVictims.isPresent()) { - // Previously found victims are no longer valid -> let the next run find a new slot. - return Optional.absent(); - } - - for (PreemptionVictim toPreempt : validatedVictims.get()) { - metrics.recordTaskPreemption(toPreempt); - stateManager.changeState( - store, - toPreempt.getTaskId(), - Optional.absent(), - PREEMPTING, - Optional.of("Preempting in favor of " + pendingTask.getTaskId())); - } - return Optional.of(slot.getSlaveId()); - } - - return Optional.absent(); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorMetrics.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorMetrics.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorMetrics.java deleted file mode 100644 index 22a1533..0000000 --- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorMetrics.java +++ /dev/null @@ -1,131 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async.preemptor; - -import java.util.Set; - -import javax.inject.Inject; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableSet; - -import org.apache.aurora.scheduler.stats.CachedCounters; -import org.apache.aurora.scheduler.storage.entities.ITaskConfig; - -import static java.util.Objects.requireNonNull; - -/** - * Defines methods to manage preemptor metrics. - */ -@VisibleForTesting -public class PreemptorMetrics { - @VisibleForTesting - static final String MISSING_ATTRIBUTES_NAME = "preemptor_missing_attributes"; - - @VisibleForTesting - static final String TASK_PROCESSOR_RUN_NAME = "preemptor_task_processor_runs"; - - private volatile boolean exported = false; - private final CachedCounters counters; - - @Inject - PreemptorMetrics(CachedCounters counters) { - this.counters = requireNonNull(counters); - assertFullyExported(); - } - - private static String prod(boolean production) { - return production ? "prod" : "non_prod"; - } - - private static String result(boolean success) { - return success ? "successful" : "failed"; - } - - private void assertFullyExported() { - if (exported) { - return; - } - - // Dummy-read all stats to ensure they are exported. - Set<String> allStats = ImmutableSet.of( - attemptsStatName(false), - attemptsStatName(true), - successStatName(false), - successStatName(true), - slotSearchStatName(true, false), - slotSearchStatName(false, false), - slotSearchStatName(true, true), - slotSearchStatName(false, true), - slotValidationStatName(true), - slotValidationStatName(false), - MISSING_ATTRIBUTES_NAME, - TASK_PROCESSOR_RUN_NAME); - for (String stat : allStats) { - counters.get(stat); - } - - exported = true; - } - - private void increment(String stat) { - assertFullyExported(); - counters.get(stat).incrementAndGet(); - } - - @VisibleForTesting - static String attemptsStatName(boolean production) { - return "preemptor_slot_search_attempts_for_" + prod(production); - } - - @VisibleForTesting - static String successStatName(boolean production) { - return "preemptor_tasks_preempted_" + prod(production); - } - - @VisibleForTesting - static String slotSearchStatName(boolean success, boolean production) { - return String.format("preemptor_slot_search_%s_for_%s", result(success), prod(production)); - } - - @VisibleForTesting - static String slotValidationStatName(boolean success) { - return "preemptor_slot_validation_" + result(success); - } - - void recordPreemptionAttemptFor(ITaskConfig task) { - increment(attemptsStatName(task.isProduction())); - } - - void recordTaskPreemption(PreemptionVictim victim) { - increment(successStatName(victim.isProduction())); - } - - void recordSlotSearchResult(Optional<?> result, ITaskConfig task) { - increment(slotSearchStatName(result.isPresent(), task.isProduction())); - } - - void recordSlotValidationResult(Optional<?> result) { - increment(slotValidationStatName(result.isPresent())); - } - - void recordMissingAttributes() { - increment(MISSING_ATTRIBUTES_NAME); - } - - void recordTaskProcessorRun() { - increment(TASK_PROCESSOR_RUN_NAME); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java b/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java deleted file mode 100644 index 3d9e27b..0000000 --- a/src/main/java/org/apache/aurora/scheduler/async/preemptor/PreemptorModule.java +++ /dev/null @@ -1,167 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async.preemptor; - -import java.util.logging.Logger; - -import javax.inject.Inject; -import javax.inject.Singleton; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.util.concurrent.AbstractScheduledService; -import com.google.inject.AbstractModule; -import com.google.inject.PrivateModule; -import com.google.inject.TypeLiteral; -import com.twitter.common.args.Arg; -import com.twitter.common.args.CmdLine; -import com.twitter.common.quantity.Amount; -import com.twitter.common.quantity.Time; - -import org.apache.aurora.scheduler.SchedulerServicesModule; -import org.apache.aurora.scheduler.async.preemptor.BiCache.BiCacheSettings; -import org.apache.aurora.scheduler.base.TaskGroupKey; -import org.apache.aurora.scheduler.events.PubsubEventModule; -import org.apache.aurora.scheduler.filter.AttributeAggregate; -import org.apache.aurora.scheduler.storage.Storage; -import org.apache.aurora.scheduler.storage.entities.IAssignedTask; - -import static java.util.Objects.requireNonNull; - -public class PreemptorModule extends AbstractModule { - - private static final Logger LOG = Logger.getLogger(PreemptorModule.class.getName()); - - @CmdLine(name = "enable_preemptor", - help = "Enable the preemptor and preemption") - private static final Arg<Boolean> ENABLE_PREEMPTOR = Arg.create(true); - - @CmdLine(name = "preemption_delay", - help = "Time interval after which a pending task becomes eligible to preempt other tasks") - private static final Arg<Amount<Long, Time>> PREEMPTION_DELAY = - Arg.create(Amount.of(3L, Time.MINUTES)); - - @CmdLine(name = "preemption_slot_hold_time", - help = "Time to hold a preemption slot found before it is discarded.") - private static final Arg<Amount<Long, Time>> PREEMPTION_SLOT_HOLD_TIME = - Arg.create(Amount.of(5L, Time.MINUTES)); - - @CmdLine(name = "preemption_slot_search_interval", - help = "Time interval between pending task preemption slot searches.") - private static final Arg<Amount<Long, Time>> PREEMPTION_SLOT_SEARCH_INTERVAL = - Arg.create(Amount.of(1L, Time.MINUTES)); - - private final boolean enablePreemptor; - private final Amount<Long, Time> preemptionDelay; - private final Amount<Long, Time> slotSearchInterval; - - @VisibleForTesting - public PreemptorModule( - boolean enablePreemptor, - Amount<Long, Time> preemptionDelay, - Amount<Long, Time> slotSearchInterval) { - - this.enablePreemptor = enablePreemptor; - this.preemptionDelay = requireNonNull(preemptionDelay); - this.slotSearchInterval = requireNonNull(slotSearchInterval); - } - - public PreemptorModule() { - this(ENABLE_PREEMPTOR.get(), PREEMPTION_DELAY.get(), PREEMPTION_SLOT_SEARCH_INTERVAL.get()); - } - - @Override - protected void configure() { - install(new PrivateModule() { - @Override - protected void configure() { - if (enablePreemptor) { - LOG.info("Preemptor Enabled."); - bind(PreemptorMetrics.class).in(Singleton.class); - bind(PreemptionVictimFilter.class) - .to(PreemptionVictimFilter.PreemptionVictimFilterImpl.class); - bind(PreemptionVictimFilter.PreemptionVictimFilterImpl.class).in(Singleton.class); - bind(Preemptor.class).to(Preemptor.PreemptorImpl.class); - bind(Preemptor.PreemptorImpl.class).in(Singleton.class); - bind(new TypeLiteral<Amount<Long, Time>>() { }) - .annotatedWith(PendingTaskProcessor.PreemptionDelay.class) - .toInstance(preemptionDelay); - bind(BiCacheSettings.class).toInstance( - new BiCacheSettings(PREEMPTION_SLOT_HOLD_TIME.get(), "preemption_slot_cache_size")); - bind(new TypeLiteral<BiCache<PreemptionProposal, TaskGroupKey>>() { }) - .in(Singleton.class); - bind(PendingTaskProcessor.class).in(Singleton.class); - bind(ClusterState.class).to(ClusterStateImpl.class); - bind(ClusterStateImpl.class).in(Singleton.class); - expose(ClusterStateImpl.class); - - bind(PreemptorService.class).in(Singleton.class); - bind(AbstractScheduledService.Scheduler.class).toInstance( - AbstractScheduledService.Scheduler.newFixedRateSchedule( - 0L, - slotSearchInterval.getValue(), - slotSearchInterval.getUnit().getTimeUnit())); - - expose(PreemptorService.class); - expose(PendingTaskProcessor.class); - } else { - bind(Preemptor.class).toInstance(NULL_PREEMPTOR); - LOG.warning("Preemptor Disabled."); - } - expose(Preemptor.class); - } - }); - - // We can't do this in the private module due to the known conflict between multibindings - // and private modules due to multiple injectors. We accept the added complexity here to keep - // the other bindings private. - PubsubEventModule.bindSubscriber(binder(), ClusterStateImpl.class); - if (enablePreemptor) { - SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()) - .to(PreemptorService.class); - } - } - - static class PreemptorService extends AbstractScheduledService { - private final PendingTaskProcessor slotFinder; - private final Scheduler schedule; - - @Inject - PreemptorService(PendingTaskProcessor slotFinder, Scheduler schedule) { - this.slotFinder = requireNonNull(slotFinder); - this.schedule = requireNonNull(schedule); - } - - @Override - protected void runOneIteration() { - slotFinder.run(); - } - - @Override - protected Scheduler scheduler() { - return schedule; - } - } - - private static final Preemptor NULL_PREEMPTOR = new Preemptor() { - @Override - public Optional<String> attemptPreemptionFor( - IAssignedTask task, - AttributeAggregate jobState, - Storage.MutableStoreProvider storeProvider) { - - return Optional.absent(); - } - }; -} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/http/Offers.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/http/Offers.java b/src/main/java/org/apache/aurora/scheduler/http/Offers.java index b991616..4329ce1 100644 --- a/src/main/java/org/apache/aurora/scheduler/http/Offers.java +++ b/src/main/java/org/apache/aurora/scheduler/http/Offers.java @@ -28,7 +28,7 @@ import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableMap; import org.apache.aurora.scheduler.HostOffer; -import org.apache.aurora.scheduler.async.OfferManager; +import org.apache.aurora.scheduler.offers.OfferManager; import org.apache.mesos.Protos.Attribute; import org.apache.mesos.Protos.ExecutorID; import org.apache.mesos.Protos.Resource; http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/http/PendingTasks.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/http/PendingTasks.java b/src/main/java/org/apache/aurora/scheduler/http/PendingTasks.java index 82b6f50..c80e0c8 100644 --- a/src/main/java/org/apache/aurora/scheduler/http/PendingTasks.java +++ b/src/main/java/org/apache/aurora/scheduler/http/PendingTasks.java @@ -22,7 +22,7 @@ import javax.ws.rs.Produces; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import org.apache.aurora.scheduler.async.TaskGroups; +import org.apache.aurora.scheduler.scheduling.TaskGroups; /** * Servlet that exposes detailed information about tasks that are pending. http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java index 6f06693..7c8a008 100644 --- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java @@ -33,12 +33,12 @@ import com.twitter.common.inject.TimedInterceptor.Timed; import org.apache.aurora.GuiceUtils.AllowUnchecked; import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.TaskStatusHandler; -import org.apache.aurora.scheduler.async.OfferManager; import org.apache.aurora.scheduler.base.SchedulerException; import org.apache.aurora.scheduler.events.EventSink; import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected; import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered; import org.apache.aurora.scheduler.events.PubsubEvent.TaskStatusReceived; +import org.apache.aurora.scheduler.offers.OfferManager; import org.apache.aurora.scheduler.stats.CachedCounters; import org.apache.aurora.scheduler.storage.AttributeStore; import org.apache.aurora.scheduler.storage.Storage;
