Author: challngr
Date: Sat Jul 13 13:28:39 2013
New Revision: 1502786

URL: http://svn.apache.org/r1502786
Log:
UIMA-3076 Insure map differences are retained in RM state updates.

Modified:
    
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java?rev=1502786&r1=1502785&r2=1502786&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java
 Sat Jul 13 13:28:39 2013
@@ -723,10 +723,65 @@ public class JobManagerConverter
     }
 
     /**
+     * This is an ugly kludge because we discovered OR isn't doing map diffs!  
So in the case
+     * of lost messagees, OR may not be able to discover that jobs actually 
have shares assigned.
+     *
+     * Here we look into the OR map, dig out the "work", and if the indicated 
share is not
+     * there, forcibly add it to the expanded shares list.
+     */
+    Map<Share, Share> sanityCheckForOrchestrator(IRmJob j, Map<Share, Share> 
shares, Map<Share, Share>expanded)
+    {
+        String methodName = "sanityCheckForOrchestrator";
+        IDuccWork w = localMap.findDuccWork(j.getId());
+
+        Map<Share, Share> ret = new HashMap<Share, Share>();
+        if ( shares == null ) return null;                                    
// no shares for whatever reason, we couldn't care less ...
+
+        switch ( w.getDuccType() ) {
+            case Job:
+            case Service:
+                {
+                    IDuccWorkExecutable de = (IDuccWorkExecutable) w;
+                    IDuccProcessMap pm = de.getProcessMap();
+                    
+                    for ( Share s : shares.values() ) {
+                        IDuccProcess p = pm.get(s.getId());
+                        if ( p == null ) {
+                            if ( (expanded == null) || 
(!expanded.containsKey(s)) ) {
+                                logger.warn(methodName, j.getId(), "Redrive 
share assignment: ", s);
+                                ret.put(s, s);
+                            }
+                        }
+                    }
+                }
+                break;
+
+            case Reservation:
+                {
+                    IDuccWorkReservation de = (IDuccWorkReservation) w;
+                    IDuccReservationMap  rm = de.getReservationMap();
+                    
+                    for ( Share s : shares.values() ) {
+                        IDuccReservation r = rm.get(s.getId());
+                        if ( r == null ) {
+                            if ( (expanded == null) || 
(!expanded.containsKey(s)) ) {
+                                logger.warn(methodName, j.getId(), "Redrive 
share assignment:", s);
+                                ret.put(s, s);
+                            }
+                        }
+                    }
+                }
+                break;
+        }
+        return ret;
+    }
+
+    /**
      * If no state has changed, we just resend that last one.
      */
     Map<DuccId, IRmJobState> previousJobState = new HashMap<DuccId, 
IRmJobState>();
 
+
     /**
      * Here's where we make a IRmStateEvent from the JobManagerUpdate so the 
caller can publish it.
      */    
@@ -784,15 +839,16 @@ public class JobManagerConverter
                 Map<DuccId, IResource> all_shares      = new HashMap<DuccId, 
IResource>();
                 Map<DuccId, IResource> shrunken_shares = new HashMap<DuccId, 
IResource>();
                 Map<DuccId, IResource> expanded_shares = new HashMap<DuccId, 
IResource>();
-                Map<Share, Share> shares;
-
+                Map<Share, Share> shares = null;
+                Map<Share, Share> redrive = null;
+                
                 shares = j.getAssignedShares();
                 if ( shares != null ) {
                     for ( Share s : shares.values() ) {
                         Resource r = new Resource(s.getId(), s.getNode(), 
s.isPurged(), s.getShareOrder());
                         all_shares.put(s.getId(), r);
-                        //logger.debug(methodName, j.getId(), "Assigned:", 
s.toString());
                     }
+                    redrive = sanityCheckForOrchestrator(j, shares, 
expanded.get(j.getId()));
                 }
 
                 shares = shrunken.get(j.getId());
@@ -800,7 +856,6 @@ public class JobManagerConverter
                     for ( Share s : shares.values() ) {
                         Resource r = new Resource(s.getId(), s.getNode(), 
s.isPurged(), s.getShareOrder());
                         shrunken_shares.put(s.getId(), r);
-                        //logger.debug(methodName, j.getId(), "Shrunken:", 
s.toString());
                     }
                 }
 
@@ -809,10 +864,16 @@ public class JobManagerConverter
                     for ( Share s : shares.values() ) {
                         Resource r = new Resource(s.getId(), s.getNode(), 
s.isPurged(), s.getShareOrder());
                         expanded_shares.put(s.getId(), r);
-                        //logger.debug(methodName, j.getId(), "Expanded:", 
s.toString());
                     }
                 }
-                
+
+                if ( redrive != null ) {
+                    for ( Share s : redrive.values() ) {
+                        Resource r = new Resource(s.getId(), s.getNode(), 
s.isPurged(), s.getShareOrder());
+                        expanded_shares.put(s.getId(), r);
+                    }
+                }
+
                 RmJobState rjs = new RmJobState(j.getId(), all_shares, 
shrunken_shares, expanded_shares);
                 rjs.setDuccType(j.getDuccType());
                 rmJobState.put(j.getId(), rjs);


Reply via email to