http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java new file mode 100644 index 0000000..14bf265 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java @@ -0,0 +1,408 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.offers; + +import java.util.Comparator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.inject.Inject; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.base.Supplier; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; +import com.google.common.collect.Ordering; +import com.google.common.eventbus.Subscribe; +import com.twitter.common.inject.TimedInterceptor.Timed; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.stats.Stats; + +import org.apache.aurora.gen.MaintenanceMode; +import org.apache.aurora.scheduler.HostOffer; +import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; +import org.apache.aurora.scheduler.base.TaskGroupKey; +import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected; +import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; +import org.apache.aurora.scheduler.mesos.Driver; +import org.apache.aurora.scheduler.state.TaskAssigner.Assignment; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; +import org.apache.mesos.Protos.OfferID; +import org.apache.mesos.Protos.SlaveID; + +import static java.util.Objects.requireNonNull; + +import static org.apache.aurora.gen.MaintenanceMode.DRAINED; +import static org.apache.aurora.gen.MaintenanceMode.DRAINING; +import static org.apache.aurora.gen.MaintenanceMode.NONE; +import static org.apache.aurora.gen.MaintenanceMode.SCHEDULED; +import static org.apache.aurora.scheduler.events.PubsubEvent.HostAttributesChanged; + +/** + * Tracks the Offers currently known by the scheduler. + */ +public interface OfferManager extends EventSubscriber { + + /** + * Notifies the scheduler of a new resource offer. + * + * @param offer Newly-available resource offer. + */ + void addOffer(HostOffer offer); + + /** + * Invalidates an offer. This indicates that the scheduler should not attempt to match any + * tasks against the offer. + * + * @param offer Canceled offer. + */ + void cancelOffer(OfferID offer); + + /** + * Launches the first task that satisfies the {@code acceptor} by returning a {@link Assignment}. + * + * @param acceptor Function that determines if an offer is accepted. + * @param groupKey Task group key. + * @return {@code true} if the task was launched, {@code false} if no offers satisfied the + * {@code acceptor}. + * @throws LaunchException If the acceptor accepted an offer, but there was an error launching the + * task. + */ + boolean launchFirst(Function<HostOffer, Assignment> acceptor, TaskGroupKey groupKey) + throws LaunchException; + + /** + * Notifies the offer queue that a host's attributes have changed. + * + * @param change State change notification. + */ + void hostAttributesChanged(HostAttributesChanged change); + + /** + * Gets the offers that the scheduler is holding. + * + * @return A snapshot of the offers that the scheduler is currently holding. + */ + Iterable<HostOffer> getOffers(); + + /** + * Gets an offer for the given slave ID. + * + * @param slaveId Slave ID to get offer for. + * @return An offer for the slave ID. + */ + Optional<HostOffer> getOffer(SlaveID slaveId); + + /** + * Calculates the amount of time before an offer should be 'returned' by declining it. + * The delay is calculated for each offer that is received, so the return delay may be + * fixed or variable. + */ + interface OfferReturnDelay extends Supplier<Amount<Long, Time>> { + } + + /** + * Thrown when there was an unexpected failure trying to launch a task. + */ + class LaunchException extends Exception { + LaunchException(String msg) { + super(msg); + } + + LaunchException(String msg, Throwable cause) { + super(msg, cause); + } + } + + class OfferManagerImpl implements OfferManager { + @VisibleForTesting + static final Logger LOG = Logger.getLogger(OfferManagerImpl.class.getName()); + + private final HostOffers hostOffers = new HostOffers(); + private final AtomicLong offerRaces = Stats.exportLong("offer_accept_races"); + + private final Driver driver; + private final OfferReturnDelay returnDelay; + private final ScheduledExecutorService executor; + + @Inject + @VisibleForTesting + public OfferManagerImpl( + Driver driver, + OfferReturnDelay returnDelay, + @AsyncExecutor ScheduledExecutorService executor) { + + this.driver = requireNonNull(driver); + this.returnDelay = requireNonNull(returnDelay); + this.executor = requireNonNull(executor); + } + + @Override + public void addOffer(final HostOffer offer) { + // We run a slight risk of a race here, which is acceptable. The worst case is that we + // temporarily hold two offers for the same host, which should be corrected when we return + // them after the return delay. + // There's also a chance that we return an offer for compaction ~simultaneously with the + // same-host offer being canceled/returned. This is also fine. + Optional<HostOffer> sameSlave = hostOffers.get(offer.getOffer().getSlaveId()); + if (sameSlave.isPresent()) { + // If there are existing offers for the slave, decline all of them so the master can + // compact all of those offers into a single offer and send them back. + LOG.info("Returning offers for " + offer.getOffer().getSlaveId().getValue() + + " for compaction."); + decline(offer.getOffer().getId()); + removeAndDecline(sameSlave.get().getOffer().getId()); + } else { + hostOffers.add(offer); + executor.schedule( + new Runnable() { + @Override + public void run() { + removeAndDecline(offer.getOffer().getId()); + } + }, + returnDelay.get().as(Time.MILLISECONDS), + TimeUnit.MILLISECONDS); + } + } + + void removeAndDecline(OfferID id) { + if (removeFromHostOffers(id)) { + decline(id); + } + } + + void decline(OfferID id) { + LOG.fine("Declining offer " + id); + driver.declineOffer(id); + } + + @Override + public void cancelOffer(final OfferID offerId) { + removeFromHostOffers(offerId); + } + + private boolean removeFromHostOffers(final OfferID offerId) { + requireNonNull(offerId); + + // The small risk of inconsistency is acceptable here - if we have an accept/remove race + // on an offer, the master will mark the task as LOST and it will be retried. + return hostOffers.remove(offerId); + } + + @Override + public Iterable<HostOffer> getOffers() { + return hostOffers.getWeaklyConsistentOffers(); + } + + @Override + public Optional<HostOffer> getOffer(SlaveID slaveId) { + return hostOffers.get(slaveId); + } + + /** + * Updates the preference of a host's offers. + * + * @param change Host change notification. + */ + @Subscribe + public void hostAttributesChanged(HostAttributesChanged change) { + hostOffers.updateHostAttributes(change.getAttributes()); + } + + /** + * Notifies the queue that the driver is disconnected, and all the stored offers are now + * invalid. + * <p> + * The queue takes this as a signal to flush its queue. + * + * @param event Disconnected event. + */ + @Subscribe + public void driverDisconnected(DriverDisconnected event) { + LOG.info("Clearing stale offers since the driver is disconnected."); + hostOffers.clear(); + } + + /** + * A container for the data structures used by this class, to make it easier to reason about + * the different indices used and their consistency. + */ + private static class HostOffers { + private static final Comparator<HostOffer> PREFERENCE_COMPARATOR = + // Currently, the only preference is based on host maintenance status. + Ordering.explicit(NONE, SCHEDULED, DRAINING, DRAINED) + .onResultOf(new Function<HostOffer, MaintenanceMode>() { + @Override + public MaintenanceMode apply(HostOffer offer) { + return offer.getAttributes().getMode(); + } + }) + .compound(Ordering.arbitrary()); + + private final Set<HostOffer> offers = new ConcurrentSkipListSet<>(PREFERENCE_COMPARATOR); + private final Map<OfferID, HostOffer> offersById = Maps.newHashMap(); + private final Map<SlaveID, HostOffer> offersBySlave = Maps.newHashMap(); + private final Map<String, HostOffer> offersByHost = Maps.newHashMap(); + // TODO(maxim): Expose via a debug endpoint. AURORA-1136. + // Keep track of offer->groupKey mappings that will never be matched to avoid redundant + // scheduling attempts. See Assignment.Result for more details on static ban. + private final Multimap<OfferID, TaskGroupKey> staticallyBannedOffers = HashMultimap.create(); + + HostOffers() { + // Potential gotcha - since this is a ConcurrentSkipListSet, size() is more expensive. + // Could track this separately if it turns out to pose problems. + Stats.exportSize("outstanding_offers", offers); + } + + synchronized Optional<HostOffer> get(SlaveID slaveId) { + return Optional.fromNullable(offersBySlave.get(slaveId)); + } + + synchronized void add(HostOffer offer) { + offers.add(offer); + offersById.put(offer.getOffer().getId(), offer); + offersBySlave.put(offer.getOffer().getSlaveId(), offer); + offersByHost.put(offer.getOffer().getHostname(), offer); + } + + synchronized boolean remove(OfferID id) { + HostOffer removed = offersById.remove(id); + if (removed != null) { + offers.remove(removed); + offersBySlave.remove(removed.getOffer().getSlaveId()); + offersByHost.remove(removed.getOffer().getHostname()); + staticallyBannedOffers.removeAll(id); + } + return removed != null; + } + + synchronized void updateHostAttributes(IHostAttributes attributes) { + HostOffer offer = offersByHost.remove(attributes.getHost()); + if (offer != null) { + // Remove and re-add a host's offer to re-sort based on its new hostStatus + remove(offer.getOffer().getId()); + add(new HostOffer(offer.getOffer(), attributes)); + } + } + + synchronized Iterable<HostOffer> getWeaklyConsistentOffers() { + return Iterables.unmodifiableIterable(offers); + } + + synchronized boolean isStaticallyBanned(HostOffer offer, TaskGroupKey groupKey) { + boolean result = staticallyBannedOffers.containsEntry(offer.getOffer().getId(), groupKey); + if (LOG.isLoggable(Level.FINE)) { + LOG.fine(String.format( + "Host offer %s is statically banned for %s: %s", + offer, + groupKey, + result)); + } + return result; + } + + synchronized void addStaticGroupBan(HostOffer offer, TaskGroupKey groupKey) { + OfferID offerId = offer.getOffer().getId(); + if (offersById.containsKey(offerId)) { + staticallyBannedOffers.put(offerId, groupKey); + + if (LOG.isLoggable(Level.FINE)) { + LOG.fine( + String.format("Adding static ban for offer: %s, groupKey: %s", offer, groupKey)); + } + } + } + + synchronized void clear() { + offers.clear(); + offersById.clear(); + offersBySlave.clear(); + offersByHost.clear(); + staticallyBannedOffers.clear(); + } + } + + @Timed("offer_queue_launch_first") + @Override + public boolean launchFirst(Function<HostOffer, Assignment> acceptor, TaskGroupKey groupKey) + throws LaunchException { + + // It's important that this method is not called concurrently - doing so would open up the + // possibility of a race between the same offers being accepted by different threads. + + for (HostOffer offer : hostOffers.getWeaklyConsistentOffers()) { + if (!hostOffers.isStaticallyBanned(offer, groupKey) + && acceptOffer(offer, acceptor, groupKey)) { + return true; + } + } + + return false; + } + + @Timed("offer_queue_accept_offer") + protected boolean acceptOffer( + HostOffer offer, + Function<HostOffer, Assignment> acceptor, + TaskGroupKey groupKey) throws LaunchException { + + Assignment assignment = acceptor.apply(offer); + switch (assignment.getResult()) { + + case SUCCESS: + // Guard against an offer being removed after we grabbed it from the iterator. + // If that happens, the offer will not exist in hostOffers, and we can immediately + // send it back to LOST for quick reschedule. + // Removing while iterating counts on the use of a weakly-consistent iterator being used, + // which is a feature of ConcurrentSkipListSet. + if (hostOffers.remove(offer.getOffer().getId())) { + try { + driver.launchTask(offer.getOffer().getId(), assignment.getTaskInfo().get()); + return true; + } catch (IllegalStateException e) { + // TODO(William Farner): Catch only the checked exception produced by Driver + // once it changes from throwing IllegalStateException when the driver is not yet + // registered. + throw new LaunchException("Failed to launch task.", e); + } + } else { + offerRaces.incrementAndGet(); + throw new LaunchException( + "Accepted offer no longer exists in offer queue, likely data race."); + } + + case FAILURE_STATIC_MISMATCH: + // Exclude an offer that results in a static mismatch from further attempts to match + // against all tasks from the same group. + hostOffers.addStaticGroupBan(offer, groupKey); + return false; + + default: + return false; + } + } + } +}
http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java b/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java new file mode 100644 index 0000000..6ab413a --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/offers/OffersModule.java @@ -0,0 +1,63 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.offers; + +import javax.inject.Singleton; + +import com.google.inject.AbstractModule; +import com.google.inject.PrivateModule; +import com.twitter.common.args.Arg; +import com.twitter.common.args.CmdLine; +import com.twitter.common.args.constraints.NotNegative; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.util.Random; + +import org.apache.aurora.scheduler.events.PubsubEventModule; + +/** + * Binding module for resource offer management. + */ +public class OffersModule extends AbstractModule { + + @CmdLine(name = "min_offer_hold_time", + help = "Minimum amount of time to hold a resource offer before declining.") + @NotNegative + private static final Arg<Amount<Integer, Time>> MIN_OFFER_HOLD_TIME = + Arg.create(Amount.of(5, Time.MINUTES)); + + @CmdLine(name = "offer_hold_jitter_window", + help = "Maximum amount of random jitter to add to the offer hold time window.") + @NotNegative + private static final Arg<Amount<Integer, Time>> OFFER_HOLD_JITTER_WINDOW = + Arg.create(Amount.of(1, Time.MINUTES)); + + @Override + protected void configure() { + install(new PrivateModule() { + @Override + protected void configure() { + bind(OfferManager.OfferReturnDelay.class).toInstance( + new RandomJitterReturnDelay( + MIN_OFFER_HOLD_TIME.get().as(Time.MILLISECONDS), + OFFER_HOLD_JITTER_WINDOW.get().as(Time.MILLISECONDS), + new Random.SystemRandom(new java.util.Random()))); + bind(OfferManager.class).to(OfferManager.OfferManagerImpl.class); + bind(OfferManager.OfferManagerImpl.class).in(Singleton.class); + expose(OfferManager.class); + } + }); + PubsubEventModule.bindSubscriber(binder(), OfferManager.class); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/offers/RandomJitterReturnDelay.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/offers/RandomJitterReturnDelay.java b/src/main/java/org/apache/aurora/scheduler/offers/RandomJitterReturnDelay.java new file mode 100644 index 0000000..a3e63bf --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/offers/RandomJitterReturnDelay.java @@ -0,0 +1,49 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.offers; + +import java.util.Objects; + +import com.google.common.annotations.VisibleForTesting; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.util.Random; + +import org.apache.aurora.scheduler.offers.OfferManager.OfferReturnDelay; + +import static com.google.common.base.Preconditions.checkArgument; + +/** + * Returns offers after a random duration within a fixed window. + */ +@VisibleForTesting +class RandomJitterReturnDelay implements OfferReturnDelay { + private final int minHoldTimeMs; + private final int maxJitterWindowMs; + private final Random random; + + RandomJitterReturnDelay(int minHoldTimeMs, int maxJitterWindowMs, Random random) { + checkArgument(minHoldTimeMs >= 0); + checkArgument(maxJitterWindowMs >= 0); + + this.minHoldTimeMs = minHoldTimeMs; + this.maxJitterWindowMs = maxJitterWindowMs; + this.random = Objects.requireNonNull(random); + } + + @Override + public Amount<Long, Time> get() { + return Amount.of((long) minHoldTimeMs + random.nextInt(maxJitterWindowMs), Time.MILLISECONDS); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/preemptor/BiCache.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/BiCache.java b/src/main/java/org/apache/aurora/scheduler/preemptor/BiCache.java new file mode 100644 index 0000000..2551057 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/preemptor/BiCache.java @@ -0,0 +1,139 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.preemptor; + +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import javax.inject.Inject; + +import com.google.common.base.Optional; +import com.google.common.base.Supplier; +import com.google.common.base.Ticker; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Multimap; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.stats.StatsProvider; +import com.twitter.common.util.Clock; + +import static java.util.Objects.requireNonNull; + +/** + * A bi-directional cache of items. Entries are purged from cache after + * {@link BiCacheSettings#expireAfter}. + * + * @param <K> Key type. + * @param <V> Value type. + */ +public class BiCache<K, V> { + + public static class BiCacheSettings { + private final Amount<Long, Time> expireAfter; + private final String cacheSizeStatName; + + public BiCacheSettings(Amount<Long, Time> expireAfter, String cacheSizeStatName) { + this.expireAfter = requireNonNull(expireAfter); + this.cacheSizeStatName = requireNonNull(cacheSizeStatName); + } + } + + private final Cache<K, V> cache; + private final Multimap<V, K> inverse = HashMultimap.create(); + + @Inject + public BiCache( + StatsProvider statsProvider, + BiCacheSettings settings, + final Clock clock) { + + requireNonNull(clock); + this.cache = CacheBuilder.newBuilder() + .expireAfterWrite(settings.expireAfter.as(Time.MINUTES), TimeUnit.MINUTES) + .ticker(new Ticker() { + @Override + public long read() { + return clock.nowNanos(); + } + }) + .removalListener(new RemovalListener<K, V>() { + @Override + public void onRemoval(RemovalNotification<K, V> notification) { + inverse.remove(notification.getValue(), notification.getKey()); + } + }) + .build(); + + statsProvider.makeGauge( + settings.cacheSizeStatName, + new Supplier<Long>() { + @Override + public Long get() { + return cache.size(); + } + }); + } + + /** + * Puts a new key/value pair. + * + * @param key Key to add. + * @param value Value to add. + */ + public synchronized void put(K key, V value) { + requireNonNull(key); + requireNonNull(value); + cache.put(key, value); + inverse.put(value, key); + } + + /** + * Gets a cached value by key. + * + * @param key Key to get value for. + * @return Optional of value. + */ + public synchronized Optional<V> get(K key) { + return Optional.fromNullable(cache.getIfPresent(key)); + } + + /** + * Gets a set of keys for a given value. + * + * @param value Value to get all keys for. + * @return An {@link Iterable} of keys or empty if value does not exist. + */ + public synchronized Set<K> getByValue(V value) { + // Cache items are lazily removed by routine maintenance operations during get/write access. + // Forcing cleanup here to ensure proper data integrity. + cache.cleanUp(); + return ImmutableSet.copyOf(inverse.get(value)); + } + + /** + * Removes a key/value pair from cache. + * + * @param key Key to remove. + * @param value Value to remove. + */ + public synchronized void remove(K key, V value) { + inverse.remove(value, key); + cache.invalidate(key); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/preemptor/ClusterState.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/ClusterState.java b/src/main/java/org/apache/aurora/scheduler/preemptor/ClusterState.java new file mode 100644 index 0000000..ce3bc7e --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/preemptor/ClusterState.java @@ -0,0 +1,34 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.preemptor; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Multimap; + +/** + * A facade for the preemptor to gain access to the state of scheduled tasks in the cluster. + */ +@VisibleForTesting +public interface ClusterState { + + /** + * Gets a snapshot of the active tasks in the cluster, indexed by the slave IDs they are + * assigned to. + * <p> + * TODO(wfarner): Return a more minimal type than IAssignedTask here. + * + * @return Active tasks and their associated slave IDs. + */ + Multimap<String, PreemptionVictim> getSlavesToActiveTasks(); +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/preemptor/ClusterStateImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/ClusterStateImpl.java b/src/main/java/org/apache/aurora/scheduler/preemptor/ClusterStateImpl.java new file mode 100644 index 0000000..42e2ca4 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/preemptor/ClusterStateImpl.java @@ -0,0 +1,50 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.preemptor; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; +import com.google.common.eventbus.Subscribe; + +import org.apache.aurora.scheduler.base.Tasks; +import org.apache.aurora.scheduler.events.PubsubEvent; +import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; + +/** + * A cached view of cluster state, kept up to date by pubsub notifications. + */ +public class ClusterStateImpl implements ClusterState, PubsubEvent.EventSubscriber { + + private final Multimap<String, PreemptionVictim> victims = + Multimaps.synchronizedMultimap(HashMultimap.create()); + + @Override + public Multimap<String, PreemptionVictim> getSlavesToActiveTasks() { + return Multimaps.unmodifiableMultimap(victims); + } + + @Subscribe + public void taskChangedState(TaskStateChange stateChange) { + synchronized (victims) { + String slaveId = stateChange.getTask().getAssignedTask().getSlaveId(); + PreemptionVictim victim = PreemptionVictim.fromTask(stateChange.getTask().getAssignedTask()); + if (Tasks.SLAVE_ASSIGNED_STATES.contains(stateChange.getNewState())) { + victims.put(slaveId, victim); + } else { + victims.remove(slaveId, victim); + } + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java new file mode 100644 index 0000000..f1b075a --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java @@ -0,0 +1,258 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.preemptor; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.inject.Inject; +import javax.inject.Qualifier; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Functions; +import com.google.common.base.Optional; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.HashMultiset; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multiset; +import com.google.common.collect.Sets; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.util.Clock; + +import org.apache.aurora.scheduler.HostOffer; +import org.apache.aurora.scheduler.base.Query; +import org.apache.aurora.scheduler.base.TaskGroupKey; +import org.apache.aurora.scheduler.base.Tasks; +import org.apache.aurora.scheduler.filter.AttributeAggregate; +import org.apache.aurora.scheduler.offers.OfferManager; +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.Storage.StoreProvider; +import org.apache.aurora.scheduler.storage.entities.IAssignedTask; +import org.apache.aurora.scheduler.storage.entities.IJobKey; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.entities.ITaskConfig; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; +import static java.util.Objects.requireNonNull; + +import static org.apache.aurora.gen.ScheduleStatus.PENDING; +import static org.apache.aurora.scheduler.base.Tasks.SCHEDULED_TO_ASSIGNED; + +/** + * Attempts to find preemption slots for all PENDING tasks eligible for preemption. + */ +@VisibleForTesting +public class PendingTaskProcessor implements Runnable { + private final Storage storage; + private final OfferManager offerManager; + private final PreemptionVictimFilter preemptionVictimFilter; + private final PreemptorMetrics metrics; + private final Amount<Long, Time> preemptionCandidacyDelay; + private final BiCache<PreemptionProposal, TaskGroupKey> slotCache; + private final ClusterState clusterState; + private final Clock clock; + + /** + * Binding annotation for the time interval after which a pending task becomes eligible to + * preempt other tasks. To avoid excessive churn, the preemptor requires that a task is PENDING + * for a duration (dictated by {@link #preemptionCandidacyDelay}) before it becomes eligible + * to preempt other tasks. + */ + @VisibleForTesting + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + public @interface PreemptionDelay { } + + @Inject + PendingTaskProcessor( + Storage storage, + OfferManager offerManager, + PreemptionVictimFilter preemptionVictimFilter, + PreemptorMetrics metrics, + @PreemptionDelay Amount<Long, Time> preemptionCandidacyDelay, + BiCache<PreemptionProposal, TaskGroupKey> slotCache, + ClusterState clusterState, + Clock clock) { + + this.storage = requireNonNull(storage); + this.offerManager = requireNonNull(offerManager); + this.preemptionVictimFilter = requireNonNull(preemptionVictimFilter); + this.metrics = requireNonNull(metrics); + this.preemptionCandidacyDelay = requireNonNull(preemptionCandidacyDelay); + this.slotCache = requireNonNull(slotCache); + this.clusterState = requireNonNull(clusterState); + this.clock = requireNonNull(clock); + } + + @Override + public void run() { + metrics.recordTaskProcessorRun(); + storage.read(new Storage.Work.Quiet<Void>() { + @Override + public Void apply(StoreProvider store) { + Multimap<String, PreemptionVictim> slavesToActiveTasks = + clusterState.getSlavesToActiveTasks(); + + if (slavesToActiveTasks.isEmpty()) { + // No preemption victims to consider. + return null; + } + + // Group the offers by slave id so they can be paired with active tasks from the same slave. + Map<String, HostOffer> slavesToOffers = + Maps.uniqueIndex(offerManager.getOffers(), OFFER_TO_SLAVE_ID); + + Set<String> allSlaves = Sets.newHashSet(Iterables.concat( + slavesToOffers.keySet(), + slavesToActiveTasks.keySet())); + + // The algorithm below attempts to find a reservation for every task group by matching + // it against all available slaves until a preemption slot is found. Groups are evaluated + // in a round-robin fashion to ensure fairness (e.g.: G1, G2, G3, G1, G2). + // A slave is removed from further matching once a reservation is made. Similarly, all + // identical task group instances are removed from further iteration if none of the + // available slaves could yield a preemption proposal. A consuming iterator is used for + // task groups to ensure iteration order is preserved after a task group is removed. + LoadingCache<IJobKey, AttributeAggregate> jobStates = attributeCache(store); + List<TaskGroupKey> pendingGroups = fetchIdlePendingGroups(store); + Iterator<TaskGroupKey> groups = Iterators.consumingIterator(pendingGroups.iterator()); + while (!pendingGroups.isEmpty()) { + boolean matched = false; + TaskGroupKey group = groups.next(); + ITaskConfig task = group.getTask(); + + metrics.recordPreemptionAttemptFor(task); + Iterator<String> slaveIterator = allSlaves.iterator(); + while (slaveIterator.hasNext()) { + String slaveId = slaveIterator.next(); + Optional<ImmutableSet<PreemptionVictim>> candidates = + preemptionVictimFilter.filterPreemptionVictims( + task, + slavesToActiveTasks.get(slaveId), + jobStates.getUnchecked(task.getJob()), + Optional.fromNullable(slavesToOffers.get(slaveId)), + store); + + metrics.recordSlotSearchResult(candidates, task); + if (candidates.isPresent()) { + // Slot found -> remove slave to avoid multiple task reservations. + slaveIterator.remove(); + slotCache.put(new PreemptionProposal(candidates.get(), slaveId), group); + matched = true; + break; + } + } + if (!matched) { + // No slot found for the group -> remove group and reset group iterator. + pendingGroups.removeAll(ImmutableSet.of(group)); + groups = Iterators.consumingIterator(pendingGroups.iterator()); + } + } + return null; + } + }); + } + + private List<TaskGroupKey> fetchIdlePendingGroups(StoreProvider store) { + Multiset<TaskGroupKey> taskGroupCounts = HashMultiset.create( + FluentIterable.from(store.getTaskStore().fetchTasks(Query.statusScoped(PENDING))) + .filter(Predicates.and(isIdleTask, Predicates.not(hasCachedSlot))) + .transform(Functions.compose(ASSIGNED_TO_GROUP_KEY, SCHEDULED_TO_ASSIGNED))); + + return getPreemptionSequence(taskGroupCounts); + } + + /** + * Creates execution sequence for pending task groups by interleaving their unique occurrences. + * For example: {G1, G1, G1, G2, G2} will be converted into {G1, G2, G1, G2, G1}. + * + * @param groups Multiset of task groups. + * @return A task group execution sequence. + */ + private static List<TaskGroupKey> getPreemptionSequence(Multiset<TaskGroupKey> groups) { + Multiset<TaskGroupKey> mutableGroups = HashMultiset.create(groups); + List<TaskGroupKey> instructions = Lists.newLinkedList(); + Set<TaskGroupKey> keys = ImmutableSet.copyOf(groups.elementSet()); + while (!mutableGroups.isEmpty()) { + for (TaskGroupKey key : keys) { + if (mutableGroups.contains(key)) { + instructions.add(key); + mutableGroups.remove(key); + } + } + } + + return instructions; + } + + private LoadingCache<IJobKey, AttributeAggregate> attributeCache(final StoreProvider store) { + return CacheBuilder.newBuilder().build(CacheLoader.from( + new Function<IJobKey, AttributeAggregate>() { + @Override + public AttributeAggregate apply(IJobKey job) { + return AttributeAggregate.getJobActiveState(store, job); + } + })); + } + + private static final Function<IAssignedTask, TaskGroupKey> ASSIGNED_TO_GROUP_KEY = + new Function<IAssignedTask, TaskGroupKey>() { + @Override + public TaskGroupKey apply(IAssignedTask task) { + return TaskGroupKey.from(task.getTask()); + } + }; + + private final Predicate<IScheduledTask> hasCachedSlot = new Predicate<IScheduledTask>() { + @Override + public boolean apply(IScheduledTask task) { + return !slotCache.getByValue(TaskGroupKey.from(task.getAssignedTask().getTask())).isEmpty(); + } + }; + + private final Predicate<IScheduledTask> isIdleTask = new Predicate<IScheduledTask>() { + @Override + public boolean apply(IScheduledTask task) { + return (clock.nowMillis() - Tasks.getLatestEvent(task).getTimestamp()) + >= preemptionCandidacyDelay.as(Time.MILLISECONDS); + } + }; + + private static final Function<HostOffer, String> OFFER_TO_SLAVE_ID = + new Function<HostOffer, String>() { + @Override + public String apply(HostOffer offer) { + return offer.getOffer().getSlaveId().getValue(); + } + }; +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionProposal.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionProposal.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionProposal.java new file mode 100644 index 0000000..d598b02 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionProposal.java @@ -0,0 +1,66 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.preemptor; + +import java.util.Objects; +import java.util.Set; + +import com.google.common.collect.ImmutableSet; + +import static java.util.Objects.requireNonNull; + +/** + * A set of tasks proposed for preemption on a given slave. + */ +class PreemptionProposal { + private final Set<PreemptionVictim> victims; + private final String slaveId; + + PreemptionProposal(ImmutableSet<PreemptionVictim> victims, String slaveId) { + this.victims = requireNonNull(victims); + this.slaveId = requireNonNull(slaveId); + } + + Set<PreemptionVictim> getVictims() { + return victims; + } + + String getSlaveId() { + return slaveId; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof PreemptionProposal)) { + return false; + } + + PreemptionProposal other = (PreemptionProposal) o; + return Objects.equals(getVictims(), other.getVictims()) + && Objects.equals(getSlaveId(), other.getSlaveId()); + } + + @Override + public int hashCode() { + return Objects.hash(victims, slaveId); + } + + @Override + public String toString() { + return com.google.common.base.Objects.toStringHelper(this) + .add("victims", getVictims()) + .add("slaveId", getSlaveId()) + .toString(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java new file mode 100644 index 0000000..a93fa22 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictim.java @@ -0,0 +1,115 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.preemptor; + +import java.util.Objects; + +import org.apache.aurora.scheduler.configuration.Resources; +import org.apache.aurora.scheduler.storage.entities.IAssignedTask; +import org.apache.aurora.scheduler.storage.entities.ITaskConfig; + +/** + * A victim to be considered as a candidate for preemption. + */ +public final class PreemptionVictim { + private final String slaveHost; + private final boolean production; + private final String role; + private final int priority; + private final Resources resources; + private final String taskId; + + private PreemptionVictim( + String slaveHost, + boolean production, + String role, + int priority, + Resources resources, + String taskId) { + + this.slaveHost = slaveHost; + this.production = production; + this.role = role; + this.priority = priority; + this.resources = resources; + this.taskId = taskId; + } + + public static PreemptionVictim fromTask(IAssignedTask task) { + ITaskConfig config = task.getTask(); + return new PreemptionVictim( + task.getSlaveHost(), + config.isProduction(), + config.getJob().getRole(), + config.getPriority(), + Resources.from(task.getTask()), + task.getTaskId()); + } + + public String getSlaveHost() { + return slaveHost; + } + + public boolean isProduction() { + return production; + } + + public String getRole() { + return role; + } + + public int getPriority() { + return priority; + } + + public Resources getResources() { + return resources; + } + + public String getTaskId() { + return taskId; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof PreemptionVictim)) { + return false; + } + + PreemptionVictim other = (PreemptionVictim) o; + return Objects.equals(getSlaveHost(), other.getSlaveHost()) + && Objects.equals(isProduction(), other.isProduction()) + && Objects.equals(getRole(), other.getRole()) + && Objects.equals(getPriority(), other.getPriority()) + && Objects.equals(getResources(), other.getResources()) + && Objects.equals(getTaskId(), other.getTaskId()); + } + + @Override + public int hashCode() { + return Objects.hash(slaveHost, production, role, priority, resources, taskId); + } + + @Override + public String toString() { + return com.google.common.base.Objects.toStringHelper(this) + .add("slaveHost", getSlaveHost()) + .add("production", isProduction()) + .add("role", getRole()) + .add("priority", getPriority()) + .add("resources", getResources()) + .add("taskId", getTaskId()) + .toString(); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java new file mode 100644 index 0000000..4293415 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java @@ -0,0 +1,214 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.preemptor; + +import java.util.Set; + +import javax.inject.Inject; + +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.base.Predicate; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Ordering; +import com.google.common.collect.Sets; + +import org.apache.aurora.scheduler.HostOffer; +import org.apache.aurora.scheduler.ResourceSlot; +import org.apache.aurora.scheduler.filter.AttributeAggregate; +import org.apache.aurora.scheduler.filter.SchedulingFilter; +import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; +import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource; +import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto; +import org.apache.aurora.scheduler.mesos.ExecutorSettings; +import org.apache.aurora.scheduler.storage.Storage.StoreProvider; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; +import org.apache.aurora.scheduler.storage.entities.ITaskConfig; + +import static java.util.Objects.requireNonNull; + +/** + * Filters active tasks (victims) and available offer (slack) resources that can accommodate a + * given task (candidate), provided victims are preempted. + * <p> + * A task may preempt another task if the following conditions hold true: + * <ol> + * <li>The resources reserved for a victim (or a set of victims) are sufficient to satisfy + * the candidate. + * </li> + * <li>Both candidate and victim are owned by the same user and the + * {@link ITaskConfig#getPriority} of a victim is lower OR a victim is non-production and the + * candidate is production. + * </li> + * </ol> + */ +public interface PreemptionVictimFilter { + /** + * Returns a set of {@link PreemptionVictim} that can accommodate a given task if preempted. + * + * @param pendingTask Task to search preemption slot for. + * @param victims Active tasks on a slave. + * @param attributeAggregate An {@link AttributeAggregate} instance for the task's job. + * @param offer A resource offer for a slave. + * @param storeProvider A store provider to access task data. + * @return A set of {@code PreemptionVictim} instances to preempt for a given task. + */ + Optional<ImmutableSet<PreemptionVictim>> filterPreemptionVictims( + ITaskConfig pendingTask, + Iterable<PreemptionVictim> victims, + AttributeAggregate attributeAggregate, + Optional<HostOffer> offer, + StoreProvider storeProvider); + + class PreemptionVictimFilterImpl implements PreemptionVictimFilter { + private final SchedulingFilter schedulingFilter; + private final ExecutorSettings executorSettings; + private final PreemptorMetrics metrics; + + @Inject + PreemptionVictimFilterImpl( + SchedulingFilter schedulingFilter, + ExecutorSettings executorSettings, + PreemptorMetrics metrics) { + + this.schedulingFilter = requireNonNull(schedulingFilter); + this.executorSettings = requireNonNull(executorSettings); + this.metrics = requireNonNull(metrics); + } + + private static final Function<HostOffer, ResourceSlot> OFFER_TO_RESOURCE_SLOT = + new Function<HostOffer, ResourceSlot>() { + @Override + public ResourceSlot apply(HostOffer offer) { + return ResourceSlot.from(offer.getOffer()); + } + }; + + private static final Function<HostOffer, String> OFFER_TO_HOST = + new Function<HostOffer, String>() { + @Override + public String apply(HostOffer offer) { + return offer.getOffer().getHostname(); + } + }; + + private static final Function<PreemptionVictim, String> VICTIM_TO_HOST = + new Function<PreemptionVictim, String>() { + @Override + public String apply(PreemptionVictim victim) { + return victim.getSlaveHost(); + } + }; + + private final Function<PreemptionVictim, ResourceSlot> victimToResources = + new Function<PreemptionVictim, ResourceSlot>() { + @Override + public ResourceSlot apply(PreemptionVictim victim) { + return ResourceSlot.from(victim, executorSettings); + } + }; + + // TODO(zmanji) Consider using Dominant Resource Fairness for ordering instead of the vector + // ordering + private final Ordering<PreemptionVictim> resourceOrder = + ResourceSlot.ORDER.onResultOf(victimToResources).reverse(); + + @Override + public Optional<ImmutableSet<PreemptionVictim>> filterPreemptionVictims( + ITaskConfig pendingTask, + Iterable<PreemptionVictim> possibleVictims, + AttributeAggregate jobState, + Optional<HostOffer> offer, + StoreProvider storeProvider) { + + // This enforces the precondition that all of the resources are from the same host. We need to + // get the host for the schedulingFilter. + Set<String> hosts = ImmutableSet.<String>builder() + .addAll(Iterables.transform(possibleVictims, VICTIM_TO_HOST)) + .addAll(Iterables.transform(offer.asSet(), OFFER_TO_HOST)).build(); + + ResourceSlot slackResources = + ResourceSlot.sum(Iterables.transform(offer.asSet(), OFFER_TO_RESOURCE_SLOT)); + + FluentIterable<PreemptionVictim> preemptableTasks = FluentIterable.from(possibleVictims) + .filter(preemptionFilter(pendingTask)); + + if (preemptableTasks.isEmpty()) { + return Optional.absent(); + } + + Set<PreemptionVictim> toPreemptTasks = Sets.newHashSet(); + + Iterable<PreemptionVictim> sortedVictims = + resourceOrder.immutableSortedCopy(preemptableTasks); + + Optional<IHostAttributes> attributes = + storeProvider.getAttributeStore().getHostAttributes(Iterables.getOnlyElement(hosts)); + + if (!attributes.isPresent()) { + metrics.recordMissingAttributes(); + return Optional.absent(); + } + + for (PreemptionVictim victim : sortedVictims) { + toPreemptTasks.add(victim); + + ResourceSlot totalResource = ResourceSlot.sum( + ResourceSlot.sum(Iterables.transform(toPreemptTasks, victimToResources)), + slackResources); + + Set<Veto> vetoes = schedulingFilter.filter( + new UnusedResource(totalResource, attributes.get()), + new ResourceRequest(pendingTask, jobState)); + + if (vetoes.isEmpty()) { + return Optional.of(ImmutableSet.copyOf(toPreemptTasks)); + } + } + return Optional.absent(); + } + + /** + * Creates a filter that will find tasks that the provided {@code pendingTask} may preempt. + * + * @param pendingTask A task that is not scheduled to possibly preempt other tasks for. + * @return A filter that will compare the priorities and resources required by other tasks + * with {@code preemptableTask}. + */ + private static Predicate<PreemptionVictim> preemptionFilter(final ITaskConfig pendingTask) { + return new Predicate<PreemptionVictim>() { + @Override + public boolean apply(PreemptionVictim possibleVictim) { + boolean pendingIsProduction = pendingTask.isProduction(); + boolean victimIsProduction = possibleVictim.isProduction(); + + if (pendingIsProduction && !victimIsProduction) { + return true; + } else if (pendingIsProduction == victimIsProduction) { + // If production flags are equal, preemption is based on priority within the same role. + if (pendingTask.getJob().getRole().equals(possibleVictim.getRole())) { + return pendingTask.getPriority() > possibleVictim.getPriority(); + } else { + return false; + } + } else { + return false; + } + } + }; + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java new file mode 100644 index 0000000..7d2903a --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java @@ -0,0 +1,121 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.preemptor; + +import java.util.Set; + +import javax.inject.Inject; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableSet; + +import org.apache.aurora.scheduler.base.TaskGroupKey; +import org.apache.aurora.scheduler.filter.AttributeAggregate; +import org.apache.aurora.scheduler.offers.OfferManager; +import org.apache.aurora.scheduler.state.StateManager; +import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; +import org.apache.aurora.scheduler.storage.entities.IAssignedTask; +import org.apache.mesos.Protos.SlaveID; + +import static java.util.Objects.requireNonNull; + +import static org.apache.aurora.gen.ScheduleStatus.PREEMPTING; + +/** + * Attempts to preempt active tasks in favor of the provided PENDING task in case a preemption + * slot has been previously found. + */ +public interface Preemptor { + /** + * Preempts victim tasks in case a valid preemption slot exists. + * + * @param task Preempting task. + * @param jobState Current job state aggregate. + * @param storeProvider Store provider to use for task preemption. + * @return ID of the slave where preemption occurred. + */ + Optional<String> attemptPreemptionFor( + IAssignedTask task, + AttributeAggregate jobState, + MutableStoreProvider storeProvider); + + class PreemptorImpl implements Preemptor { + private final StateManager stateManager; + private final OfferManager offerManager; + private final PreemptionVictimFilter preemptionVictimFilter; + private final PreemptorMetrics metrics; + private final BiCache<PreemptionProposal, TaskGroupKey> slotCache; + + @Inject + PreemptorImpl( + StateManager stateManager, + OfferManager offerManager, + PreemptionVictimFilter preemptionVictimFilter, + PreemptorMetrics metrics, + BiCache<PreemptionProposal, TaskGroupKey> slotCache) { + + this.stateManager = requireNonNull(stateManager); + this.offerManager = requireNonNull(offerManager); + this.preemptionVictimFilter = requireNonNull(preemptionVictimFilter); + this.metrics = requireNonNull(metrics); + this.slotCache = requireNonNull(slotCache); + } + + @Override + public Optional<String> attemptPreemptionFor( + IAssignedTask pendingTask, + AttributeAggregate jobState, + MutableStoreProvider store) { + + TaskGroupKey groupKey = TaskGroupKey.from(pendingTask.getTask()); + Set<PreemptionProposal> preemptionProposals = slotCache.getByValue(groupKey); + + // A preemption slot is available -> attempt to preempt tasks. + if (!preemptionProposals.isEmpty()) { + // Get the next available preemption slot. + PreemptionProposal slot = preemptionProposals.iterator().next(); + slotCache.remove(slot, groupKey); + + // Validate PreemptionProposal is still valid for the given task. + SlaveID slaveId = SlaveID.newBuilder().setValue(slot.getSlaveId()).build(); + Optional<ImmutableSet<PreemptionVictim>> validatedVictims = + preemptionVictimFilter.filterPreemptionVictims( + pendingTask.getTask(), + slot.getVictims(), + jobState, + offerManager.getOffer(slaveId), + store); + + metrics.recordSlotValidationResult(validatedVictims); + if (!validatedVictims.isPresent()) { + // Previously found victims are no longer valid -> let the next run find a new slot. + return Optional.absent(); + } + + for (PreemptionVictim toPreempt : validatedVictims.get()) { + metrics.recordTaskPreemption(toPreempt); + stateManager.changeState( + store, + toPreempt.getTaskId(), + Optional.absent(), + PREEMPTING, + Optional.of("Preempting in favor of " + pendingTask.getTaskId())); + } + return Optional.of(slot.getSlaveId()); + } + + return Optional.absent(); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorMetrics.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorMetrics.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorMetrics.java new file mode 100644 index 0000000..30bb814 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorMetrics.java @@ -0,0 +1,131 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.preemptor; + +import java.util.Set; + +import javax.inject.Inject; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableSet; + +import org.apache.aurora.scheduler.stats.CachedCounters; +import org.apache.aurora.scheduler.storage.entities.ITaskConfig; + +import static java.util.Objects.requireNonNull; + +/** + * Defines methods to manage preemptor metrics. + */ +@VisibleForTesting +public class PreemptorMetrics { + @VisibleForTesting + static final String MISSING_ATTRIBUTES_NAME = "preemptor_missing_attributes"; + + @VisibleForTesting + static final String TASK_PROCESSOR_RUN_NAME = "preemptor_task_processor_runs"; + + private volatile boolean exported = false; + private final CachedCounters counters; + + @Inject + PreemptorMetrics(CachedCounters counters) { + this.counters = requireNonNull(counters); + assertFullyExported(); + } + + private static String prod(boolean production) { + return production ? "prod" : "non_prod"; + } + + private static String result(boolean success) { + return success ? "successful" : "failed"; + } + + private void assertFullyExported() { + if (exported) { + return; + } + + // Dummy-read all stats to ensure they are exported. + Set<String> allStats = ImmutableSet.of( + attemptsStatName(false), + attemptsStatName(true), + successStatName(false), + successStatName(true), + slotSearchStatName(true, false), + slotSearchStatName(false, false), + slotSearchStatName(true, true), + slotSearchStatName(false, true), + slotValidationStatName(true), + slotValidationStatName(false), + MISSING_ATTRIBUTES_NAME, + TASK_PROCESSOR_RUN_NAME); + for (String stat : allStats) { + counters.get(stat); + } + + exported = true; + } + + private void increment(String stat) { + assertFullyExported(); + counters.get(stat).incrementAndGet(); + } + + @VisibleForTesting + static String attemptsStatName(boolean production) { + return "preemptor_slot_search_attempts_for_" + prod(production); + } + + @VisibleForTesting + static String successStatName(boolean production) { + return "preemptor_tasks_preempted_" + prod(production); + } + + @VisibleForTesting + static String slotSearchStatName(boolean success, boolean production) { + return String.format("preemptor_slot_search_%s_for_%s", result(success), prod(production)); + } + + @VisibleForTesting + static String slotValidationStatName(boolean success) { + return "preemptor_slot_validation_" + result(success); + } + + void recordPreemptionAttemptFor(ITaskConfig task) { + increment(attemptsStatName(task.isProduction())); + } + + void recordTaskPreemption(PreemptionVictim victim) { + increment(successStatName(victim.isProduction())); + } + + void recordSlotSearchResult(Optional<?> result, ITaskConfig task) { + increment(slotSearchStatName(result.isPresent(), task.isProduction())); + } + + void recordSlotValidationResult(Optional<?> result) { + increment(slotValidationStatName(result.isPresent())); + } + + void recordMissingAttributes() { + increment(MISSING_ATTRIBUTES_NAME); + } + + void recordTaskProcessorRun() { + increment(TASK_PROCESSOR_RUN_NAME); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorModule.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorModule.java new file mode 100644 index 0000000..fc39a6d --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorModule.java @@ -0,0 +1,167 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.preemptor; + +import java.util.logging.Logger; + +import javax.inject.Inject; +import javax.inject.Singleton; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.AbstractScheduledService; +import com.google.inject.AbstractModule; +import com.google.inject.PrivateModule; +import com.google.inject.TypeLiteral; +import com.twitter.common.args.Arg; +import com.twitter.common.args.CmdLine; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; + +import org.apache.aurora.scheduler.SchedulerServicesModule; +import org.apache.aurora.scheduler.base.TaskGroupKey; +import org.apache.aurora.scheduler.events.PubsubEventModule; +import org.apache.aurora.scheduler.filter.AttributeAggregate; +import org.apache.aurora.scheduler.preemptor.BiCache.BiCacheSettings; +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.entities.IAssignedTask; + +import static java.util.Objects.requireNonNull; + +public class PreemptorModule extends AbstractModule { + + private static final Logger LOG = Logger.getLogger(PreemptorModule.class.getName()); + + @CmdLine(name = "enable_preemptor", + help = "Enable the preemptor and preemption") + private static final Arg<Boolean> ENABLE_PREEMPTOR = Arg.create(true); + + @CmdLine(name = "preemption_delay", + help = "Time interval after which a pending task becomes eligible to preempt other tasks") + private static final Arg<Amount<Long, Time>> PREEMPTION_DELAY = + Arg.create(Amount.of(3L, Time.MINUTES)); + + @CmdLine(name = "preemption_slot_hold_time", + help = "Time to hold a preemption slot found before it is discarded.") + private static final Arg<Amount<Long, Time>> PREEMPTION_SLOT_HOLD_TIME = + Arg.create(Amount.of(5L, Time.MINUTES)); + + @CmdLine(name = "preemption_slot_search_interval", + help = "Time interval between pending task preemption slot searches.") + private static final Arg<Amount<Long, Time>> PREEMPTION_SLOT_SEARCH_INTERVAL = + Arg.create(Amount.of(1L, Time.MINUTES)); + + private final boolean enablePreemptor; + private final Amount<Long, Time> preemptionDelay; + private final Amount<Long, Time> slotSearchInterval; + + @VisibleForTesting + public PreemptorModule( + boolean enablePreemptor, + Amount<Long, Time> preemptionDelay, + Amount<Long, Time> slotSearchInterval) { + + this.enablePreemptor = enablePreemptor; + this.preemptionDelay = requireNonNull(preemptionDelay); + this.slotSearchInterval = requireNonNull(slotSearchInterval); + } + + public PreemptorModule() { + this(ENABLE_PREEMPTOR.get(), PREEMPTION_DELAY.get(), PREEMPTION_SLOT_SEARCH_INTERVAL.get()); + } + + @Override + protected void configure() { + install(new PrivateModule() { + @Override + protected void configure() { + if (enablePreemptor) { + LOG.info("Preemptor Enabled."); + bind(PreemptorMetrics.class).in(Singleton.class); + bind(PreemptionVictimFilter.class) + .to(PreemptionVictimFilter.PreemptionVictimFilterImpl.class); + bind(PreemptionVictimFilter.PreemptionVictimFilterImpl.class).in(Singleton.class); + bind(Preemptor.class).to(Preemptor.PreemptorImpl.class); + bind(Preemptor.PreemptorImpl.class).in(Singleton.class); + bind(new TypeLiteral<Amount<Long, Time>>() { }) + .annotatedWith(PendingTaskProcessor.PreemptionDelay.class) + .toInstance(preemptionDelay); + bind(BiCacheSettings.class).toInstance( + new BiCacheSettings(PREEMPTION_SLOT_HOLD_TIME.get(), "preemption_slot_cache_size")); + bind(new TypeLiteral<BiCache<PreemptionProposal, TaskGroupKey>>() { }) + .in(Singleton.class); + bind(PendingTaskProcessor.class).in(Singleton.class); + bind(ClusterState.class).to(ClusterStateImpl.class); + bind(ClusterStateImpl.class).in(Singleton.class); + expose(ClusterStateImpl.class); + + bind(PreemptorService.class).in(Singleton.class); + bind(AbstractScheduledService.Scheduler.class).toInstance( + AbstractScheduledService.Scheduler.newFixedRateSchedule( + 0L, + slotSearchInterval.getValue(), + slotSearchInterval.getUnit().getTimeUnit())); + + expose(PreemptorService.class); + expose(PendingTaskProcessor.class); + } else { + bind(Preemptor.class).toInstance(NULL_PREEMPTOR); + LOG.warning("Preemptor Disabled."); + } + expose(Preemptor.class); + } + }); + + // We can't do this in the private module due to the known conflict between multibindings + // and private modules due to multiple injectors. We accept the added complexity here to keep + // the other bindings private. + PubsubEventModule.bindSubscriber(binder(), ClusterStateImpl.class); + if (enablePreemptor) { + SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()) + .to(PreemptorService.class); + } + } + + static class PreemptorService extends AbstractScheduledService { + private final PendingTaskProcessor slotFinder; + private final Scheduler schedule; + + @Inject + PreemptorService(PendingTaskProcessor slotFinder, Scheduler schedule) { + this.slotFinder = requireNonNull(slotFinder); + this.schedule = requireNonNull(schedule); + } + + @Override + protected void runOneIteration() { + slotFinder.run(); + } + + @Override + protected Scheduler scheduler() { + return schedule; + } + } + + private static final Preemptor NULL_PREEMPTOR = new Preemptor() { + @Override + public Optional<String> attemptPreemptionFor( + IAssignedTask task, + AttributeAggregate jobState, + Storage.MutableStoreProvider storeProvider) { + + return Optional.absent(); + } + }; +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPruner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPruner.java new file mode 100644 index 0000000..023e0cf --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/pruning/JobUpdateHistoryPruner.java @@ -0,0 +1,105 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.pruning; + +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +import javax.inject.Inject; + +import com.google.common.base.Joiner; +import com.google.common.util.concurrent.AbstractIdleService; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.util.Clock; + +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; +import org.apache.aurora.scheduler.storage.Storage.MutateWork; +import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; + +import static java.util.Objects.requireNonNull; + +/** + * Prunes per-job update history on a periodic basis. + */ +class JobUpdateHistoryPruner extends AbstractIdleService { + private static final Logger LOG = Logger.getLogger(JobUpdateHistoryPruner.class.getName()); + + private final Clock clock; + private final ScheduledExecutorService executor; + private final Storage storage; + private final HistoryPrunerSettings settings; + + static class HistoryPrunerSettings { + private final Amount<Long, Time> pruneInterval; + private final Amount<Long, Time> maxHistorySize; + private final int maxUpdatesPerJob; + + HistoryPrunerSettings( + Amount<Long, Time> pruneInterval, + Amount<Long, Time> maxHistorySize, + int maxUpdatesPerJob) { + + this.pruneInterval = requireNonNull(pruneInterval); + this.maxHistorySize = requireNonNull(maxHistorySize); + this.maxUpdatesPerJob = maxUpdatesPerJob; + } + } + + @Inject + JobUpdateHistoryPruner( + Clock clock, + ScheduledExecutorService executor, + Storage storage, + HistoryPrunerSettings settings) { + + this.clock = requireNonNull(clock); + this.executor = requireNonNull(executor); + this.storage = requireNonNull(storage); + this.settings = requireNonNull(settings); + } + + @Override + protected void startUp() { + executor.scheduleAtFixedRate( + new Runnable() { + @Override + public void run() { + storage.write(new MutateWork.NoResult.Quiet() { + @Override + public void execute(MutableStoreProvider storeProvider) { + Set<IJobUpdateKey> prunedUpdates = storeProvider.getJobUpdateStore().pruneHistory( + settings.maxUpdatesPerJob, + clock.nowMillis() - settings.maxHistorySize.as(Time.MILLISECONDS)); + + LOG.info(prunedUpdates.isEmpty() + ? "No job update history to prune." + : "Pruned job update history: " + Joiner.on(",").join(prunedUpdates)); + } + }); + } + }, + settings.pruneInterval.as(Time.MILLISECONDS), + settings.pruneInterval.as(Time.MILLISECONDS), + TimeUnit.MILLISECONDS); + } + + @Override + protected void shutDown() { + // Nothing to do - await VM shutdown. + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java b/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java new file mode 100644 index 0000000..373c833 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/pruning/PruningModule.java @@ -0,0 +1,106 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.pruning; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.logging.Logger; + +import javax.inject.Singleton; + +import com.google.inject.AbstractModule; +import com.google.inject.PrivateModule; +import com.twitter.common.args.Arg; +import com.twitter.common.args.CmdLine; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; + +import org.apache.aurora.scheduler.SchedulerServicesModule; +import org.apache.aurora.scheduler.base.AsyncUtil; +import org.apache.aurora.scheduler.events.PubsubEventModule; +import org.apache.aurora.scheduler.pruning.TaskHistoryPruner.HistoryPrunnerSettings; + +/** + * Binding module for background storage pruning. + */ +public class PruningModule extends AbstractModule { + + private static final Logger LOG = Logger.getLogger(PruningModule.class.getName()); + + @CmdLine(name = "history_prune_threshold", + help = "Time after which the scheduler will prune terminated task history.") + private static final Arg<Amount<Long, Time>> HISTORY_PRUNE_THRESHOLD = + Arg.create(Amount.of(2L, Time.DAYS)); + + @CmdLine(name = "history_max_per_job_threshold", + help = "Maximum number of terminated tasks to retain in a job history.") + private static final Arg<Integer> HISTORY_MAX_PER_JOB_THRESHOLD = Arg.create(100); + + @CmdLine(name = "history_min_retention_threshold", + help = "Minimum guaranteed time for task history retention before any pruning is attempted.") + private static final Arg<Amount<Long, Time>> HISTORY_MIN_RETENTION_THRESHOLD = + Arg.create(Amount.of(1L, Time.HOURS)); + + @CmdLine(name = "job_update_history_per_job_threshold", + help = "Maximum number of completed job updates to retain in a job update history.") + private static final Arg<Integer> JOB_UPDATE_HISTORY_PER_JOB_THRESHOLD = Arg.create(10); + + @CmdLine(name = "job_update_history_pruning_interval", + help = "Job update history pruning interval.") + private static final Arg<Amount<Long, Time>> JOB_UPDATE_HISTORY_PRUNING_INTERVAL = + Arg.create(Amount.of(15L, Time.MINUTES)); + + @CmdLine(name = "job_update_history_pruning_threshold", + help = "Time after which the scheduler will prune completed job update history.") + private static final Arg<Amount<Long, Time>> JOB_UPDATE_HISTORY_PRUNING_THRESHOLD = + Arg.create(Amount.of(30L, Time.DAYS)); + + @Override + protected void configure() { + install(new PrivateModule() { + @Override + protected void configure() { + // TODO(ksweeney): Create a configuration validator module so this can be injected. + // TODO(William Farner): Revert this once large task counts is cheap ala hierarchichal store + bind(HistoryPrunnerSettings.class).toInstance(new HistoryPrunnerSettings( + HISTORY_PRUNE_THRESHOLD.get(), + HISTORY_MIN_RETENTION_THRESHOLD.get(), + HISTORY_MAX_PER_JOB_THRESHOLD.get() + )); + + bind(TaskHistoryPruner.class).in(Singleton.class); + expose(TaskHistoryPruner.class); + } + }); + PubsubEventModule.bindSubscriber(binder(), TaskHistoryPruner.class); + + install(new PrivateModule() { + @Override + protected void configure() { + bind(JobUpdateHistoryPruner.HistoryPrunerSettings.class).toInstance( + new JobUpdateHistoryPruner.HistoryPrunerSettings( + JOB_UPDATE_HISTORY_PRUNING_INTERVAL.get(), + JOB_UPDATE_HISTORY_PRUNING_THRESHOLD.get(), + JOB_UPDATE_HISTORY_PER_JOB_THRESHOLD.get())); + + bind(ScheduledExecutorService.class).toInstance( + AsyncUtil.singleThreadLoggingScheduledExecutor("JobUpdatePruner-%d", LOG)); + + bind(JobUpdateHistoryPruner.class).in(Singleton.class); + expose(JobUpdateHistoryPruner.class); + } + }); + SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()) + .to(JobUpdateHistoryPruner.class); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/0070a5fd/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java new file mode 100644 index 0000000..ef88d98 --- /dev/null +++ b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java @@ -0,0 +1,175 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.pruning; + +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +import javax.inject.Inject; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Predicate; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.eventbus.Subscribe; +import com.twitter.common.quantity.Amount; +import com.twitter.common.quantity.Time; +import com.twitter.common.util.Clock; + +import org.apache.aurora.gen.apiConstants; +import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; +import org.apache.aurora.scheduler.base.Query; +import org.apache.aurora.scheduler.base.Tasks; +import org.apache.aurora.scheduler.state.StateManager; +import org.apache.aurora.scheduler.storage.Storage; +import org.apache.aurora.scheduler.storage.entities.IJobKey; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; + +import static java.util.Objects.requireNonNull; + +import static org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; +import static org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; + +/** + * Prunes tasks in a job based on per-job history and an inactive time threshold by observing tasks + * transitioning into one of the inactive states. + */ +public class TaskHistoryPruner implements EventSubscriber { + private static final Logger LOG = Logger.getLogger(TaskHistoryPruner.class.getName()); + + private final ScheduledExecutorService executor; + private final StateManager stateManager; + private final Clock clock; + private final HistoryPrunnerSettings settings; + private final Storage storage; + + private final Predicate<IScheduledTask> safeToDelete = new Predicate<IScheduledTask>() { + @Override + public boolean apply(IScheduledTask task) { + return Tasks.getLatestEvent(task).getTimestamp() + <= clock.nowMillis() - settings.minRetentionThresholdMillis; + } + }; + + static class HistoryPrunnerSettings { + private final long pruneThresholdMillis; + private final long minRetentionThresholdMillis; + private final int perJobHistoryGoal; + + HistoryPrunnerSettings( + Amount<Long, Time> inactivePruneThreshold, + Amount<Long, Time> minRetentionThreshold, + int perJobHistoryGoal) { + + this.pruneThresholdMillis = inactivePruneThreshold.as(Time.MILLISECONDS); + this.minRetentionThresholdMillis = minRetentionThreshold.as(Time.MILLISECONDS); + this.perJobHistoryGoal = perJobHistoryGoal; + } + } + + @Inject + TaskHistoryPruner( + @AsyncExecutor ScheduledExecutorService executor, + StateManager stateManager, + Clock clock, + HistoryPrunnerSettings settings, + Storage storage) { + + this.executor = requireNonNull(executor); + this.stateManager = requireNonNull(stateManager); + this.clock = requireNonNull(clock); + this.settings = requireNonNull(settings); + this.storage = requireNonNull(storage); + } + + @VisibleForTesting + long calculateTimeout(long taskEventTimestampMillis) { + return Math.max( + settings.minRetentionThresholdMillis, + settings.pruneThresholdMillis - Math.max(0, clock.nowMillis() - taskEventTimestampMillis)); + } + + /** + * When triggered, records an inactive task state change. + * + * @param change Event when a task changes state. + */ + @Subscribe + public void recordStateChange(TaskStateChange change) { + if (Tasks.isTerminated(change.getNewState())) { + long timeoutBasis = change.isTransition() + ? clock.nowMillis() + : Iterables.getLast(change.getTask().getTaskEvents()).getTimestamp(); + registerInactiveTask( + Tasks.SCHEDULED_TO_JOB_KEY.apply(change.getTask()), + change.getTaskId(), + calculateTimeout(timeoutBasis)); + } + } + + private void deleteTasks(final Set<String> taskIds) { + LOG.info("Pruning inactive tasks " + taskIds); + storage.write(new Storage.MutateWork.NoResult.Quiet() { + @Override + protected void execute(Storage.MutableStoreProvider storeProvider) { + stateManager.deleteTasks(storeProvider, taskIds); + } + }); + } + + @VisibleForTesting + static Query.Builder jobHistoryQuery(IJobKey jobKey) { + return Query.jobScoped(jobKey).byStatus(apiConstants.TERMINAL_STATES); + } + + private void registerInactiveTask( + final IJobKey jobKey, + final String taskId, + long timeRemaining) { + + LOG.fine("Prune task " + taskId + " in " + timeRemaining + " ms."); + executor.schedule( + new Runnable() { + @Override + public void run() { + LOG.info("Pruning expired inactive task " + taskId); + deleteTasks(ImmutableSet.of(taskId)); + } + }, + timeRemaining, + TimeUnit.MILLISECONDS); + + executor.submit(new Runnable() { + @Override + public void run() { + Iterable<IScheduledTask> inactiveTasks = + Storage.Util.fetchTasks(storage, jobHistoryQuery(jobKey)); + int numInactiveTasks = Iterables.size(inactiveTasks); + int tasksToPrune = numInactiveTasks - settings.perJobHistoryGoal; + if (tasksToPrune > 0 && numInactiveTasks > settings.perJobHistoryGoal) { + Set<String> toPrune = FluentIterable + .from(Tasks.LATEST_ACTIVITY.sortedCopy(inactiveTasks)) + .filter(safeToDelete) + .limit(tasksToPrune) + .transform(Tasks.SCHEDULED_TO_ID) + .toSet(); + deleteTasks(toPrune); + } + } + }); + } +}
