Author: michiel
Date: 2009-11-12 18:18:02 +0100 (Thu, 12 Nov 2009)
New Revision: 39676

Modified:
   
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/CreateCachesProcessor.java
   
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/WaitUntilRecognizedFunction.java
   
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/WaitUntilTranscodingFunction.java
   
mmbase/trunk/applications/streams/src/main/resources/org/mmbase/streams/resources/createcaches.xsd
   
mmbase/trunk/applications/streams/src/test/java/org/mmbase/streams/CreateCachesTest.java
   
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/crazycreatecaches.xml
   
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_0.xml
   
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_1.xml
   
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_2.xml
   
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_3.xml
   
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_4.xml
   
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_5.xml
Log:
  MMB-1873

Modified: 
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/CreateCachesProcessor.java
===================================================================
--- 
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/CreateCachesProcessor.java
       2009-11-12 16:36:50 UTC (rev 39675)
+++ 
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/CreateCachesProcessor.java
       2009-11-12 17:18:02 UTC (rev 39676)
@@ -87,20 +87,36 @@
     }
 
     private static int transSeq = 0;
-    public final ThreadPoolExecutor transcoderExecutor = new 
ThreadPoolExecutor(3, 3, 5 * 60 , TimeUnit.SECONDS, new 
LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+
+    public final Map<Stage, ThreadPoolExecutor> threadPools = new 
EnumMap<Stage, ThreadPoolExecutor>(Stage.class);
+    private final List<CommandExecutor.Method> executors = new 
CopyOnWriteArrayList<CommandExecutor.Method>();
+    {
+        threadPools.put(Stage.TRANSCODER, new ThreadPoolExecutor(3, 3, 5 * 60 
, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+                public Thread newThread(Runnable r) {
+                    return ThreadPools.newThread(r, "TranscoderThread-" + 
Stage.TRANSCODER + "-" + (transSeq++));
+                }
+            }));
+        threadPools.put(Stage.RECOGNIZER, new ThreadPoolExecutor(3, 3, 5 * 60 
, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
             public Thread newThread(Runnable r) {
-                return ThreadPools.newThread(r, "TranscoderThread-" + 
(transSeq++));
+                return ThreadPools.newThread(r, "TranscoderThread-" + 
Stage.RECOGNIZER + "-" + (transSeq++));
             }
-        });
-    {
-        ThreadPools.getThreadPools().put(CreateCachesProcessor.class.getName() 
+ ".transcoders", transcoderExecutor);
-    }
+            }));
+        // fill the complete map, so we don't have to think about it any more 
later on.
+        for (Stage s : Stage.values()) {
+            if (s.ordinal() < Stage.TRANSCODER.ordinal()) {
+                threadPools.put(s, threadPools.get(Stage.TRANSCODER));
+            }
+            if (s.ordinal() > Stage.RECOGNIZER.ordinal()) {
+                threadPools.put(s, threadPools.get(Stage.RECOGNIZER));
+            }
+        }
+        for (Map.Entry<Stage, ThreadPoolExecutor> e : threadPools.entrySet()) {
+            
ThreadPools.getThreadPools().put(CreateCachesProcessor.class.getName() + "." + 
e.getKey().toString(), e.getValue());
+        }
 
-    private final List<CommandExecutor.Method> executors = new 
CopyOnWriteArrayList<CommandExecutor.Method>();
-    {
-        executors.add(new CommandExecutor.Method());
-        executors.add(new CommandExecutor.Method());
-        executors.add(new CommandExecutor.Method());
+        for (int i = 0; i < 6; i++) {
+            executors.add(new CommandExecutor.Method());
+        }
     }
 
     private String configFile = "streams/createcaches.xml";
@@ -112,7 +128,7 @@
                     Map<String, JobDefinition> newList = new 
LinkedHashMap<String, JobDefinition>();
                     List<CommandExecutor.Method> newExecutors = new 
ArrayList<CommandExecutor.Method>();
                     Document document = 
