Repository: aurora Updated Branches: refs/heads/master 364eac5d1 -> 3ec0430b8
Fix concurrency issues around banned offers in HostOffers Currently, `getOffers` and `getWeaklyConsistentOffers` provide iterables that use lambdas (in filter) not backed by concurrent data structures (`globallyBannedOffers` and `staticallyBannedOffers`). This can lead to a race condition where we have unknown behavior if reading and modification happens at the same time. To fix this, in `getOffers` we will revert to the previous behavior (before patch https://reviews.apache.org/r/61804/) where we create a copy of the current offers as well as a copy of the banned offers for consistency. For `getWeaklyConsistentOffers`, we will use concurrent data structures for `globallyBannedOffers` and `staticallyBannedOffers` so we don't have a race that may produce unknown side effects. Reviewed at https://reviews.apache.org/r/61918/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/3ec0430b Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/3ec0430b Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/3ec0430b Branch: refs/heads/master Commit: 3ec0430b848301d7a42d8fd89eee5df55e9d6603 Parents: 364eac5 Author: Jordan Ly <[email protected]> Authored: Mon Sep 4 13:46:16 2017 +0200 Committer: Stephan Erb <[email protected]> Committed: Mon Sep 4 13:46:16 2017 +0200 ---------------------------------------------------------------------- .../aurora/scheduler/offers/OfferManager.java | 26 +++++++++++++++++--- 1 file changed, 22 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/3ec0430b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java index 5697d2e..e833431 100644 --- a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java +++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java @@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.common.eventbus.Subscribe; @@ -301,10 +302,11 @@ public interface OfferManager extends EventSubscriber { // TODO(maxim): Expose via a debug endpoint. AURORA-1136. // Keep track of offer->groupKey mappings that will never be matched to avoid redundant // scheduling attempts. See VetoGroup for more details on static ban. - private final Multimap<OfferID, TaskGroupKey> staticallyBannedOffers = HashMultimap.create(); + private final Multimap<OfferID, TaskGroupKey> staticallyBannedOffers = + Multimaps.synchronizedMultimap(HashMultimap.create()); // Keep track of globally banned offers that will never be matched to anything. - private final Set<OfferID> globallyBannedOffers = Sets.newHashSet(); + private final Set<OfferID> globallyBannedOffers = Sets.newConcurrentHashSet(); HostOffers(StatsProvider statsProvider, Ordering<HostOffer> offerOrder) { offers = new ConcurrentSkipListSet<>(offerOrder); @@ -352,12 +354,28 @@ public interface OfferManager extends EventSubscriber { } } + /** + * Returns an iterable giving the state of the offers at the time the method is called. Unlike + * {@code getWeaklyConsistentOffers}, the underlying collection is a copy of the original and + * will not be modified outside of the returned iterable. + * + * @return The offers currently known by the scheduler. + */ synchronized Iterable<HostOffer> getOffers() { - return Iterables.unmodifiableIterable(FluentIterable.from(offers).filter( + return FluentIterable.from(offers).filter( e -> !globallyBannedOffers.contains(e.getOffer().getId()) - )); + ).toSet(); } + /** + * Returns a weakly-consistent iterable giving the available offers to a given + * {@code groupKey}. This iterable can handle concurrent operations on its underlying + * collection, and may reflect changes that happen after the construction of the iterable. + * This property is mainly used in {@code launchTask}. + * + * @param groupKey The task group to get offers for. + * @return The offers a given task group can use. + */ synchronized Iterable<HostOffer> getWeaklyConsistentOffers(TaskGroupKey groupKey) { return Iterables.unmodifiableIterable(FluentIterable.from(offers).filter( e -> !staticallyBannedOffers.containsEntry(e.getOffer().getId(), groupKey)
