Author: challngr
Date: Mon Oct 27 17:47:37 2014
New Revision: 1634616

URL: http://svn.apache.org/r1634616
Log:
UIMA-4063 Run RM schedule on OR clock instead of RM clock.

Modified:
    uima/sandbox/uima-ducc/trunk/src/main/resources/ducc.properties
    
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManagerComponent.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedConstants.java

Modified: uima/sandbox/uima-ducc/trunk/src/main/resources/ducc.properties
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/src/main/resources/ducc.properties?rev=1634616&r1=1634615&r2=1634616&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/src/main/resources/ducc.properties (original)
+++ uima/sandbox/uima-ducc/trunk/src/main/resources/ducc.properties Mon Oct 27 
17:47:37 2014
@@ -328,9 +328,8 @@ ducc.rm.state.update.endpoint=ducc.rm.st
 # If enabled, RM tries to start as soon as it recoveres state from an OR 
publication,
 # instread of waiting for init.stability for nodes to check in. 
 ducc.rm.state.update.endpoint.type=topic
-# Frequency in milliseconds the RM publishes state.  Increase to 60000 for 
larger systems
-# to avoid thrashing.
-ducc.rm.state.publish.rate = 10000
+# the frequency, relative to or publicatations, at which RM runs a schedule
+ducc.rm.state.publish.ratio = 1
 # Base size of dram quantum in Gb
 ducc.rm.share.quantum = 1
 # Implementation class for actual scheduling algorithm

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManagerComponent.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManagerComponent.java?rev=1634616&r1=1634615&r2=1634616&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManagerComponent.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManagerComponent.java
 Mon Oct 27 17:47:37 2014
@@ -57,14 +57,17 @@ public class ResourceManagerComponent 
     int nodeStability;                // number of heartbeats from agent 
metrics we are allowed to miss before purging node
     int initStability;                // number of heartbeats from agent 
metrics we must wait for during init befor starting
     int nodeMetricsUpdateRate;
+    int orPublishingRate;
     boolean schedulerReady = false;
 
     ISchedulerMain scheduler;
     JobManagerConverter converter;
 
     // These guys are used to manage my own epoch
-    int schedulingRatio = 6;
-    int schedulingEpoch = 60000;
+    int schedulingRatio = 1;
+    // int schedulingEpoch = 60000;
+
+    long lastSchedule = 0;
     DuccEventDispatcher eventDispatcher;
     String stateEndpoint;
 
@@ -184,7 +187,8 @@ public class ResourceManagerComponent 
         nodeStability         = 
SystemPropertyResolver.getIntProperty("ducc.rm.node.stability", 
DEFAULT_STABILITY_COUNT);
         nodeMetricsUpdateRate = 
SystemPropertyResolver.getIntProperty("ducc.agent.node.metrics.publish.rate", 
DEFAULT_NODE_METRICS_RATE);
         schedulingRatio       = 
SystemPropertyResolver.getIntProperty("ducc.rm.state.publish.ratio", 
DEFAULT_SCHEDULING_RATIO);
-        schedulingEpoch       = 
SystemPropertyResolver.getIntProperty("ducc.rm.state.publish.rate", 
DEFAULT_SCHEDULING_RATE);
+        orPublishingRate      = 
SystemPropertyResolver.getIntProperty("ducc.orchestrator.abbreviated.state.publish.rate",
 DEFAULT_OR_PUBLISH_RATE);
+        // schedulingEpoch       = 
SystemPropertyResolver.getIntProperty("ducc.rm.state.publish.rate", 
DEFAULT_SCHEDULING_RATE);
         
         String adminEndpoint         = 