getResourceLoader().getDocument(resource);
-                    int totalTranscoders = 0;
+                    Map<Stage, Integer> totals = new EnumMap<Stage, 
Integer>(Stage.class);
 
                     if (document != null) {
                         org.w3c.dom.NodeList ellist = 
document.getDocumentElement().getChildNodes();
@@ -159,26 +175,36 @@
                                         LOG.warn("" + newList + " already 
contains an entry with id " + id);
                                     }
                                     newList.put(id, def);
-                                
+
                                 } else if 
(el.getTagName().equals("localhost")) {
                                     int max = 
Integer.parseInt(el.getAttribute("max_simultaneous_transcoders"));
-                                    totalTranscoders += max;
+                                    Stage s = 
Stage.valueOf(el.getAttribute("stage").toUpperCase());
+                                    Integer t = totals.get(s);
+                                    if (t == null) t = 0;
+                                    t += max;
+                                    totals.put(s, t);
                                     for (int j = 1; j <= max; j++) {
                                         newExecutors.add(new 
CommandExecutor.Method());
                                     }
                                 } else if (el.getTagName().equals("server")) {
                                     int max = 
Integer.parseInt(el.getAttribute("max_simultaneous_transcoders"));
-                                    totalTranscoders += max;
-                                        String host = el.getAttribute("host");
-                                        int    port = 
Integer.parseInt(el.getAttribute("port"));
-                                        for (int j = 1; j <= max; j++) {
-                                            newExecutors.add(new 
CommandExecutor.Method(host, port));
-                                        }
+                                    Stage s = 
Stage.valueOf(el.getAttribute("stage").toUpperCase());
+                                    Integer t = totals.get(s);
+                                    if (t == null) t = 0;
+                                    t += max;
+                                    totals.put(s, t);
+                                    String host = el.getAttribute("host");
+                                    int    port = 
Integer.parseInt(el.getAttribute("port"));
+                                    for (int j = 1; j <= max; j++) {
+                                        newExecutors.add(new 
CommandExecutor.Method(host, port));
+                                    }
                                 }
                             }
                         }
-                        transcoderExecutor.setCorePoolSize(totalTranscoders);
-                        
transcoderExecutor.setMaximumPoolSize(totalTranscoders);
+                        for (Map.Entry<Stage, Integer> e : totals.entrySet()) {
+                            
threadPools.get(e.getKey()).setCorePoolSize(e.getValue());
+                            
threadPools.get(e.getKey()).setMaximumPoolSize(e.getValue());
+                        }
                     } else {
                         LOG.warn("No " + resource);
                     }
@@ -188,7 +214,7 @@
                         executors.clear();
                         executors.addAll(newExecutors);
                     }
-                    LOG.service("Reading of configuration file " + resource + 
" successfull. Transcoders now " + list + ". Executors " + executors + ". Max 
simultaneous transcoders: " + totalTranscoders);
+                    LOG.service("Reading of configuration file " + resource + 
" successfull. Transcoders now " + list + ". Executors " + executors + ". Max 
simultaneous transcoders: " + totals);
                 } catch (Exception e)  {
                     LOG.error(e.getClass() + " " + e.getMessage() + " In " + 
resource + " Transcoders now " + list + " (not changed)", e);
                 }
@@ -282,7 +308,7 @@
             final Job thisJob = new Job(ntCloud, logger);
             runningJobs.put(node, thisJob);
 
-            thisJob.setFuture(transcoderExecutor.submit(new 
JobCallable(thisJob, ntCloud, logger, node)));
+            thisJob.submit(ntCloud, node, logger);
 
             return thisJob;
         }
