Repository: aurora
Updated Branches:
  refs/heads/master 364eac5d1 -> 3ec0430b8


Fix concurrency issues around banned offers in HostOffers

Currently, `getOffers` and `getWeaklyConsistentOffers` provide iterables that
use lambdas (in filter) not backed by concurrent data structures
(`globallyBannedOffers` and `staticallyBannedOffers`). This can lead to a race
condition where we have unknown behavior if reading and modification happens at
the same time.

To fix this, in `getOffers` we will revert to the previous behavior (before
patch https://reviews.apache.org/r/61804/) where we create a copy of the current
offers as well as a copy of the banned offers for consistency.

For `getWeaklyConsistentOffers`, we will use concurrent data structures for
`globallyBannedOffers` and `staticallyBannedOffers` so we don't have a race that
may produce unknown side effects.

Reviewed at https://reviews.apache.org/r/61918/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/3ec0430b
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/3ec0430b
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/3ec0430b

Branch: refs/heads/master
Commit: 3ec0430b848301d7a42d8fd89eee5df55e9d6603
Parents: 364eac5
Author: Jordan Ly <[email protected]>
Authored: Mon Sep 4 13:46:16 2017 +0200
Committer: Stephan Erb <[email protected]>
Committed: Mon Sep 4 13:46:16 2017 +0200

----------------------------------------------------------------------
 .../aurora/scheduler/offers/OfferManager.java   | 26 +++++++++++++++++---
 1 file changed, 22 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/3ec0430b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java 
b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
index 5697d2e..e833431 100644
--- a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java
@@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
 import com.google.common.collect.Ordering;
 import com.google.common.collect.Sets;
 import com.google.common.eventbus.Subscribe;
@@ -301,10 +302,11 @@ public interface OfferManager extends EventSubscriber {
       // TODO(maxim): Expose via a debug endpoint. AURORA-1136.
       // Keep track of offer->groupKey mappings that will never be matched to 
avoid redundant
       // scheduling attempts. See VetoGroup for more details on static ban.
-      private final Multimap<OfferID, TaskGroupKey> staticallyBannedOffers = 
HashMultimap.create();
+      private final Multimap<OfferID, TaskGroupKey> staticallyBannedOffers =
+          Multimaps.synchronizedMultimap(HashMultimap.create());
 
       // Keep track of globally banned offers that will never be matched to 
anything.
-      private final Set<OfferID> globallyBannedOffers = Sets.newHashSet();
+      private final Set<OfferID> globallyBannedOffers = 
Sets.newConcurrentHashSet();
 
       HostOffers(StatsProvider statsProvider, Ordering<HostOffer> offerOrder) {
         offers = new ConcurrentSkipListSet<>(offerOrder);
@@ -352,12 +354,28 @@ public interface OfferManager extends EventSubscriber {
         }
       }
 
+      /**
+       * Returns an iterable giving the state of the offers at the time the 
method is called. Unlike
+       * {@code getWeaklyConsistentOffers}, the underlying collection is a 
copy of the original and
+       * will not be modified outside of the returned iterable.
+       *
+       * @return The offers currently known by the scheduler.
+       */
       synchronized Iterable<HostOffer> getOffers() {
-        return 
Iterables.unmodifiableIterable(FluentIterable.from(offers).filter(
+        return FluentIterable.from(offers).filter(
             e -> !globallyBannedOffers.contains(e.getOffer().getId())
-        ));
+        ).toSet();
       }
 
+      /**
+       * Returns a weakly-consistent iterable giving the available offers to a 
given
+       * {@code groupKey}. This iterable can handle concurrent operations on 
its underlying
+       * collection, and may reflect changes that happen after the 
construction of the iterable.
+       * This property is mainly used in {@code launchTask}.
+       *
+       * @param groupKey The task group to get offers for.
+       * @return The offers a given task group can use.
+       */
       synchronized Iterable<HostOffer> getWeaklyConsistentOffers(TaskGroupKey 
groupKey) {
         return 
Iterables.unmodifiableIterable(FluentIterable.from(offers).filter(
             e -> !staticallyBannedOffers.containsEntry(e.getOffer().getId(), 
groupKey)

Reply via email to