Author: bfoster
Date: Thu Feb 17 18:53:36 2011
New Revision: 1071749

URL: http://svn.apache.org/viewvc?rev=1071749&view=rev
Log:

- update to LocalEngineRunner to allow a cache (with configurable size)

-----------------------------------

Modified:
    
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunner.java
    
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunnerFactory.java

Modified: 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunner.java
URL: 
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunner.java?rev=1071749&r1=1071748&r2=1071749&view=diff
==============================================================================
--- 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunner.java
 (original)
+++ 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunner.java
 Thu Feb 17 18:53:36 2011
@@ -17,6 +17,10 @@
 package org.apache.oodt.cas.workflow.engine.runner;
 
 //OODT imports
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
 import org.apache.oodt.cas.workflow.instance.TaskInstance;
 
 /**
@@ -30,15 +34,43 @@ import org.apache.oodt.cas.workflow.inst
  */
 public class LocalEngineRunner extends EngineRunner {
        
+       private static final Logger LOG = 
Logger.getLogger(LocalEngineRunner.class.getName());
+       
+       private int cacheSize;
        private int numOfSlots;
        private int usedSlots = 0;
+       private List<TaskInstance> cache;
        
-       public LocalEngineRunner(int numOfSlots) {
+       public LocalEngineRunner(int numOfSlots, int cacheSize) {
                this.numOfSlots = numOfSlots;
+               this.cacheSize = cacheSize;
+               if (this.cacheSize > 0) {
+                       new Thread(new Runnable() {
+                               public void run() {
+                                       while (true) {
+                                               try {
+                                                       if 
(LocalEngineRunner.this.numOfSlots > LocalEngineRunner.this.usedSlots && 
LocalEngineRunner.this.cache.size() > 0)
+                                                               
LocalEngineRunner.this.execute(LocalEngineRunner.this.cache.remove(0));
+                                               }catch (Exception e) {
+                                                       LOG.log(Level.SEVERE, 
"Failed to submit job from cache : " + e.getMessage(), e);
+                                               }
+                                               try {
+                                                       synchronized(this) {
+                                                               this.wait(2000);
+                                                       }
+                                               }catch (Exception e) {
+                                                       LOG.log(Level.WARNING, 
"Local Runner cache submitter thread wait terminated : " + e.getMessage(), e);
+                                               }
+                                       }
+                               }
+                       }).start();
+               }
        }
        
        public void execute(final TaskInstance workflowInstance) throws 
Exception {
                incrSlots();
+               if (this.cache.size() > 0)
+                       cache.add(workflowInstance);
                new Thread(new Runnable() {
                        public void run() {
                                try {
@@ -52,7 +84,7 @@ public class LocalEngineRunner extends E
 
        @Override
        public synchronized int getOpenSlots(TaskInstance workflowInstance) 
throws Exception {
-               return numOfSlots - usedSlots;
+               return (numOfSlots - usedSlots) + this.cacheSize;
        }
 
        @Override

Modified: 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunnerFactory.java
URL: 
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunnerFactory.java?rev=1071749&r1=1071748&r2=1071749&view=diff
==============================================================================
--- 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunnerFactory.java
 (original)
+++ 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunnerFactory.java
 Thu Feb 17 18:53:36 2011
@@ -27,14 +27,19 @@ package org.apache.oodt.cas.workflow.eng
  */
 public class LocalEngineRunnerFactory implements EngineRunnerFactory {
 
+       private int cacheSize = 0;
        private int numOfSlots = 6;
        
        public LocalEngineRunner createRunner() {
-               return new LocalEngineRunner(this.numOfSlots);
+               return new LocalEngineRunner(this.numOfSlots, this.cacheSize);
        }
        
        public void setNumOfSlots(int numOfSlots) {
                this.numOfSlots = numOfSlots;
        }
+       
+       public void setCacheSize(int cacheSize) {
+               this.cacheSize = cacheSize;
+       }
 
 }


Reply via email to