@@ -291,7 +317,7 @@
 
 
     /**
-     * Creates caches nodes when not existing by creating a transcoding Job 
+     * Creates caches nodes when not existing by creating a transcoding Job
      * @param ntCloud   a non transactional cloud
      * @param int       node number
      * @return Job recognizing and/or transcoding the source stream
@@ -334,7 +360,7 @@
         if (node.getNumber() > 0) {
             if (node.isChanged(field.getName())) {
                 LOG.service("For node " + node.getNumber() + ", the field '" + 
field.getName() + "' is changed " + node.getChanged() + ". That means that we 
must schedule create caches");
-                
+
                 final Cloud ntCloud = 
node.getCloud().getNonTransactionalCloud();
                 final int nodeNumber = node.getNumber();
                 createCaches(ntCloud, nodeNumber);
@@ -375,7 +401,7 @@
         final String label;
 
         final Stage stage;
-        
+
         /**
          * Creates an JobDefinition template (used in the configuration 
container).
          */
@@ -453,6 +479,10 @@
         }
         public abstract MimeType getMimeType();
 
+        public final Stage getStage() {
+            return getJobDefinition().getStage();
+        }
+
     }
 
     /**
@@ -483,7 +513,7 @@
         public void ready() {
             super.ready();
             if (dest != null) {
-                LOG.info("Setting " + dest.getNumber() + " to done");
+                LOG.service("Setting " + dest.getNumber() + " to done");
                 File outFile = new File(getDirectory(), 
dest.getStringValue("url").replace("/", File.separator));
                 dest.setLongValue("filesize", outFile.length());
                 if (outFile.length() > 1) {     // @TODO: there should maybe 
be other ways to detect if a transcoding failed
@@ -514,9 +544,9 @@
 
 
     }
-    
+
     /**
-     * Result of a recognizer JobDefinition, just recognizes the type of 
stream etc. 
+     * Result of a recognizer JobDefinition, just recognizes the type of 
stream etc.
      * Does not transcode. The result out is the same as in, same for mimetype.
      */
     public class RecognizerResult extends Result {
@@ -600,11 +630,14 @@
             logger.setMaxSize(100);
             logger.setMaxAge(60000);
             chain.addLogger(logger);
+            for (Map.Entry<String, JobDefinition> dum : 
CreateCachesProcessor.this.list.entrySet()) {
+                results.add(null);
+            }
         }
 
         /**
-         * Defines the several Results by reading the JobDefinitions in the 
list. 
-         * Creates streamsourcescaches for transcoders and asigns 
TranscoderResults to them or creates 
+         * Defines the several Results by reading the JobDefinitions in the 
list.
+         * Creates streamsourcescaches for transcoders and asigns 
TranscoderResults to them or creates
          * RecognizerResults for JobDefinitions of recognizers.
          */
         protected void findResults() {
@@ -615,7 +648,7 @@
                     JobDefinition jd = entry.getValue();
                     URI inURI;
                     Node inNode;
-                    
+
                     // inNode (input stream) to be used
                     if (jd.getInId() == null) {
                         String url = node.getStringValue("url");
@@ -698,7 +731,6 @@
                         dest.commit();
 
 
-                        System.out.println("Created " + dest);
                         String destFile = dest.getStringValue("url");
                         assert destFile != null;
                         assert destFile.length() > 0;
@@ -724,7 +756,7 @@
 
                         URI outURI = outFile.toURI();
                         Result result = new TranscoderResult(jd, dest, inURI, 
outURI);
-                        
+
                         LOG.info("Added result to results list with key: " + 
dest.getStringValue("key"));
                         results.set(i, result);
                         lookup.put(jd.getId(), result);
@@ -743,10 +775,11 @@
 
         public Iterator<Result> iterator() {
             return new Iterator<Result>() {
-                int i = -1;
+                int i = 0;
                 public boolean hasNext() {
-                    for (int j = i + 1 ; j < results.size(); j++) {
+                    for (int j = i  ; j < results.size(); j++) {
                         if (results.get(j) != null && ! 
results.get(j).isReady()) {
+                            LOG.debug("Found to do at " + j + " -> " + 
results.get(j));
                             return true;
                         }
                     }
@@ -757,12 +790,16 @@
                 }
                 public Result next() {
                     synchronized(Job.this) {
-                        while(current == null || current.isReady()) {
-                            i++;
-                            current = results.get(i);
-                            Job.this.notifyAll();
+                        for (; i < results.size(); i++) {
+                            if (results.get(i) != null && ! 
results.get(i).isReady()) {
+                                current = results.get(i);
+                                Job.this.notifyAll();
+                                i++;
+                                break;
+                            }
                         }
                     }
+
                     if (current.definition.transcoder instanceof 
CommandTranscoder) {
                         // Get free method
                         CommandExecutor.Method m = null;
@@ -784,7 +821,7 @@
                     Node destination = current.getDestination();
                     if (destination != null) {
                         try {
-                            LOG.info("Setting " + 
destination.getNodeManager().getName() + " " + destination.getNumber() + " to 
BUSY");
+                            LOG.debug("Setting " + 
destination.getNodeManager().getName() + " " + destination.getNumber() + " to 
BUSY " + i);
                             destination.setIntValue("state", 
State.BUSY.getValue());
                             destination.commit();
                         } catch (Exception e) {
@@ -792,15 +829,40 @@
                         }
                     }
                     busy++;
+                    LOG.debug(" Returning at " + i + " " + current + " (" 
+current.isReady());
                     return current;
                 }
 
             };
         }
 
-        public void setFuture(Future<Integer> f) {
-            future = f;
+
+        /**
+         * Start actually executing this Jobs.
+         */
+        public void submit(Cloud cloud, int n, ChainedLogger chain) {
+            JobCallable callable = new JobCallable(this, cloud, chain, n);
+            submit(callable);
         }
+
+        /**
+         * Re-submit this job.
+         */
+
+        public synchronized void submit(JobCallable jc) {
+            LOG.info("Will submit " + jc);
+            findResults();
+            if (getCurrent() == null) {
+                iterator().next();
+            }
+            Stage s = getCurrent().getStage();
+            LOG.info("to " + s);
+            if (getStage() == Stage.READY) {
+            } else {
+                future =  
CreateCachesProcessor.this.threadPools.get(s).submit(jc);
+                notifyAll();
+            }
+        }
         public Logger getLogger() {
             return logger;
         }
@@ -888,12 +950,10 @@
             mediaprovider = node.getNodeValue("mediaprovider");
             assert mediafragment != null : "Mediafragment should not be null";
             //assert mediaprovider != null : "Mediaprovider should not be 
null";
-            for (Map.Entry<String, JobDefinition> dum : 
CreateCachesProcessor.this.list.entrySet()) {
-                results.add(null);
-            }
             findResults();
         }
         public synchronized void interrupt() {
+
             Node cacheNode = current.getDestination();
             if (cacheNode != null && 
node.getCloud().hasNode(cacheNode.getNumber())) {
                 cacheNode.setIntValue("state", State.INTERRUPTED.getValue());
@@ -910,47 +970,37 @@
         public boolean isInterrupted() {
             return interrupted;
         }
-        public boolean isReady() {
-            return ready;
+        public boolean reached(Stage s) {
+            LOG.info("Comparing for " + getStage() + ">=" + s);
+            return getStage().ordinal() >= s.ordinal();
         }
-        public synchronized void ready() {
-            ready = true;
-            notifyAll();
-        }
+        synchronized public void ready() {
+                ready = true;
+                notifyAll();
+            }
 
-        public synchronized void waitUntilReady() throws InterruptedException {
-            while (! isReady()) {
-                wait();
+        public synchronized void waitUntil(Stage stage)
+                                     throws InterruptedException {
+                LOG.info("Waiting for " + stage);
+                while (! reached(stage)) {
+                    wait();
+                }
+
             }
-        }
+
+        public synchronized void waitUntilAfter(Stage stage) throws 
InterruptedException {
+                while (getStage().ordinal() <= stage.ordinal()) {
+                    wait();
+                }
+            }
+
         public Stage getStage() {
             if (ready) return Stage.READY;
             Result res = getCurrent();
             return res == null ? Stage.UNSTARTED : 
res.getJobDefinition().getStage();
         }
 
-        public void waitUntilRecognized() throws InterruptedException {
-            Stage stage = getStage();
-            while (stage.ordinal() < Stage.RECOGNIZER.ordinal()) {
-                synchronized(this) {
-                    LOG.info("Not yet recognized, but " + stage + " " + 
getCurrent());
-                    wait(100000);
-                }
-                stage = getStage();
-            }
-        }
 
-        public void waitUntilTranscoding() throws InterruptedException {
-            Stage stage = getStage();
-            while (stage.ordinal() < Stage.TRANSCODER.ordinal()) {
-                synchronized(this) {
-                    LOG.info("Not yet transcoding, but " + stage + " " + 
getCurrent());
-                    wait(100000);
-                }
-                stage = getStage();
-            }
-        }
-
         public String getProgress() {
             return "" + busy + "/" + results.size();
         }
@@ -980,14 +1030,33 @@
         }
     }
 
-
+    /**
+     * This enum can contain the 'stage' of the transcoding process
+     */
     public static enum Stage {
+        /**
+         * Nothing happend yet.
+         */
         UNSTARTED,
+        /**
+         * Currently busy with a recognizer
+         */
         RECOGNIZER,
+        /**
+         * Currently busy with a transcoder
+         */
         TRANSCODER,
+        /**
+         * Everything's done!
+         */
         READY;
     }
 
+
+    /**
+     * This is the actual callable that can be submitted to the Executors.
+     * It can actually be submitted multiple times. Until it is ready.
+     */
     protected class JobCallable implements Callable<Integer> {
         private final Job thisJob;
         private final Cloud ntCloud;
@@ -1001,6 +1070,7 @@
             this.ntCloud = cloud;
             this.logger = l;
             this.node   = node;
+            init();
 
         }
         protected void init() {
@@ -1029,13 +1099,33 @@
         }
 
         public Integer call() {
-            init();
 
             int resultCount = 0;
             try {
-                LOG.info("Executing " + thisJob);
-                while (iterator.hasNext()) {
-                    final Result result = iterator.next();
+                Result result = thisJob.getCurrent();
+                while (true) {
+                    LOG.info("Executing " + thisJob);
+                    Result current = thisJob.getCurrent();
+                    if (current == null || current.isReady()) {
+                        while (iterator.hasNext()) {
+                            iterator.next();
+                        }
+                        current = thisJob.getCurrent();
+
+                        LOG.info("Found " + current);
+                        if (current.isReady()) {
+                            thisJob.ready();
+                            return resultCount;
+
+                        }
+
+                    }
+                    if (result != null && result.getStage() != 
current.getStage()) {
+                        LOG.info("Will do next stage " + current.getStage() + 
" now (was " + result + "), first returning");
+                        thisJob.submit(this);
+                        return resultCount;
+                    }
+                    result = current;
                     LOG.info("NOW doing " + result);
                     URI in  = result.getIn();
                     URI out = result.getOut();
@@ -1085,8 +1175,8 @@
                 throw e;
             } finally {
                 logger.info("FINALLY " + resultCount);
-                thisJob.ready(); // notify waiters
-                runningJobs.remove(thisJob.getNode().getNumber());
+                //thisJob.ready(); // notify waiters
+                //runningJobs.remove(thisJob.getNode().getNumber());
             }
             return resultCount;
         }

Modified: 
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/WaitUntilRecognizedFunction.java
===================================================================
--- 
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/WaitUntilRecognizedFunction.java
 2009-11-12 16:36:50 UTC (rev 39675)
+++ 
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/WaitUntilRecognizedFunction.java
 2009-11-12 17:18:02 UTC (rev 39676)
@@ -18,7 +18,7 @@
 
 /**
  *
- * @author André van Toly
+ * @author Andr� van Toly
  * @version $Id: CreateCachesFunction.java 36715 2009-07-08 22:30:03Z michiel $
  */
 
@@ -35,7 +35,7 @@
         CreateCachesProcessor.Job job = CreateCachesProcessor.getJob(node);
         if (job != null) {
             try {
-                job.waitUntilRecognized();
+                job.waitUntilAfter(CreateCachesProcessor.Stage.RECOGNIZER);
             } catch (InterruptedException ie) {
                 return false;
             }

Modified: 
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/WaitUntilTranscodingFunction.java
===================================================================
--- 
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/WaitUntilTranscodingFunction.java
        2009-11-12 16:36:50 UTC (rev 39675)
+++ 
mmbase/trunk/applications/streams/src/main/java/org/mmbase/streams/WaitUntilTranscodingFunction.java
        2009-11-12 17:18:02 UTC (rev 39676)
@@ -35,7 +35,7 @@
         CreateCachesProcessor.Job job = CreateCachesProcessor.getJob(node);
         if (job != null) {
             try {
-                job.waitUntilTranscoding();
+                job.waitUntil(CreateCachesProcessor.Stage.TRANSCODER);
             } catch (InterruptedException ie) {
                 return false;
             }

Modified: 
mmbase/trunk/applications/streams/src/main/resources/org/mmbase/streams/resources/createcaches.xsd
===================================================================
--- 
mmbase/trunk/applications/streams/src/main/resources/org/mmbase/streams/resources/createcaches.xsd
  2009-11-12 16:36:50 UTC (rev 39675)
+++ 
mmbase/trunk/applications/streams/src/main/resources/org/mmbase/streams/resources/createcaches.xsd
  2009-11-12 17:18:02 UTC (rev 39676)
@@ -54,12 +54,14 @@
   <xsd:element name="localhost">
     <xsd:complexType>
       <xsd:attribute name="max_simultaneous_transcoders" type="xsd:integer" 
use="required" />
+      <xsd:attribute name="stage"                        type="type_stage"  
use="required" />
     </xsd:complexType>
   </xsd:element>
 
   <xsd:element name="server">
     <xsd:complexType>
       <xsd:attribute name="max_simultaneous_transcoders" type="xsd:integer"  
use="required" />
+      <xsd:attribute name="stage"                        type="type_stage"   
use="required" />
       <xsd:attribute name="host" type="xsd:string"  use="required" />
       <xsd:attribute name="port" type="xsd:integer" use="required" />
     </xsd:complexType>
@@ -93,4 +95,11 @@
     </xsd:complexType>
   </xsd:element>
 
+  <xsd:simpleType name="type_stage">
+    <xsd:restriction base="xsd:string">
+      <xsd:enumeration value="recognizer" />
+      <xsd:enumeration value="transcoder" />
+    </xsd:restriction>
+  </xsd:simpleType>
+
 </xsd:schema>

Modified: 
mmbase/trunk/applications/streams/src/test/java/org/mmbase/streams/CreateCachesTest.java
===================================================================
--- 
mmbase/trunk/applications/streams/src/test/java/org/mmbase/streams/CreateCachesTest.java
    2009-11-12 16:36:50 UTC (rev 39675)
+++ 
mmbase/trunk/applications/streams/src/test/java/org/mmbase/streams/CreateCachesTest.java
    2009-11-12 17:18:02 UTC (rev 39676)
@@ -106,7 +106,7 @@
 
 
 
-    //@Test
+    @Test
     public void node() {
         Cloud cloud = getCloud();
         assumeNotNull(cloud);
@@ -191,7 +191,7 @@
             CreateCachesProcessor proc = get("crazycreatecaches.xml");
             Node source = getNode(proc.getDirectory());
             CreateCachesProcessor.Job job = 
proc.createCaches(source.getCloud(), source.getNumber());
-            job.waitUntilReady();
+            job.waitUntil(CreateCachesProcessor.Stage.READY);
             assertTrue("No node " + source.getNumber() + " in " + 
source.getCloud(), source.getCloud().hasNode(source.getNumber()));
             source = refresh(source);
         }
@@ -204,7 +204,7 @@
         Node source = getNode(proc.getDirectory());
         CreateCachesProcessor.Job job = proc.createCaches(source.getCloud(), 
source.getNumber());
 
-        job.waitUntilReady();
+        job.waitUntil(CreateCachesProcessor.Stage.READY);
 
         assertTrue("No node " + source.getNumber() + " in " + 
source.getCloud(), source.getCloud().hasNode(source.getNumber()));
         source = refresh(source);
@@ -217,7 +217,7 @@
         Node source = getNode(proc.getDirectory());
         CreateCachesProcessor.Job job = proc.createCaches(source.getCloud(), 
source.getNumber());
         source.commit();
-        job.waitUntilReady();
+        job.waitUntil(CreateCachesProcessor.Stage.READY);
         assertTrue(source.getCloud().hasNode(source.getNumber()));
         source = refresh(source);
         checkSource(source, 2);
@@ -230,7 +230,7 @@
         Node source = getNode(proc.getDirectory());
         CreateCachesProcessor.Job job = proc.createCaches(source.getCloud(), 
source.getNumber());
         source.commit();
-        job.waitUntilReady();
+        job.waitUntil(CreateCachesProcessor.Stage.READY);
         source = refresh(source);
         checkSource(source, 3);
     }
@@ -241,7 +241,7 @@
         Node source = getNode(proc.getDirectory());
         CreateCachesProcessor.Job job = proc.createCaches(source.getCloud(), 
source.getNumber());
         source.commit();
-        job.waitUntilReady();
+        job.waitUntil(CreateCachesProcessor.Stage.READY);
         source = refresh(source);
         checkSource(source, 4);
     }
@@ -252,7 +252,7 @@
         Node source = getNode(proc.getDirectory());
         CreateCachesProcessor.Job job = proc.createCaches(source.getCloud(), 
source.getNumber());
         source.commit();
-        job.waitUntilReady();
+        job.waitUntil(CreateCachesProcessor.Stage.READY);
         source = refresh(source);
         checkSource(source, 1);
     }
@@ -263,7 +263,7 @@
         Node source = getNode(proc.getDirectory());
         CreateCachesProcessor.Job job = proc.createCaches(source.getCloud(), 
source.getNumber());
         source.commit();
-        job.waitUntilReady();
+        job.waitUntil(CreateCachesProcessor.Stage.READY);
         source = refresh(source);
         checkSource(source, 4);
     }

Modified: 
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/crazycreatecaches.xml
===================================================================
--- 
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/crazycreatecaches.xml
        2009-11-12 16:36:50 UTC (rev 39675)
+++ 
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/crazycreatecaches.xml
        2009-11-12 17:18:02 UTC (rev 39676)
@@ -3,7 +3,7 @@
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
     xsi:schemaLocation="http://www.mmbase.org/xmlns/createcaches 
http://www.mmbase.org/xmlns/createcaches.xsd";
     >
-  <localhost max_simultaneous_transcoders="1" />
+  <localhost max_simultaneous_transcoders="1" stage="recognizer"/>
 
   <!-- No caches at all, recognizing only -->
 

Modified: 
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_0.xml
===================================================================
--- 
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_0.xml
      2009-11-12 16:36:50 UTC (rev 39675)
+++ 
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_0.xml
      2009-11-12 17:18:02 UTC (rev 39676)
@@ -3,7 +3,7 @@
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
     xsi:schemaLocation="http://www.mmbase.org/xmlns/createcaches 
http://www.mmbase.org/xmlns/createcaches.xsd";
     >
-  <localhost max_simultaneous_transcoders="1" />
+  <localhost max_simultaneous_transcoders="1" stage="recognizer" />
 
   <!-- No caches at all, recognizing only -->
 

Modified: 
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_1.xml
===================================================================
--- 
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_1.xml
      2009-11-12 16:36:50 UTC (rev 39675)
+++ 
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_1.xml
      2009-11-12 17:18:02 UTC (rev 39676)
@@ -3,7 +3,8 @@
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
     xsi:schemaLocation="http://www.mmbase.org/xmlns/createcaches 
http://www.mmbase.org/xmlns/createcaches.xsd";
     >
-  <localhost max_simultaneous_transcoders="1" />
+  <localhost max_simultaneous_transcoders="1" stage="recognizer" />
+  <localhost max_simultaneous_transcoders="1" stage="transcoder" />
 
 
   <recognizer id="recognizer">

Modified: 
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_2.xml
===================================================================
--- 
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_2.xml
      2009-11-12 16:36:50 UTC (rev 39675)
+++ 
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_2.xml
      2009-11-12 17:18:02 UTC (rev 39676)
@@ -3,7 +3,8 @@
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
     xsi:schemaLocation="http://www.mmbase.org/xmlns/createcaches 
http://www.mmbase.org/xmlns/createcaches.xsd";
     >
-  <localhost max_simultaneous_transcoders="1" />
+  <localhost max_simultaneous_transcoders="1" stage="recognizer" />
+  <localhost max_simultaneous_transcoders="1" stage="transcoder" />
 
 
   <recognizer id="recognizer">

Modified: 
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_3.xml
===================================================================
--- 
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_3.xml
      2009-11-12 16:36:50 UTC (rev 39675)
+++ 
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_3.xml
      2009-11-12 17:18:02 UTC (rev 39676)
@@ -3,9 +3,11 @@
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
     xsi:schemaLocation="http://www.mmbase.org/xmlns/createcaches 
http://www.mmbase.org/xmlns/createcaches.xsd";
     >
-  <localhost max_simultaneous_transcoders="1" />
 
+  <localhost max_simultaneous_transcoders="1" stage="recognizer" />
+  <localhost max_simultaneous_transcoders="1" stage="transcoder" />
 
+
   <recognizer id="recognizer">
     <class name="org.mmbase.streams.transcoders.FFMpegRecognizer">
     </class>

Modified: 
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_4.xml
===================================================================
--- 
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_4.xml
      2009-11-12 16:36:50 UTC (rev 39675)
+++ 
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_4.xml
      2009-11-12 17:18:02 UTC (rev 39676)
@@ -3,7 +3,8 @@
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
     xsi:schemaLocation="http://www.mmbase.org/xmlns/createcaches 
http://www.mmbase.org/xmlns/createcaches.xsd";
     >
-  <localhost max_simultaneous_transcoders="1" />
+  <localhost max_simultaneous_transcoders="1" stage="recognizer" />
+  <localhost max_simultaneous_transcoders="1" stage="transcoder" />
 
 
   <recognizer id="recognizer">

Modified: 
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_5.xml
===================================================================
--- 
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_5.xml
      2009-11-12 16:36:50 UTC (rev 39675)
+++ 
mmbase/trunk/applications/streams/src/test/resources/org/mmbase/config/dummycreatecaches_5.xml
      2009-11-12 17:18:02 UTC (rev 39676)
@@ -3,9 +3,9 @@
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
     xsi:schemaLocation="http://www.mmbase.org/xmlns/createcaches 
http://www.mmbase.org/xmlns/createcaches.xsd";
     >
-  <localhost max_simultaneous_transcoders="1" />
+  <localhost max_simultaneous_transcoders="1" stage="recognizer" />
+  <localhost max_simultaneous_transcoders="1" stage="transcoder" />
 
-
   <recognizer id="recognizer">
     <class name="org.mmbase.streams.transcoders.FFMpegRecognizer">
     </class>

_______________________________________________
Cvs mailing list
[email protected]
http://lists.mmbase.org/mailman/listinfo/cvs

Reply via email to