Added: hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/BaseSchedulerTest.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/BaseSchedulerTest.java?rev=783059&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/BaseSchedulerTest.java (added) +++ hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/BaseSchedulerTest.java Tue Jun 9 16:16:33 2009 @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.mapred; + +import junit.framework.TestCase; + +import java.util.Collection; +import java.util.Set; +import java.util.HashSet; +import java.util.Arrays; +import java.util.Timer; +import java.util.TimerTask; +import java.io.IOException; +import java.io.File; + +import org.apache.hadoop.conf.Configuration; + +/** + * Base class for various scheduler tests + */ +public class BaseSchedulerTest extends TestCase { + final static String[] QUEUES = new String[] {"queue1","queue2"}; + protected FakeDynamicTimer timer = new FakeDynamicTimer(); + protected FakeTaskTrackerManager taskTracker = new FakeTaskTrackerManager(); + protected String budgetFile; + protected Configuration conf; + /** + * Create the test budget file + * @throws Exception + */ + @Override + protected void setUp() throws Exception { + super.setUp(); + String pathname = System.getProperty("test.build.data", + "build/contrib/dynamic-scheduler/test/data"); + String testDir = new File(pathname).getAbsolutePath(); + budgetFile = new File(testDir, "test-budget").getAbsolutePath(); + new File(testDir).mkdirs(); + new File(budgetFile).createNewFile(); + conf = new Configuration(); + conf.set(PrioritySchedulerOptions.DYNAMIC_SCHEDULER_ALLOC_INTERVAL, "2"); + conf.set(PrioritySchedulerOptions.DYNAMIC_SCHEDULER_BUDGET_FILE, budgetFile); + } + + /** + * deletes the test budget file + * @throws Exception + */ + @Override + protected void tearDown() throws Exception { + new File(budgetFile).delete(); + } + + static class FakeTaskTrackerManager implements TaskTrackerManager { + FakeQueueManager qm = new FakeQueueManager(); + public FakeTaskTrackerManager() { + } + public void addTaskTracker(String ttName) { + } + public ClusterStatus getClusterStatus() { + return null; + } + public int getNumberOfUniqueHosts() { + return 0; + } + public int getNextHeartbeatInterval() { + return 0; + } + public Collection<TaskTrackerStatus> taskTrackers() { + return null; + } + public void addJobInProgressListener(JobInProgressListener listener) { + } + public void removeJobInProgressListener(JobInProgressListener listener) { + } + public void submitJob(JobInProgress job) { + } + public TaskTrackerStatus getTaskTracker(String trackerID) { + return null; + } + public void killJob(JobID jobid) throws IOException { + } + public JobInProgress getJob(JobID jobid) { + return null; + } + public void startTask(String taskTrackerName, final Task t) { + } + void addQueues(String[] arr) { + Set<String> queues = new HashSet<String>(); + queues.addAll(Arrays.asList(arr)); + qm.setQueues(queues); + } + public QueueManager getQueueManager() { + return qm; + } + } + + + + + static class FakeDynamicTimer extends Timer { + private TimerTask task; + public void scheduleAtFixedRate(TimerTask task, long delay, long period) { + this.task = task; + } + public void runTask() { + task.run(); + } + } + + static class FakeQueueManager extends QueueManager { + private Set<String> queues = null; + FakeQueueManager() { + super(new Configuration()); + } + void setQueues(Set<String> queues) { + this.queues = queues; + } + public synchronized Set<String> getQueues() { + return queues; + } + } + +}
Added: hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/FakeDynamicScheduler.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/FakeDynamicScheduler.java?rev=783059&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/FakeDynamicScheduler.java (added) +++ hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/FakeDynamicScheduler.java Tue Jun 9 16:16:33 2009 @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.util.List; +import java.util.Collection; + +/** + * Mock queue scheduler for testing only + */ +public class FakeDynamicScheduler extends QueueTaskScheduler { + public void start() throws IOException { + } + public void terminate() throws IOException { + } + public List<Task> assignTasks(TaskTrackerStatus taskTracker) + throws IOException { + return null; + } + public Collection<JobInProgress> getJobs(String queueName) { + return null; + } + public void setAllocator(QueueAllocator allocator) { + } +} + Added: hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/TestDynamicScheduler.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/TestDynamicScheduler.java?rev=783059&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/TestDynamicScheduler.java (added) +++ hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/TestDynamicScheduler.java Tue Jun 9 16:16:33 2009 @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; + + +/** + Test the dynamic scheduler. + Use the System Property test.build.data to drive the test run + */ +public class TestDynamicScheduler extends BaseSchedulerTest { + + private DynamicPriorityScheduler scheduler; + + + /** + * Create the test queues + * @throws Exception + */ + @Override + protected void setUp() throws Exception { + super.setUp(); + conf.set(PrioritySchedulerOptions.DYNAMIC_SCHEDULER_SCHEDULER, + "org.apache.hadoop.mapred.FakeDynamicScheduler"); + scheduler = new DynamicPriorityScheduler(); + scheduler.setTimer(timer); + scheduler.setConf(conf); + scheduler.setTaskTrackerManager(taskTracker); + taskTracker.addQueues(QUEUES); + scheduler.start(); + } + + /** + * Remove the queues + * @throws Exception + */ + @Override + protected void tearDown() throws Exception { + super.tearDown(); + removeQueues(QUEUES); + } + + + private void setSpending(String queue, float spending) throws IOException { + scheduler.allocations.setSpending(queue, spending); + } + + private void setBudgets(String[] queue, float[] budget) throws IOException { + for (int i = 0; i < queue.length; i++) { + scheduler.allocations.addBudget(queue[i], budget[i]); + } + } + + private void addQueues(String[] queue) throws IOException { + for (String aQueue : queue) { + scheduler.allocations.addQueue(aQueue); + } + } + private void removeQueues(String[] queue) throws IOException { + for (String aQueue : queue) { + scheduler.allocations.removeQueue(aQueue); + } + } + + + public void testAllocation() throws IOException { + addQueues(QUEUES); + setSpending("queue1", 1.0f); + setSpending("queue2", 2.0f); + setBudgets(QUEUES, new float[] {100.0f, 100.0f}); + scheduler.allocations.setUsage("queue1",2,0); + scheduler.allocations.setUsage("queue2",3,0); + timer.runTask(); + assertNotNull(scheduler.allocations); + assertNotNull(scheduler.allocations.allocation); + assertNotNull(scheduler.allocations.allocation.get("queue1")); + assertNotNull(scheduler.allocations.allocation.get("queue2")); + Collection<BudgetQueue> budgetQueues = + scheduler.allocations.store.getQueues(); + assertNotNull(budgetQueues); + assertEquals(2, budgetQueues.size()); + BudgetQueue queue1Budget = null; + BudgetQueue queue2Budget = null; + for (BudgetQueue queue: budgetQueues) { + if (queue.name.equals("queue1")) { + queue1Budget = queue; + } else { + queue2Budget = queue; + } + } + assertNotNull(queue1Budget); + assertNotNull(queue2Budget); + + assertEquals(98.0f, queue1Budget.budget, 0.1f); + assertEquals(94.0f, queue2Budget.budget, 0.1f); + assertEquals(1.0f, queue1Budget.spending, 0.1f); + assertEquals(2.0f, queue2Budget.spending, 0.1f); + + Map<String,QueueAllocation> shares = scheduler.allocations.getAllocation(); + assertNotNull(shares); + assertEquals(2, shares.size()); + assertNotNull(shares.get("queue1")); + assertNotNull(shares.get("queue2")); + assertEquals(1.0f/3.0f, shares.get("queue1").getShare(), 0.1f); + assertEquals(2.0f/3.0f, shares.get("queue2").getShare(), 0.1f); + } + + public void testBudgetUpdate() throws IOException { + addQueues(QUEUES); + setSpending("queue1", 1.0f); + setSpending("queue2", 2.0f); + setBudgets(QUEUES, new float[] {100.0f, 200.0f}); + timer.runTask(); + Collection<BudgetQueue> budgetQueues = + scheduler.allocations.store.getQueues(); + BudgetQueue queue1Budget = null; + BudgetQueue queue2Budget = null; + for (BudgetQueue queue: budgetQueues) { + if (queue.name.equals("queue1")) { + queue1Budget = queue; + } else { + queue2Budget = queue; + } + } + assertNotNull(queue1Budget); + assertNotNull(queue2Budget); + assertEquals(100.0f, queue1Budget.budget, 0.1f); + assertEquals(200.0f, queue2Budget.budget, 0.1f); + setBudgets(QUEUES, new float[] {200.0f, 300.0f}); + timer.runTask(); + budgetQueues = scheduler.allocations.store.getQueues(); + for (BudgetQueue queue: budgetQueues) { + if (queue.name.equals("queue1")) { + queue1Budget = queue; + } else { + queue2Budget = queue; + } + } + assertEquals(300.0f, queue1Budget.budget, 0.1f); + assertEquals(500.0f, queue2Budget.budget, 0.1f); + removeQueues(QUEUES); + } + + public void testSpendingUpdate() throws IOException { + addQueues(QUEUES); + setSpending("queue1", 1.0f); + setSpending("queue2", 2.0f); + setBudgets(QUEUES, new float[] {100.0f, 100.0f}); + scheduler.allocations.setUsage("queue1", 1, 0); + scheduler.allocations.setUsage("queue2", 1, 0); + timer.runTask(); + Map<String,QueueAllocation> shares = + scheduler.allocations.getAllocation(); + assertEquals(1.0f/3.0f, shares.get("queue1").getShare(), 0.1f); + assertEquals(2.0f/3.0f, shares.get("queue2").getShare(), 0.1f); + setSpending("queue1", 5.0f); + setSpending("queue2", 1.0f); + timer.runTask(); + shares = scheduler.allocations.getAllocation(); + assertEquals(5.0f/6.0f, shares.get("queue1").getShare(), 0.1f); + assertEquals(1.0f/6.0f, shares.get("queue2").getShare(), 0.1f); + } +} Added: hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/TestPriorityScheduler.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/TestPriorityScheduler.java?rev=783059&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/TestPriorityScheduler.java (added) +++ hadoop/core/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/TestPriorityScheduler.java Tue Jun 9 16:16:33 2009 @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; + +public class TestPriorityScheduler extends BaseSchedulerTest { + + private DynamicPriorityScheduler scheduler; + + @Override + protected void setUp() throws Exception { + super.setUp(); + conf.set(PrioritySchedulerOptions.DYNAMIC_SCHEDULER_SCHEDULER, + "org.apache.hadoop.mapred.PriorityScheduler"); + scheduler = new DynamicPriorityScheduler(); + scheduler.setTimer(timer); + scheduler.setConf(conf); + scheduler.setTaskTrackerManager(taskTracker); + taskTracker.addQueues(QUEUES); + scheduler.start(); + } + + + /** + * Remove the queues + * @throws Exception + */ + @Override + protected void tearDown() throws Exception { + super.tearDown(); + removeQueues(QUEUES); + } + + private void setSpending(String queue, float spending) throws IOException { + scheduler.allocations.setSpending(queue, spending); + } + + private void setBudgets(String[] queue, float[] budget) throws IOException { + for (int i = 0; i < queue.length; i++) { + scheduler.allocations.addBudget(queue[i], budget[i]); + } + } + + private void addQueues(String[] queue) throws IOException { + for (String aQueue : queue) { + scheduler.allocations.addQueue(aQueue); + } + } + private void removeQueues(String[] queue) throws IOException { + for (String aQueue : queue) { + scheduler.allocations.removeQueue(aQueue); + } + } + + public void testQueueAllocation() throws IOException { + addQueues(QUEUES); + setSpending("queue1", 1.0f); + setSpending("queue2", 2.0f); + setBudgets(QUEUES, new float[] {100.0f, 100.0f}); + scheduler.allocations.setUsage("queue1", 2,0); + scheduler.allocations.setUsage("queue2", 3,0); + timer.runTask(); + Map<String,PriorityScheduler.QueueQuota> queueQuota = + ((PriorityScheduler)scheduler.scheduler). + getQueueQuota(100, 10, PriorityScheduler.MAP); + assertEquals(2, queueQuota.size()); + for (PriorityScheduler.QueueQuota quota: queueQuota.values()) { + if (quota.name.equals("queue1")) { + assertEquals(Math.round(100 * 1.0f/3.0f), quota.quota, 0.1f); + } else { + assertEquals(Math.round(100 * 2.0f/3.0f), quota.quota, 0.1f); + } + assertTrue(quota.mappers == quota.quota); + } + queueQuota = ((PriorityScheduler)scheduler.scheduler).getQueueQuota(100, 10, + PriorityScheduler.REDUCE); + assertEquals(2, queueQuota.size()); + for (PriorityScheduler.QueueQuota quota: queueQuota.values()) { + if (quota.name.equals("queue1")) { + assertEquals( Math.round(10 * 1.0f/3.0f), quota.quota, 0.1f); + } else { + assertEquals(Math.round(10 * 2.0f/3.0f), quota.quota, 0.1f); + } + assertTrue(quota.reducers == quota.quota); + } + } + + public void testUsage() throws IOException { + addQueues(QUEUES); + setSpending("queue1", 1.0f); + setSpending("queue2", 2.0f); + setBudgets(QUEUES, new float[] {1000.0f, 1000.0f}); + scheduler.allocations.setUsage("queue1", 0, 1); + scheduler.allocations.setUsage("queue2", 0, 1); + timer.runTask(); + Map<String,PriorityScheduler.QueueQuota> queueQuota = + ((PriorityScheduler)scheduler.scheduler).getQueueQuota(100, 10, + PriorityScheduler.MAP); + PriorityScheduler.QueueQuota quota1 = queueQuota.get("queue1"); + PriorityScheduler.QueueQuota quota2 = queueQuota.get("queue2"); + quota1.map_used = 10; + quota2.map_used = 90; + ((PriorityScheduler)scheduler.scheduler).markIdle(queueQuota); + timer.runTask(); + + Collection<BudgetQueue> budgetQueues = + scheduler.allocations.store.getQueues(); + assertNotNull(budgetQueues); + assertEquals(2, budgetQueues.size()); + BudgetQueue queue1Budget = null; + BudgetQueue queue2Budget = null; + for (BudgetQueue queue: budgetQueues) { + if (queue.name.equals("queue1")) { + queue1Budget = queue; + } else { + queue2Budget = queue; + } + } + assertNotNull(queue1Budget); + assertNotNull(queue2Budget); + assertEquals("Budget incorrect", 990.0f, queue1Budget.budget, 0.1f); + assertEquals("Budget incorrect", 866.0f, queue2Budget.budget, 0.1f); + } +}
