Author: challngr
Date: Sat Jul 13 13:28:39 2013
New Revision: 1502786
URL: http://svn.apache.org/r1502786
Log:
UIMA-3076 Insure map differences are retained in RM state updates.
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java
URL:
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java?rev=1502786&r1=1502785&r2=1502786&view=diff
==============================================================================
---
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java
(original)
+++
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java
Sat Jul 13 13:28:39 2013
@@ -723,10 +723,65 @@ public class JobManagerConverter
}
/**
+ * This is an ugly kludge because we discovered OR isn't doing map diffs!
So in the case
+ * of lost messagees, OR may not be able to discover that jobs actually
have shares assigned.
+ *
+ * Here we look into the OR map, dig out the "work", and if the indicated
share is not
+ * there, forcibly add it to the expanded shares list.
+ */
+ Map<Share, Share> sanityCheckForOrchestrator(IRmJob j, Map<Share, Share>
shares, Map<Share, Share>expanded)
+ {
+ String methodName = "sanityCheckForOrchestrator";
+ IDuccWork w = localMap.findDuccWork(j.getId());
+
+ Map<Share, Share> ret = new HashMap<Share, Share>();
+ if ( shares == null ) return null;
// no shares for whatever reason, we couldn't care less ...
+
+ switch ( w.getDuccType() ) {
+ case Job:
+ case Service:
+ {
+ IDuccWorkExecutable de = (IDuccWorkExecutable) w;
+ IDuccProcessMap pm = de.getProcessMap();
+
+ for ( Share s : shares.values() ) {
+ IDuccProcess p = pm.get(s.getId());
+ if ( p == null ) {
+ if ( (expanded == null) ||
(!expanded.containsKey(s)) ) {
+ logger.warn(methodName, j.getId(), "Redrive
share assignment: ", s);
+ ret.put(s, s);
+ }
+ }
+ }
+ }
+ break;
+
+ case Reservation:
+ {
+ IDuccWorkReservation de = (IDuccWorkReservation) w;
+ IDuccReservationMap rm = de.getReservationMap();
+
+ for ( Share s : shares.values() ) {
+ IDuccReservation r = rm.get(s.getId());
+ if ( r == null ) {
+ if ( (expanded == null) ||
(!expanded.containsKey(s)) ) {
+ logger.warn(methodName, j.getId(), "Redrive
share assignment:", s);
+ ret.put(s, s);
+ }
+ }
+ }
+ }
+ break;
+ }
+ return ret;
+ }
+
+ /**
* If no state has changed, we just resend that last one.
*/
Map<DuccId, IRmJobState> previousJobState = new HashMap<DuccId,
IRmJobState>();
+
/**
* Here's where we make a IRmStateEvent from the JobManagerUpdate so the
caller can publish it.
*/
@@ -784,15 +839,16 @@ public class JobManagerConverter
Map<DuccId, IResource> all_shares = new HashMap<DuccId,
IResource>();
Map<DuccId, IResource> shrunken_shares = new HashMap<DuccId,
IResource>();
Map<DuccId, IResource> expanded_shares = new HashMap<DuccId,
IResource>();
- Map<Share, Share> shares;
-
+ Map<Share, Share> shares = null;
+ Map<Share, Share> redrive = null;
+
shares = j.getAssignedShares();
if ( shares != null ) {
for ( Share s : shares.values() ) {
Resource r = new Resource(s.getId(), s.getNode(),
s.isPurged(), s.getShareOrder());
all_shares.put(s.getId(), r);
- //logger.debug(methodName, j.getId(), "Assigned:",
s.toString());
}
+ redrive = sanityCheckForOrchestrator(j, shares,
expanded.get(j.getId()));
}
shares = shrunken.get(j.getId());
@@ -800,7 +856,6 @@ public class JobManagerConverter
for ( Share s : shares.values() ) {
Resource r = new Resource(s.getId(), s.getNode(),
s.isPurged(), s.getShareOrder());
shrunken_shares.put(s.getId(), r);
- //logger.debug(methodName, j.getId(), "Shrunken:",
s.toString());
}
}
@@ -809,10 +864,16 @@ public class JobManagerConverter
for ( Share s : shares.values() ) {
Resource r = new Resource(s.getId(), s.getNode(),
s.isPurged(), s.getShareOrder());
expanded_shares.put(s.getId(), r);
- //logger.debug(methodName, j.getId(), "Expanded:",
s.toString());
}
}
-
+
+ if ( redrive != null ) {
+ for ( Share s : redrive.values() ) {
+ Resource r = new Resource(s.getId(), s.getNode(),
s.isPurged(), s.getShareOrder());
+ expanded_shares.put(s.getId(), r);
+ }
+ }
+
RmJobState rjs = new RmJobState(j.getId(), all_shares,
shrunken_shares, expanded_shares);
rjs.setDuccType(j.getDuccType());
rmJobState.put(j.getId(), rjs);