Author: challngr
Date: Tue Mar 19 14:31:02 2013
New Revision: 1458311
URL: http://svn.apache.org/r1458311
Log:
UIMA-2667
Rewrite the 'takeFromTheRich' routine for defragmentation.
Also fixes a 1-line bug in the counting method apportion_qshares, a loop that
terminated incorrectly,
causing a divisor to go negative, and hence class/user/job counts to go
negative, under high load.
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java
URL:
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java?rev=1458311&r1=1458310&r2=1458311&view=diff
==============================================================================
---
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java
(original)
+++
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java
Tue Mar 19 14:31:02 2013
@@ -459,6 +459,7 @@ public class NodepoolScheduler
allweights -= e.getShareWeight();
}
}
+ if ( allweights <=0 ) break; // JRC JRC
}
// Remove entities that have everything they want
@@ -1529,113 +1530,137 @@ public class NodepoolScheduler
* ALL the user's jobs, we will have culled
everything that makes no sense to
* take from in the caller.
*
- * @return THe number of processes recovered.
+ * @return The number of processes we found space for. Note this could be
different from the number
+ * of processes evicted, if it took more than one eviction to make
spece. Also We may have
+ * evicted a process smaller than is needed, because there was
already some free space on
+ * the machine.
*/
int takeFromTheRich(IRmJob nj, int needed,
TreeMap<User, User> users_by_wealth,
HashMap<User, TreeMap<IRmJob, IRmJob>>
jobs_by_user)
{
String methodName = "takeFromTheRich";
- int given = 0;
-
- logger.debug(methodName, nj.getId(), "needed[", needed, "]");
+ // 1. Collect all machines that have shares, which if evicted, would
make enough space
+ // - in compatible NP
+ // - g + sum(shares belonging to rich users on the machine);
+ // 2. Order the machiens by
+ // a) richest user
+ // b) largest machine
+ // 3. Pick next machine,
+ // - clear enough shares
+ // - remove machine from list
+ // - update wealth
+ // 4. Repeat at 2 until
+ // a) have given what is needed
+ // b) nothing left to give
+
+ // Map<Share, Share> exemptShares = new HashMap<Share, Share>();
// not eligible for various reasons
+ Map<IRmJob, IRmJob> candidateJobs = new HashMap<IRmJob, IRmJob>();
+ Map<Machine, Machine> eligibleMachines = new TreeMap<Machine,
Machine>(new EligibleMachineSorter());
- TreeMap<IRmJob, IRmJob> job_set = new TreeMap<IRmJob, IRmJob>(new
JobByShareSorter()); // the collection of rich users jobs to take from
- Map<Share, Share> shares = new TreeMap<Share, Share>(new
FinalEvictionSorter());
- Map<Share, Share> removed = new HashMap<Share, Share>();
-
- List<User> allUsers = new ArrayList<User>();
// for debug
-
- for ( User next_user : users_by_wealth.keySet() ) {
- job_set.putAll(jobs_by_user.get(next_user));
// on each iter, the set of jobs with candidates grows
-
- allUsers.add(next_user);
- logger.debug(methodName, nj.getId(), "Donating users:", allUsers);
- logger.debug(methodName, nj.getId(), "Donating jobs:",
listJobSet(job_set));
-
- Map<IRmJob, IRmJob> donorJobs = new HashMap<IRmJob, IRmJob>();
- List<Share> donorShares = new ArrayList<Share>();
- boolean shares_found = false;
- do {
- shares_found = false;
- IRmJob rich_j = job_set.firstKey();
// de rrrichest kind
-
- logger.debug(methodName, nj.getId(), "Inspecting job",
rich_j.getId());
- //
- // First lets see if something is pending and we can just
reassign it. Nobody knows
- // about this share but RM yet, so it's safe to do this.
- //
- shares.putAll(rich_j.getPendingShares());
// each new job makes the candidate set richer
- shares.putAll(rich_j.getAssignedShares());
- removed.putAll(rich_j.getPendingRemoves());
-
- for ( Share s : shares.keySet() ) {
+ for ( TreeMap<IRmJob, IRmJob> jobs : jobs_by_user.values() ) {
+ candidateJobs.putAll(jobs);
+ }
- if ( removed.containsKey(s) ) continue;
// already gone, ignore it
-
- if ( ! compatibleNodepools(s, nj) ) {
- logger.trace(methodName, nj.getId(), "Bypassing
pending share", s.toString(), "becaose of incompatible nodepool");
- continue;
- }
- logger.trace(methodName, nj.getId(), "Pending share",
s.toString(), "is compatible with class", nj.getResourceClass().getName());
+ int given = 0;
+ int orderNeeded = nj.getShareOrder();
+
+ ResourceClass cl = nj.getResourceClass();
+ String npname = cl.getNodepoolName();
+ NodePool np = globalNodepool.getSubpool(npname);
+ Map<Node, Machine> machines = np.getAllMachines(); //
everything here is a candidate, nothing else is
+
+ for ( Machine m : machines.values() ) {
+ if ( m.getShareOrder() < orderNeeded ) {
+ // logger.debug(methodName, nj.getId(), "Bypass ", m.getId(),
": too small for request of order", orderNeeded);
+ continue;
+ }
+ Map<Share, Share> as = m.getActiveShares();
+ int g = m.getVirtualShareOrder();
+ for ( Share s : as.values() ) {
+ IRmJob j = s.getJob();
+ if ( s.isForceable() && candidateJobs.containsKey(j) ) {
+ g += j.getShareOrder();
+ }
+ }
+ if ( g >= orderNeeded ) {
+ logger.trace(methodName, nj.getId(), "Candidate machine:",
m.getId());
+ eligibleMachines.put(m, m);
+ } else {
+ // (a) the share is not forceable (non-preemptbable, or
already being removed), or
+ // (b) the share is not owned by a rich job
+ logger.trace(methodName, nj.getId(), "Not a candidate,
insufficient rich jobs:", m.getId());
+ }
+ }
+ logger.debug(methodName, nj.getId(), "Found", eligibleMachines.size(),
"machines to be searched in this order:");
+ for ( Machine m : eligibleMachines.keySet() ) {
+ logger.debug(methodName, nj.getId(), "Eligible machine:",
m.getId());
+ }
+ // first part done
+
+ // Now just bop through the machines until either we can't find
anything, or we find everything.
+ int given_per_round = 0;
+ do {
+ int g = 0;
+ for ( Machine m : eligibleMachines.keySet() ) {
+ HashMap<Share, Share> sh = m.getActiveShares();
+ g = m.getVirtualShareOrder();
+ List<Share> potentialShares = new ArrayList<Share>();
+ for ( Share s : sh.values() ) {
+ IRmJob j = s.getJob();
+ User u = j.getUser();
- Machine m = s.getMachine();
- // clear everything from this machine from jobs in the
current job set
- if ( m.getShareOrder() < nj.getShareOrder() ) {
- logger.debug(methodName, nj.getId(), "Bypassing
pending share", s.toString(), "because machine size[", m.getShareOrder(), "<
job order[", nj.getShareOrder());
- continue; // job will never fit here
- }
- Map<Share, Share> activeShares = m.getActiveShares();
// shares on this machine
- List<Share> candidates = new ArrayList<Share>();
- int total_available = m.getVirtualShareOrder();
// start with free space
-
- logger.debug(methodName, nj.getId(), "Machine", m.getId(),
"activeShares", activeShares.keySet(), "total_available", total_available);
- for ( Share as : activeShares.values() ) {
- IRmJob tentative = as.getJob();
- if ( job_set.containsKey(tentative) ) {
// share belong to a rich j?
- donorJobs.put(tentative, tentative);
- candidates.add(as);
// yes it's a candidate
- total_available += as.getShareOrder();
// add in shares that might work
- //logger.debug(methodName, nj.getId(), "share",
as.getId(), "in job", tentative.getId(), "is a candidate");
- } else {
- //logger.debug(methodName, nj.getId(), "share",
as.getId(), "in job", tentative.getId(), "is not a candidate");
- }
- }
-
- int nj_shares = total_available / nj.getShareOrder();
// figure how many nj-sized shares we can get
- nj_shares = Math.min(needed, nj_shares);
// cap on actual needed
- logger.debug(methodName, nj.getId(), "Machine", m.getId(),
"total_available", total_available, "nj_shares", nj_shares);
- if ( nj_shares > 0 ) {
// if it works, pull candidates until we have enough
- int g = m.getVirtualShareOrder();
- for ( Share cs : candidates ) {
- if ( clearShare(cs, nj) ) removed.put(cs, cs);
// return TRUE if evicted, and FALSE if reassigned to nj
- User u = rich_j.getUser();
- u.subtractWealth(s.getShareOrder());
- donorShares.add(cs);
- g += cs.getShareOrder();
- shares_found = true;
- if ( (g / nj.getShareOrder() ) >= nj_shares ) {
- break;
+ if ( s.isForceable() ) {
+ TreeMap<IRmJob, IRmJob> potentialJobs =
jobs_by_user.get(u);
+ if ( (potentialJobs != null) && (
potentialJobs.containsKey(j) ) ) {
+ g += s.getShareOrder();
+ if ( s.getShareOrder() == orderNeeded ) {
+ potentialShares.add(0, s); // exact matches
first
+ } else {
+ potentialShares.add(s);
}
}
}
- given += nj_shares;
- needed -= nj_shares;
- if ( needed == 0 ) return given;
+ if ( g >= orderNeeded ) break;
}
+
+ if ( g >= orderNeeded ) {
+ // found enough on this machine for 1 share!
+ logger.debug(methodName, nj.getId(), "Clearing shares:
g[", g, "], orderNeeded[", orderNeeded, "]");
+ g = m.getVirtualShareOrder(); // reset
+ for ( Share s : potentialShares ) {
+ IRmJob j = s.getJob();
+ User u = j.getUser();
+
+ g += s.getShareOrder();
+ given_per_round++;
+ clearShare(s, nj);
+ u.subtractWealth(s.getShareOrder());
+ logger.debug(methodName, nj.getId(), "Clearing share",
s, "order[", s.getShareOrder(),
+ "]: g[", g, "], orderNeeded[",
orderNeeded, "]");
+ if ( g >= orderNeeded) break; // inner loop, could
break on exact match without giving everything away
+ }
+ break; // outer loop, if
anything was found
+ }
+ }
- // TODO: This can't happen inside the loop
- for ( IRmJob j : donorJobs.values() ) {
// rebalance tree on job wealth after possible removal
- job_set.remove(j);
- job_set.put(j, j);
+ if ( given_per_round > 0 ) {
+ // Must reorder the eligible list to get the "next" best
candidate. We could try to remove
+ // machines that were exhausted above ...
+ Map<Machine, Machine> tmp = new HashMap<Machine, Machine>();
+ tmp.putAll(eligibleMachines);
+ eligibleMachines.clear();
+ for ( Machine m : tmp.keySet() ) {
+ eligibleMachines.put(m, m);
}
- for ( Share s : donorShares ) {
// given away, and removed from consideration
- shares.remove(s);
- }
- } while (shares_found && ( given < needed) );
- }
+ // and also must track how many processes we ma made space for
+ given = given + (g / orderNeeded); // at least one,or else
we have a bug
+ logger.debug(methodName, nj.getId(), "LOOPEND: given[", given,
"] g[", g, "] orderNeeded[", orderNeeded, "]");
+ }
+ } while ( (given_per_round > 0) && ( given < needed ));
+
return given;
}
@@ -2281,31 +2306,70 @@ public class NodepoolScheduler
// This is a sorter for a tree map so we have to be sure not to return
equality unless the objects
// are the same objects.
//
- static private class FinalEvictionSorter
- implements Comparator<Share>
+// static private class FinalEvictionSorter
+// implements Comparator<Share>
+// {
+//
+// public int compare(Share s1, Share s2)
+// {
+// if ( s1 == s2 ) return 0;
+//
+// // pending shares first, no point expanding them if we don't
have to
+// if ( s1.isPending() && s2.isPending() ) return -1;
+// if ( s1.isPending() ) return -1;
+// if (s2.isPending() ) return 1;
+//
+// // Shares on machines with more space first, deal with defrag,
which is why we're here
+// int vso1 = s1.getMachine().countFreedUpShares();
+// int vso2 = s2.getMachine().countFreedUpShares();
+//
+// if ( vso1 != vso2 ) {
+// return vso2 - vso1; // (more space first)
+// }
+//
+// // All else being equal, use investment
+// int inv = (int) (s1.getInvestment() - s2.getInvestment());
+// if ( inv == 0 ) return -1; // careful not to
return 0
+// return inv;
+// }
+// }
+
+ //
+ // Sort machines for defrag.
+ // a) machines with richest users first
+ // b) largest machine second
+ //
+ static private class EligibleMachineSorter
+ implements Comparator<Machine>
{
- public int compare(Share s1, Share s2)
+ public int compare(Machine m1, Machine m2)
{
- if ( s1 == s2 ) return 0;
+ if ( m1 == m2 ) return 0;
- // pending shares first, no point expanding them if we don't have
to
- if ( s1.isPending() && s2.isPending() ) return -1;
- if ( s1.isPending() ) return -1;
- if (s2.isPending() ) return 1;
+ int m1wealth = 0;
+ int m2wealth = 0;
+ Map<Share, Share> sh1 = m1.getActiveShares();
+ for ( Share s : sh1.values() ) {
+ IRmJob j = s.getJob();
+ User u = j.getUser();
+ m1wealth = Math.max(m1wealth, u.getShareWealth());
+ }
+
+ Map<Share, Share> sh2 = m2.getActiveShares();
+ for ( Share s : sh2.values() ) {
+ IRmJob j = s.getJob();
+ User u = j.getUser();
+ m2wealth = Math.max(m2wealth, u.getShareWealth());
+ }
- // Shares on machines with more space first, deal with defrag,
which is why we're here
- int vso1 = s1.getMachine().countFreedUpShares();
- int vso2 = s2.getMachine().countFreedUpShares();
+ if ( m1wealth != m2wealth ) return m2wealth - m1wealth; //
richest uesr first
- if ( vso1 != vso2 ) {
- return vso2 - vso1; // (more space first)
- }
+ long m1mem = m1.getMemory();
+ long m2mem = m2.getMemory();
- // All else being equal, use investment
- int inv = (int) (s1.getInvestment() - s2.getInvestment());
- if ( inv == 0 ) return -1; // careful not to
return 0
- return inv;
+ if ( m1mem == m2mem ) return -1; // for tree map, must not
return 0 unless same object
+ return (int) (m2mem - m1mem); // largest machine first.
}
}
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java
URL:
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java?rev=1458311&r1=1458310&r2=1458311&view=diff
==============================================================================
---
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java
(original)
+++
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java
Tue Mar 19 14:31:02 2013
@@ -303,16 +303,20 @@ public class Scheduler
* We only get one such name, so we give up the search if we find
* it.
*/
+ static String cached_domain = null;
private String getDomainName()
{
String methodName = "getDomainName";
+
+ if ( cached_domain != null ) return cached_domain;
try {
NodeIdentity ni = new NodeIdentity();
for ( IIdentity id : ni.getNodeIdentities()) {
String n = id.getName();
int ndx = n.indexOf(".");
if ( ndx > 0 ) {
- return n.substring(ndx + 1);
+ cached_domain = n.substring(ndx + 1);
+ return cached_domain;
}
}
} catch (Exception e) {