Author: matei
Date: Thu Jan 22 20:55:17 2009
New Revision: 736916
URL: http://svn.apache.org/viewvc?rev=736916&view=rev
Log:
HADOOP-5075. Potential infinite loop in updateMinSlots.
Modified:
hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
Modified:
hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=736916&r1=736915&r2=736916&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
(original)
+++
hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
Thu Jan 22 20:55:17 2009
@@ -480,7 +480,7 @@
}
}
mapWeightSums.put(pool.getName(), mapWeightSum);
- reduceWeightSums.put(pool.getName(), mapWeightSum);
+ reduceWeightSums.put(pool.getName(), reduceWeightSum);
}
// And normalize the weights based on pool sums and pool weights
// to share fairly across pools (proportional to their weights)
@@ -489,8 +489,16 @@
JobInfo info = entry.getValue();
String pool = poolMgr.getPoolName(job);
double poolWeight = poolMgr.getPoolWeight(pool);
- info.mapWeight *= (poolWeight / mapWeightSums.get(pool));
- info.reduceWeight *= (poolWeight / reduceWeightSums.get(pool));
+ double mapWeightSum = mapWeightSums.get(pool);
+ double reduceWeightSum = reduceWeightSums.get(pool);
+ if (mapWeightSum == 0)
+ info.mapWeight = 0;
+ else
+ info.mapWeight *= (poolWeight / mapWeightSum);
+ if (reduceWeightSum == 0)
+ info.reduceWeight = 0;
+ else
+ info.reduceWeight *= (poolWeight / reduceWeightSum);
}
}
@@ -555,6 +563,12 @@
int share = (int) Math.ceil(oldSlots * weight / totalWeight);
slotsLeft = giveMinSlots(job, type, slotsLeft, share);
}
+ if (slotsLeft > 0) {
+ LOG.warn("Had slotsLeft = " + slotsLeft + " after the final "
+ + "loop in updateMinSlots. This probably means some fair "
+ + "scheduler weights are being set to NaN or Infinity.");
+ }
+ break;
}
}
}
Modified:
hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=736916&r1=736915&r2=736916&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
(original)
+++
hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
Thu Jan 22 20:55:17 2009
@@ -1152,6 +1152,51 @@
}
/**
+ * This test submits jobs in two pools, poolA and poolB. None of the
+ * jobs in poolA have maps, but this should not affect their reduce
+ * share.
+ */
+ public void testPoolWeightsWhenNoMaps() throws Exception {
+ // Set up pools file
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<allocations>");
+ out.println("<pool name=\"poolA\">");
+ out.println("<weight>2.0</weight>");
+ out.println("</pool>");
+ out.println("<pool name=\"poolB\">");
+ out.println("<weight>1.0</weight>");
+ out.println("</pool>");
+ out.println("</allocations>");
+ out.close();
+ scheduler.getPoolManager().reloadAllocs();
+
+ // Submit jobs, advancing time in-between to make sure that they are
+ // all submitted at distinct times.
+ JobInProgress job1 = submitJob(JobStatus.RUNNING, 0, 10, "poolA");
+ JobInfo info1 = scheduler.infos.get(job1);
+ JobInProgress job2 = submitJob(JobStatus.RUNNING, 0, 10, "poolA");
+ JobInfo info2 = scheduler.infos.get(job2);
+ JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolB");
+ JobInfo info3 = scheduler.infos.get(job3);
+ advanceTime(10);
+
+ assertEquals(0, info1.mapWeight, 0.01);
+ assertEquals(1.0, info1.reduceWeight, 0.01);
+ assertEquals(0, info2.mapWeight, 0.01);
+ assertEquals(1.0, info2.reduceWeight, 0.01);
+ assertEquals(1.0, info3.mapWeight, 0.01);
+ assertEquals(1.0, info3.reduceWeight, 0.01);
+
+ assertEquals(0, info1.mapFairShare, 0.01);
+ assertEquals(1.33, info1.reduceFairShare, 0.01);
+ assertEquals(0, info2.mapFairShare, 0.01);
+ assertEquals(1.33, info2.reduceFairShare, 0.01);
+ assertEquals(4, info3.mapFairShare, 0.01);
+ assertEquals(1.33, info3.reduceFairShare, 0.01);
+ }
+
+ /**
* Tests that max-running-tasks per node are set by assigning load
* equally accross the cluster in CapBasedLoadManager.
*/