System.getProperty("ducc.rm.admin.endpoint");
         if ( adminEndpoint == null ) {
@@ -251,26 +255,28 @@ public class ResourceManagerComponent 
 
         while ( true ) {
 
-            try {
-                Thread.sleep(schedulingEpoch);                               
// and linger a while
-                //wait();
-            } catch (InterruptedException e) {
-               logger.info(methodName, null, "Scheduling wait interrupted, 
executing out-of-band epoch.");
-            }
+            synchronized(this) {
+                try {
+                    //Thread.sleep(schedulingEpoch);                           
    // and linger a while
+                    wait();
+                } catch (InterruptedException e) {
+                    logger.info(methodName, null, "Scheduling wait 
interrupted, executing out-of-band epoch.");
+                }
             
-            try {
-                // logger.info(methodName, null, "Publishing RM state to", 
stateEndpoint);
-                logger.info(methodName, null, "--------", ++epoch_counter, 
"------- Entering scheduling loop --------------------");
-
-                jobManagerUpdate = scheduler.schedule();          
-                if ( jobManagerUpdate != null ) {             // returns null 
while waiting for node stability
-                    RmStateDuccEvent ev = 
converter.createState(jobManagerUpdate);
-                    eventDispatcher.dispatch(stateEndpoint, ev, "");  // tell 
the world what is scheduled (note empty string)
+                try {
+                    // logger.info(methodName, null, "Publishing RM state to", 
stateEndpoint);
+                    logger.info(methodName, null, "--------", epoch_counter, 
"------- Entering scheduling loop --------------------");
+                    
+                    jobManagerUpdate = scheduler.schedule();          
+                    if ( jobManagerUpdate != null ) {             // returns 
null while waiting for node stability
+                        RmStateDuccEvent ev = 
converter.createState(jobManagerUpdate);
+                        eventDispatcher.dispatch(stateEndpoint, ev, "");  // 
tell the world what is scheduled (note empty string)
+                }
+                    
+                    logger.info(methodName, null, "--------", epoch_counter, 
"------- Scheduling loop returns  --------------------");
+                } catch (Throwable e1) {
+                    logger.fatal(methodName, null, e1);
                 }
-
-                logger.info(methodName, null, "--------", epoch_counter, 
"------- Scheduling loop returns  --------------------");
-            } catch (Throwable e1) {
-               logger.fatal(methodName, null, e1);
             }
             
         }
@@ -319,12 +325,24 @@ public class ResourceManagerComponent 
     public void onOrchestratorStateUpdate(DuccWorkMap map)
     {
         String methodName = "onJobManagerStateUpdate";
+
         try {
             logger.info(methodName, null, "-------> OR state arrives");
-            converter.eventArrives(map);
-            //if ( ((epoch_counter++) % schedulingRatio) == 0 ) {
-            //    notify();
-            //}
+            synchronized(this) {
+                // If the OR publications come too fast just ignore them.
+                // We try to set the minSchedulingRate to be something 
reasonably less than
+                // the OR rate in order to be as responsive as possible.
+                long now = System.currentTimeMillis();
+                if ( now - lastSchedule >= orPublishingRate ) {
+                    converter.eventArrives(map);
+                    if ( ((++epoch_counter) % schedulingRatio) == 0 ) {
+                        notify();
+                    }
+                    lastSchedule = now;
+                } else {
+                    logger.warn(methodName, null, "-------> OR publication 
ignored, arrived too soon (less than", orPublishingRate, "delay). Delay was", 
(now-lastSchedule));
+                }
+            }
         } catch ( Throwable e ) {
             logger.error(methodName, null, "Excepton processing Orchestrator 
event:", e);
         }

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedConstants.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedConstants.java?rev=1634616&r1=1634615&r2=1634616&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedConstants.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/SchedConstants.java
 Mon Oct 27 17:47:37 2014
@@ -34,9 +34,10 @@ public interface SchedConstants
     public static final String COMPONENT_NAME            = "RM";
     public static final int DEFAULT_STABILITY_COUNT      = 5;
     public static final int DEFAULT_INIT_STABILITY_COUNT = 3;
-    public static final int DEFAULT_SCHEDULING_RATIO     = 4;
+    public static final int DEFAULT_SCHEDULING_RATIO     = 1;
     public static final int DEFAULT_SCHEDULING_RATE      = 60000;
     public static final int DEFAULT_NODE_METRICS_RATE    = 60000;
+    public static final int DEFAULT_OR_PUBLISH_RATE      = 30000;
 
     public static final int DEFAULT_PROCESSES            = 10;     // for 
jobs, number of processes if not specified
     public static final int DEFAULT_INSTANCES            = 1;     // for 
reservations, number of instances if not specified


Reply via email to