YARN-2922. ConcurrentModificationException in CapacityScheduler's LeafQueue. Contributed by Rohith Sharmaks.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0c944600 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0c944600 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0c944600 Branch: refs/heads/HDFS-EC Commit: 0c9446003fa9b462f75736d42c32925d931059c6 Parents: 10415a0 Author: Tsuyoshi Ozawa <oz...@apache.org> Authored: Mon Jan 5 00:08:31 2015 +0900 Committer: Zhe Zhang <z...@apache.org> Committed: Mon Jan 5 14:48:37 2015 -0800 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/capacity/LeafQueue.java | 4 +- .../scheduler/capacity/TestLeafQueue.java | 86 ++++++++++++++++++++ 3 files changed, 91 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c944600/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index e6694f1..0d33b4a 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -308,6 +308,9 @@ Release 2.7.0 - UNRELEASED YARN-2991. Fixed DrainDispatcher to reuse the draining code path in AsyncDispatcher. (Rohith Sharmaks via zjshen) + YARN-2922. ConcurrentModificationException in CapacityScheduler's LeafQueue. + (Rohith Sharmaks via ozawa) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c944600/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index f129ff4..47679a6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1878,7 +1878,7 @@ public class LeafQueue extends AbstractCSQueue { } // return a single Resource capturing the overal amount of pending resources - public Resource getTotalResourcePending() { + public synchronized Resource getTotalResourcePending() { Resource ret = BuilderUtils.newResource(0, 0); for (FiCaSchedulerApp f : activeApplications) { Resources.addTo(ret, f.getTotalPendingRequests()); @@ -1887,7 +1887,7 @@ public class LeafQueue extends AbstractCSQueue { } @Override - public void collectSchedulerApplications( + public synchronized void collectSchedulerApplications( Collection<ApplicationAttemptId> apps) { for (FiCaSchedulerApp pendingApp : pendingApplications) { apps.add(pendingApp.getApplicationAttemptId()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/0c944600/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 642363e..fb7bb2c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -37,11 +37,13 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.ConcurrentModificationException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CyclicBarrier; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -61,6 +63,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; @@ -2353,6 +2356,89 @@ public class TestLeafQueue { } } + @Test + public void testConcurrentAccess() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + MockRM rm = new MockRM(); + rm.init(conf); + rm.start(); + + final String queue = "default"; + final String user = "user"; + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + final LeafQueue defaultQueue = (LeafQueue) cs.getQueue(queue); + + final List<FiCaSchedulerApp> listOfApps = + createListOfApps(10000, user, defaultQueue); + + final CyclicBarrier cb = new CyclicBarrier(2); + final List<ConcurrentModificationException> conException = + new ArrayList<ConcurrentModificationException>(); + + Thread submitAndRemove = new Thread(new Runnable() { + + @Override + public void run() { + + for (FiCaSchedulerApp fiCaSchedulerApp : listOfApps) { + defaultQueue.submitApplicationAttempt(fiCaSchedulerApp, user); + } + try { + cb.await(); + } catch (Exception e) { + // Ignore + } + for (FiCaSchedulerApp fiCaSchedulerApp : listOfApps) { + defaultQueue.finishApplicationAttempt(fiCaSchedulerApp, queue); + } + } + }, "SubmitAndRemoveApplicationAttempt Thread"); + + Thread getAppsInQueue = new Thread(new Runnable() { + List<ApplicationAttemptId> apps = new ArrayList<ApplicationAttemptId>(); + + @Override + public void run() { + try { + try { + cb.await(); + } catch (Exception e) { + // Ignore + } + defaultQueue.collectSchedulerApplications(apps); + } catch (ConcurrentModificationException e) { + conException.add(e); + } + } + + }, "GetAppsInQueue Thread"); + + submitAndRemove.start(); + getAppsInQueue.start(); + + submitAndRemove.join(); + getAppsInQueue.join(); + + assertTrue("ConcurrentModificationException is thrown", + conException.isEmpty()); + rm.stop(); + + } + + private List<FiCaSchedulerApp> createListOfApps(int noOfApps, String user, + LeafQueue defaultQueue) { + List<FiCaSchedulerApp> appsLists = new ArrayList<FiCaSchedulerApp>(); + for (int i = 0; i < noOfApps; i++) { + ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(i, 0); + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user, defaultQueue, + mock(ActiveUsersManager.class), spyRMContext); + appsLists.add(app_0); + } + return appsLists; + } + private CapacitySchedulerContext mockCSContext( CapacitySchedulerConfiguration csConf, Resource clusterResource) { CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);